2022-10-22 11:56:50 +01:00
|
|
|
import json
|
|
|
|
import base64
|
|
|
|
import giteapy
|
|
|
|
import os
|
|
|
|
|
2024-11-16 14:31:21 +00:00
|
|
|
import clientapi_forgejo as forgejo
|
|
|
|
|
2022-10-22 11:56:50 +01:00
|
|
|
from flask import request, Blueprint
|
|
|
|
|
|
|
|
from urllib.parse import urlparse
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
webhook_bp = Blueprint("webmention", __name__)
|
|
|
|
|
|
|
|
|
2024-11-16 14:31:21 +00:00
|
|
|
@webhook_bp.route("/webmentions", methods=["POST"])
|
2022-10-22 11:56:50 +01:00
|
|
|
def webmention_hook():
|
|
|
|
"""Accept web mention webhook request"""
|
|
|
|
|
|
|
|
body = request.get_json()
|
2024-11-16 14:31:21 +00:00
|
|
|
|
2022-10-22 11:56:50 +01:00
|
|
|
print(f"Incoming webmention {body}")
|
|
|
|
|
|
|
|
# webmention should always have a json body
|
|
|
|
if body is None:
|
2024-11-16 14:31:21 +00:00
|
|
|
return {"error": "invalid_request"}, 400
|
|
|
|
|
2022-10-22 11:56:50 +01:00
|
|
|
# assert that secret matches
|
2024-11-16 14:31:21 +00:00
|
|
|
if body.get("secret") != os.environ.get("WEBMENTION_SECRET", "changeme"):
|
|
|
|
return {"error": "invalid_secret"}, 401
|
2022-10-22 11:56:50 +01:00
|
|
|
|
|
|
|
# get existing mentions from gitea
|
|
|
|
from microcosm import get_api_client
|
|
|
|
|
|
|
|
api = get_api_client()
|
|
|
|
|
|
|
|
old_sha = None
|
|
|
|
|
|
|
|
try:
|
2024-11-16 14:31:21 +00:00
|
|
|
mentions_meta = api.repo_get_contents(
|
|
|
|
os.environ.get("GITEA_REPO_OWNER"),
|
|
|
|
os.environ.get("GITEA_REPO_NAME"),
|
|
|
|
os.environ.get("WEBMENTIONS_JSON_FILE"),
|
|
|
|
)
|
2022-10-22 11:56:50 +01:00
|
|
|
|
|
|
|
old_sha = mentions_meta.sha
|
|
|
|
|
2024-11-16 14:31:21 +00:00
|
|
|
mentions = json.loads(base64.decodebytes(mentions_meta.content.encode("utf8")))
|
2022-10-22 11:56:50 +01:00
|
|
|
|
2024-11-16 14:31:21 +00:00
|
|
|
except forgejo.rest.ApiException as e:
|
2022-10-22 11:56:50 +01:00
|
|
|
if e.status == 404:
|
|
|
|
print("no mentions yet, create new mentions file")
|
|
|
|
mentions = {}
|
|
|
|
|
|
|
|
# parse target url and get path on server
|
2024-11-16 14:31:21 +00:00
|
|
|
url_path = urlparse(body.get("target")).path
|
2022-10-22 11:56:50 +01:00
|
|
|
|
|
|
|
# create mention entry if needed
|
|
|
|
if url_path not in mentions:
|
|
|
|
mentions[url_path] = []
|
|
|
|
|
|
|
|
# if the mention is already processed, do nothing.
|
|
|
|
for entry in mentions[url_path]:
|
2024-11-16 14:31:21 +00:00
|
|
|
if entry["source"] == body["source"]:
|
2022-10-22 11:56:50 +01:00
|
|
|
return {"message": "mention already processed, no action taken."}
|
|
|
|
|
|
|
|
activity_types = {
|
|
|
|
"in-reply-to": "reply",
|
|
|
|
"like-of": "like",
|
|
|
|
"repost-of": "repost",
|
|
|
|
"bookmark-of": "bookmark",
|
|
|
|
"mention-of": "mention",
|
2024-11-16 14:31:21 +00:00
|
|
|
"rsvp": "rsvp",
|
2022-10-22 11:56:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
# format the mention like the data from webmention.io api
|
|
|
|
new_mention = {
|
2024-11-16 14:31:21 +00:00
|
|
|
"id": body["post"]["wm-id"],
|
|
|
|
"source": body["source"],
|
|
|
|
"target": body["target"],
|
|
|
|
"activity": {"type": activity_types[body["post"]["wm-property"]]},
|
|
|
|
"verified_date": body.get("wm-received", datetime.now().isoformat()),
|
|
|
|
"data": {
|
|
|
|
"author": body["post"]["author"],
|
|
|
|
"content": body["post"].get("content", {}).get("html", None),
|
|
|
|
"published": body["post"].get("published"),
|
2022-10-22 11:56:50 +01:00
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
# append the new mention
|
|
|
|
mentions[url_path].append(new_mention)
|
|
|
|
|
2024-11-16 14:31:21 +00:00
|
|
|
content = base64.b64encode(json.dumps(mentions, indent=2).encode("utf8")).decode(
|
|
|
|
"utf8"
|
|
|
|
)
|
2022-10-22 11:56:50 +01:00
|
|
|
|
|
|
|
# store to repo
|
|
|
|
if old_sha:
|
|
|
|
print(f"Update {os.environ.get('WEBMENTIONS_JSON_FILE')}")
|
2024-11-16 14:31:21 +00:00
|
|
|
body = forgejo.UpdateFileOptions(content=content, sha=old_sha)
|
2022-10-22 11:56:50 +01:00
|
|
|
|
|
|
|
try:
|
2024-11-16 14:31:21 +00:00
|
|
|
r = api.repo_update_file(
|
|
|
|
os.environ.get("GITEA_REPO_OWNER"),
|
|
|
|
os.environ.get("GITEA_REPO_NAME"),
|
|
|
|
os.environ.get("WEBMENTIONS_JSON_FILE"),
|
|
|
|
body,
|
|
|
|
)
|
2022-10-22 11:56:50 +01:00
|
|
|
|
2024-11-16 14:31:21 +00:00
|
|
|
return {"message": "ok"}
|
2022-10-22 11:56:50 +01:00
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
return {"error": str(e)}, 500
|
2024-11-16 14:31:21 +00:00
|
|
|
|
2022-10-22 11:56:50 +01:00
|
|
|
else:
|
|
|
|
print(f"Create {os.environ.get('WEBMENTIONS_JSON_FILE')}")
|
|
|
|
body = giteapy.CreateFileOptions(content=content)
|
|
|
|
|
|
|
|
try:
|
2024-11-16 14:31:21 +00:00
|
|
|
r = api.repo_create_file(
|
|
|
|
os.environ.get("GITEA_REPO_OWNER"),
|
|
|
|
os.environ.get("GITEA_REPO_NAME"),
|
|
|
|
os.environ.get("WEBMENTIONS_JSON_FILE"),
|
|
|
|
body,
|
|
|
|
)
|
2022-10-22 11:56:50 +01:00
|
|
|
|
2024-11-16 14:31:21 +00:00
|
|
|
return {"message": "ok"}
|
2022-10-22 11:56:50 +01:00
|
|
|
|
|
|
|
except Exception as e:
|
2024-11-16 14:31:21 +00:00
|
|
|
return {"error": str(e)}, 500
|