from typing import Dict, List import requests import os import functools import dotenv import giteapy import time import base64 from werkzeug.datastructures import FileStorage import yaml from slugify import slugify from datetime import datetime from xml.etree import ElementTree from flask import Flask, request, url_for, Response from requests import api from flask_micropub import MicropubClient dotenv.load_dotenv() PERMITTED_DOMAIN = os.environ.get( 'PERMITTED_DOMAINS', 'https://brainsteam.co.uk/').split(';') app = Flask(__name__) app.config['SECRET_KEY'] = 'my super secret key' micropub = MicropubClient(app, client_id='https://brainsteam.co.uk') ENTITY_TYPE_PLURAL_MAP = { "post": "posts", "note": "notes", "reply": "replies", "bookmark": "bookmarks" } def authed_endpoint(f): @functools.wraps(f) def wrapper(*args, **kwargs): authtok = request.headers.get('Authorization') if authtok is None: return { "error": "unauthorized", "error_description": "An auth token was not provided" }, 401 auth = requests.get("https://tokens.indieauth.com/token", headers={ "Authorization": authtok, "Accept": "application/json"}).json() if auth['me'] not in PERMITTED_DOMAIN: return {"error": "forbidden", "error_description": f"User {auth['me']} not permitted to post here"}, 403 return f(*args, *kwargs) return wrapper _api_client = None def get_api_client(): global _api_client if _api_client is None: config = giteapy.Configuration() config.host = os.environ.get('GITEA_URL') config.api_key['access_token'] = os.environ.get('GITEA_API_KEY') _api_client = giteapy.RepositoryApi(giteapy.ApiClient(config)) return _api_client def process_photo_url(now: datetime, doc: Dict[str, List[str]], suffix: str = ""): """Process photo submitted via URL""" now_ts = int(time.mktime(now.timetuple())) if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy': # download the photo r = requests.get(doc['photo']) ext = os.path.splitext(doc['photo'])[1] # generate local filename filename = os.path.join(os.environ.get( 'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}{ext}") photo_url = os.path.join(os.environ.get( 'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}{ext}") # make directory if needed if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) with open(filename, 'wb') as f: f.write(r.content) # elif os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'gitea': else: photo_url = doc['photo'] return photo_url def process_photo_upload(now: datetime, file: FileStorage, suffix: str=""): """Process photo directly uploaded to micropub""" now_ts = int(time.mktime(now.timetuple())) if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy': ext = os.path.splitext(file.filename)[1] # generate local filename filename = os.path.join(os.environ.get( 'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}") photo_url = os.path.join(os.environ.get( 'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}") # make directory if needed if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) file.save(filename) return photo_url else: return None @app.route('/', methods=['POST']) @authed_endpoint def req(): doc = request.form.to_dict(flat=True) if 'in-reply-to' in doc: entry_type = "reply" elif 'bookmark-of' in doc: entry_type = "bookmark" elif 'repost-of' in doc: entry_type = "repost" elif 'like-of' in doc: entry_type = "like" # elif ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files): # entry_type = "photo" elif 'name' in doc: entry_type = "post" else: entry_type = "note" now = datetime.now() now_ts = int(time.mktime(now.timetuple())) url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(entry_type, entry_type + "s"), now.strftime("%Y/%m/%d"), str(now_ts)) if 'name' in doc: slug = slugify(doc['name']) + str(now_ts) else: slug = str(now_ts) file_path = os.path.join(os.environ.get( 'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(entry_type, entry_type + "s"), now.strftime("%Y/%m/%d"), slug + ".md") frontmatter = { "url": url, "type": entry_type, "date": now.isoformat(sep='T'), } for key in ['bookmark-of', 'in-reply-to', 'repost-of', 'like-of']: if key in doc: frontmatter[key] = doc[key] if 'category' in doc: categories = [doc['category']] else: categories = request.form.getlist('category[]') if 'name' in doc: frontmatter['title'] = doc['name'] if len(categories) > 0: frontmatter['tags'] = categories frontmatter_str = yaml.dump(frontmatter) if ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files): if 'photo[]' in request.files: photos = request.files.getlist('photo[]') docstr = "" for i, photo in enumerate(photos): photo_url = process_photo_upload(now, photo, suffix=i) docstr += f"\n\n![image]({photo_url})" docstr += f"\n\n {doc['content']}" else: if 'photo' in doc: photo_url = process_photo_url(now, doc) else: photo_url = process_photo_upload(now, request.files['photo']) docstr = f"![image]({photo_url}) \n\n {doc['content']}" else: docstr = doc['content'] content = base64.encodestring( f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")).decode("utf8") api = get_api_client() body = giteapy.CreateFileOptions(content=content) try: r = api.repo_create_file(os.environ.get( 'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), file_path, body) return Response(status=202, headers={"Location": url}) except Exception as e: return {"error": str(e)}, 500 def parse_categories(): strategy = os.environ.get('MICROPUB_CATEGORY_LIST_STRATEGY') if strategy == 'feed': tree = ElementTree.parse(os.environ.get('MICROPUB_CATEGORY_LIST_FILE')) tags = tree.findall('.//item/title') return {"categories": [tag.text for tag in tags] } def generate_config_json(): return { "media-endpoint": request.base_url + "media", "syndicate-to": [ { "uid": "mastodon", "name": "Mastodon" } ], "post-types": [ { "type": "note", "name": "Note" }, { "type": "article", "name": "Blog Post" }, { "type": "photo", "name": "Photo" }, { "type": "reply", "name": "Reply" }, { "type": "bookmark", "name": "Bookmark" } ] } @app.route("/", methods=['GET']) @authed_endpoint def index(): if request.args.get('q') == 'config': return generate_config_json() elif request.args.get('q') == 'category': return parse_categories() @app.route('/form', methods=['GET']) def authform(): return """
""" @app.route('/authenticate') def authenticate(): return micropub.authenticate( request.args.get('me'), next_url=url_for('index')) @app.route('/authorize') def authorize(): return micropub.authorize( request.args.get('me'), next_url=url_for('index'), scope=request.args.get('scope')) @app.route('/indieauth-callback') @micropub.authenticated_handler def indieauth_callback(resp): return """ Authenticated: """.format(resp.me, resp.next_url, resp.error) @app.route('/micropub-callback') @micropub.authorized_handler def micropub_callback(resp): return """ Authorized: """.format(resp.me, resp.micropub_endpoint, resp.access_token, resp.next_url, resp.error) if __name__ == '__main__': app.run(debug=True)