micropub-flask-gitea/example.py

555 lines
15 KiB
Python

from typing import Dict, List, Optional
import requests
import os
import functools
import dotenv
import giteapy
import giteapy.rest
import time
import json
import base64
from werkzeug.datastructures import FileStorage
import yaml
import hashlib
from urllib.parse import urlparse
from slugify import slugify
from datetime import date, datetime
from xml.etree import ElementTree
from flask import Flask, request, url_for, Response
from requests import api
from flask_micropub import MicropubClient
dotenv.load_dotenv()
PERMITTED_DOMAIN = os.environ.get(
'PERMITTED_DOMAINS', 'https://brainsteam.co.uk/').split(';')
app = Flask(__name__)
app.config['SECRET_KEY'] = 'my super secret key'
micropub = MicropubClient(app, client_id='https://brainsteam.co.uk')
ENTITY_TYPE_PLURAL_MAP = {
"post": "posts",
"note": "notes",
"reply": "replies",
"bookmark": "bookmarks"
}
def authed_endpoint(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
authtok = request.headers.get('Authorization')
if authtok is None:
return {
"error": "unauthorized",
"error_description": "An auth token was not provided"
}, 401
auth = requests.get("https://tokens.indieauth.com/token", headers={
"Authorization": authtok, "Accept": "application/json"}).json()
if auth.get('me','') not in PERMITTED_DOMAIN:
return {"error": "insufficient_scope", "error_description": f"User \"{auth.get('me','')}\" not permitted to post here"}, 401
return f(*args, *kwargs)
return wrapper
_api_client = None
def get_api_client() -> giteapy.RepositoryApi:
global _api_client
if _api_client is None:
config = giteapy.Configuration()
config.host = os.environ.get('GITEA_URL')
config.api_key['access_token'] = os.environ.get('GITEA_API_KEY')
_api_client = giteapy.RepositoryApi(giteapy.ApiClient(config))
return _api_client
def process_photo_url(now: datetime, doc: Dict[str, List[str]], suffix: str = ""):
"""Process photo submitted via URL"""
now_ts = int(time.mktime(now.timetuple()))
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
# download the photo
r = requests.get(doc['photo'])
ext = os.path.splitext(doc['photo'])[1]
# generate local filename
filename = os.path.join(os.environ.get(
'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}{ext}")
photo_url = os.path.join(os.environ.get(
'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}{ext}")
# make directory if needed
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
with open(filename, 'wb') as f:
f.write(r.content)
else:
photo_url = doc['photo']
return photo_url
def process_photo_upload(now: datetime, file: FileStorage, suffix: str=""):
"""Process photo directly uploaded to micropub"""
now_ts = int(time.mktime(now.timetuple()))
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
ext = os.path.splitext(file.filename)[1]
# generate local filename
filename = os.path.join(os.environ.get(
'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
photo_url = os.path.join(os.environ.get(
'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
# make directory if needed
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
file.save(filename)
return photo_url
else:
return None
def init_frontmatter(now: datetime, post_type: str, name: Optional[str]=None):
now_ts = int(time.mktime(now.timetuple()))
if name:
slug = slugify(name) + str(now_ts)
else:
slug = str(now_ts)
url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
now.strftime("%Y/%m/%d"), slug)
file_path = os.path.join(os.environ.get(
'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), now.strftime("%Y/%m/%d"), slug + ".md")
frontmatter = {
"url": url,
"type": post_type,
"date": now.isoformat(sep='T'),
}
return frontmatter, file_path
@app.route("/webmentions", methods=['POST'])
def webmention_hook():
"""Accept web mention webhook request"""
body = request.get_json()
# webmention should always have a json body
if body is None:
return {"error":"invalid_request"}, 400
# assert that secret matches
if body.get('secret') != os.environ.get("WEBMENTION_SECRET", "changeme"):
return {"error":"invalid_secret"}, 401
# get existing mentions from gitea
api = get_api_client()
old_sha = None
try:
mentions_meta = api.repo_get_contents(os.environ.get('GITEA_REPO_OWNER'),
os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'))
old_sha = mentions_meta.sha
mentions = json.loads(base64.decodebytes(mentions_meta.content.encode('utf8')))
except giteapy.rest.ApiException as e:
if e.status == 404:
print("no mentions yet, create new mentions file")
mentions = {}
# parse target url and get path on server
url_path = urlparse(body.get('target')).path
# create mention entry if needed
if url_path not in mentions:
mentions[url_path] = []
# if the mention is already processed, do nothing.
for entry in mentions[url_path]:
if entry['source'] == body['source']:
return {"message": "mention already processed, no action taken."}
activity_types = {
"in-reply-to": "reply",
"like-of": "like",
"repost-of": "repost",
"bookmark-of": "bookmark",
"mention-of": "mention",
"rsvp":"rsvp"
}
# format the mention like the data from webmention.io api
new_mention = {
"source": body['source'],
"target": body['target'],
"activity":{
"type": activity_types[body['post']['wm-property']]
},
"verified_date": body.get('wm-received', datetime.now().isoformat()),
"data":{
"author": body['post']['author'],
"content": body['post'].get('content',{}).get('html', None),
"published": body['post'].get('published')
}
}
# append the new mention
mentions[url_path].append(new_mention)
content = base64.encodestring(json.dumps(mentions, indent=2).encode("utf8")).decode("utf8")
# store to repo
if old_sha:
print(f"Update {os.environ.get('WEBMENTIONS_JSON_FILE')}")
body = giteapy.UpdateFileOptions(content=content, sha=old_sha)
try:
r = api.repo_update_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'), body)
return {"message":"ok"}
except Exception as e:
return {"error": str(e)}, 500
else:
print(f"Create {os.environ.get('WEBMENTIONS_JSON_FILE')}")
body = giteapy.CreateFileOptions(content=content)
try:
r = api.repo_create_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'), body)
return {"message":"ok"}
except Exception as e:
return {"error": str(e)}, 500
def process_multipart_post():
doc = request.form.to_dict(flat=True)
if 'in-reply-to' in doc:
entry_type = "reply"
elif 'bookmark-of' in doc:
entry_type = "bookmark"
elif 'repost-of' in doc:
entry_type = "repost"
elif 'like-of' in doc:
entry_type = "like"
elif 'name' in doc:
entry_type = "post"
else:
entry_type = "note"
now = datetime.now()
frontmatter, file_path = init_frontmatter(now, entry_type, doc.get('name'))
for key in ['bookmark-of', 'in-reply-to', 'repost-of', 'like-of']:
if key in doc:
frontmatter[key] = doc[key]
if 'category' in doc:
categories = [doc['category']]
else:
categories = request.form.getlist('category[]')
if 'name' in doc:
frontmatter['title'] = doc['name']
if len(categories) > 0:
frontmatter['tags'] = categories
if ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files):
frontmatter['photo'] = []
if 'photo[]' in request.files:
photos = request.files.getlist('photo[]')
docstr = ""
for i, photo in enumerate(photos):
photo_url = process_photo_upload(now, photo, suffix=i)
frontmatter['photo'].append(photo_url)
docstr += f"\n\n<img src=\"{photo_url}\" class=\"u-photo\" />"
docstr += f"\n\n {doc['content']}"
else:
if 'photo' in doc:
photo_url = process_photo_url(now, doc)
else:
photo_url = process_photo_upload(now, request.files['photo'])
frontmatter['photo'] = [photo_url]
docstr = f"<img src=\"{photo_url}\" class=\"u-photo\" /> \n\n {doc['content']}"
else:
docstr = doc.get('content','') if 'content' in doc else ""
if 'mp-syndicate-to' in doc:
frontmatter['mp-syndicate-to'] = doc['mp-syndicate-to'].split(",")
for url in doc['mp-syndicate-to'].split(","):
docstr += f"\n<a href=\"{url}\"></a>"
if 'mp-syndicate-to[]' in request.form:
frontmatter['mp-syndicate-to'] = request.form.getlist('mp-syndicate-to[]')
for url in request.form.getlist('mp-syndicate-to[]'):
docstr += f"\n<a href=\"{url}\"></a>"
return docstr, frontmatter, file_path
def process_json_post():
"""Process JSON POST submission"""
@app.route('/', methods=['POST'])
@authed_endpoint
def req():
if request.get_json():
docstr, frontmatter, file_path = process_json_post()
else:
docstr, frontmatter, file_path = process_multipart_post()
frontmatter_str = yaml.dump(frontmatter)
content = base64.encodestring(
f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")).decode("utf8")
api = get_api_client()
body = giteapy.CreateFileOptions(content=content)
try:
r = api.repo_create_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), file_path, body)
return Response(status=202, headers={"Location": frontmatter['url']})
except Exception as e:
return {"error": str(e)}, 500
def parse_categories():
strategy = os.environ.get('MICROPUB_CATEGORY_LIST_STRATEGY')
if strategy == 'feed':
tree = ElementTree.parse(os.environ.get('MICROPUB_CATEGORY_LIST_FILE'))
tags = tree.findall('.//item/title')
return {"categories": [tag.text for tag in tags] }
def get_syndication_targets():
targets = os.environ.get("SYNDICATION_TARGETS", "").split(",")
defs = []
for target in targets:
if os.environ.get(f"SYNDICATION_TARGET_{target}_URL") is None:
print(f"No url for SYNDICATION_TARGET_{target}_URL")
continue
target_def = {
"uid": os.environ.get(f"SYNDICATION_TARGET_{target}_URL", target),
"name": os.environ.get(f"SYNDICATION_TARGET_{target}_NAME", target),
}
defs.append(target_def)
return defs
@app.route("/media", methods=["POST"])
@authed_endpoint
def media_endpoint():
now = datetime.now()
url = process_photo_upload(now, request.files['file'])
return Response(status=201, headers={"Location": url})
def generate_config_json():
return {
"media-endpoint": request.base_url + "media",
"syndicate-to": get_syndication_targets(),
"post-types": [
{
"type": "note",
"name": "Note"
},
{
"type": "article",
"name": "Blog Post"
},
{
"type": "photo",
"name": "Photo"
},
{
"type": "reply",
"name": "Reply"
},
{
"type": "bookmark",
"name": "Bookmark"
}
]
}
@app.route("/", methods=['GET'])
@authed_endpoint
def index():
if request.args.get('q') == 'config':
return generate_config_json()
elif request.args.get('q') == 'category':
return parse_categories()
elif request.args.get('q') == 'syndicate-to':
return {"syndicate-to": get_syndication_targets()}
@app.route('/form', methods=['GET'])
def authform():
return """
<!DOCTYPE html>
<html>
<body>
<form action="/authenticate" method="GET">
<input type="text" name="me" placeholder="your domain.com"/>
<button type="submit">Authenticate</button>
</form>
<form action="/authorize" method="GET">
<input type="text" name="me" placeholder="your domain.com"/>
<select name="scope">
<option>read</option>
<option>post</option>
<option>comment</option>
<option>create draft update delete media read follow mute block create</option>
</select>
<button type="submit">Authorize</button>
</form>
</body>
</html>
"""
@app.route('/authenticate')
def authenticate():
return micropub.authenticate(
request.args.get('me'), next_url=url_for('index'))
@app.route('/authorize')
def authorize():
return micropub.authorize(
request.args.get('me'), next_url=url_for('index'),
scope=request.args.get('scope'))
@app.route('/indieauth-callback')
@micropub.authenticated_handler
def indieauth_callback(resp):
return """
<!DOCTYPE html>
<html>
<body>
Authenticated:
<ul>
<li>me: {}</li>
<li>next: {}</li>
<li>error: {}</li>
</ul>
</body>
</html>
""".format(resp.me, resp.next_url, resp.error)
@app.route('/micropub-callback')
@micropub.authorized_handler
def micropub_callback(resp):
return """
<!DOCTYPE html>
<html>
<body>
Authorized:
<ul>
<li>me: {}</li>
<li>endpoint: {}</li>
<li>token: {}</li>
<li>next: {}</li>
<li>error: {}</li>
</ul>
</body>
</html>
""".format(resp.me, resp.micropub_endpoint, resp.access_token,
resp.next_url, resp.error)
if __name__ == '__main__':
app.run(debug=True)