Compare commits
17 Commits
Author | SHA1 | Date |
---|---|---|
James Ravenscroft | 7e9aa77769 | |
James Ravenscroft | b483ebf869 | |
James Ravenscroft | 77926ec92b | |
James Ravenscroft | 6dcf787c4f | |
James Ravenscroft | 8cc5482422 | |
James Ravenscroft | 95662f180e | |
James Ravenscroft | 165fc400d4 | |
James Ravenscroft | 0fb2380fd9 | |
James Ravenscroft | f583942894 | |
James Ravenscroft | 0882d5ce24 | |
James Ravenscroft | db34ff3d37 | |
ravenscroftj | 01a80a6fc2 | |
ravenscroftj | 117a070203 | |
James Ravenscroft | cab7a15728 | |
James Ravenscroft | 55265d86a1 | |
ravenscroftj | 36b3d90f4f | |
James Ravenscroft | 47bdb1631b |
12
.drone.yml
12
.drone.yml
|
@ -11,12 +11,12 @@ steps:
|
||||||
- poetry run pytest
|
- poetry run pytest
|
||||||
|
|
||||||
- name: publish
|
- name: publish
|
||||||
when:
|
# when:
|
||||||
branch:
|
# branch:
|
||||||
- master
|
# - master
|
||||||
event:
|
# event:
|
||||||
exclude:
|
# exclude:
|
||||||
- pull_request
|
# - pull_request
|
||||||
image: python:3.7
|
image: python:3.7
|
||||||
environment:
|
environment:
|
||||||
GITEA_PACKAGE_REPO:
|
GITEA_PACKAGE_REPO:
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,6 +1,6 @@
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "pymicrocosm"
|
name = "pymicrocosm"
|
||||||
version = "0.2.0"
|
version = "0.3.0"
|
||||||
description = "A tiny python-based micropub endpoint that supports a Gitea + drone static website"
|
description = "A tiny python-based micropub endpoint that supports a Gitea + drone static website"
|
||||||
authors = ["James Ravenscroft <ravenscroftj@gmail.com>"]
|
authors = ["James Ravenscroft <ravenscroftj@gmail.com>"]
|
||||||
license = "AGPL-3.0"
|
license = "AGPL-3.0"
|
||||||
|
@ -14,7 +14,7 @@ packages = [
|
||||||
|
|
||||||
|
|
||||||
[tool.poetry.dependencies]
|
[tool.poetry.dependencies]
|
||||||
python = "^3.7.1"
|
python = ">=3.8,<4.0.0"
|
||||||
Flask = "^2.0.2"
|
Flask = "^2.0.2"
|
||||||
giteapy = {url = "https://github.com/dblueai/giteapy/archive/master.zip"}
|
giteapy = {url = "https://github.com/dblueai/giteapy/archive/master.zip"}
|
||||||
requests = "^2.27.1"
|
requests = "^2.27.1"
|
||||||
|
@ -22,6 +22,9 @@ python-dotenv = "^0.19.2"
|
||||||
python-slugify = "^5.0.2"
|
python-slugify = "^5.0.2"
|
||||||
PyYAML = "^6.0"
|
PyYAML = "^6.0"
|
||||||
Flask-Micropub = "^0.2.8"
|
Flask-Micropub = "^0.2.8"
|
||||||
|
pillow = "^10.0.0"
|
||||||
|
clientapi-forgejo = "^1.0.0"
|
||||||
|
loguru = "^0.7.2"
|
||||||
|
|
||||||
[tool.poetry.dev-dependencies]
|
[tool.poetry.dev-dependencies]
|
||||||
pytest = "^6.2.5"
|
pytest = "^6.2.5"
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
#!/bin/bash
|
||||||
|
FLASK_APP=microcosm.wsgi:app poetry run flask run --port 10183
|
|
@ -3,11 +3,16 @@ import requests
|
||||||
import os
|
import os
|
||||||
import functools
|
import functools
|
||||||
import dotenv
|
import dotenv
|
||||||
import giteapy
|
|
||||||
import giteapy.rest
|
|
||||||
import time
|
import time
|
||||||
import base64
|
import base64
|
||||||
|
import mimetypes
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import clientapi_forgejo as forgejo
|
||||||
|
|
||||||
from werkzeug.datastructures import FileStorage
|
from werkzeug.datastructures import FileStorage
|
||||||
import yaml
|
import yaml
|
||||||
|
@ -23,35 +28,48 @@ from flask import Flask, jsonify, request, Response, Blueprint
|
||||||
dotenv.load_dotenv()
|
dotenv.load_dotenv()
|
||||||
|
|
||||||
PERMITTED_DOMAIN = os.environ.get(
|
PERMITTED_DOMAIN = os.environ.get(
|
||||||
'PERMITTED_DOMAINS', 'https://brainsteam.co.uk/').split(';')
|
"PERMITTED_DOMAINS", "https://brainsteam.co.uk/"
|
||||||
|
).split(";")
|
||||||
|
|
||||||
|
|
||||||
|
ENTITY_TYPE_PLURAL_MAP = {"reply": "replies", "watch": "watches"}
|
||||||
ENTITY_TYPE_PLURAL_MAP = {
|
|
||||||
"reply": "replies",
|
|
||||||
"watch": "watches"
|
|
||||||
}
|
|
||||||
|
|
||||||
core_bp = Blueprint("core", __name__)
|
core_bp = Blueprint("core", __name__)
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidRequestException(Exception):
|
||||||
|
"""Class of exception raised when the server receives an invalid request"""
|
||||||
|
|
||||||
|
|
||||||
|
# create a custom handler
|
||||||
|
class InterceptHandler(logging.Handler):
|
||||||
|
def emit(self, record):
|
||||||
|
logger_opt = logger.opt(depth=6, exception=record.exc_info)
|
||||||
|
logger_opt.log(record.levelno, record.getMessage())
|
||||||
|
|
||||||
|
|
||||||
def create_app():
|
def create_app():
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
app.config['SECRET_KEY'] = 'my super secret key'
|
app.config["SECRET_KEY"] = "my super secret key"
|
||||||
|
|
||||||
#app.config.from_file(os.path.join(os.getcwd(), "config.yaml"), yaml.safe_load)
|
# app.config.from_file(os.path.join(os.getcwd(), "config.yaml"), yaml.safe_load)
|
||||||
|
|
||||||
from .indieauth import micropub, auth_bp
|
from .indieauth import micropub, auth_bp
|
||||||
from .webmentions import webhook_bp
|
from .webmentions import webhook_bp
|
||||||
|
|
||||||
print(app.config)
|
print(app.config)
|
||||||
|
|
||||||
micropub.init_app(app, app.config.get('INDIEAUTH_CLIENT_ID', 'test.com'))
|
micropub.init_app(app, os.environ.get("INDIEAUTH_CLIENT_ID", "test.com"))
|
||||||
|
|
||||||
app.register_blueprint(auth_bp)
|
app.register_blueprint(auth_bp)
|
||||||
app.register_blueprint(core_bp)
|
app.register_blueprint(core_bp)
|
||||||
app.register_blueprint(webhook_bp)
|
app.register_blueprint(webhook_bp)
|
||||||
|
|
||||||
|
logger.add(sys.stderr, level=logging.WARN, backtrace=True, diagnose=True)
|
||||||
|
|
||||||
|
# logger.start()
|
||||||
|
app.logger.addHandler(InterceptHandler())
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
@ -59,19 +77,24 @@ def authed_endpoint(f):
|
||||||
@functools.wraps(f)
|
@functools.wraps(f)
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
|
|
||||||
authtok = request.headers.get('Authorization')
|
authtok = request.headers.get("Authorization")
|
||||||
|
|
||||||
if authtok is None:
|
if authtok is None:
|
||||||
return {
|
return {
|
||||||
"error": "unauthorized",
|
"error": "unauthorized",
|
||||||
"error_description": "An auth token was not provided"
|
"error_description": "An auth token was not provided",
|
||||||
}, 401
|
}, 401
|
||||||
|
|
||||||
auth = requests.get("https://tokens.indieauth.com/token", headers={
|
auth = requests.get(
|
||||||
"Authorization": authtok, "Accept": "application/json"}).json()
|
"https://tokens.indieauth.com/token",
|
||||||
|
headers={"Authorization": authtok, "Accept": "application/json"},
|
||||||
|
).json()
|
||||||
|
|
||||||
if auth.get('me','') not in PERMITTED_DOMAIN:
|
if auth.get("me", "") not in PERMITTED_DOMAIN:
|
||||||
return {"error": "insufficient_scope", "error_description": f"User \"{auth.get('me','')}\" not permitted to post here"}, 401
|
return {
|
||||||
|
"error": "insufficient_scope",
|
||||||
|
"error_description": f"User \"{auth.get('me','')}\" not permitted to post here",
|
||||||
|
}, 401
|
||||||
|
|
||||||
return f(*args, *kwargs)
|
return f(*args, *kwargs)
|
||||||
|
|
||||||
|
@ -81,64 +104,87 @@ def authed_endpoint(f):
|
||||||
_api_client = None
|
_api_client = None
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidRequestException(Exception):
|
||||||
|
"""Invalid Request"""
|
||||||
|
|
||||||
|
|
||||||
def process_photo_url(created_at: datetime, doc: Dict[str, List[str]], suffix: str = ""):
|
def process_photo_url(
|
||||||
|
created_at: datetime, doc: Dict[str, List[str]], suffix: str = ""
|
||||||
|
):
|
||||||
"""Process photo submitted via URL"""
|
"""Process photo submitted via URL"""
|
||||||
|
|
||||||
now_ts = int(time.mktime(created_at.timetuple()))
|
now_ts = int(time.mktime(created_at.timetuple()))
|
||||||
|
|
||||||
photo_urls = []
|
photo_urls = []
|
||||||
|
|
||||||
if isinstance(doc['photo'], str):
|
if isinstance(doc["photo"], str):
|
||||||
doc['photo'] = [doc['photo']]
|
doc["photo"] = [doc["photo"]]
|
||||||
|
|
||||||
|
for i, photo in enumerate(doc["photo"]):
|
||||||
|
|
||||||
for i, photo in enumerate(doc['photo']):
|
if isinstance(photo, str):
|
||||||
|
photo = {"url": photo, "alt": ""}
|
||||||
|
|
||||||
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
|
if os.environ.get("MICROPUB_IMAGE_STRATEGY") == "copy":
|
||||||
# download the photo
|
# download the photo
|
||||||
|
|
||||||
r = requests.get(photo)
|
r = requests.get(photo["url"])
|
||||||
|
|
||||||
ext = os.path.splitext(photo)[1]
|
ext = os.path.splitext(photo["url"])[1]
|
||||||
|
|
||||||
# generate local filename
|
# generate local filename
|
||||||
filename = os.path.join(os.environ.get(
|
filename = os.path.join(
|
||||||
'MICROPUB_MEDIA_PATH'), created_at.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}")
|
os.environ.get("MICROPUB_MEDIA_PATH"),
|
||||||
photo_url = os.path.join(os.environ.get(
|
created_at.strftime("%Y/%m/%d"),
|
||||||
'MICROPUB_MEDIA_URL_PREFIX'), created_at.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}")
|
str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}",
|
||||||
|
)
|
||||||
|
|
||||||
photo_urls.append(photo_url)
|
photo_url = os.path.join(
|
||||||
|
os.environ.get("MICROPUB_MEDIA_URL_PREFIX"),
|
||||||
|
created_at.strftime("%Y/%m/%d"),
|
||||||
|
str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}",
|
||||||
|
)
|
||||||
|
|
||||||
|
photo_urls.append((photo_url, photo["alt"]))
|
||||||
|
|
||||||
# make directory if needed
|
# make directory if needed
|
||||||
if not os.path.exists(os.path.dirname(filename)):
|
if not os.path.exists(os.path.dirname(filename)):
|
||||||
os.makedirs(os.path.dirname(filename))
|
os.makedirs(os.path.dirname(filename))
|
||||||
|
|
||||||
with open(filename, 'wb') as f:
|
with open(filename, "wb") as f:
|
||||||
f.write(r.content)
|
f.write(r.content)
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
photo_urls.append(photo)
|
photo_urls.append((photo["value"], photo["alt"]))
|
||||||
|
|
||||||
|
|
||||||
return photo_urls
|
return photo_urls
|
||||||
|
|
||||||
def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str=""):
|
|
||||||
|
def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str = ""):
|
||||||
"""Process photo directly uploaded to micropub"""
|
"""Process photo directly uploaded to micropub"""
|
||||||
|
|
||||||
now_ts = int(time.mktime(created_at.timetuple()))
|
now_ts = int(time.mktime(created_at.timetuple()))
|
||||||
|
|
||||||
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
|
if os.environ.get("MICROPUB_IMAGE_STRATEGY") == "copy":
|
||||||
|
|
||||||
|
file.mimetype
|
||||||
|
|
||||||
ext = os.path.splitext(file.filename)[1]
|
ext = os.path.splitext(file.filename)[1]
|
||||||
|
|
||||||
|
if ext == "":
|
||||||
|
ext = mimetypes.guess_extension(file.mimetype)
|
||||||
|
|
||||||
# generate local filename
|
# generate local filename
|
||||||
filename = os.path.join(os.environ.get(
|
filename = os.path.join(
|
||||||
'MICROPUB_MEDIA_PATH'), created_at.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
|
os.environ.get("MICROPUB_MEDIA_PATH"),
|
||||||
photo_url = os.path.join(os.environ.get(
|
created_at.strftime("%Y/%m/%d"),
|
||||||
'MICROPUB_MEDIA_URL_PREFIX'), created_at.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
|
f"{now_ts}_{suffix}{ext}",
|
||||||
|
)
|
||||||
|
photo_url = os.path.join(
|
||||||
|
os.environ.get("MICROPUB_MEDIA_URL_PREFIX"),
|
||||||
|
created_at.strftime("%Y/%m/%d"),
|
||||||
|
f"{now_ts}_{suffix}{ext}",
|
||||||
|
)
|
||||||
|
|
||||||
# make directory if needed
|
# make directory if needed
|
||||||
if not os.path.exists(os.path.dirname(filename)):
|
if not os.path.exists(os.path.dirname(filename)):
|
||||||
|
@ -152,7 +198,7 @@ def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str=""
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str]=None):
|
def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str] = None):
|
||||||
|
|
||||||
now_ts = int(time.mktime(created_at.timetuple()))
|
now_ts = int(time.mktime(created_at.timetuple()))
|
||||||
|
|
||||||
|
@ -164,89 +210,117 @@ def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str]=N
|
||||||
else:
|
else:
|
||||||
slug = str(now_ts)
|
slug = str(now_ts)
|
||||||
|
|
||||||
|
url = os.path.join(
|
||||||
|
"/",
|
||||||
|
ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
|
||||||
|
created_at.strftime("%Y/%m/%d"),
|
||||||
|
slug,
|
||||||
|
)
|
||||||
|
|
||||||
url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
|
print(os.environ.get("CONTENT_PREFIX"))
|
||||||
created_at.strftime("%Y/%m/%d"), slug)
|
|
||||||
|
|
||||||
print(os.environ.get(
|
file_path = os.path.join(
|
||||||
'CONTENT_PREFIX'))
|
os.environ.get("CONTENT_PREFIX"),
|
||||||
|
ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
|
||||||
file_path = os.path.join(os.environ.get(
|
created_at.strftime("%Y/%m/%d"),
|
||||||
'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), created_at.strftime("%Y/%m/%d"), slug + ".md")
|
slug + ".md",
|
||||||
|
)
|
||||||
|
|
||||||
frontmatter = {
|
frontmatter = {
|
||||||
|
"post_meta": ["date"],
|
||||||
"url": url,
|
"url": url,
|
||||||
"type": post_type,
|
"type": ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
|
||||||
"date": created_at.isoformat(sep='T'),
|
"date": created_at.isoformat(sep="T"),
|
||||||
}
|
}
|
||||||
|
|
||||||
return frontmatter, file_path
|
return frontmatter, file_path
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def detect_entry_type(doc: dict) -> str:
|
def detect_entry_type(doc: dict) -> str:
|
||||||
"""Given a dictionary object from either form or json, detect type of post"""
|
"""Given a dictionary object from either form or json, detect type of post"""
|
||||||
|
|
||||||
|
if "hypothesis-link" in doc:
|
||||||
|
entry_type = "annotation"
|
||||||
|
|
||||||
if ('in-reply-to' in doc) or ('u-in-reply-to' in doc):
|
elif ("in-reply-to" in doc) or ("u-in-reply-to" in doc):
|
||||||
entry_type = "reply"
|
entry_type = "reply"
|
||||||
|
|
||||||
elif ('bookmark-of' in doc) or ('u-bookmark-of' in doc):
|
elif ("bookmark-of" in doc) or ("u-bookmark-of" in doc):
|
||||||
entry_type = "bookmark"
|
entry_type = "bookmark"
|
||||||
elif ('repost-of' in doc) or ('u-repost-of' in doc):
|
elif ("repost-of" in doc) or ("u-repost-of" in doc):
|
||||||
entry_type = "repost"
|
entry_type = "repost"
|
||||||
elif ('like-of' in doc) or ('u-like-of' in doc):
|
elif ("like-of" in doc) or ("u-like-of" in doc):
|
||||||
entry_type = "like"
|
entry_type = "like"
|
||||||
|
|
||||||
elif ('read-of' in doc):
|
elif "read-of" in doc:
|
||||||
entry_type = "read"
|
entry_type = "read"
|
||||||
|
|
||||||
elif ('watch-of' in doc):
|
elif "watch-of" in doc:
|
||||||
entry_type = "watch"
|
entry_type = "watch"
|
||||||
|
|
||||||
elif ('name' in doc) or ('p-name' in doc):
|
elif ("name" in doc) or ("p-name" in doc):
|
||||||
entry_type = "post"
|
entry_type = "post"
|
||||||
else:
|
else:
|
||||||
entry_type = "note"
|
entry_type = "note"
|
||||||
|
|
||||||
return entry_type
|
return entry_type
|
||||||
|
|
||||||
def capture_frontmatter_props(doc: Dict[str, Union[str, List[str]]], frontmatter: Dict[str, Union[str,List[str]]]):
|
|
||||||
|
|
||||||
|
def capture_frontmatter_props(
|
||||||
|
doc: Dict[str, Union[str, List[str]]], frontmatter: Dict[str, Union[str, List[str]]]
|
||||||
|
):
|
||||||
|
|
||||||
keys = ['summary', 'bookmark-of', 'in-reply-to', 'repost-of', 'like-of', 'read-of', 'watch-of', 'listen-of', 'read-status', 'rating']
|
keys = [
|
||||||
|
"summary",
|
||||||
|
"bookmark-of",
|
||||||
|
"in-reply-to",
|
||||||
|
"repost-of",
|
||||||
|
"like-of",
|
||||||
|
"read-of",
|
||||||
|
"watch-of",
|
||||||
|
"listen-of",
|
||||||
|
"read-status",
|
||||||
|
"rating",
|
||||||
|
]
|
||||||
|
|
||||||
keys += [f'u-{key}' for key in keys]
|
keys += [f"u-{key}" for key in keys]
|
||||||
|
|
||||||
for key in keys:
|
for key in keys:
|
||||||
|
|
||||||
if key in doc:
|
if key in doc:
|
||||||
|
|
||||||
if isinstance(doc[key], dict) and ('type' in doc[key]):
|
if isinstance(doc[key], dict) and ("type" in doc[key]):
|
||||||
|
|
||||||
if doc[key]['type'][0] == 'h-cite':
|
if doc[key]["type"][0] == "h-cite":
|
||||||
|
|
||||||
if 'citations' not in frontmatter:
|
if "citations" not in frontmatter:
|
||||||
frontmatter['citations'] = []
|
frontmatter["citations"] = []
|
||||||
frontmatter['citations'].append(doc[key]['properties'])
|
frontmatter["citations"].append(doc[key]["properties"])
|
||||||
|
|
||||||
elif isinstance(doc[key], list) and (len(doc[key]) < 2):
|
elif isinstance(doc[key], list) and (len(doc[key]) < 2):
|
||||||
frontmatter[key] = doc[key][0]
|
frontmatter[key] = doc[key][0]
|
||||||
else:
|
else:
|
||||||
frontmatter[key] = doc[key]
|
frontmatter[key] = doc[key]
|
||||||
|
|
||||||
if 'category' in doc:
|
if "hypothesis-link" in doc:
|
||||||
if isinstance(doc['category'], list):
|
# get the hypothesis data and store it
|
||||||
categories = doc['category']
|
r = requests.get(doc["hypothesis-link"][0])
|
||||||
|
|
||||||
|
frontmatter["hypothesis-meta"] = r.json()
|
||||||
|
|
||||||
|
if "category" in doc:
|
||||||
|
if isinstance(doc["category"], list):
|
||||||
|
categories = doc["category"]
|
||||||
else:
|
else:
|
||||||
categories = [doc['category']]
|
categories = [doc["category"]]
|
||||||
elif 'p-category' in doc:
|
elif "p-category" in doc:
|
||||||
categories = doc['p-category']
|
categories = doc["p-category"]
|
||||||
else:
|
else:
|
||||||
categories = request.form.getlist('category[]')
|
categories = request.form.getlist("category[]")
|
||||||
|
|
||||||
if len(categories) > 0:
|
if len(categories) > 0:
|
||||||
frontmatter['tags'] = categories
|
frontmatter["tags"] = categories
|
||||||
|
|
||||||
|
|
||||||
def process_multipart_post():
|
def process_multipart_post():
|
||||||
doc = request.form.to_dict(flat=True)
|
doc = request.form.to_dict(flat=True)
|
||||||
|
@ -255,104 +329,132 @@ def process_multipart_post():
|
||||||
|
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
|
|
||||||
frontmatter, file_path = init_frontmatter(now, entry_type, doc.get('name'))
|
frontmatter, file_path = init_frontmatter(now, entry_type, doc.get("name"))
|
||||||
|
|
||||||
capture_frontmatter_props(doc, frontmatter)
|
capture_frontmatter_props(doc, frontmatter)
|
||||||
|
|
||||||
|
if "name" in doc:
|
||||||
|
frontmatter["title"] = doc["name"]
|
||||||
|
|
||||||
if 'name' in doc:
|
if ("photo" in doc) or ("photo" in request.files) or ("photo[]" in request.files):
|
||||||
frontmatter['title'] = doc['name']
|
|
||||||
|
|
||||||
|
frontmatter["photo"] = []
|
||||||
|
|
||||||
if ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files):
|
if "photo[]" in request.files:
|
||||||
|
photos = request.files.getlist("photo[]")
|
||||||
frontmatter['photo'] = []
|
|
||||||
|
|
||||||
if 'photo[]' in request.files:
|
|
||||||
photos = request.files.getlist('photo[]')
|
|
||||||
|
|
||||||
docstr = ""
|
docstr = ""
|
||||||
|
|
||||||
for i, photo in enumerate(photos):
|
for i, photo in enumerate(photos):
|
||||||
photo_url = process_photo_upload(now, photo, suffix=i)
|
photo_url = process_photo_upload(now, photo, suffix=i)
|
||||||
|
|
||||||
frontmatter['photo'].append(photo_url)
|
if "thumbnail" not in frontmatter:
|
||||||
|
frontmatter["thumbnail"] = photo_url
|
||||||
|
|
||||||
docstr += f"\n\n<img src=\"{photo_url}\" class=\"u-photo\" />"
|
frontmatter["photo"].append(photo_url)
|
||||||
|
|
||||||
|
#docstr += f'\n\n<img src="{photo_url}" class="u-photo" />'
|
||||||
|
|
||||||
docstr += f"\n\n {doc['content']}"
|
docstr += f"\n\n {doc['content']}"
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
||||||
if 'photo' in doc:
|
if "photo" in doc:
|
||||||
photo_urls = process_photo_url(now, doc)
|
photo_objects = process_photo_url(now, doc)
|
||||||
else:
|
|
||||||
photo_urls = [process_photo_upload(now, request.files['photo'])]
|
else:
|
||||||
|
photo_objects = [
|
||||||
|
(process_photo_upload(now, request.files["photo"]), "")
|
||||||
|
]
|
||||||
|
|
||||||
|
frontmatter["photo"] = [
|
||||||
|
{"value": photo[0], "alt": photo[1]} for photo in photo_objects
|
||||||
|
]
|
||||||
|
frontmatter["thumbnail"] = photo_objects[0][0]
|
||||||
|
|
||||||
frontmatter['photo'] = photo_urls
|
|
||||||
docstr = ""
|
docstr = ""
|
||||||
for photo in photo_urls:
|
#for photo in photo_objects:
|
||||||
docstr += f"<img src=\"{photo}\" class=\"u-photo\" /> \n\n {doc['content']}"
|
# docstr += f"<img src=\"{photo[0]}\" alt=\"{photo[1]}\" class=\"u-photo\" /> \n\n {doc['content']}"
|
||||||
else:
|
else:
|
||||||
docstr = doc.get('content','') if 'content' in doc else ""
|
docstr = doc.get("content", "") if "content" in doc else ""
|
||||||
|
|
||||||
if 'mp-syndicate-to' in doc:
|
if "mp-syndicate-to" in doc:
|
||||||
frontmatter['mp-syndicate-to'] = doc['mp-syndicate-to'].split(",")
|
frontmatter["mp-syndicate-to"] = doc["mp-syndicate-to"].split(",")
|
||||||
|
|
||||||
for url in doc['mp-syndicate-to'].split(","):
|
for url in doc["mp-syndicate-to"].split(","):
|
||||||
docstr += f"\n<a href=\"{url}\"></a>"
|
docstr += f'\n<a href="{url}"></a>'
|
||||||
|
|
||||||
if 'mp-syndicate-to[]' in request.form:
|
if "mp-syndicate-to[]" in request.form:
|
||||||
frontmatter['mp-syndicate-to'] = request.form.getlist('mp-syndicate-to[]')
|
frontmatter["mp-syndicate-to"] = request.form.getlist("mp-syndicate-to[]")
|
||||||
|
|
||||||
for url in request.form.getlist('mp-syndicate-to[]'):
|
for url in request.form.getlist("mp-syndicate-to[]"):
|
||||||
docstr += f"\n<a href=\"{url}\"></a>"
|
docstr += f'\n<a href="{url}"></a>'
|
||||||
|
|
||||||
return docstr, frontmatter, file_path
|
return docstr, frontmatter, file_path
|
||||||
|
|
||||||
|
|
||||||
|
def process_image_alt_texts(doc):
|
||||||
|
|
||||||
|
alts = []
|
||||||
|
|
||||||
|
if isinstance(doc["photo"], str):
|
||||||
|
doc["photo"] = [doc["photo"]]
|
||||||
|
|
||||||
|
for i, photo in enumerate(doc["photo"]):
|
||||||
|
|
||||||
|
if isinstance(photo, dict):
|
||||||
|
alts.append(doc["alt"])
|
||||||
|
else:
|
||||||
|
alts.append("")
|
||||||
|
|
||||||
|
return alts
|
||||||
|
|
||||||
|
|
||||||
def process_json_post():
|
def process_json_post():
|
||||||
"""Process JSON POST submission"""
|
"""Process JSON POST submission"""
|
||||||
|
|
||||||
body = request.get_json()
|
body = request.get_json()
|
||||||
|
|
||||||
# get post type - take the first item in the array
|
# get post type - take the first item in the array
|
||||||
if body['type'][0] != 'h-entry':
|
if body["type"][0] != "h-entry":
|
||||||
return jsonify({"error":"invalid_format"}), 400
|
return jsonify({"error": "invalid_format"}), 400
|
||||||
|
|
||||||
props = body['properties']
|
props = body["properties"]
|
||||||
entry_type = detect_entry_type(props)
|
entry_type = detect_entry_type(props)
|
||||||
|
|
||||||
if 'published' in props:
|
if "published" in props:
|
||||||
|
|
||||||
from dateutil import parser
|
from dateutil import parser
|
||||||
|
|
||||||
now = parser.parse(props['published'][0])
|
now = parser.parse(props["published"][0])
|
||||||
else:
|
else:
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
|
|
||||||
frontmatter, file_path = init_frontmatter(now, entry_type, props.get('name'))
|
frontmatter, file_path = init_frontmatter(now, entry_type, props.get("name"))
|
||||||
|
|
||||||
capture_frontmatter_props(props, frontmatter)
|
capture_frontmatter_props(props, frontmatter)
|
||||||
|
|
||||||
if 'name' in props:
|
if "name" in props:
|
||||||
frontmatter['title'] = props['name'][0]
|
frontmatter["title"] = props["name"][0]
|
||||||
|
|
||||||
docstr = ""
|
docstr = ""
|
||||||
|
|
||||||
if 'photo' in props:
|
if "photo" in props:
|
||||||
|
|
||||||
photo_urls = process_photo_url(now, props)
|
photo_objects = process_photo_url(now, props)
|
||||||
|
|
||||||
frontmatter['photo'] = photo_urls
|
frontmatter["photo"] = [
|
||||||
|
{"value": photo[0], "alt": photo[1]} for photo in photo_objects
|
||||||
|
]
|
||||||
|
frontmatter["thumbnail"] = frontmatter["photo"][0]["value"]
|
||||||
|
docstr = ""
|
||||||
|
#for photo in photo_objects:
|
||||||
|
# docstr += f'<img src="{photo[0]}" alt="{photo[1]}" class="u-photo" /> \n\n'
|
||||||
|
|
||||||
for photo in photo_urls:
|
for content in props.get("content", []):
|
||||||
docstr += f"\n\n<img src=\"{photo}\" class=\"u-photo\" />"
|
|
||||||
|
|
||||||
for content in props.get('content', []):
|
|
||||||
|
|
||||||
if isinstance(content, dict):
|
if isinstance(content, dict):
|
||||||
if 'html' in content:
|
if "html" in content:
|
||||||
docstr += f"\n\n {content.get('html')}"
|
docstr += f"\n\n {content.get('html')}"
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -360,19 +462,20 @@ def process_json_post():
|
||||||
|
|
||||||
return docstr, frontmatter, file_path
|
return docstr, frontmatter, file_path
|
||||||
|
|
||||||
def get_api_client() -> giteapy.RepositoryApi:
|
|
||||||
|
def get_api_client() -> forgejo.RepositoryApi:
|
||||||
global _api_client
|
global _api_client
|
||||||
|
|
||||||
if _api_client is None:
|
if _api_client is None:
|
||||||
config = giteapy.Configuration()
|
config = forgejo.Configuration()
|
||||||
config.host = os.environ.get('GITEA_URL')
|
config.host = os.environ.get("GITEA_URL")
|
||||||
config.api_key['access_token'] = os.environ.get('GITEA_API_KEY')
|
config.api_key["Token"] = os.environ.get("GITEA_API_KEY")
|
||||||
_api_client = giteapy.RepositoryApi(giteapy.ApiClient(config))
|
_api_client = forgejo.RepositoryApi(forgejo.ApiClient(config))
|
||||||
|
|
||||||
return _api_client
|
return _api_client
|
||||||
|
|
||||||
|
|
||||||
@core_bp.route('/', methods=['POST'])
|
@core_bp.route("/", methods=["POST"])
|
||||||
@authed_endpoint
|
@authed_endpoint
|
||||||
def req():
|
def req():
|
||||||
|
|
||||||
|
@ -382,34 +485,39 @@ def req():
|
||||||
docstr, frontmatter, file_path = process_multipart_post()
|
docstr, frontmatter, file_path = process_multipart_post()
|
||||||
|
|
||||||
frontmatter_str = yaml.dump(frontmatter)
|
frontmatter_str = yaml.dump(frontmatter)
|
||||||
content = base64.encodestring(
|
content = base64.encodebytes(
|
||||||
f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")).decode("utf8")
|
f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")
|
||||||
|
).decode("utf8")
|
||||||
|
|
||||||
api = get_api_client()
|
api = get_api_client()
|
||||||
|
|
||||||
body = giteapy.CreateFileOptions(content=content)
|
body = forgejo.CreateFileOptions(content=content)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
r = api.repo_create_file(os.environ.get(
|
r = api.repo_create_file(
|
||||||
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), file_path, body)
|
os.environ.get("GITEA_REPO_OWNER"),
|
||||||
|
os.environ.get("GITEA_REPO_NAME"),
|
||||||
|
file_path,
|
||||||
|
body,
|
||||||
|
)
|
||||||
|
|
||||||
return Response(status=202, headers={"Location": frontmatter['url']})
|
return Response(status=202, headers={"Location": frontmatter["url"]})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {"error": str(e)}, 500
|
|
||||||
|
|
||||||
|
logger.error(e, exc_info=True)
|
||||||
|
return {"error": str(e)}, 500
|
||||||
|
|
||||||
|
|
||||||
def parse_categories():
|
def parse_categories():
|
||||||
|
|
||||||
strategy = os.environ.get('MICROPUB_CATEGORY_LIST_STRATEGY')
|
strategy = os.environ.get("MICROPUB_CATEGORY_LIST_STRATEGY")
|
||||||
|
|
||||||
if strategy == 'feed':
|
if strategy == "feed":
|
||||||
tree = ElementTree.parse(os.environ.get('MICROPUB_CATEGORY_LIST_FILE'))
|
tree = ElementTree.parse(os.environ.get("MICROPUB_CATEGORY_LIST_FILE"))
|
||||||
tags = tree.findall('.//item/title')
|
tags = tree.findall(".//item/title")
|
||||||
|
|
||||||
|
return {"categories": [tag.text for tag in tags]}
|
||||||
return {"categories": [tag.text for tag in tags] }
|
|
||||||
|
|
||||||
|
|
||||||
def get_syndication_targets():
|
def get_syndication_targets():
|
||||||
|
@ -437,7 +545,7 @@ def get_syndication_targets():
|
||||||
def media_endpoint():
|
def media_endpoint():
|
||||||
|
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
url = process_photo_upload(now, request.files['file'])
|
url = process_photo_upload(now, request.files["file"])
|
||||||
|
|
||||||
return Response(status=201, headers={"Location": url})
|
return Response(status=201, headers={"Location": url})
|
||||||
|
|
||||||
|
@ -445,50 +553,33 @@ def media_endpoint():
|
||||||
def generate_config_json():
|
def generate_config_json():
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"media-endpoint": request.base_url + "media",
|
"media-endpoint": os.environ.get(f"MICROCOSM_BASE_URL", request.base_url)
|
||||||
|
+ "media",
|
||||||
"syndicate-to": get_syndication_targets(),
|
"syndicate-to": get_syndication_targets(),
|
||||||
"post-types": [
|
"post-types": [
|
||||||
{
|
{"type": "note", "name": "Note"},
|
||||||
"type": "note",
|
{"type": "article", "name": "Blog Post"},
|
||||||
"name": "Note"
|
{"type": "photo", "name": "Photo"},
|
||||||
},
|
{"type": "reply", "name": "Reply"},
|
||||||
{
|
{"type": "bookmark", "name": "Bookmark"},
|
||||||
"type": "article",
|
{"type": "like", "name": "Like"},
|
||||||
"name": "Blog Post"
|
],
|
||||||
},
|
|
||||||
{
|
|
||||||
"type": "photo",
|
|
||||||
"name": "Photo"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"type": "reply",
|
|
||||||
"name": "Reply"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"type": "bookmark",
|
|
||||||
"name": "Bookmark"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"type": "like",
|
|
||||||
"name":"Like"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@core_bp.route("/", methods=['GET'])
|
@core_bp.route("/", methods=["GET"])
|
||||||
@authed_endpoint
|
@authed_endpoint
|
||||||
def index():
|
def index():
|
||||||
|
|
||||||
if request.args.get('q') == 'config':
|
if request.args.get("q") == "config":
|
||||||
return generate_config_json()
|
return generate_config_json()
|
||||||
|
|
||||||
elif request.args.get('q') == 'category':
|
elif request.args.get("q") == "category":
|
||||||
return parse_categories()
|
return parse_categories()
|
||||||
|
|
||||||
elif request.args.get('q') == 'syndicate-to':
|
elif request.args.get("q") == "syndicate-to":
|
||||||
return {"syndicate-to": get_syndication_targets()}
|
return {"syndicate-to": get_syndication_targets()}
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
app.run(debug=False)
|
app.run(debug=False)
|
||||||
|
|
|
@ -37,13 +37,13 @@ def authform():
|
||||||
@auth_bp.route('/authenticate')
|
@auth_bp.route('/authenticate')
|
||||||
def authenticate():
|
def authenticate():
|
||||||
return micropub.authenticate(
|
return micropub.authenticate(
|
||||||
request.args.get('me'), next_url=url_for('index'))
|
request.args.get('me'), next_url=url_for('token.indieauth_callback'))
|
||||||
|
|
||||||
|
|
||||||
@auth_bp.route('/authorize')
|
@auth_bp.route('/authorize')
|
||||||
def authorize():
|
def authorize():
|
||||||
return micropub.authorize(
|
return micropub.authorize(
|
||||||
request.args.get('me'), next_url=url_for('index'),
|
request.args.get('me'), next_url=url_for('token.indieauth_callback'),
|
||||||
scope=request.args.get('scope'))
|
scope=request.args.get('scope'))
|
||||||
|
|
||||||
|
|
||||||
|
@ -65,6 +65,7 @@ def indieauth_callback(resp):
|
||||||
""".format(resp.me, resp.next_url, resp.error)
|
""".format(resp.me, resp.next_url, resp.error)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@auth_bp.route('/micropub-callback')
|
@auth_bp.route('/micropub-callback')
|
||||||
@micropub.authorized_handler
|
@micropub.authorized_handler
|
||||||
def micropub_callback(resp):
|
def micropub_callback(resp):
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
from . import create_app
|
||||||
|
|
||||||
|
|
||||||
|
app = create_app()
|
Loading…
Reference in New Issue