update to use forgejo api instead of giteapy

This commit is contained in:
James Ravenscroft 2024-09-08 15:56:49 +01:00
parent 77926ec92b
commit b483ebf869
4 changed files with 359 additions and 205 deletions

132
poetry.lock generated
View File

@ -1,5 +1,17 @@
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. # This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
[[package]]
name = "aenum"
version = "3.1.15"
description = "Advanced Enumerations (compatible with Python's stdlib Enum), NamedTuples, and NamedConstants"
optional = false
python-versions = "*"
files = [
{file = "aenum-3.1.15-py2-none-any.whl", hash = "sha256:27b1710b9d084de6e2e695dab78fe9f269de924b51ae2850170ee7e1ca6288a5"},
{file = "aenum-3.1.15-py3-none-any.whl", hash = "sha256:e0dfaeea4c2bd362144b87377e2c61d91958c5ed0b4daf89cb6f45ae23af6288"},
{file = "aenum-3.1.15.tar.gz", hash = "sha256:8cbd76cd18c4f870ff39b24284d3ea028fbe8731a58df3aa581e434c575b9559"},
]
[[package]] [[package]]
name = "atomicwrites" name = "atomicwrites"
version = "1.4.0" version = "1.4.0"
@ -195,6 +207,23 @@ files = [
[package.dependencies] [package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""} colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "clientapi-forgejo"
version = "1.0.0"
description = "Forgejo API."
optional = false
python-versions = "*"
files = [
{file = "clientapi_forgejo-1.0.0-py3-none-any.whl", hash = "sha256:67c5551838857eac9f697785c4affe462e085f2d4d9a7fb851d770d31ed2d94f"},
{file = "clientapi_forgejo-1.0.0.tar.gz", hash = "sha256:94b339372650e0398088023c98c3516f12cbb600bb453dd4142b52d8711a7dc7"},
]
[package.dependencies]
aenum = "*"
pydantic = ">=1.10.5,<2"
python-dateutil = "*"
urllib3 = ">=1.25.3,<2.1.0"
[[package]] [[package]]
name = "colorama" name = "colorama"
version = "0.4.4" version = "0.4.4"
@ -455,6 +484,24 @@ SecretStorage = {version = ">=3.2", markers = "sys_platform == \"linux\""}
docs = ["jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx"] docs = ["jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx"]
testing = ["flake8 (<5)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)"] testing = ["flake8 (<5)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)"]
[[package]]
name = "loguru"
version = "0.7.2"
description = "Python logging made (stupidly) simple"
optional = false
python-versions = ">=3.5"
files = [
{file = "loguru-0.7.2-py3-none-any.whl", hash = "sha256:003d71e3d3ed35f0f8984898359d65b79e5b21943f78af86aa5491210429b8eb"},
{file = "loguru-0.7.2.tar.gz", hash = "sha256:e671a53522515f34fd406340ee968cb9ecafbc4b36c679da03c18fd8d0bd51ac"},
]
[package.dependencies]
colorama = {version = ">=0.3.4", markers = "sys_platform == \"win32\""}
win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""}
[package.extras]
dev = ["Sphinx (==7.2.5)", "colorama (==0.4.5)", "colorama (==0.4.6)", "exceptiongroup (==1.1.3)", "freezegun (==1.1.0)", "freezegun (==1.2.2)", "mypy (==v0.910)", "mypy (==v0.971)", "mypy (==v1.4.1)", "mypy (==v1.5.1)", "pre-commit (==3.4.0)", "pytest (==6.1.2)", "pytest (==7.4.0)", "pytest-cov (==2.12.1)", "pytest-cov (==4.1.0)", "pytest-mypy-plugins (==1.9.3)", "pytest-mypy-plugins (==3.0.0)", "sphinx-autobuild (==2021.3.14)", "sphinx-rtd-theme (==1.3.0)", "tox (==3.27.1)", "tox (==4.11.0)"]
[[package]] [[package]]
name = "markupsafe" name = "markupsafe"
version = "2.0.1" version = "2.0.1"
@ -689,6 +736,65 @@ files = [
{file = "pycparser-2.21.tar.gz", hash = "sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"}, {file = "pycparser-2.21.tar.gz", hash = "sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"},
] ]
[[package]]
name = "pydantic"
version = "1.10.18"
description = "Data validation and settings management using python type hints"
optional = false
python-versions = ">=3.7"
files = [
{file = "pydantic-1.10.18-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e405ffcc1254d76bb0e760db101ee8916b620893e6edfbfee563b3c6f7a67c02"},
{file = "pydantic-1.10.18-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e306e280ebebc65040034bff1a0a81fd86b2f4f05daac0131f29541cafd80b80"},
{file = "pydantic-1.10.18-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:11d9d9b87b50338b1b7de4ebf34fd29fdb0d219dc07ade29effc74d3d2609c62"},
{file = "pydantic-1.10.18-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b661ce52c7b5e5f600c0c3c5839e71918346af2ef20062705ae76b5c16914cab"},
{file = "pydantic-1.10.18-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:c20f682defc9ef81cd7eaa485879ab29a86a0ba58acf669a78ed868e72bb89e0"},
{file = "pydantic-1.10.18-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:c5ae6b7c8483b1e0bf59e5f1843e4fd8fd405e11df7de217ee65b98eb5462861"},
{file = "pydantic-1.10.18-cp310-cp310-win_amd64.whl", hash = "sha256:74fe19dda960b193b0eb82c1f4d2c8e5e26918d9cda858cbf3f41dd28549cb70"},
{file = "pydantic-1.10.18-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:72fa46abace0a7743cc697dbb830a41ee84c9db8456e8d77a46d79b537efd7ec"},
{file = "pydantic-1.10.18-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ef0fe7ad7cbdb5f372463d42e6ed4ca9c443a52ce544472d8842a0576d830da5"},
{file = "pydantic-1.10.18-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a00e63104346145389b8e8f500bc6a241e729feaf0559b88b8aa513dd2065481"},
{file = "pydantic-1.10.18-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ae6fa2008e1443c46b7b3a5eb03800121868d5ab6bc7cda20b5df3e133cde8b3"},
{file = "pydantic-1.10.18-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:9f463abafdc92635da4b38807f5b9972276be7c8c5121989768549fceb8d2588"},
{file = "pydantic-1.10.18-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:3445426da503c7e40baccefb2b2989a0c5ce6b163679dd75f55493b460f05a8f"},
{file = "pydantic-1.10.18-cp311-cp311-win_amd64.whl", hash = "sha256:467a14ee2183bc9c902579bb2f04c3d3dac00eff52e252850509a562255b2a33"},
{file = "pydantic-1.10.18-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:efbc8a7f9cb5fe26122acba1852d8dcd1e125e723727c59dcd244da7bdaa54f2"},
{file = "pydantic-1.10.18-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:24a4a159d0f7a8e26bf6463b0d3d60871d6a52eac5bb6a07a7df85c806f4c048"},
{file = "pydantic-1.10.18-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b74be007703547dc52e3c37344d130a7bfacca7df112a9e5ceeb840a9ce195c7"},
{file = "pydantic-1.10.18-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fcb20d4cb355195c75000a49bb4a31d75e4295200df620f454bbc6bdf60ca890"},
{file = "pydantic-1.10.18-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:46f379b8cb8a3585e3f61bf9ae7d606c70d133943f339d38b76e041ec234953f"},
{file = "pydantic-1.10.18-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:cbfbca662ed3729204090c4d09ee4beeecc1a7ecba5a159a94b5a4eb24e3759a"},
{file = "pydantic-1.10.18-cp312-cp312-win_amd64.whl", hash = "sha256:c6d0a9f9eccaf7f438671a64acf654ef0d045466e63f9f68a579e2383b63f357"},
{file = "pydantic-1.10.18-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:3d5492dbf953d7d849751917e3b2433fb26010d977aa7a0765c37425a4026ff1"},
{file = "pydantic-1.10.18-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fe734914977eed33033b70bfc097e1baaffb589517863955430bf2e0846ac30f"},
{file = "pydantic-1.10.18-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:15fdbe568beaca9aacfccd5ceadfb5f1a235087a127e8af5e48df9d8a45ae85c"},
{file = "pydantic-1.10.18-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:c3e742f62198c9eb9201781fbebe64533a3bbf6a76a91b8d438d62b813079dbc"},
{file = "pydantic-1.10.18-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:19a3bd00b9dafc2cd7250d94d5b578edf7a0bd7daf102617153ff9a8fa37871c"},
{file = "pydantic-1.10.18-cp37-cp37m-win_amd64.whl", hash = "sha256:2ce3fcf75b2bae99aa31bd4968de0474ebe8c8258a0110903478bd83dfee4e3b"},
{file = "pydantic-1.10.18-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:335a32d72c51a313b33fa3a9b0fe283503272ef6467910338e123f90925f0f03"},
{file = "pydantic-1.10.18-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:34a3613c7edb8c6fa578e58e9abe3c0f5e7430e0fc34a65a415a1683b9c32d9a"},
{file = "pydantic-1.10.18-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e9ee4e6ca1d9616797fa2e9c0bfb8815912c7d67aca96f77428e316741082a1b"},
{file = "pydantic-1.10.18-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:23e8ec1ce4e57b4f441fc91e3c12adba023fedd06868445a5b5f1d48f0ab3682"},
{file = "pydantic-1.10.18-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:44ae8a3e35a54d2e8fa88ed65e1b08967a9ef8c320819a969bfa09ce5528fafe"},
{file = "pydantic-1.10.18-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:d5389eb3b48a72da28c6e061a247ab224381435256eb541e175798483368fdd3"},
{file = "pydantic-1.10.18-cp38-cp38-win_amd64.whl", hash = "sha256:069b9c9fc645474d5ea3653788b544a9e0ccd3dca3ad8c900c4c6eac844b4620"},
{file = "pydantic-1.10.18-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:80b982d42515632eb51f60fa1d217dfe0729f008e81a82d1544cc392e0a50ddf"},
{file = "pydantic-1.10.18-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:aad8771ec8dbf9139b01b56f66386537c6fe4e76c8f7a47c10261b69ad25c2c9"},
{file = "pydantic-1.10.18-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:941a2eb0a1509bd7f31e355912eb33b698eb0051730b2eaf9e70e2e1589cae1d"},
{file = "pydantic-1.10.18-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:65f7361a09b07915a98efd17fdec23103307a54db2000bb92095457ca758d485"},
{file = "pydantic-1.10.18-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:6951f3f47cb5ca4da536ab161ac0163cab31417d20c54c6de5ddcab8bc813c3f"},
{file = "pydantic-1.10.18-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:7a4c5eec138a9b52c67f664c7d51d4c7234c5ad65dd8aacd919fb47445a62c86"},
{file = "pydantic-1.10.18-cp39-cp39-win_amd64.whl", hash = "sha256:49e26c51ca854286bffc22b69787a8d4063a62bf7d83dc21d44d2ff426108518"},
{file = "pydantic-1.10.18-py3-none-any.whl", hash = "sha256:06a189b81ffc52746ec9c8c007f16e5167c8b0a696e1a726369327e3db7b2a82"},
{file = "pydantic-1.10.18.tar.gz", hash = "sha256:baebdff1907d1d96a139c25136a9bb7d17e118f133a76a2ef3b845e831e3403a"},
]
[package.dependencies]
typing-extensions = ">=4.2.0"
[package.extras]
dotenv = ["python-dotenv (>=0.10.4)"]
email = ["email-validator (>=1.0.3)"]
[[package]] [[package]]
name = "pygments" name = "pygments"
version = "2.13.0" version = "2.13.0"
@ -1052,13 +1158,13 @@ urllib3 = ">=1.26.0"
[[package]] [[package]]
name = "typing-extensions" name = "typing-extensions"
version = "4.0.1" version = "4.12.2"
description = "Backported and Experimental Type Hints for Python 3.6+" description = "Backported and Experimental Type Hints for Python 3.8+"
optional = false optional = false
python-versions = ">=3.6" python-versions = ">=3.8"
files = [ files = [
{file = "typing_extensions-4.0.1-py3-none-any.whl", hash = "sha256:7f001e5ac290a0c0401508864c7ec868be4e701886d5b573a9528ed3973d9d3b"}, {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"},
{file = "typing_extensions-4.0.1.tar.gz", hash = "sha256:4ca091dea149f945ec56afb48dae714f21e8692ef22a395223bcd328961b6a0e"}, {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"},
] ]
[[package]] [[package]]
@ -1102,6 +1208,20 @@ files = [
[package.extras] [package.extras]
watchdog = ["watchdog"] watchdog = ["watchdog"]
[[package]]
name = "win32-setctime"
version = "1.1.0"
description = "A small Python utility to set file creation time on Windows"
optional = false
python-versions = ">=3.5"
files = [
{file = "win32_setctime-1.1.0-py3-none-any.whl", hash = "sha256:231db239e959c2fe7eb1d7dc129f11172354f98361c4fa2d6d2d7e278baa8aad"},
{file = "win32_setctime-1.1.0.tar.gz", hash = "sha256:15cf5750465118d6929ae4de4eb46e8edae9a5634350c01ba582df868e932cb2"},
]
[package.extras]
dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"]
[[package]] [[package]]
name = "zipp" name = "zipp"
version = "3.7.0" version = "3.7.0"
@ -1120,4 +1240,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = ">=3.8,<4.0.0" python-versions = ">=3.8,<4.0.0"
content-hash = "3c54043af6bd9117fe0a28662c707018331a2ce2d91e10cf29137a80256e4de0" content-hash = "49a16ba341bf565decdbda3c7f050c9007eceb08d43535ac0f54c0ea4c6bb447"

View File

@ -23,6 +23,8 @@ python-slugify = "^5.0.2"
PyYAML = "^6.0" PyYAML = "^6.0"
Flask-Micropub = "^0.2.8" Flask-Micropub = "^0.2.8"
pillow = "^10.0.0" pillow = "^10.0.0"
clientapi-forgejo = "^1.0.0"
loguru = "^0.7.2"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
pytest = "^6.2.5" pytest = "^6.2.5"

View File

@ -3,12 +3,16 @@ import requests
import os import os
import functools import functools
import dotenv import dotenv
import giteapy
import giteapy.rest
import time import time
import base64 import base64
import mimetypes import mimetypes
import sys
from loguru import logger
import logging
import clientapi_forgejo as forgejo
from werkzeug.datastructures import FileStorage from werkzeug.datastructures import FileStorage
import yaml import yaml
@ -24,39 +28,47 @@ from flask import Flask, jsonify, request, Response, Blueprint
dotenv.load_dotenv() dotenv.load_dotenv()
PERMITTED_DOMAIN = os.environ.get( PERMITTED_DOMAIN = os.environ.get(
'PERMITTED_DOMAINS', 'https://brainsteam.co.uk/').split(';') "PERMITTED_DOMAINS", "https://brainsteam.co.uk/"
).split(";")
ENTITY_TYPE_PLURAL_MAP = {"reply": "replies", "watch": "watches"}
ENTITY_TYPE_PLURAL_MAP = {
"reply": "replies",
"watch": "watches"
}
core_bp = Blueprint("core", __name__) core_bp = Blueprint("core", __name__)
class InvalidRequestException(Exception): class InvalidRequestException(Exception):
"""Class of exception raised when the server receives an invalid request""" """Class of exception raised when the server receives an invalid request"""
# create a custom handler
class InterceptHandler(logging.Handler):
def emit(self, record):
logger_opt = logger.opt(depth=6, exception=record.exc_info)
logger_opt.log(record.levelno, record.getMessage())
def create_app(): def create_app():
app = Flask(__name__) app = Flask(__name__)
app.config['SECRET_KEY'] = 'my super secret key' app.config["SECRET_KEY"] = "my super secret key"
#app.config.from_file(os.path.join(os.getcwd(), "config.yaml"), yaml.safe_load) # app.config.from_file(os.path.join(os.getcwd(), "config.yaml"), yaml.safe_load)
from .indieauth import micropub, auth_bp from .indieauth import micropub, auth_bp
from .webmentions import webhook_bp from .webmentions import webhook_bp
print(app.config) print(app.config)
micropub.init_app(app, os.environ.get('INDIEAUTH_CLIENT_ID', 'test.com')) micropub.init_app(app, os.environ.get("INDIEAUTH_CLIENT_ID", "test.com"))
app.register_blueprint(auth_bp) app.register_blueprint(auth_bp)
app.register_blueprint(core_bp) app.register_blueprint(core_bp)
app.register_blueprint(webhook_bp) app.register_blueprint(webhook_bp)
logger.add(sys.stderr, level=logging.WARN, backtrace=True, diagnose=True)
# logger.start()
app.logger.addHandler(InterceptHandler())
return app return app
@ -65,19 +77,24 @@ def authed_endpoint(f):
@functools.wraps(f) @functools.wraps(f)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
authtok = request.headers.get('Authorization') authtok = request.headers.get("Authorization")
if authtok is None: if authtok is None:
return { return {
"error": "unauthorized", "error": "unauthorized",
"error_description": "An auth token was not provided" "error_description": "An auth token was not provided",
}, 401 }, 401
auth = requests.get("https://tokens.indieauth.com/token", headers={ auth = requests.get(
"Authorization": authtok, "Accept": "application/json"}).json() "https://tokens.indieauth.com/token",
headers={"Authorization": authtok, "Accept": "application/json"},
).json()
if auth.get('me','') not in PERMITTED_DOMAIN: if auth.get("me", "") not in PERMITTED_DOMAIN:
return {"error": "insufficient_scope", "error_description": f"User \"{auth.get('me','')}\" not permitted to post here"}, 401 return {
"error": "insufficient_scope",
"error_description": f"User \"{auth.get('me','')}\" not permitted to post here",
}, 401
return f(*args, *kwargs) return f(*args, *kwargs)
@ -86,63 +103,69 @@ def authed_endpoint(f):
_api_client = None _api_client = None
class InvalidRequestException(Exception): class InvalidRequestException(Exception):
"""Invalid Request""" """Invalid Request"""
def process_photo_url(created_at: datetime, doc: Dict[str, List[str]], suffix: str = ""): def process_photo_url(
created_at: datetime, doc: Dict[str, List[str]], suffix: str = ""
):
"""Process photo submitted via URL""" """Process photo submitted via URL"""
now_ts = int(time.mktime(created_at.timetuple())) now_ts = int(time.mktime(created_at.timetuple()))
photo_urls = [] photo_urls = []
if isinstance(doc['photo'], str): if isinstance(doc["photo"], str):
doc['photo'] = [doc['photo']] doc["photo"] = [doc["photo"]]
for i, photo in enumerate(doc["photo"]):
for i, photo in enumerate(doc['photo']): if isinstance(photo, str):
photo = {"url": photo, "alt": ""}
if(isinstance(photo, str)): if os.environ.get("MICROPUB_IMAGE_STRATEGY") == "copy":
photo = {"value": photo, "alt": ""}
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
# download the photo # download the photo
r = requests.get(photo['value']) r = requests.get(photo["url"])
ext = os.path.splitext(photo['value'])[1] ext = os.path.splitext(photo["url"])[1]
# generate local filename # generate local filename
filename = os.path.join(os.environ.get( filename = os.path.join(
'MICROPUB_MEDIA_PATH'), created_at.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}") os.environ.get("MICROPUB_MEDIA_PATH"),
created_at.strftime("%Y/%m/%d"),
str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}",
)
photo_url = os.path.join(os.environ.get( photo_url = os.path.join(
'MICROPUB_MEDIA_URL_PREFIX'), created_at.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}") os.environ.get("MICROPUB_MEDIA_URL_PREFIX"),
created_at.strftime("%Y/%m/%d"),
str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}",
)
photo_urls.append((photo_url, photo['alt'])) photo_urls.append((photo_url, photo["alt"]))
# make directory if needed # make directory if needed
if not os.path.exists(os.path.dirname(filename)): if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename)) os.makedirs(os.path.dirname(filename))
with open(filename, 'wb') as f: with open(filename, "wb") as f:
f.write(r.content) f.write(r.content)
else: else:
photo_urls.append((photo['value'], photo['alt'])) photo_urls.append((photo["value"], photo["alt"]))
return photo_urls return photo_urls
def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str=""):
def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str = ""):
"""Process photo directly uploaded to micropub""" """Process photo directly uploaded to micropub"""
now_ts = int(time.mktime(created_at.timetuple())) now_ts = int(time.mktime(created_at.timetuple()))
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy': if os.environ.get("MICROPUB_IMAGE_STRATEGY") == "copy":
file.mimetype file.mimetype
@ -152,10 +175,16 @@ def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str=""
ext = mimetypes.guess_extension(file.mimetype) ext = mimetypes.guess_extension(file.mimetype)
# generate local filename # generate local filename
filename = os.path.join(os.environ.get( filename = os.path.join(
'MICROPUB_MEDIA_PATH'), created_at.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}") os.environ.get("MICROPUB_MEDIA_PATH"),
photo_url = os.path.join(os.environ.get( created_at.strftime("%Y/%m/%d"),
'MICROPUB_MEDIA_URL_PREFIX'), created_at.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}") f"{now_ts}_{suffix}{ext}",
)
photo_url = os.path.join(
os.environ.get("MICROPUB_MEDIA_URL_PREFIX"),
created_at.strftime("%Y/%m/%d"),
f"{now_ts}_{suffix}{ext}",
)
# make directory if needed # make directory if needed
if not os.path.exists(os.path.dirname(filename)): if not os.path.exists(os.path.dirname(filename)):
@ -169,9 +198,7 @@ def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str=""
return None return None
def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str] = None):
def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str]=None):
now_ts = int(time.mktime(created_at.timetuple())) now_ts = int(time.mktime(created_at.timetuple()))
@ -183,100 +210,117 @@ def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str]=N
else: else:
slug = str(now_ts) slug = str(now_ts)
url = os.path.join(
"/",
ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
created_at.strftime("%Y/%m/%d"),
slug,
)
url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), print(os.environ.get("CONTENT_PREFIX"))
created_at.strftime("%Y/%m/%d"), slug)
print(os.environ.get( file_path = os.path.join(
'CONTENT_PREFIX')) os.environ.get("CONTENT_PREFIX"),
ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
file_path = os.path.join(os.environ.get( created_at.strftime("%Y/%m/%d"),
'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), created_at.strftime("%Y/%m/%d"), slug + ".md") slug + ".md",
)
frontmatter = { frontmatter = {
"post_meta": ['date'], "post_meta": ["date"],
"url": url, "url": url,
"type": ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), "type": ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
"date": created_at.isoformat(sep='T'), "date": created_at.isoformat(sep="T"),
} }
return frontmatter, file_path return frontmatter, file_path
def detect_entry_type(doc: dict) -> str: def detect_entry_type(doc: dict) -> str:
"""Given a dictionary object from either form or json, detect type of post""" """Given a dictionary object from either form or json, detect type of post"""
if "hypothesis-link" in doc:
if 'hypothesis-link' in doc:
entry_type = "annotation" entry_type = "annotation"
elif ('in-reply-to' in doc) or ('u-in-reply-to' in doc): elif ("in-reply-to" in doc) or ("u-in-reply-to" in doc):
entry_type = "reply" entry_type = "reply"
elif ('bookmark-of' in doc) or ('u-bookmark-of' in doc): elif ("bookmark-of" in doc) or ("u-bookmark-of" in doc):
entry_type = "bookmark" entry_type = "bookmark"
elif ('repost-of' in doc) or ('u-repost-of' in doc): elif ("repost-of" in doc) or ("u-repost-of" in doc):
entry_type = "repost" entry_type = "repost"
elif ('like-of' in doc) or ('u-like-of' in doc): elif ("like-of" in doc) or ("u-like-of" in doc):
entry_type = "like" entry_type = "like"
elif ('read-of' in doc): elif "read-of" in doc:
entry_type = "read" entry_type = "read"
elif ('watch-of' in doc): elif "watch-of" in doc:
entry_type = "watch" entry_type = "watch"
elif ('name' in doc) or ('p-name' in doc): elif ("name" in doc) or ("p-name" in doc):
entry_type = "post" entry_type = "post"
else: else:
entry_type = "note" entry_type = "note"
return entry_type return entry_type
def capture_frontmatter_props(doc: Dict[str, Union[str, List[str]]], frontmatter: Dict[str, Union[str,List[str]]]):
def capture_frontmatter_props(
doc: Dict[str, Union[str, List[str]]], frontmatter: Dict[str, Union[str, List[str]]]
):
keys = ['summary', 'bookmark-of', 'in-reply-to', 'repost-of', 'like-of', 'read-of', 'watch-of', 'listen-of', 'read-status', 'rating'] keys = [
"summary",
"bookmark-of",
"in-reply-to",
"repost-of",
"like-of",
"read-of",
"watch-of",
"listen-of",
"read-status",
"rating",
]
keys += [f'u-{key}' for key in keys] keys += [f"u-{key}" for key in keys]
for key in keys: for key in keys:
if key in doc: if key in doc:
if isinstance(doc[key], dict) and ('type' in doc[key]): if isinstance(doc[key], dict) and ("type" in doc[key]):
if doc[key]['type'][0] == 'h-cite': if doc[key]["type"][0] == "h-cite":
if 'citations' not in frontmatter: if "citations" not in frontmatter:
frontmatter['citations'] = [] frontmatter["citations"] = []
frontmatter['citations'].append(doc[key]['properties']) frontmatter["citations"].append(doc[key]["properties"])
elif isinstance(doc[key], list) and (len(doc[key]) < 2): elif isinstance(doc[key], list) and (len(doc[key]) < 2):
frontmatter[key] = doc[key][0] frontmatter[key] = doc[key][0]
else: else:
frontmatter[key] = doc[key] frontmatter[key] = doc[key]
if 'hypothesis-link' in doc: if "hypothesis-link" in doc:
# get the hypothesis data and store it # get the hypothesis data and store it
r = requests.get(doc['hypothesis-link'][0]) r = requests.get(doc["hypothesis-link"][0])
frontmatter['hypothesis-meta'] = r.json() frontmatter["hypothesis-meta"] = r.json()
if 'category' in doc: if "category" in doc:
if isinstance(doc['category'], list): if isinstance(doc["category"], list):
categories = doc['category'] categories = doc["category"]
else: else:
categories = [doc['category']] categories = [doc["category"]]
elif 'p-category' in doc: elif "p-category" in doc:
categories = doc['p-category'] categories = doc["p-category"]
else: else:
categories = request.form.getlist('category[]') categories = request.form.getlist("category[]")
if len(categories) > 0: if len(categories) > 0:
frontmatter['tags'] = categories frontmatter["tags"] = categories
def process_multipart_post(): def process_multipart_post():
doc = request.form.to_dict(flat=True) doc = request.form.to_dict(flat=True)
@ -285,66 +329,66 @@ def process_multipart_post():
now = datetime.now() now = datetime.now()
frontmatter, file_path = init_frontmatter(now, entry_type, doc.get('name')) frontmatter, file_path = init_frontmatter(now, entry_type, doc.get("name"))
capture_frontmatter_props(doc, frontmatter) capture_frontmatter_props(doc, frontmatter)
if "name" in doc:
frontmatter["title"] = doc["name"]
if 'name' in doc: if ("photo" in doc) or ("photo" in request.files) or ("photo[]" in request.files):
frontmatter['title'] = doc['name']
frontmatter["photo"] = []
if ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files): if "photo[]" in request.files:
photos = request.files.getlist("photo[]")
frontmatter['photo'] = []
if 'photo[]' in request.files:
photos = request.files.getlist('photo[]')
docstr = "" docstr = ""
for i, photo in enumerate(photos): for i, photo in enumerate(photos):
photo_url = process_photo_upload(now, photo, suffix=i) photo_url = process_photo_upload(now, photo, suffix=i)
if 'thumbnail' not in frontmatter: if "thumbnail" not in frontmatter:
frontmatter['thumbnail'] = photo_url frontmatter["thumbnail"] = photo_url
frontmatter['photo'].append(photo_url) frontmatter["photo"].append(photo_url)
docstr += f"\n\n<img src=\"{photo_url}\" class=\"u-photo\" />" docstr += f'\n\n<img src="{photo_url}" class="u-photo" />'
docstr += f"\n\n {doc['content']}" docstr += f"\n\n {doc['content']}"
else: else:
if 'photo' in doc: if "photo" in doc:
photo_objects = process_photo_url(now, doc) photo_objects = process_photo_url(now, doc)
else: else:
photo_objects = [ (process_photo_upload(now, request.files['photo']), "") ] photo_objects = [
(process_photo_upload(now, request.files["photo"]), "")
frontmatter['photo'] = [ {"value": photo[0], "alt": photo[1]} for photo in photo_objects] ]
frontmatter['thumbnail'] = photo_objects[0][0]
frontmatter["photo"] = [
{"value": photo[0], "alt": photo[1]} for photo in photo_objects
]
frontmatter["thumbnail"] = photo_objects[0][0]
docstr = "" docstr = ""
for photo in photo_objects: for photo in photo_objects:
docstr += f"<img src=\"{photo[0]}\" alt=\"{photo[1]}\" class=\"u-photo\" /> \n\n {doc['content']}" docstr += f"<img src=\"{photo[0]}\" alt=\"{photo[1]}\" class=\"u-photo\" /> \n\n {doc['content']}"
else: else:
docstr = doc.get('content','') if 'content' in doc else "" docstr = doc.get("content", "") if "content" in doc else ""
if 'mp-syndicate-to' in doc: if "mp-syndicate-to" in doc:
frontmatter['mp-syndicate-to'] = doc['mp-syndicate-to'].split(",") frontmatter["mp-syndicate-to"] = doc["mp-syndicate-to"].split(",")
for url in doc['mp-syndicate-to'].split(","): for url in doc["mp-syndicate-to"].split(","):
docstr += f"\n<a href=\"{url}\"></a>" docstr += f'\n<a href="{url}"></a>'
if 'mp-syndicate-to[]' in request.form: if "mp-syndicate-to[]" in request.form:
frontmatter['mp-syndicate-to'] = request.form.getlist('mp-syndicate-to[]') frontmatter["mp-syndicate-to"] = request.form.getlist("mp-syndicate-to[]")
for url in request.form.getlist('mp-syndicate-to[]'): for url in request.form.getlist("mp-syndicate-to[]"):
docstr += f"\n<a href=\"{url}\"></a>" docstr += f'\n<a href="{url}"></a>'
return docstr, frontmatter, file_path return docstr, frontmatter, file_path
@ -353,66 +397,64 @@ def process_image_alt_texts(doc):
alts = [] alts = []
if isinstance(doc['photo'], str): if isinstance(doc["photo"], str):
doc['photo'] = [doc['photo']] doc["photo"] = [doc["photo"]]
for i, photo in enumerate(doc["photo"]):
for i, photo in enumerate(doc['photo']):
if isinstance(photo, dict): if isinstance(photo, dict):
alts.append(doc['alt']) alts.append(doc["alt"])
else: else:
alts.append("") alts.append("")
return alts return alts
def process_json_post(): def process_json_post():
"""Process JSON POST submission""" """Process JSON POST submission"""
body = request.get_json() body = request.get_json()
# get post type - take the first item in the array # get post type - take the first item in the array
if body['type'][0] != 'h-entry': if body["type"][0] != "h-entry":
return jsonify({"error":"invalid_format"}), 400 return jsonify({"error": "invalid_format"}), 400
props = body['properties'] props = body["properties"]
entry_type = detect_entry_type(props) entry_type = detect_entry_type(props)
if 'published' in props: if "published" in props:
from dateutil import parser from dateutil import parser
now = parser.parse(props['published'][0]) now = parser.parse(props["published"][0])
else: else:
now = datetime.now() now = datetime.now()
frontmatter, file_path = init_frontmatter(now, entry_type, props.get('name')) frontmatter, file_path = init_frontmatter(now, entry_type, props.get("name"))
capture_frontmatter_props(props, frontmatter) capture_frontmatter_props(props, frontmatter)
if 'name' in props: if "name" in props:
frontmatter['title'] = props['name'][0] frontmatter["title"] = props["name"][0]
docstr = "" docstr = ""
if 'photo' in props: if "photo" in props:
photo_objects = process_photo_url(now, props) photo_objects = process_photo_url(now, props)
frontmatter["photo"] = [
frontmatter['photo'] = [ {"value": photo[0], "alt": photo[1]} for photo in photo_objects] {"value": photo[0], "alt": photo[1]} for photo in photo_objects
frontmatter['thumbnail'] = frontmatter['photo'][0]['value'] ]
frontmatter["thumbnail"] = frontmatter["photo"][0]["value"]
docstr = "" docstr = ""
for photo in photo_objects: for photo in photo_objects:
docstr += f"<img src=\"{photo[0]}\" alt=\"{photo[1]}\" class=\"u-photo\" /> \n\n" docstr += f'<img src="{photo[0]}" alt="{photo[1]}" class="u-photo" /> \n\n'
for content in props.get("content", []):
for content in props.get('content', []):
if isinstance(content, dict): if isinstance(content, dict):
if 'html' in content: if "html" in content:
docstr += f"\n\n {content.get('html')}" docstr += f"\n\n {content.get('html')}"
else: else:
@ -420,19 +462,20 @@ def process_json_post():
return docstr, frontmatter, file_path return docstr, frontmatter, file_path
def get_api_client() -> giteapy.RepositoryApi:
def get_api_client() -> forgejo.RepositoryApi:
global _api_client global _api_client
if _api_client is None: if _api_client is None:
config = giteapy.Configuration() config = forgejo.Configuration()
config.host = os.environ.get('GITEA_URL') config.host = os.environ.get("GITEA_URL")
config.api_key['access_token'] = os.environ.get('GITEA_API_KEY') config.api_key["Token"] = os.environ.get("GITEA_API_KEY")
_api_client = giteapy.RepositoryApi(giteapy.ApiClient(config)) _api_client = forgejo.RepositoryApi(forgejo.ApiClient(config))
return _api_client return _api_client
@core_bp.route('/', methods=['POST']) @core_bp.route("/", methods=["POST"])
@authed_endpoint @authed_endpoint
def req(): def req():
@ -443,33 +486,38 @@ def req():
frontmatter_str = yaml.dump(frontmatter) frontmatter_str = yaml.dump(frontmatter)
content = base64.encodebytes( content = base64.encodebytes(
f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")).decode("utf8") f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")
).decode("utf8")
api = get_api_client() api = get_api_client()
body = giteapy.CreateFileOptions(content=content) body = forgejo.CreateFileOptions(content=content)
try: try:
r = api.repo_create_file(os.environ.get( r = api.repo_create_file(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), file_path, body) os.environ.get("GITEA_REPO_OWNER"),
os.environ.get("GITEA_REPO_NAME"),
file_path,
body,
)
return Response(status=202, headers={"Location": frontmatter['url']}) return Response(status=202, headers={"Location": frontmatter["url"]})
except Exception as e: except Exception as e:
return {"error": str(e)}, 500
logger.error(e, exc_info=True)
return {"error": str(e)}, 500
def parse_categories(): def parse_categories():
strategy = os.environ.get('MICROPUB_CATEGORY_LIST_STRATEGY') strategy = os.environ.get("MICROPUB_CATEGORY_LIST_STRATEGY")
if strategy == 'feed': if strategy == "feed":
tree = ElementTree.parse(os.environ.get('MICROPUB_CATEGORY_LIST_FILE')) tree = ElementTree.parse(os.environ.get("MICROPUB_CATEGORY_LIST_FILE"))
tags = tree.findall('.//item/title') tags = tree.findall(".//item/title")
return {"categories": [tag.text for tag in tags]}
return {"categories": [tag.text for tag in tags] }
def get_syndication_targets(): def get_syndication_targets():
@ -497,7 +545,7 @@ def get_syndication_targets():
def media_endpoint(): def media_endpoint():
now = datetime.now() now = datetime.now()
url = process_photo_upload(now, request.files['file']) url = process_photo_upload(now, request.files["file"])
return Response(status=201, headers={"Location": url}) return Response(status=201, headers={"Location": url})
@ -505,50 +553,33 @@ def media_endpoint():
def generate_config_json(): def generate_config_json():
return { return {
"media-endpoint": os.environ.get(f"MICROCOSM_BASE_URL", request.base_url) + "media", "media-endpoint": os.environ.get(f"MICROCOSM_BASE_URL", request.base_url)
+ "media",
"syndicate-to": get_syndication_targets(), "syndicate-to": get_syndication_targets(),
"post-types": [ "post-types": [
{ {"type": "note", "name": "Note"},
"type": "note", {"type": "article", "name": "Blog Post"},
"name": "Note" {"type": "photo", "name": "Photo"},
}, {"type": "reply", "name": "Reply"},
{ {"type": "bookmark", "name": "Bookmark"},
"type": "article", {"type": "like", "name": "Like"},
"name": "Blog Post" ],
},
{
"type": "photo",
"name": "Photo"
},
{
"type": "reply",
"name": "Reply"
},
{
"type": "bookmark",
"name": "Bookmark"
},
{
"type": "like",
"name":"Like"
}
]
} }
@core_bp.route("/", methods=['GET']) @core_bp.route("/", methods=["GET"])
@authed_endpoint @authed_endpoint
def index(): def index():
if request.args.get('q') == 'config': if request.args.get("q") == "config":
return generate_config_json() return generate_config_json()
elif request.args.get('q') == 'category': elif request.args.get("q") == "category":
return parse_categories() return parse_categories()
elif request.args.get('q') == 'syndicate-to': elif request.args.get("q") == "syndicate-to":
return {"syndicate-to": get_syndication_targets()} return {"syndicate-to": get_syndication_targets()}
if __name__ == '__main__': if __name__ == "__main__":
app.run(debug=False) app.run(debug=False)

View File

@ -65,6 +65,7 @@ def indieauth_callback(resp):
""".format(resp.me, resp.next_url, resp.error) """.format(resp.me, resp.next_url, resp.error)
@auth_bp.route('/micropub-callback') @auth_bp.route('/micropub-callback')
@micropub.authorized_handler @micropub.authorized_handler
def micropub_callback(resp): def micropub_callback(resp):