diff --git a/src/microcosm/__init__.py b/src/microcosm/__init__.py
index a2d4456..c4a3471 100644
--- a/src/microcosm/__init__.py
+++ b/src/microcosm/__init__.py
@@ -76,7 +76,6 @@ def create_app():
def authed_endpoint(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
-
authtok = request.headers.get("Authorization")
if authtok is None:
@@ -121,10 +120,12 @@ def process_photo_url(
doc["photo"] = [doc["photo"]]
for i, photo in enumerate(doc["photo"]):
-
if isinstance(photo, str):
photo = {"url": photo, "alt": ""}
+ elif "value" in photo:
+ photo["url"] = photo["value"]
+
if os.environ.get("MICROPUB_IMAGE_STRATEGY") == "copy":
# download the photo
@@ -166,7 +167,6 @@ def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str =
now_ts = int(time.mktime(created_at.timetuple()))
if os.environ.get("MICROPUB_IMAGE_STRATEGY") == "copy":
-
file.mimetype
ext = os.path.splitext(file.filename)[1]
@@ -199,7 +199,6 @@ def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str =
def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str] = None):
-
now_ts = int(time.mktime(created_at.timetuple()))
if name:
@@ -269,7 +268,6 @@ def detect_entry_type(doc: dict) -> str:
def capture_frontmatter_props(
doc: Dict[str, Union[str, List[str]]], frontmatter: Dict[str, Union[str, List[str]]]
):
-
keys = [
"summary",
"bookmark-of",
@@ -286,13 +284,9 @@ def capture_frontmatter_props(
keys += [f"u-{key}" for key in keys]
for key in keys:
-
if key in doc:
-
if isinstance(doc[key], dict) and ("type" in doc[key]):
-
if doc[key]["type"][0] == "h-cite":
-
if "citations" not in frontmatter:
frontmatter["citations"] = []
frontmatter["citations"].append(doc[key]["properties"])
@@ -337,7 +331,6 @@ def process_multipart_post():
frontmatter["title"] = doc["name"]
if ("photo" in doc) or ("photo" in request.files) or ("photo[]" in request.files):
-
frontmatter["photo"] = []
if "photo[]" in request.files:
@@ -353,12 +346,11 @@ def process_multipart_post():
frontmatter["photo"].append(photo_url)
- #docstr += f'\n\n'
+ # docstr += f'\n\n'
docstr += f"\n\n {doc['content']}"
else:
-
if "photo" in doc:
photo_objects = process_photo_url(now, doc)
@@ -373,7 +365,7 @@ def process_multipart_post():
frontmatter["thumbnail"] = photo_objects[0][0]
docstr = ""
- #for photo in photo_objects:
+ # for photo in photo_objects:
# docstr += f" \n\n {doc['content']}"
else:
docstr = doc.get("content", "") if "content" in doc else ""
@@ -394,14 +386,12 @@ def process_multipart_post():
def process_image_alt_texts(doc):
-
alts = []
if isinstance(doc["photo"], str):
doc["photo"] = [doc["photo"]]
for i, photo in enumerate(doc["photo"]):
-
if isinstance(photo, dict):
alts.append(doc["alt"])
else:
@@ -423,7 +413,6 @@ def process_json_post():
entry_type = detect_entry_type(props)
if "published" in props:
-
from dateutil import parser
now = parser.parse(props["published"][0])
@@ -440,7 +429,6 @@ def process_json_post():
docstr = ""
if "photo" in props:
-
photo_objects = process_photo_url(now, props)
frontmatter["photo"] = [
@@ -448,11 +436,10 @@ def process_json_post():
]
frontmatter["thumbnail"] = frontmatter["photo"][0]["value"]
docstr = ""
- #for photo in photo_objects:
+ # for photo in photo_objects:
# docstr += f' \n\n'
for content in props.get("content", []):
-
if isinstance(content, dict):
if "html" in content:
docstr += f"\n\n {content.get('html')}"
@@ -478,7 +465,6 @@ def get_api_client() -> forgejo.RepositoryApi:
@core_bp.route("/", methods=["POST"])
@authed_endpoint
def req():
-
if request.get_json():
docstr, frontmatter, file_path = process_json_post()
else:
@@ -504,13 +490,11 @@ def req():
return Response(status=202, headers={"Location": frontmatter["url"]})
except Exception as e:
-
logger.error(e, exc_info=True)
return {"error": str(e)}, 500
def parse_categories():
-
strategy = os.environ.get("MICROPUB_CATEGORY_LIST_STRATEGY")
if strategy == "feed":
@@ -521,12 +505,10 @@ def parse_categories():
def get_syndication_targets():
-
targets = os.environ.get("SYNDICATION_TARGETS", "").split(",")
defs = []
for target in targets:
-
if os.environ.get(f"SYNDICATION_TARGET_{target}_URL") is None:
print(f"No url for SYNDICATION_TARGET_{target}_URL")
continue
@@ -543,7 +525,6 @@ def get_syndication_targets():
@core_bp.route("/media", methods=["POST"])
@authed_endpoint
def media_endpoint():
-
now = datetime.now()
url = process_photo_upload(now, request.files["file"])
@@ -551,7 +532,6 @@ def media_endpoint():
def generate_config_json():
-
return {
"media-endpoint": os.environ.get(f"MICROCOSM_BASE_URL", request.base_url)
+ "media",
@@ -570,7 +550,6 @@ def generate_config_json():
@core_bp.route("/", methods=["GET"])
@authed_endpoint
def index():
-
if request.args.get("q") == "config":
return generate_config_json()
diff --git a/src/microcosm/webmentions.py b/src/microcosm/webmentions.py
index ad4726b..b036233 100644
--- a/src/microcosm/webmentions.py
+++ b/src/microcosm/webmentions.py
@@ -3,6 +3,8 @@ import base64
import giteapy
import os
+import clientapi_forgejo as forgejo
+
from flask import request, Blueprint
from urllib.parse import urlparse
@@ -11,24 +13,21 @@ from datetime import datetime
webhook_bp = Blueprint("webmention", __name__)
-
-
-@webhook_bp.route("/webmentions", methods=['POST'])
+@webhook_bp.route("/webmentions", methods=["POST"])
def webmention_hook():
"""Accept web mention webhook request"""
-
body = request.get_json()
-
+
print(f"Incoming webmention {body}")
# webmention should always have a json body
if body is None:
- return {"error":"invalid_request"}, 400
-
+ return {"error": "invalid_request"}, 400
+
# assert that secret matches
- if body.get('secret') != os.environ.get("WEBMENTION_SECRET", "changeme"):
- return {"error":"invalid_secret"}, 401
+ if body.get("secret") != os.environ.get("WEBMENTION_SECRET", "changeme"):
+ return {"error": "invalid_secret"}, 401
# get existing mentions from gitea
from microcosm import get_api_client
@@ -36,23 +35,25 @@ def webmention_hook():
api = get_api_client()
old_sha = None
-
try:
- mentions_meta = api.repo_get_contents(os.environ.get('GITEA_REPO_OWNER'),
- os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'))
+ mentions_meta = api.repo_get_contents(
+ os.environ.get("GITEA_REPO_OWNER"),
+ os.environ.get("GITEA_REPO_NAME"),
+ os.environ.get("WEBMENTIONS_JSON_FILE"),
+ )
old_sha = mentions_meta.sha
- mentions = json.loads(base64.decodebytes(mentions_meta.content.encode('utf8')))
+ mentions = json.loads(base64.decodebytes(mentions_meta.content.encode("utf8")))
- except giteapy.rest.ApiException as e:
+ except forgejo.rest.ApiException as e:
if e.status == 404:
print("no mentions yet, create new mentions file")
mentions = {}
# parse target url and get path on server
- url_path = urlparse(body.get('target')).path
+ url_path = urlparse(body.get("target")).path
# create mention entry if needed
if url_path not in mentions:
@@ -60,7 +61,7 @@ def webmention_hook():
# if the mention is already processed, do nothing.
for entry in mentions[url_path]:
- if entry['source'] == body['source']:
+ if entry["source"] == body["source"]:
return {"message": "mention already processed, no action taken."}
activity_types = {
@@ -69,54 +70,61 @@ def webmention_hook():
"repost-of": "repost",
"bookmark-of": "bookmark",
"mention-of": "mention",
- "rsvp":"rsvp"
+ "rsvp": "rsvp",
}
# format the mention like the data from webmention.io api
new_mention = {
- "id": body['post']['wm-id'],
- "source": body['source'],
- "target": body['target'],
- "activity":{
- "type": activity_types[body['post']['wm-property']]
+ "id": body["post"]["wm-id"],
+ "source": body["source"],
+ "target": body["target"],
+ "activity": {"type": activity_types[body["post"]["wm-property"]]},
+ "verified_date": body.get("wm-received", datetime.now().isoformat()),
+ "data": {
+ "author": body["post"]["author"],
+ "content": body["post"].get("content", {}).get("html", None),
+ "published": body["post"].get("published"),
},
- "verified_date": body.get('wm-received', datetime.now().isoformat()),
- "data":{
- "author": body['post']['author'],
- "content": body['post'].get('content',{}).get('html', None),
- "published": body['post'].get('published')
- }
}
# append the new mention
mentions[url_path].append(new_mention)
- content = base64.encodestring(json.dumps(mentions, indent=2).encode("utf8")).decode("utf8")
+ content = base64.b64encode(json.dumps(mentions, indent=2).encode("utf8")).decode(
+ "utf8"
+ )
# store to repo
if old_sha:
print(f"Update {os.environ.get('WEBMENTIONS_JSON_FILE')}")
- body = giteapy.UpdateFileOptions(content=content, sha=old_sha)
-
+ body = forgejo.UpdateFileOptions(content=content, sha=old_sha)
try:
- r = api.repo_update_file(os.environ.get(
- 'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'), body)
+ r = api.repo_update_file(
+ os.environ.get("GITEA_REPO_OWNER"),
+ os.environ.get("GITEA_REPO_NAME"),
+ os.environ.get("WEBMENTIONS_JSON_FILE"),
+ body,
+ )
- return {"message":"ok"}
+ return {"message": "ok"}
except Exception as e:
return {"error": str(e)}, 500
-
+
else:
print(f"Create {os.environ.get('WEBMENTIONS_JSON_FILE')}")
body = giteapy.CreateFileOptions(content=content)
try:
- r = api.repo_create_file(os.environ.get(
- 'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'), body)
+ r = api.repo_create_file(
+ os.environ.get("GITEA_REPO_OWNER"),
+ os.environ.get("GITEA_REPO_NAME"),
+ os.environ.get("WEBMENTIONS_JSON_FILE"),
+ body,
+ )
- return {"message":"ok"}
+ return {"message": "ok"}
except Exception as e:
- return {"error": str(e)}, 500
\ No newline at end of file
+ return {"error": str(e)}, 500