Compare commits

..

No commits in common. "develop" and "master" have entirely different histories.

8 changed files with 719 additions and 1106 deletions

View File

@ -11,12 +11,12 @@ steps:
- poetry run pytest
- name: publish
# when:
# branch:
# - master
# event:
# exclude:
# - pull_request
when:
branch:
- master
event:
exclude:
- pull_request
image: python:3.7
environment:
GITEA_PACKAGE_REPO:

1350
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "pymicrocosm"
version = "0.3.0"
version = "0.2.0"
description = "A tiny python-based micropub endpoint that supports a Gitea + drone static website"
authors = ["James Ravenscroft <ravenscroftj@gmail.com>"]
license = "AGPL-3.0"
@ -14,7 +14,7 @@ packages = [
[tool.poetry.dependencies]
python = ">=3.8,<4.0.0"
python = "^3.7.1"
Flask = "^2.0.2"
giteapy = {url = "https://github.com/dblueai/giteapy/archive/master.zip"}
requests = "^2.27.1"
@ -22,9 +22,6 @@ python-dotenv = "^0.19.2"
python-slugify = "^5.0.2"
PyYAML = "^6.0"
Flask-Micropub = "^0.2.8"
pillow = "^10.0.0"
clientapi-forgejo = "^1.0.0"
loguru = "^0.7.2"
[tool.poetry.dev-dependencies]
pytest = "^6.2.5"

2
run.sh
View File

@ -1,2 +0,0 @@
#!/bin/bash
FLASK_APP=microcosm.wsgi:app poetry run flask run --port 10183

View File

@ -3,16 +3,11 @@ import requests
import os
import functools
import dotenv
import giteapy
import giteapy.rest
import time
import base64
import mimetypes
import sys
from loguru import logger
import logging
import clientapi_forgejo as forgejo
from werkzeug.datastructures import FileStorage
import yaml
@ -28,48 +23,35 @@ from flask import Flask, jsonify, request, Response, Blueprint
dotenv.load_dotenv()
PERMITTED_DOMAIN = os.environ.get(
"PERMITTED_DOMAINS", "https://brainsteam.co.uk/"
).split(";")
'PERMITTED_DOMAINS', 'https://brainsteam.co.uk/').split(';')
ENTITY_TYPE_PLURAL_MAP = {"reply": "replies", "watch": "watches"}
ENTITY_TYPE_PLURAL_MAP = {
"reply": "replies",
"watch": "watches"
}
core_bp = Blueprint("core", __name__)
class InvalidRequestException(Exception):
"""Class of exception raised when the server receives an invalid request"""
# create a custom handler
class InterceptHandler(logging.Handler):
def emit(self, record):
logger_opt = logger.opt(depth=6, exception=record.exc_info)
logger_opt.log(record.levelno, record.getMessage())
def create_app():
app = Flask(__name__)
app.config["SECRET_KEY"] = "my super secret key"
# app.config.from_file(os.path.join(os.getcwd(), "config.yaml"), yaml.safe_load)
app.config['SECRET_KEY'] = 'my super secret key'
#app.config.from_file(os.path.join(os.getcwd(), "config.yaml"), yaml.safe_load)
from .indieauth import micropub, auth_bp
from .webmentions import webhook_bp
print(app.config)
micropub.init_app(app, os.environ.get("INDIEAUTH_CLIENT_ID", "test.com"))
micropub.init_app(app, app.config.get('INDIEAUTH_CLIENT_ID', 'test.com'))
app.register_blueprint(auth_bp)
app.register_blueprint(core_bp)
app.register_blueprint(webhook_bp)
logger.add(sys.stderr, level=logging.WARN, backtrace=True, diagnose=True)
# logger.start()
app.logger.addHandler(InterceptHandler())
return app
@ -77,24 +59,19 @@ def authed_endpoint(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
authtok = request.headers.get("Authorization")
authtok = request.headers.get('Authorization')
if authtok is None:
return {
"error": "unauthorized",
"error_description": "An auth token was not provided",
"error_description": "An auth token was not provided"
}, 401
auth = requests.get(
"https://tokens.indieauth.com/token",
headers={"Authorization": authtok, "Accept": "application/json"},
).json()
auth = requests.get("https://tokens.indieauth.com/token", headers={
"Authorization": authtok, "Accept": "application/json"}).json()
if auth.get("me", "") not in PERMITTED_DOMAIN:
return {
"error": "insufficient_scope",
"error_description": f"User \"{auth.get('me','')}\" not permitted to post here",
}, 401
if auth.get('me','') not in PERMITTED_DOMAIN:
return {"error": "insufficient_scope", "error_description": f"User \"{auth.get('me','')}\" not permitted to post here"}, 401
return f(*args, *kwargs)
@ -104,87 +81,64 @@ def authed_endpoint(f):
_api_client = None
class InvalidRequestException(Exception):
"""Invalid Request"""
def process_photo_url(
created_at: datetime, doc: Dict[str, List[str]], suffix: str = ""
):
def process_photo_url(created_at: datetime, doc: Dict[str, List[str]], suffix: str = ""):
"""Process photo submitted via URL"""
now_ts = int(time.mktime(created_at.timetuple()))
photo_urls = []
if isinstance(doc["photo"], str):
doc["photo"] = [doc["photo"]]
if isinstance(doc['photo'], str):
doc['photo'] = [doc['photo']]
for i, photo in enumerate(doc["photo"]):
for i, photo in enumerate(doc['photo']):
if isinstance(photo, str):
photo = {"url": photo, "alt": ""}
if os.environ.get("MICROPUB_IMAGE_STRATEGY") == "copy":
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
# download the photo
r = requests.get(photo["url"])
r = requests.get(photo)
ext = os.path.splitext(photo["url"])[1]
ext = os.path.splitext(photo)[1]
# generate local filename
filename = os.path.join(
os.environ.get("MICROPUB_MEDIA_PATH"),
created_at.strftime("%Y/%m/%d"),
str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}",
)
filename = os.path.join(os.environ.get(
'MICROPUB_MEDIA_PATH'), created_at.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}")
photo_url = os.path.join(os.environ.get(
'MICROPUB_MEDIA_URL_PREFIX'), created_at.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}")
photo_url = os.path.join(
os.environ.get("MICROPUB_MEDIA_URL_PREFIX"),
created_at.strftime("%Y/%m/%d"),
str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}",
)
photo_urls.append((photo_url, photo["alt"]))
photo_urls.append(photo_url)
# make directory if needed
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
with open(filename, "wb") as f:
with open(filename, 'wb') as f:
f.write(r.content)
else:
photo_urls.append((photo["value"], photo["alt"]))
photo_urls.append(photo)
return photo_urls
def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str = ""):
def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str=""):
"""Process photo directly uploaded to micropub"""
now_ts = int(time.mktime(created_at.timetuple()))
if os.environ.get("MICROPUB_IMAGE_STRATEGY") == "copy":
file.mimetype
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
ext = os.path.splitext(file.filename)[1]
if ext == "":
ext = mimetypes.guess_extension(file.mimetype)
# generate local filename
filename = os.path.join(
os.environ.get("MICROPUB_MEDIA_PATH"),
created_at.strftime("%Y/%m/%d"),
f"{now_ts}_{suffix}{ext}",
)
photo_url = os.path.join(
os.environ.get("MICROPUB_MEDIA_URL_PREFIX"),
created_at.strftime("%Y/%m/%d"),
f"{now_ts}_{suffix}{ext}",
)
filename = os.path.join(os.environ.get(
'MICROPUB_MEDIA_PATH'), created_at.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
photo_url = os.path.join(os.environ.get(
'MICROPUB_MEDIA_URL_PREFIX'), created_at.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
# make directory if needed
if not os.path.exists(os.path.dirname(filename)):
@ -198,7 +152,7 @@ def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str =
return None
def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str] = None):
def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str]=None):
now_ts = int(time.mktime(created_at.timetuple()))
@ -210,117 +164,89 @@ def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str] =
else:
slug = str(now_ts)
url = os.path.join(
"/",
ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
created_at.strftime("%Y/%m/%d"),
slug,
)
print(os.environ.get("CONTENT_PREFIX"))
url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
created_at.strftime("%Y/%m/%d"), slug)
file_path = os.path.join(
os.environ.get("CONTENT_PREFIX"),
ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
created_at.strftime("%Y/%m/%d"),
slug + ".md",
)
print(os.environ.get(
'CONTENT_PREFIX'))
file_path = os.path.join(os.environ.get(
'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), created_at.strftime("%Y/%m/%d"), slug + ".md")
frontmatter = {
"post_meta": ["date"],
"url": url,
"type": ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
"date": created_at.isoformat(sep="T"),
"type": post_type,
"date": created_at.isoformat(sep='T'),
}
return frontmatter, file_path
def detect_entry_type(doc: dict) -> str:
"""Given a dictionary object from either form or json, detect type of post"""
if "hypothesis-link" in doc:
entry_type = "annotation"
elif ("in-reply-to" in doc) or ("u-in-reply-to" in doc):
if ('in-reply-to' in doc) or ('u-in-reply-to' in doc):
entry_type = "reply"
elif ("bookmark-of" in doc) or ("u-bookmark-of" in doc):
elif ('bookmark-of' in doc) or ('u-bookmark-of' in doc):
entry_type = "bookmark"
elif ("repost-of" in doc) or ("u-repost-of" in doc):
elif ('repost-of' in doc) or ('u-repost-of' in doc):
entry_type = "repost"
elif ("like-of" in doc) or ("u-like-of" in doc):
elif ('like-of' in doc) or ('u-like-of' in doc):
entry_type = "like"
elif "read-of" in doc:
elif ('read-of' in doc):
entry_type = "read"
elif "watch-of" in doc:
elif ('watch-of' in doc):
entry_type = "watch"
elif ("name" in doc) or ("p-name" in doc):
elif ('name' in doc) or ('p-name' in doc):
entry_type = "post"
else:
entry_type = "note"
return entry_type
def capture_frontmatter_props(doc: Dict[str, Union[str, List[str]]], frontmatter: Dict[str, Union[str,List[str]]]):
def capture_frontmatter_props(
doc: Dict[str, Union[str, List[str]]], frontmatter: Dict[str, Union[str, List[str]]]
):
keys = [
"summary",
"bookmark-of",
"in-reply-to",
"repost-of",
"like-of",
"read-of",
"watch-of",
"listen-of",
"read-status",
"rating",
]
keys += [f"u-{key}" for key in keys]
keys = ['summary', 'bookmark-of', 'in-reply-to', 'repost-of', 'like-of', 'read-of', 'watch-of', 'listen-of', 'read-status', 'rating']
keys += [f'u-{key}' for key in keys]
for key in keys:
if key in doc:
if isinstance(doc[key], dict) and ("type" in doc[key]):
if doc[key]["type"][0] == "h-cite":
if "citations" not in frontmatter:
frontmatter["citations"] = []
frontmatter["citations"].append(doc[key]["properties"])
if isinstance(doc[key], dict) and ('type' in doc[key]):
if doc[key]['type'][0] == 'h-cite':
if 'citations' not in frontmatter:
frontmatter['citations'] = []
frontmatter['citations'].append(doc[key]['properties'])
elif isinstance(doc[key], list) and (len(doc[key]) < 2):
frontmatter[key] = doc[key][0]
else:
frontmatter[key] = doc[key]
if "hypothesis-link" in doc:
# get the hypothesis data and store it
r = requests.get(doc["hypothesis-link"][0])
frontmatter["hypothesis-meta"] = r.json()
if "category" in doc:
if isinstance(doc["category"], list):
categories = doc["category"]
if 'category' in doc:
if isinstance(doc['category'], list):
categories = doc['category']
else:
categories = [doc["category"]]
elif "p-category" in doc:
categories = doc["p-category"]
categories = [doc['category']]
elif 'p-category' in doc:
categories = doc['p-category']
else:
categories = request.form.getlist("category[]")
categories = request.form.getlist('category[]')
if len(categories) > 0:
frontmatter["tags"] = categories
frontmatter['tags'] = categories
def process_multipart_post():
doc = request.form.to_dict(flat=True)
@ -329,132 +255,104 @@ def process_multipart_post():
now = datetime.now()
frontmatter, file_path = init_frontmatter(now, entry_type, doc.get("name"))
frontmatter, file_path = init_frontmatter(now, entry_type, doc.get('name'))
capture_frontmatter_props(doc, frontmatter)
if "name" in doc:
frontmatter["title"] = doc["name"]
if ("photo" in doc) or ("photo" in request.files) or ("photo[]" in request.files):
if 'name' in doc:
frontmatter['title'] = doc['name']
frontmatter["photo"] = []
if "photo[]" in request.files:
photos = request.files.getlist("photo[]")
if ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files):
frontmatter['photo'] = []
if 'photo[]' in request.files:
photos = request.files.getlist('photo[]')
docstr = ""
for i, photo in enumerate(photos):
photo_url = process_photo_upload(now, photo, suffix=i)
if "thumbnail" not in frontmatter:
frontmatter["thumbnail"] = photo_url
frontmatter['photo'].append(photo_url)
frontmatter["photo"].append(photo_url)
#docstr += f'\n\n<img src="{photo_url}" class="u-photo" />'
docstr += f"\n\n<img src=\"{photo_url}\" class=\"u-photo\" />"
docstr += f"\n\n {doc['content']}"
else:
if "photo" in doc:
photo_objects = process_photo_url(now, doc)
if 'photo' in doc:
photo_urls = process_photo_url(now, doc)
else:
photo_objects = [
(process_photo_upload(now, request.files["photo"]), "")
]
frontmatter["photo"] = [
{"value": photo[0], "alt": photo[1]} for photo in photo_objects
]
frontmatter["thumbnail"] = photo_objects[0][0]
photo_urls = [process_photo_upload(now, request.files['photo'])]
frontmatter['photo'] = photo_urls
docstr = ""
#for photo in photo_objects:
# docstr += f"<img src=\"{photo[0]}\" alt=\"{photo[1]}\" class=\"u-photo\" /> \n\n {doc['content']}"
for photo in photo_urls:
docstr += f"<img src=\"{photo}\" class=\"u-photo\" /> \n\n {doc['content']}"
else:
docstr = doc.get("content", "") if "content" in doc else ""
docstr = doc.get('content','') if 'content' in doc else ""
if "mp-syndicate-to" in doc:
frontmatter["mp-syndicate-to"] = doc["mp-syndicate-to"].split(",")
if 'mp-syndicate-to' in doc:
frontmatter['mp-syndicate-to'] = doc['mp-syndicate-to'].split(",")
for url in doc["mp-syndicate-to"].split(","):
docstr += f'\n<a href="{url}"></a>'
for url in doc['mp-syndicate-to'].split(","):
docstr += f"\n<a href=\"{url}\"></a>"
if 'mp-syndicate-to[]' in request.form:
frontmatter['mp-syndicate-to'] = request.form.getlist('mp-syndicate-to[]')
if "mp-syndicate-to[]" in request.form:
frontmatter["mp-syndicate-to"] = request.form.getlist("mp-syndicate-to[]")
for url in request.form.getlist("mp-syndicate-to[]"):
docstr += f'\n<a href="{url}"></a>'
for url in request.form.getlist('mp-syndicate-to[]'):
docstr += f"\n<a href=\"{url}\"></a>"
return docstr, frontmatter, file_path
def process_image_alt_texts(doc):
alts = []
if isinstance(doc["photo"], str):
doc["photo"] = [doc["photo"]]
for i, photo in enumerate(doc["photo"]):
if isinstance(photo, dict):
alts.append(doc["alt"])
else:
alts.append("")
return alts
def process_json_post():
"""Process JSON POST submission"""
body = request.get_json()
# get post type - take the first item in the array
if body["type"][0] != "h-entry":
return jsonify({"error": "invalid_format"}), 400
if body['type'][0] != 'h-entry':
return jsonify({"error":"invalid_format"}), 400
props = body["properties"]
props = body['properties']
entry_type = detect_entry_type(props)
if "published" in props:
if 'published' in props:
from dateutil import parser
now = parser.parse(props["published"][0])
now = parser.parse(props['published'][0])
else:
now = datetime.now()
frontmatter, file_path = init_frontmatter(now, entry_type, props.get("name"))
frontmatter, file_path = init_frontmatter(now, entry_type, props.get('name'))
capture_frontmatter_props(props, frontmatter)
if "name" in props:
frontmatter["title"] = props["name"][0]
if 'name' in props:
frontmatter['title'] = props['name'][0]
docstr = ""
if "photo" in props:
if 'photo' in props:
photo_objects = process_photo_url(now, props)
photo_urls = process_photo_url(now, props)
frontmatter["photo"] = [
{"value": photo[0], "alt": photo[1]} for photo in photo_objects
]
frontmatter["thumbnail"] = frontmatter["photo"][0]["value"]
docstr = ""
#for photo in photo_objects:
# docstr += f'<img src="{photo[0]}" alt="{photo[1]}" class="u-photo" /> \n\n'
frontmatter['photo'] = photo_urls
for content in props.get("content", []):
for photo in photo_urls:
docstr += f"\n\n<img src=\"{photo}\" class=\"u-photo\" />"
for content in props.get('content', []):
if isinstance(content, dict):
if "html" in content:
if 'html' in content:
docstr += f"\n\n {content.get('html')}"
else:
@ -462,20 +360,19 @@ def process_json_post():
return docstr, frontmatter, file_path
def get_api_client() -> forgejo.RepositoryApi:
def get_api_client() -> giteapy.RepositoryApi:
global _api_client
if _api_client is None:
config = forgejo.Configuration()
config.host = os.environ.get("GITEA_URL")
config.api_key["Token"] = os.environ.get("GITEA_API_KEY")
_api_client = forgejo.RepositoryApi(forgejo.ApiClient(config))
config = giteapy.Configuration()
config.host = os.environ.get('GITEA_URL')
config.api_key['access_token'] = os.environ.get('GITEA_API_KEY')
_api_client = giteapy.RepositoryApi(giteapy.ApiClient(config))
return _api_client
@core_bp.route("/", methods=["POST"])
@core_bp.route('/', methods=['POST'])
@authed_endpoint
def req():
@ -485,39 +382,34 @@ def req():
docstr, frontmatter, file_path = process_multipart_post()
frontmatter_str = yaml.dump(frontmatter)
content = base64.encodebytes(
f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")
).decode("utf8")
content = base64.encodestring(
f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")).decode("utf8")
api = get_api_client()
body = forgejo.CreateFileOptions(content=content)
body = giteapy.CreateFileOptions(content=content)
try:
r = api.repo_create_file(
os.environ.get("GITEA_REPO_OWNER"),
os.environ.get("GITEA_REPO_NAME"),
file_path,
body,
)
r = api.repo_create_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), file_path, body)
return Response(status=202, headers={"Location": frontmatter["url"]})
return Response(status=202, headers={"Location": frontmatter['url']})
except Exception as e:
logger.error(e, exc_info=True)
return {"error": str(e)}, 500
def parse_categories():
strategy = os.environ.get("MICROPUB_CATEGORY_LIST_STRATEGY")
strategy = os.environ.get('MICROPUB_CATEGORY_LIST_STRATEGY')
if strategy == "feed":
tree = ElementTree.parse(os.environ.get("MICROPUB_CATEGORY_LIST_FILE"))
tags = tree.findall(".//item/title")
if strategy == 'feed':
tree = ElementTree.parse(os.environ.get('MICROPUB_CATEGORY_LIST_FILE'))
tags = tree.findall('.//item/title')
return {"categories": [tag.text for tag in tags]}
return {"categories": [tag.text for tag in tags] }
def get_syndication_targets():
@ -532,9 +424,9 @@ def get_syndication_targets():
continue
target_def = {
"uid": os.environ.get(f"SYNDICATION_TARGET_{target}_URL", target),
"uid": os.environ.get(f"SYNDICATION_TARGET_{target}_URL", target),
"name": os.environ.get(f"SYNDICATION_TARGET_{target}_NAME", target),
}
}
defs.append(target_def)
return defs
@ -545,7 +437,7 @@ def get_syndication_targets():
def media_endpoint():
now = datetime.now()
url = process_photo_upload(now, request.files["file"])
url = process_photo_upload(now, request.files['file'])
return Response(status=201, headers={"Location": url})
@ -553,33 +445,50 @@ def media_endpoint():
def generate_config_json():
return {
"media-endpoint": os.environ.get(f"MICROCOSM_BASE_URL", request.base_url)
+ "media",
"media-endpoint": request.base_url + "media",
"syndicate-to": get_syndication_targets(),
"post-types": [
{"type": "note", "name": "Note"},
{"type": "article", "name": "Blog Post"},
{"type": "photo", "name": "Photo"},
{"type": "reply", "name": "Reply"},
{"type": "bookmark", "name": "Bookmark"},
{"type": "like", "name": "Like"},
],
{
"type": "note",
"name": "Note"
},
{
"type": "article",
"name": "Blog Post"
},
{
"type": "photo",
"name": "Photo"
},
{
"type": "reply",
"name": "Reply"
},
{
"type": "bookmark",
"name": "Bookmark"
},
{
"type": "like",
"name":"Like"
}
]
}
@core_bp.route("/", methods=["GET"])
@core_bp.route("/", methods=['GET'])
@authed_endpoint
def index():
if request.args.get("q") == "config":
if request.args.get('q') == 'config':
return generate_config_json()
elif request.args.get("q") == "category":
elif request.args.get('q') == 'category':
return parse_categories()
elif request.args.get("q") == "syndicate-to":
elif request.args.get('q') == 'syndicate-to':
return {"syndicate-to": get_syndication_targets()}
if __name__ == "__main__":
if __name__ == '__main__':
app.run(debug=False)

View File

@ -37,13 +37,13 @@ def authform():
@auth_bp.route('/authenticate')
def authenticate():
return micropub.authenticate(
request.args.get('me'), next_url=url_for('token.indieauth_callback'))
request.args.get('me'), next_url=url_for('index'))
@auth_bp.route('/authorize')
def authorize():
return micropub.authorize(
request.args.get('me'), next_url=url_for('token.indieauth_callback'),
request.args.get('me'), next_url=url_for('index'),
scope=request.args.get('scope'))
@ -65,7 +65,6 @@ def indieauth_callback(resp):
""".format(resp.me, resp.next_url, resp.error)
@auth_bp.route('/micropub-callback')
@micropub.authorized_handler
def micropub_callback(resp):

View File

View File

@ -1,4 +0,0 @@
from . import create_app
app = create_app()