from typing import Dict, List, Optional, Union import requests import os import functools import dotenv import giteapy import giteapy.rest import time import base64 from werkzeug.datastructures import FileStorage import yaml from urllib.parse import urlparse from slugify import slugify from datetime import date, datetime from xml.etree import ElementTree from flask import Flask, jsonify, request, Response, Blueprint dotenv.load_dotenv() PERMITTED_DOMAIN = os.environ.get( 'PERMITTED_DOMAINS', 'https://brainsteam.co.uk/').split(';') ENTITY_TYPE_PLURAL_MAP = { "reply": "replies", "watch":"watches" } core_bp = Blueprint("core", __name__) def create_app(): app = Flask(__name__) app.config['SECRET_KEY'] = 'my super secret key' #app.config.from_file(os.path.join(os.getcwd(), "config.yaml"), yaml.safe_load) from .indieauth import micropub, auth_bp from .webmentions import webhook_bp print(app.config) micropub.init_app(app, app.config.get('INDIEAUTH_CLIENT_ID', 'test.com')) app.register_blueprint(auth_bp) app.register_blueprint(core_bp) app.register_blueprint(webhook_bp) return app def authed_endpoint(f): @functools.wraps(f) def wrapper(*args, **kwargs): authtok = request.headers.get('Authorization') if authtok is None: return { "error": "unauthorized", "error_description": "An auth token was not provided" }, 401 auth = requests.get("https://tokens.indieauth.com/token", headers={ "Authorization": authtok, "Accept": "application/json"}).json() if auth.get('me','') not in PERMITTED_DOMAIN: return {"error": "insufficient_scope", "error_description": f"User \"{auth.get('me','')}\" not permitted to post here"}, 401 return f(*args, *kwargs) return wrapper _api_client = None def process_photo_url(now: datetime, doc: Dict[str, List[str]], suffix: str = ""): """Process photo submitted via URL""" now_ts = int(time.mktime(now.timetuple())) photo_urls = [] if isinstance(doc['photo'], str): doc['photo'] = [doc['photo']] for i, photo in enumerate(doc['photo']): if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy': # download the photo r = requests.get(photo) ext = os.path.splitext(photo)[1] # generate local filename filename = os.path.join(os.environ.get( 'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}") photo_url = os.path.join(os.environ.get( 'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}") photo_urls.append(photo_url) # make directory if needed if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) with open(filename, 'wb') as f: f.write(r.content) else: photo_urls.append(photo) return photo_urls def process_photo_upload(now: datetime, file: FileStorage, suffix: str=""): """Process photo directly uploaded to micropub""" now_ts = int(time.mktime(now.timetuple())) if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy': ext = os.path.splitext(file.filename)[1] # generate local filename filename = os.path.join(os.environ.get( 'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}") photo_url = os.path.join(os.environ.get( 'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}") # make directory if needed if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) file.save(filename) return photo_url else: return None def init_frontmatter(now: datetime, post_type: str, name: Optional[str]=None): now_ts = int(time.mktime(now.timetuple())) if name: if isinstance(name, list): slug = slugify(name[0]) + str(now_ts) else: slug = slugify(name) + str(now_ts) else: slug = str(now_ts) url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), now.strftime("%Y/%m/%d"), slug) print(os.environ.get( 'CONTENT_PREFIX')) file_path = os.path.join(os.environ.get( 'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), now.strftime("%Y/%m/%d"), slug + ".md") frontmatter = { "url": url, "type": post_type, "date": now.isoformat(sep='T'), } return frontmatter, file_path def detect_entry_type(doc: dict) -> str: """Given a dictionary object from either form or json, detect type of post""" if ('in-reply-to' in doc) or ('u-in-reply-to' in doc): entry_type = "reply" elif ('bookmark-of' in doc) or ('u-bookmark-of' in doc): entry_type = "bookmark" elif ('repost-of' in doc) or ('u-repost-of' in doc): entry_type = "repost" elif ('like-of' in doc) or ('u-like-of' in doc): entry_type = "like" elif ('read-of' in doc): entry_type = "read" elif ('watch-of' in doc): entry_type = "watch" elif ('name' in doc) or ('p-name' in doc): entry_type = "post" else: entry_type = "note" return entry_type def capture_frontmatter_props(doc: Dict[str, Union[str, List[str]]], frontmatter: Dict[str, Union[str,List[str]]]): keys = ['summary', 'bookmark-of', 'in-reply-to', 'repost-of', 'like-of', 'read-of', 'watch-of', 'listen-of', 'read-status', 'rating'] keys += [f'u-{key}' for key in keys] for key in keys: if key in doc: if isinstance(doc[key], dict) and ('type' in doc[key]): if doc[key]['type'][0] == 'h-cite': if 'citations' not in frontmatter: frontmatter['citations'] = [] frontmatter['citations'].append(doc[key]['properties']) elif isinstance(doc[key], list) and (len(doc[key]) < 2): frontmatter[key] = doc[key][0] else: frontmatter[key] = doc[key] if 'category' in doc: if isinstance(doc['category'], list): categories = doc['category'] else: categories = [doc['category']] elif 'p-category' in doc: categories = doc['p-category'] else: categories = request.form.getlist('category[]') if len(categories) > 0: frontmatter['tags'] = categories def process_multipart_post(): doc = request.form.to_dict(flat=True) entry_type = detect_entry_type(doc) now = datetime.now() frontmatter, file_path = init_frontmatter(now, entry_type, doc.get('name')) capture_frontmatter_props(doc, frontmatter) if 'name' in doc: frontmatter['title'] = doc['name'] if ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files): frontmatter['photo'] = [] if 'photo[]' in request.files: photos = request.files.getlist('photo[]') docstr = "" for i, photo in enumerate(photos): photo_url = process_photo_upload(now, photo, suffix=i) frontmatter['photo'].append(photo_url) docstr += f"\n\n" docstr += f"\n\n {doc['content']}" else: if 'photo' in doc: photo_urls = process_photo_url(now, doc) else: photo_urls = [process_photo_upload(now, request.files['photo'])] frontmatter['photo'] = photo_urls docstr = "" for photo in photo_urls: docstr += f" \n\n {doc['content']}" else: docstr = doc.get('content','') if 'content' in doc else "" if 'mp-syndicate-to' in doc: frontmatter['mp-syndicate-to'] = doc['mp-syndicate-to'].split(",") for url in doc['mp-syndicate-to'].split(","): docstr += f"\n" if 'mp-syndicate-to[]' in request.form: frontmatter['mp-syndicate-to'] = request.form.getlist('mp-syndicate-to[]') for url in request.form.getlist('mp-syndicate-to[]'): docstr += f"\n" return docstr, frontmatter, file_path def process_json_post(): """Process JSON POST submission""" body = request.get_json() # get post type - take the first item in the array if body['type'][0] != 'h-entry': return jsonify({"error":"invalid_format"}), 400 props = body['properties'] entry_type = detect_entry_type(props) now = datetime.now() frontmatter, file_path = init_frontmatter(now, entry_type, props.get('name')) capture_frontmatter_props(props, frontmatter) if 'name' in props: frontmatter['title'] = props['name'][0] docstr = "" if 'photo' in props: photo_urls = process_photo_url(now, props) frontmatter['photo'] = photo_urls for photo in photo_urls: docstr += f"\n\n" for content in props.get('content', []): if isinstance(content, dict): if 'html' in content: docstr += f"\n\n {content.get('html')}" else: docstr += f"\n\n {content}" return docstr, frontmatter, file_path def get_api_client() -> giteapy.RepositoryApi: global _api_client if _api_client is None: config = giteapy.Configuration() config.host = os.environ.get('GITEA_URL') config.api_key['access_token'] = os.environ.get('GITEA_API_KEY') _api_client = giteapy.RepositoryApi(giteapy.ApiClient(config)) return _api_client @core_bp.route('/', methods=['POST']) @authed_endpoint def req(): if request.get_json(): docstr, frontmatter, file_path = process_json_post() else: docstr, frontmatter, file_path = process_multipart_post() frontmatter_str = yaml.dump(frontmatter) content = base64.encodestring( f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")).decode("utf8") api = get_api_client() body = giteapy.CreateFileOptions(content=content) try: r = api.repo_create_file(os.environ.get( 'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), file_path, body) return Response(status=202, headers={"Location": frontmatter['url']}) except Exception as e: return {"error": str(e)}, 500 def parse_categories(): strategy = os.environ.get('MICROPUB_CATEGORY_LIST_STRATEGY') if strategy == 'feed': tree = ElementTree.parse(os.environ.get('MICROPUB_CATEGORY_LIST_FILE')) tags = tree.findall('.//item/title') return {"categories": [tag.text for tag in tags] } def get_syndication_targets(): targets = os.environ.get("SYNDICATION_TARGETS", "").split(",") defs = [] for target in targets: if os.environ.get(f"SYNDICATION_TARGET_{target}_URL") is None: print(f"No url for SYNDICATION_TARGET_{target}_URL") continue target_def = { "uid": os.environ.get(f"SYNDICATION_TARGET_{target}_URL", target), "name": os.environ.get(f"SYNDICATION_TARGET_{target}_NAME", target), } defs.append(target_def) return defs @core_bp.route("/media", methods=["POST"]) @authed_endpoint def media_endpoint(): now = datetime.now() url = process_photo_upload(now, request.files['file']) return Response(status=201, headers={"Location": url}) def generate_config_json(): return { "media-endpoint": request.base_url + "media", "syndicate-to": get_syndication_targets(), "post-types": [ { "type": "note", "name": "Note" }, { "type": "article", "name": "Blog Post" }, { "type": "photo", "name": "Photo" }, { "type": "reply", "name": "Reply" }, { "type": "bookmark", "name": "Bookmark" }, { "type": "like", "name":"Like" } ] } @core_bp.route("/", methods=['GET']) @authed_endpoint def index(): if request.args.get('q') == 'config': return generate_config_json() elif request.args.get('q') == 'category': return parse_categories() elif request.args.get('q') == 'syndicate-to': return {"syndicate-to": get_syndication_targets()} if __name__ == '__main__': app.run(debug=False)