micropub-flask-gitea/example.py

570 lines
15 KiB
Python
Raw Permalink Normal View History

2022-01-28 14:30:12 +00:00
from typing import Dict, List, Optional
2021-12-23 21:39:21 +00:00
import requests
import os
import functools
import dotenv
import giteapy
2022-01-28 14:30:12 +00:00
import giteapy.rest
2021-12-23 21:39:21 +00:00
import time
2022-01-28 14:30:12 +00:00
import json
2021-12-23 21:39:21 +00:00
import base64
2021-12-24 11:56:12 +00:00
from werkzeug.datastructures import FileStorage
2021-12-23 21:39:21 +00:00
import yaml
2022-01-28 14:30:12 +00:00
import hashlib
from urllib.parse import urlparse
2021-12-23 21:39:21 +00:00
from slugify import slugify
2022-01-28 14:30:12 +00:00
from datetime import date, datetime
2021-12-24 11:03:28 +00:00
from xml.etree import ElementTree
2021-12-23 21:39:21 +00:00
from flask import Flask, request, url_for, Response
from requests import api
from flask_micropub import MicropubClient
2022-01-28 14:30:12 +00:00
2021-12-23 21:39:21 +00:00
dotenv.load_dotenv()
PERMITTED_DOMAIN = os.environ.get(
'PERMITTED_DOMAINS', 'https://brainsteam.co.uk/').split(';')
app = Flask(__name__)
2015-01-19 17:01:56 +00:00
app.config['SECRET_KEY'] = 'my super secret key'
2021-12-23 21:39:21 +00:00
micropub = MicropubClient(app, client_id='https://brainsteam.co.uk')
ENTITY_TYPE_PLURAL_MAP = {
"post": "posts",
"note": "notes",
"reply": "replies",
"bookmark": "bookmarks"
}
2021-12-23 21:39:21 +00:00
def authed_endpoint(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
authtok = request.headers.get('Authorization')
if authtok is None:
2021-12-24 10:49:31 +00:00
return {
"error": "unauthorized",
"error_description": "An auth token was not provided"
}, 401
2021-12-23 21:39:21 +00:00
auth = requests.get("https://tokens.indieauth.com/token", headers={
"Authorization": authtok, "Accept": "application/json"}).json()
2022-01-28 14:30:12 +00:00
if auth.get('me','') not in PERMITTED_DOMAIN:
return {"error": "insufficient_scope", "error_description": f"User \"{auth.get('me','')}\" not permitted to post here"}, 401
2021-12-23 21:39:21 +00:00
return f(*args, *kwargs)
return wrapper
_api_client = None
2022-01-28 14:30:12 +00:00
def get_api_client() -> giteapy.RepositoryApi:
2021-12-23 21:39:21 +00:00
global _api_client
if _api_client is None:
config = giteapy.Configuration()
config.host = os.environ.get('GITEA_URL')
config.api_key['access_token'] = os.environ.get('GITEA_API_KEY')
_api_client = giteapy.RepositoryApi(giteapy.ApiClient(config))
2021-12-23 21:39:21 +00:00
return _api_client
2021-12-24 11:56:12 +00:00
def process_photo_url(now: datetime, doc: Dict[str, List[str]], suffix: str = ""):
"""Process photo submitted via URL"""
now_ts = int(time.mktime(now.timetuple()))
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
# download the photo
r = requests.get(doc['photo'])
ext = os.path.splitext(doc['photo'])[1]
# generate local filename
filename = os.path.join(os.environ.get(
'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}{ext}")
photo_url = os.path.join(os.environ.get(
'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}{ext}")
# make directory if needed
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
with open(filename, 'wb') as f:
f.write(r.content)
else:
photo_url = doc['photo']
return photo_url
def process_photo_upload(now: datetime, file: FileStorage, suffix: str=""):
"""Process photo directly uploaded to micropub"""
now_ts = int(time.mktime(now.timetuple()))
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
ext = os.path.splitext(file.filename)[1]
# generate local filename
filename = os.path.join(os.environ.get(
'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
photo_url = os.path.join(os.environ.get(
'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
# make directory if needed
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
file.save(filename)
return photo_url
else:
return None
2022-01-28 14:30:12 +00:00
def init_frontmatter(now: datetime, post_type: str, name: Optional[str]=None):
2021-12-23 21:39:21 +00:00
2022-01-28 14:30:12 +00:00
now_ts = int(time.mktime(now.timetuple()))
2021-12-23 21:39:21 +00:00
2022-01-28 14:30:12 +00:00
if name:
slug = slugify(name) + str(now_ts)
else:
slug = str(now_ts)
url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
now.strftime("%Y/%m/%d"), slug)
file_path = os.path.join(os.environ.get(
'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), now.strftime("%Y/%m/%d"), slug + ".md")
frontmatter = {
"url": url,
"type": post_type,
"date": now.isoformat(sep='T'),
}
return frontmatter, file_path
@app.route("/webmentions", methods=['POST'])
def webmention_hook():
"""Accept web mention webhook request"""
2021-12-23 21:39:21 +00:00
2022-01-28 14:30:12 +00:00
body = request.get_json()
print(f"Incoming webmention {body}")
# webmention should always have a json body
if body is None:
return {"error":"invalid_request"}, 400
# assert that secret matches
if body.get('secret') != os.environ.get("WEBMENTION_SECRET", "changeme"):
return {"error":"invalid_secret"}, 401
# get existing mentions from gitea
api = get_api_client()
old_sha = None
try:
mentions_meta = api.repo_get_contents(os.environ.get('GITEA_REPO_OWNER'),
os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'))
old_sha = mentions_meta.sha
mentions = json.loads(base64.decodebytes(mentions_meta.content.encode('utf8')))
except giteapy.rest.ApiException as e:
if e.status == 404:
print("no mentions yet, create new mentions file")
mentions = {}
# parse target url and get path on server
url_path = urlparse(body.get('target')).path
# create mention entry if needed
if url_path not in mentions:
mentions[url_path] = []
# if the mention is already processed, do nothing.
for entry in mentions[url_path]:
if entry['source'] == body['source']:
return {"message": "mention already processed, no action taken."}
activity_types = {
"in-reply-to": "reply",
"like-of": "like",
"repost-of": "repost",
"bookmark-of": "bookmark",
"mention-of": "mention",
"rsvp":"rsvp"
}
# format the mention like the data from webmention.io api
new_mention = {
"id": body['post']['wm-id'],
"source": body['source'],
"target": body['target'],
"activity":{
"type": activity_types[body['post']['wm-property']]
},
"verified_date": body.get('wm-received', datetime.now().isoformat()),
"data":{
"author": body['post']['author'],
"content": body['post'].get('content',{}).get('html', None),
"published": body['post'].get('published')
}
}
# append the new mention
mentions[url_path].append(new_mention)
content = base64.encodestring(json.dumps(mentions, indent=2).encode("utf8")).decode("utf8")
# store to repo
if old_sha:
print(f"Update {os.environ.get('WEBMENTIONS_JSON_FILE')}")
body = giteapy.UpdateFileOptions(content=content, sha=old_sha)
try:
r = api.repo_update_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'), body)
return {"message":"ok"}
except Exception as e:
return {"error": str(e)}, 500
else:
print(f"Create {os.environ.get('WEBMENTIONS_JSON_FILE')}")
body = giteapy.CreateFileOptions(content=content)
try:
r = api.repo_create_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'), body)
return {"message":"ok"}
except Exception as e:
return {"error": str(e)}, 500
def process_multipart_post():
doc = request.form.to_dict(flat=True)
if 'in-reply-to' in doc:
entry_type = "reply"
elif 'bookmark-of' in doc:
entry_type = "bookmark"
elif 'repost-of' in doc:
entry_type = "repost"
elif 'like-of' in doc:
entry_type = "like"
elif 'name' in doc:
2021-12-24 10:49:31 +00:00
entry_type = "post"
2021-12-23 21:39:21 +00:00
else:
2021-12-24 10:49:31 +00:00
entry_type = "note"
2021-12-23 21:39:21 +00:00
now = datetime.now()
2022-01-28 14:30:12 +00:00
frontmatter, file_path = init_frontmatter(now, entry_type, doc.get('name'))
2021-12-23 21:39:21 +00:00
2021-12-23 21:53:21 +00:00
for key in ['bookmark-of', 'in-reply-to', 'repost-of', 'like-of']:
if key in doc:
frontmatter[key] = doc[key]
2021-12-24 10:49:31 +00:00
if 'category' in doc:
categories = [doc['category']]
else:
categories = request.form.getlist('category[]')
if 'name' in doc:
frontmatter['title'] = doc['name']
2021-12-23 21:53:21 +00:00
if len(categories) > 0:
2021-12-24 10:49:31 +00:00
frontmatter['tags'] = categories
2021-12-23 21:53:21 +00:00
2021-12-24 11:56:12 +00:00
if ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files):
2021-12-23 21:53:21 +00:00
2021-12-26 11:09:50 +00:00
frontmatter['photo'] = []
2021-12-24 11:56:12 +00:00
if 'photo[]' in request.files:
photos = request.files.getlist('photo[]')
2021-12-23 21:53:21 +00:00
2021-12-24 11:56:12 +00:00
docstr = ""
2021-12-23 21:53:21 +00:00
2021-12-24 11:56:12 +00:00
for i, photo in enumerate(photos):
photo_url = process_photo_upload(now, photo, suffix=i)
2021-12-24 10:49:31 +00:00
2022-01-28 14:30:12 +00:00
frontmatter['photo'].append(photo_url)
2021-12-26 11:09:50 +00:00
docstr += f"\n\n<img src=\"{photo_url}\" class=\"u-photo\" />"
2021-12-24 10:49:31 +00:00
2021-12-24 11:56:12 +00:00
docstr += f"\n\n {doc['content']}"
2021-12-24 10:49:31 +00:00
else:
2021-12-24 11:03:28 +00:00
2021-12-24 11:56:12 +00:00
if 'photo' in doc:
photo_url = process_photo_url(now, doc)
else:
photo_url = process_photo_upload(now, request.files['photo'])
2021-12-26 11:09:50 +00:00
frontmatter['photo'] = [photo_url]
2021-12-24 11:56:12 +00:00
2021-12-26 11:09:50 +00:00
docstr = f"<img src=\"{photo_url}\" class=\"u-photo\" /> \n\n {doc['content']}"
2021-12-24 10:49:31 +00:00
else:
2022-01-28 14:30:12 +00:00
docstr = doc.get('content','') if 'content' in doc else ""
2021-12-24 10:49:31 +00:00
2021-12-26 11:09:50 +00:00
if 'mp-syndicate-to' in doc:
2022-01-28 14:30:12 +00:00
frontmatter['mp-syndicate-to'] = doc['mp-syndicate-to'].split(",")
2021-12-26 11:09:50 +00:00
for url in doc['mp-syndicate-to'].split(","):
docstr += f"\n<a href=\"{url}\"></a>"
2022-01-28 14:30:12 +00:00
if 'mp-syndicate-to[]' in request.form:
frontmatter['mp-syndicate-to'] = request.form.getlist('mp-syndicate-to[]')
for url in request.form.getlist('mp-syndicate-to[]'):
docstr += f"\n<a href=\"{url}\"></a>"
return docstr, frontmatter, file_path
def process_json_post():
"""Process JSON POST submission"""
@app.route('/', methods=['POST'])
@authed_endpoint
def req():
if request.get_json():
docstr, frontmatter, file_path = process_json_post()
else:
docstr, frontmatter, file_path = process_multipart_post()
2021-12-26 11:09:50 +00:00
frontmatter_str = yaml.dump(frontmatter)
2021-12-24 10:49:31 +00:00
content = base64.encodestring(
f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")).decode("utf8")
2021-12-23 21:39:21 +00:00
api = get_api_client()
body = giteapy.CreateFileOptions(content=content)
try:
2021-12-24 10:49:31 +00:00
r = api.repo_create_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), file_path, body)
2021-12-23 21:39:21 +00:00
2022-01-28 14:30:12 +00:00
return Response(status=202, headers={"Location": frontmatter['url']})
2021-12-23 21:39:21 +00:00
except Exception as e:
2021-12-24 10:49:31 +00:00
return {"error": str(e)}, 500
2021-12-23 21:39:21 +00:00
2021-12-24 11:03:28 +00:00
def parse_categories():
strategy = os.environ.get('MICROPUB_CATEGORY_LIST_STRATEGY')
if strategy == 'feed':
tree = ElementTree.parse(os.environ.get('MICROPUB_CATEGORY_LIST_FILE'))
tags = tree.findall('.//item/title')
return {"categories": [tag.text for tag in tags] }
2021-12-26 11:09:50 +00:00
def get_syndication_targets():
targets = os.environ.get("SYNDICATION_TARGETS", "").split(",")
defs = []
for target in targets:
if os.environ.get(f"SYNDICATION_TARGET_{target}_URL") is None:
print(f"No url for SYNDICATION_TARGET_{target}_URL")
continue
target_def = {
"uid": os.environ.get(f"SYNDICATION_TARGET_{target}_URL", target),
"name": os.environ.get(f"SYNDICATION_TARGET_{target}_NAME", target),
}
defs.append(target_def)
return defs
2022-01-28 14:30:12 +00:00
@app.route("/media", methods=["POST"])
@authed_endpoint
def media_endpoint():
now = datetime.now()
url = process_photo_upload(now, request.files['file'])
return Response(status=201, headers={"Location": url})
2021-12-26 11:09:50 +00:00
2021-12-24 11:03:28 +00:00
def generate_config_json():
2021-12-24 11:57:55 +00:00
2021-12-24 11:03:28 +00:00
return {
2021-12-24 11:57:55 +00:00
"media-endpoint": request.base_url + "media",
2021-12-26 11:09:50 +00:00
"syndicate-to": get_syndication_targets(),
2021-12-24 11:03:28 +00:00
"post-types": [
{
"type": "note",
"name": "Note"
},
{
"type": "article",
"name": "Blog Post"
},
{
"type": "photo",
"name": "Photo"
},
{
"type": "reply",
"name": "Reply"
},
{
"type": "bookmark",
"name": "Bookmark"
2022-01-28 14:30:12 +00:00
},
{
"type":"like",
"name":"Like"
},
{
"type":"repost",
"name":"Re-post"
},
{
"type":"read",
"name":"Read"
2021-12-24 11:03:28 +00:00
}
]
}
2021-12-24 10:49:31 +00:00
@app.route("/", methods=['GET'])
@authed_endpoint
def index():
2021-12-24 10:49:31 +00:00
if request.args.get('q') == 'config':
2021-12-24 11:03:28 +00:00
return generate_config_json()
elif request.args.get('q') == 'category':
return parse_categories()
2022-01-28 14:30:12 +00:00
elif request.args.get('q') == 'syndicate-to':
return {"syndicate-to": get_syndication_targets()}
2021-12-24 10:49:31 +00:00
@app.route('/form', methods=['GET'])
def authform():
return """
<!DOCTYPE html>
<html>
<body>
<form action="/authenticate" method="GET">
<input type="text" name="me" placeholder="your domain.com"/>
<button type="submit">Authenticate</button>
</form>
<form action="/authorize" method="GET">
<input type="text" name="me" placeholder="your domain.com"/>
<select name="scope">
<option>read</option>
<option>post</option>
<option>comment</option>
2021-12-26 11:09:50 +00:00
<option>create draft update delete media read follow mute block create</option>
</select>
<button type="submit">Authorize</button>
</form>
</body>
</html>
"""
@app.route('/authenticate')
def authenticate():
return micropub.authenticate(
request.args.get('me'), next_url=url_for('index'))
@app.route('/authorize')
def authorize():
return micropub.authorize(
request.args.get('me'), next_url=url_for('index'),
scope=request.args.get('scope'))
@app.route('/indieauth-callback')
@micropub.authenticated_handler
def indieauth_callback(resp):
return """
<!DOCTYPE html>
<html>
2015-01-19 17:01:56 +00:00
<body>
Authenticated:
2015-01-19 17:01:56 +00:00
<ul>
<li>me: {}</li>
<li>next: {}</li>
<li>error: {}</li>
</ul>
</body>
</html>
""".format(resp.me, resp.next_url, resp.error)
@app.route('/micropub-callback')
@micropub.authorized_handler
def micropub_callback(resp):
2021-12-23 21:39:21 +00:00
return """
<!DOCTYPE html>
<html>
2015-01-19 17:01:56 +00:00
<body>
Authorized:
<ul>
<li>me: {}</li>
<li>endpoint: {}</li>
<li>token: {}</li>
<li>next: {}</li>
<li>error: {}</li>
</ul>
2015-01-19 17:01:56 +00:00
</body>
</html>
""".format(resp.me, resp.micropub_endpoint, resp.access_token,
resp.next_url, resp.error)
if __name__ == '__main__':
app.run(debug=True)