add python bits

This commit is contained in:
James Ravenscroft 2022-01-29 14:15:43 +00:00
parent 5c56889fb6
commit 8d259cb203
5 changed files with 555 additions and 1 deletions

25
env.example Normal file
View File

@ -0,0 +1,25 @@
PERMITTED_DOMAINS=mydomain.com
SECRET_KEY=SomeTopSecretStringUsedForEncryption
MICROPUB_IMAGE_STRATEGY=copy
MICROPUB_MEDIA_PATH=/some/media/directory
MICROPUB_MEDIA_URL_PREFIX=/media/
GITEA_URL=https://git.mysite.com
GITEA_API_KEY=somesecretapikey
GITEA_CONTENT_PREFIX=content/
GITEA_REPO_OWNER=myuser
GITEA_REPO_NAME=myblog
WEBMENTIONS_JSON_FILE=data/mentions.json
WEBMENTION_SECRET=ASuperSecretPasswordForWebmentionRequests
MICROPUB_CATEGORY_LIST_STRATEGY=feed
MICROPUB_CATEGORY_LIST_FILE=/path/to/index.xml
SYNDICATION_TARGETS=TWITTER,MASTODON
SYNDICATION_TARGET_MASTODON_NAME=Mastodon
SYNDICATION_TARGET_MASTODON_URL=https://brid.gy/publish/mastodon
SYNDICATION_TARGET_TWITTER_NAME=Twitter
SYNDICATION_TARGET_TWITTER_URL=https://brid.gy/publish/twitter

40
poetry.lock generated
View File

@ -270,6 +270,20 @@ toml = "*"
[package.extras]
testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xmlschema"]
[[package]]
name = "pytest-mock"
version = "3.6.1"
description = "Thin-wrapper around the mock package for easier use with pytest"
category = "dev"
optional = false
python-versions = ">=3.6"
[package.dependencies]
pytest = ">=5.0"
[package.extras]
dev = ["pre-commit", "tox", "pytest-asyncio"]
[[package]]
name = "python-dateutil"
version = "2.8.2"
@ -324,6 +338,22 @@ urllib3 = ">=1.21.1,<1.27"
socks = ["PySocks (>=1.5.6,!=1.5.7)", "win-inet-pton"]
use_chardet_on_py3 = ["chardet (>=3.0.2,<5)"]
[[package]]
name = "requests-mock"
version = "1.9.3"
description = "Mock out responses from the requests package"
category = "dev"
optional = false
python-versions = "*"
[package.dependencies]
requests = ">=2.3,<3"
six = "*"
[package.extras]
fixture = ["fixtures"]
test = ["fixtures", "mock", "purl", "pytest", "sphinx", "testrepository (>=0.0.18)", "testtools"]
[[package]]
name = "six"
version = "1.16.0"
@ -412,7 +442,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
[metadata]
lock-version = "1.1"
python-versions = "^3.7.1"
content-hash = "46428d2ce843954ed6b34656cd29d6eac4d59dd57cf09822ffd5a98cf7df69bf"
content-hash = "6691ba3a14c5dd724cacd0ef4b346e473e23359919c874041ff2c9e9b891b716"
[metadata.files]
asgiref = [
@ -571,6 +601,10 @@ pytest = [
{file = "pytest-6.2.5-py3-none-any.whl", hash = "sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134"},
{file = "pytest-6.2.5.tar.gz", hash = "sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89"},
]
pytest-mock = [
{file = "pytest-mock-3.6.1.tar.gz", hash = "sha256:40217a058c52a63f1042f0784f62009e976ba824c418cced42e88d5f40ab0e62"},
{file = "pytest_mock-3.6.1-py3-none-any.whl", hash = "sha256:30c2f2cc9759e76eee674b81ea28c9f0b94f8f0445a1b87762cadf774f0df7e3"},
]
python-dateutil = [
{file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"},
{file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"},
@ -587,6 +621,10 @@ requests = [
{file = "requests-2.27.1-py2.py3-none-any.whl", hash = "sha256:f22fa1e554c9ddfd16e6e41ac79759e17be9e492b3587efa038054674760e72d"},
{file = "requests-2.27.1.tar.gz", hash = "sha256:68d7c56fd5a8999887728ef304a6d12edc7be74f1cfa47714fc8b414525c9a61"},
]
requests-mock = [
{file = "requests-mock-1.9.3.tar.gz", hash = "sha256:8d72abe54546c1fc9696fa1516672f1031d72a55a1d66c85184f972a24ba0eba"},
{file = "requests_mock-1.9.3-py2.py3-none-any.whl", hash = "sha256:0a2d38a117c08bb78939ec163522976ad59a6b7fdd82b709e23bb98004a44970"},
]
six = [
{file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},

View File

@ -5,6 +5,11 @@ description = "A tiny python-based micropub endpoint that supports a Gitea + dro
authors = ["James Ravenscroft <ravenscroftj@gmail.com>"]
license = "AGPL-3.0"
packages = [
{ include = "microcosm", from="src" },
]
[tool.poetry.dependencies]
python = "^3.7.1"
Flask = "^2.0.2"
@ -17,6 +22,8 @@ uvicorn = "^0.16.0"
[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
autopep8 = "^1.6.0"
pytest-mock = "^3.6.1"
requests-mock = "^1.9.3"
[build-system]
requires = ["poetry-core>=1.0.0"]

480
src/microcosm/__init__.py Normal file
View File

@ -0,0 +1,480 @@
from typing import Dict, List, Optional
import requests
import os
import functools
import dotenv
import giteapy
import giteapy.rest
import time
import json
import base64
from werkzeug.datastructures import FileStorage
import yaml
import hashlib
from urllib.parse import urlparse
from slugify import slugify
from datetime import date, datetime
from xml.etree import ElementTree
from flask import Flask, request, url_for, Response
from requests import api
dotenv.load_dotenv()
PERMITTED_DOMAIN = os.environ.get(
'PERMITTED_DOMAINS', 'https://brainsteam.co.uk/').split(';')
app = Flask(__name__)
app.config['SECRET_KEY'] = 'my super secret key'
ENTITY_TYPE_PLURAL_MAP = {
"post": "posts",
"note": "notes",
"reply": "replies",
"bookmark": "bookmarks"
}
def authed_endpoint(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
authtok = request.headers.get('Authorization')
if authtok is None:
return {
"error": "unauthorized",
"error_description": "An auth token was not provided"
}, 401
auth = requests.get("https://tokens.indieauth.com/token", headers={
"Authorization": authtok, "Accept": "application/json"}).json()
if auth.get('me','') not in PERMITTED_DOMAIN:
return {"error": "insufficient_scope", "error_description": f"User \"{auth.get('me','')}\" not permitted to post here"}, 401
return f(*args, *kwargs)
return wrapper
_api_client = None
def get_api_client() -> giteapy.RepositoryApi:
global _api_client
if _api_client is None:
config = giteapy.Configuration()
config.host = os.environ.get('GITEA_URL')
config.api_key['access_token'] = os.environ.get('GITEA_API_KEY')
_api_client = giteapy.RepositoryApi(giteapy.ApiClient(config))
return _api_client
def process_photo_url(now: datetime, doc: Dict[str, List[str]], suffix: str = ""):
"""Process photo submitted via URL"""
now_ts = int(time.mktime(now.timetuple()))
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
# download the photo
r = requests.get(doc['photo'])
ext = os.path.splitext(doc['photo'])[1]
# generate local filename
filename = os.path.join(os.environ.get(
'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}{ext}")
photo_url = os.path.join(os.environ.get(
'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}{ext}")
# make directory if needed
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
with open(filename, 'wb') as f:
f.write(r.content)
else:
photo_url = doc['photo']
return photo_url
def process_photo_upload(now: datetime, file: FileStorage, suffix: str=""):
"""Process photo directly uploaded to micropub"""
now_ts = int(time.mktime(now.timetuple()))
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
ext = os.path.splitext(file.filename)[1]
# generate local filename
filename = os.path.join(os.environ.get(
'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
photo_url = os.path.join(os.environ.get(
'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
# make directory if needed
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
file.save(filename)
return photo_url
else:
return None
def init_frontmatter(now: datetime, post_type: str, name: Optional[str]=None):
now_ts = int(time.mktime(now.timetuple()))
if name:
slug = slugify(name) + str(now_ts)
else:
slug = str(now_ts)
url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
now.strftime("%Y/%m/%d"), slug)
file_path = os.path.join(os.environ.get(
'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), now.strftime("%Y/%m/%d"), slug + ".md")
frontmatter = {
"url": url,
"type": post_type,
"date": now.isoformat(sep='T'),
}
return frontmatter, file_path
@app.route("/webmentions", methods=['POST'])
def webmention_hook():
"""Accept web mention webhook request"""
body = request.get_json()
# webmention should always have a json body
if body is None:
return {"error":"invalid_request"}, 400
# assert that secret matches
if body.get('secret') != os.environ.get("WEBMENTION_SECRET", "changeme"):
return {"error":"invalid_secret"}, 401
# get existing mentions from gitea
api = get_api_client()
old_sha = None
try:
mentions_meta = api.repo_get_contents(os.environ.get('GITEA_REPO_OWNER'),
os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'))
old_sha = mentions_meta.sha
mentions = json.loads(base64.decodebytes(mentions_meta.content.encode('utf8')))
except giteapy.rest.ApiException as e:
if e.status == 404:
print("no mentions yet, create new mentions file")
mentions = {}
# parse target url and get path on server
url_path = urlparse(body.get('target')).path
# create mention entry if needed
if url_path not in mentions:
mentions[url_path] = []
# if the mention is already processed, do nothing.
for entry in mentions[url_path]:
if entry['source'] == body['source']:
return {"message": "mention already processed, no action taken."}
activity_types = {
"in-reply-to": "reply",
"like-of": "like",
"repost-of": "repost",
"bookmark-of": "bookmark",
"mention-of": "mention",
"rsvp":"rsvp"
}
# format the mention like the data from webmention.io api
new_mention = {
"id": body['wm-id'],
"source": body['source'],
"target": body['target'],
"activity":{
"type": activity_types[body['post']['wm-property']]
},
"verified_date": body.get('wm-received', datetime.now().isoformat()),
"data":{
"author": body['post']['author'],
"content": body['post'].get('content',{}).get('html', None),
"published": body['post'].get('published')
}
}
# append the new mention
mentions[url_path].append(new_mention)
content = base64.encodestring(json.dumps(mentions, indent=2).encode("utf8")).decode("utf8")
# store to repo
if old_sha:
print(f"Update {os.environ.get('WEBMENTIONS_JSON_FILE')}")
body = giteapy.UpdateFileOptions(content=content, sha=old_sha)
try:
r = api.repo_update_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'), body)
return {"message":"ok"}
except Exception as e:
return {"error": str(e)}, 500
else:
print(f"Create {os.environ.get('WEBMENTIONS_JSON_FILE')}")
body = giteapy.CreateFileOptions(content=content)
try:
r = api.repo_create_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), os.environ.get('WEBMENTIONS_JSON_FILE'), body)
return {"message":"ok"}
except Exception as e:
return {"error": str(e)}, 500
def process_multipart_post():
doc = request.form.to_dict(flat=True)
if 'in-reply-to' in doc:
entry_type = "reply"
elif 'bookmark-of' in doc:
entry_type = "bookmark"
elif 'repost-of' in doc:
entry_type = "repost"
elif 'like-of' in doc:
entry_type = "like"
elif 'name' in doc:
entry_type = "post"
else:
entry_type = "note"
now = datetime.now()
frontmatter, file_path = init_frontmatter(now, entry_type, doc.get('name'))
for key in ['bookmark-of', 'in-reply-to', 'repost-of', 'like-of']:
if key in doc:
frontmatter[key] = doc[key]
if 'category' in doc:
categories = [doc['category']]
else:
categories = request.form.getlist('category[]')
if 'name' in doc:
frontmatter['title'] = doc['name']
if len(categories) > 0:
frontmatter['tags'] = categories
if ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files):
frontmatter['photo'] = []
if 'photo[]' in request.files:
photos = request.files.getlist('photo[]')
docstr = ""
for i, photo in enumerate(photos):
photo_url = process_photo_upload(now, photo, suffix=i)
frontmatter['photo'].append(photo_url)
docstr += f"\n\n<img src=\"{photo_url}\" class=\"u-photo\" />"
docstr += f"\n\n {doc['content']}"
else:
if 'photo' in doc:
photo_url = process_photo_url(now, doc)
else:
photo_url = process_photo_upload(now, request.files['photo'])
frontmatter['photo'] = [photo_url]
docstr = f"<img src=\"{photo_url}\" class=\"u-photo\" /> \n\n {doc['content']}"
else:
docstr = doc.get('content','') if 'content' in doc else ""
if 'mp-syndicate-to' in doc:
frontmatter['mp-syndicate-to'] = doc['mp-syndicate-to'].split(",")
for url in doc['mp-syndicate-to'].split(","):
docstr += f"\n<a href=\"{url}\"></a>"
if 'mp-syndicate-to[]' in request.form:
frontmatter['mp-syndicate-to'] = request.form.getlist('mp-syndicate-to[]')
for url in request.form.getlist('mp-syndicate-to[]'):
docstr += f"\n<a href=\"{url}\"></a>"
return docstr, frontmatter, file_path
def process_json_post():
"""Process JSON POST submission"""
@app.route('/', methods=['POST'])
@authed_endpoint
def req():
if request.get_json():
docstr, frontmatter, file_path = process_json_post()
else:
docstr, frontmatter, file_path = process_multipart_post()
frontmatter_str = yaml.dump(frontmatter)
content = base64.encodestring(
f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")).decode("utf8")
api = get_api_client()
body = giteapy.CreateFileOptions(content=content)
try:
r = api.repo_create_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), file_path, body)
return Response(status=202, headers={"Location": frontmatter['url']})
except Exception as e:
return {"error": str(e)}, 500
def parse_categories():
strategy = os.environ.get('MICROPUB_CATEGORY_LIST_STRATEGY')
if strategy == 'feed':
tree = ElementTree.parse(os.environ.get('MICROPUB_CATEGORY_LIST_FILE'))
tags = tree.findall('.//item/title')
return {"categories": [tag.text for tag in tags] }
def get_syndication_targets():
targets = os.environ.get("SYNDICATION_TARGETS", "").split(",")
defs = []
for target in targets:
if os.environ.get(f"SYNDICATION_TARGET_{target}_URL") is None:
print(f"No url for SYNDICATION_TARGET_{target}_URL")
continue
target_def = {
"uid": os.environ.get(f"SYNDICATION_TARGET_{target}_URL", target),
"name": os.environ.get(f"SYNDICATION_TARGET_{target}_NAME", target),
}
defs.append(target_def)
return defs
@app.route("/media", methods=["POST"])
@authed_endpoint
def media_endpoint():
now = datetime.now()
url = process_photo_upload(now, request.files['file'])
return Response(status=201, headers={"Location": url})
def generate_config_json():
return {
"media-endpoint": request.base_url + "media",
"syndicate-to": get_syndication_targets(),
"post-types": [
{
"type": "note",
"name": "Note"
},
{
"type": "article",
"name": "Blog Post"
},
{
"type": "photo",
"name": "Photo"
},
{
"type": "reply",
"name": "Reply"
},
{
"type": "bookmark",
"name": "Bookmark"
}
]
}
@app.route("/", methods=['GET'])
@authed_endpoint
def index():
if request.args.get('q') == 'config':
return generate_config_json()
elif request.args.get('q') == 'category':
return parse_categories()
elif request.args.get('q') == 'syndicate-to':
return {"syndicate-to": get_syndication_targets()}
if __name__ == '__main__':
app.run(debug=True)

View File

@ -0,0 +1,4 @@
from . import app
app.run(debug=True)