diff --git a/example.py b/example.py
index b7ca057..0c21d56 100644
--- a/example.py
+++ b/example.py
@@ -1,22 +1,29 @@
-from typing import Dict, List
+from typing import Dict, List, Optional
import requests
import os
import functools
import dotenv
import giteapy
+import giteapy.rest
import time
+import json
import base64
from werkzeug.datastructures import FileStorage
import yaml
+import hashlib
+
+from urllib.parse import urlparse
from slugify import slugify
-from datetime import datetime
+from datetime import date, datetime
from xml.etree import ElementTree
from flask import Flask, request, url_for, Response
from requests import api
from flask_micropub import MicropubClient
+
+
dotenv.load_dotenv()
PERMITTED_DOMAIN = os.environ.get(
@@ -50,8 +57,8 @@ def authed_endpoint(f):
auth = requests.get("https://tokens.indieauth.com/token", headers={
"Authorization": authtok, "Accept": "application/json"}).json()
- if auth['me'] not in PERMITTED_DOMAIN:
- return {"error": "forbidden", "error_description": f"User {auth['me']} not permitted to post here"}, 403
+ if auth.get('me','') not in PERMITTED_DOMAIN:
+ return {"error": "insufficient_scope", "error_description": f"User \"{auth.get('me','')}\" not permitted to post here"}, 401
return f(*args, *kwargs)
@@ -61,7 +68,7 @@ def authed_endpoint(f):
_api_client = None
-def get_api_client():
+def get_api_client() -> giteapy.RepositoryApi:
global _api_client
if _api_client is None:
@@ -96,7 +103,6 @@ def process_photo_url(now: datetime, doc: Dict[str, List[str]], suffix: str = ""
with open(filename, 'wb') as f:
f.write(r.content)
- # elif os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'gitea':
else:
photo_url = doc['photo']
@@ -130,14 +136,140 @@ def process_photo_upload(now: datetime, file: FileStorage, suffix: str=""):
return None
+def init_frontmatter(now: datetime, post_type: str, name: Optional[str]=None):
-@app.route('/', methods=['POST'])
-@authed_endpoint
-def req():
+ now_ts = int(time.mktime(now.timetuple()))
+ if name:
+ slug = slugify(name) + str(now_ts)
+ else:
+ slug = str(now_ts)
+
+
+ url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
+ now.strftime("%Y/%m/%d"), slug)
+
+
+ file_path = os.path.join(os.environ.get(
+ 'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), now.strftime("%Y/%m/%d"), slug + ".md")
+
+ frontmatter = {
+ "url": url,
+ "type": post_type,
+ "date": now.isoformat(sep='T'),
+ }
+
+ return frontmatter, file_path
+
+
+@app.route("/webmentions", methods=['POST'])
+def webmention_hook():
+ """Accept web mention webhook request"""
+
+
+ body = request.get_json()
+
+ # webmention should always have a json body
+ if body is None:
+ return {"error":"invalid_request"}, 400
+
+ # assert that secret matches
+ if body.get('secret') != os.environ.get("WEBMENTION_SECRET", "changeme"):
+ return {"error":"invalid_secret"}, 401
+
+ # get existing mentions from gitea
+
+ api = get_api_client()
+
+ old_sha = None
+
+
+ try:
+ mentions_meta = api.repo_get_contents(os.environ.get('GITEA_REPO_OWNER'),
+ os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'))
+
+ old_sha = mentions_meta.sha
+
+ mentions = json.loads(base64.decodebytes(mentions_meta.content.encode('utf8')))
+
+ except giteapy.rest.ApiException as e:
+ if e.status == 404:
+ print("no mentions yet, create new mentions file")
+ mentions = {}
+
+ # parse target url and get path on server
+ url_path = urlparse(body.get('target')).path
+
+ # create mention entry if needed
+ if url_path not in mentions:
+ mentions[url_path] = []
+
+ # if the mention is already processed, do nothing.
+ for entry in mentions[url_path]:
+ if entry['source'] == body['source']:
+ return {"message": "mention already processed, no action taken."}
+
+ activity_types = {
+ "in-reply-to": "reply",
+ "like-of": "like",
+ "repost-of": "repost",
+ "bookmark-of": "bookmark",
+ "mention-of": "mention",
+ "rsvp":"rsvp"
+ }
+
+ # format the mention like the data from webmention.io api
+ new_mention = {
+ "source": body['source'],
+ "target": body['target'],
+ "activity":{
+ "type": activity_types[body['post']['wm-property']]
+ },
+ "verified_date": body.get('wm-received', datetime.now().isoformat()),
+ "data":{
+ "author": body['post']['author'],
+ "content": body['post'].get('content',{}).get('html', None),
+ "published": body['post'].get('published')
+ }
+ }
+
+ # append the new mention
+ mentions[url_path].append(new_mention)
+
+ content = base64.encodestring(json.dumps(mentions, indent=2).encode("utf8")).decode("utf8")
+
+ # store to repo
+ if old_sha:
+ print(f"Update {os.environ.get('WEBMENTIONS_JSON_FILE')}")
+ body = giteapy.UpdateFileOptions(content=content, sha=old_sha)
+
+
+ try:
+ r = api.repo_update_file(os.environ.get(
+ 'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'), body)
+
+ return {"message":"ok"}
+
+ except Exception as e:
+ return {"error": str(e)}, 500
+
+ else:
+ print(f"Create {os.environ.get('WEBMENTIONS_JSON_FILE')}")
+ body = giteapy.CreateFileOptions(content=content)
+
+ try:
+ r = api.repo_create_file(os.environ.get(
+ 'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'), body)
+
+ return {"message":"ok"}
+
+ except Exception as e:
+ return {"error": str(e)}, 500
+
+
+def process_multipart_post():
doc = request.form.to_dict(flat=True)
-
if 'in-reply-to' in doc:
entry_type = "reply"
@@ -148,34 +280,16 @@ def req():
elif 'like-of' in doc:
entry_type = "like"
-
- # elif ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files):
- # entry_type = "photo"
-
elif 'name' in doc:
entry_type = "post"
else:
entry_type = "note"
now = datetime.now()
- now_ts = int(time.mktime(now.timetuple()))
- url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(entry_type, entry_type + "s"),
- now.strftime("%Y/%m/%d"), str(now_ts))
+ frontmatter, file_path = init_frontmatter(now, entry_type, doc.get('name'))
- if 'name' in doc:
- slug = slugify(doc['name']) + str(now_ts)
- else:
- slug = str(now_ts)
- file_path = os.path.join(os.environ.get(
- 'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(entry_type, entry_type + "s"), now.strftime("%Y/%m/%d"), slug + ".md")
-
- frontmatter = {
- "url": url,
- "type": entry_type,
- "date": now.isoformat(sep='T'),
- }
for key in ['bookmark-of', 'in-reply-to', 'repost-of', 'like-of']:
if key in doc:
@@ -205,7 +319,7 @@ def req():
for i, photo in enumerate(photos):
photo_url = process_photo_upload(now, photo, suffix=i)
- frontmatter.photo.append(photo_url)
+ frontmatter['photo'].append(photo_url)
docstr += f"\n\n"
@@ -223,18 +337,40 @@ def req():
docstr = f" \n\n {doc['content']}"
else:
- docstr = doc['content']
+ docstr = doc.get('content','') if 'content' in doc else ""
if 'mp-syndicate-to' in doc:
+ frontmatter['mp-syndicate-to'] = doc['mp-syndicate-to'].split(",")
+
for url in doc['mp-syndicate-to'].split(","):
docstr += f"\n"
+
+ if 'mp-syndicate-to[]' in request.form:
+ frontmatter['mp-syndicate-to'] = request.form.getlist('mp-syndicate-to[]')
+
+ for url in request.form.getlist('mp-syndicate-to[]'):
+ docstr += f"\n"
+
+ return docstr, frontmatter, file_path
+
+def process_json_post():
+ """Process JSON POST submission"""
+
+
+
+@app.route('/', methods=['POST'])
+@authed_endpoint
+def req():
+
+ if request.get_json():
+ docstr, frontmatter, file_path = process_json_post()
+ else:
+ docstr, frontmatter, file_path = process_multipart_post()
frontmatter_str = yaml.dump(frontmatter)
content = base64.encodestring(
f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")).decode("utf8")
-
-
api = get_api_client()
body = giteapy.CreateFileOptions(content=content)
@@ -243,7 +379,7 @@ def req():
r = api.repo_create_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), file_path, body)
- return Response(status=202, headers={"Location": url})
+ return Response(status=202, headers={"Location": frontmatter['url']})
except Exception as e:
return {"error": str(e)}, 500
@@ -282,6 +418,15 @@ def get_syndication_targets():
return defs
+@app.route("/media", methods=["POST"])
+@authed_endpoint
+def media_endpoint():
+
+ now = datetime.now()
+ url = process_photo_upload(now, request.files['file'])
+
+ return Response(status=201, headers={"Location": url})
+
def generate_config_json():
@@ -322,6 +467,9 @@ def index():
elif request.args.get('q') == 'category':
return parse_categories()
+
+ elif request.args.get('q') == 'syndicate-to':
+ return {"syndicate-to": get_syndication_targets()}
@app.route('/form', methods=['GET'])