Compare commits

...

17 Commits

Author SHA1 Message Date
James Ravenscroft 7e9aa77769 fix some stuff 2024-09-08 15:24:48 +00:00
James Ravenscroft b483ebf869 update to use forgejo api instead of giteapy 2024-09-08 15:56:49 +01:00
James Ravenscroft 77926ec92b guess mimetype if extension not provided on upload 2023-10-28 13:01:33 +01:00
James Ravenscroft 6dcf787c4f allow overriding media endpoint 2023-08-20 18:14:02 +01:00
James Ravenscroft 8cc5482422 add little run helper script 2023-07-09 14:22:20 +00:00
James Ravenscroft 95662f180e add date to default post meta 2023-07-09 15:21:42 +01:00
James Ravenscroft 165fc400d4 update deps for microcosm 2023-07-09 15:11:50 +01:00
James Ravenscroft 0fb2380fd9 update behaviour for thumbnails and post types 2023-07-09 15:11:31 +01:00
James Ravenscroft f583942894 Merge branch 'develop' of ssh://git.jamesravey.me:222/ravenscroftj/pymicrocosm into develop 2023-06-17 13:31:30 +01:00
James Ravenscroft 0882d5ce24 implement alt text for photos 2023-06-17 13:30:36 +01:00
James Ravenscroft db34ff3d37 add annotation type
continuous-integration/drone/push Build is failing Details
2022-11-26 06:45:29 +00:00
ravenscroftj 01a80a6fc2 Merge pull request 'implemented hypothesis metadata storage' (#10) from james/feature/hypothesis into develop
continuous-integration/drone/push Build is failing Details
Reviewed-on: #10
2022-11-20 09:52:54 +00:00
ravenscroftj 117a070203 Merge branch 'develop' into james/feature/hypothesis
continuous-integration/drone/push Build is failing Details
continuous-integration/drone/pr Build is failing Details
2022-11-20 08:27:48 +00:00
James Ravenscroft cab7a15728 implemented hypothesis metadata storage
continuous-integration/drone/pr Build is passing Details
continuous-integration/drone/push Build is passing Details
2022-11-20 08:25:03 +00:00
James Ravenscroft 55265d86a1 bump version number for next release
continuous-integration/drone/push Build is passing Details
2022-10-30 15:58:07 +00:00
ravenscroftj 36b3d90f4f Merge pull request 'bring develop branch up to date with master' (#7) from master into develop
continuous-integration/drone/push Build is failing Details
Reviewed-on: #7
2022-10-30 15:57:45 +00:00
James Ravenscroft 47bdb1631b test env stuff
continuous-integration/drone/push Build is passing Details
2022-10-22 16:03:27 +01:00
8 changed files with 1106 additions and 719 deletions

View File

@ -11,12 +11,12 @@ steps:
- poetry run pytest - poetry run pytest
- name: publish - name: publish
when: # when:
branch: # branch:
- master # - master
event: # event:
exclude: # exclude:
- pull_request # - pull_request
image: python:3.7 image: python:3.7
environment: environment:
GITEA_PACKAGE_REPO: GITEA_PACKAGE_REPO:

1350
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "pymicrocosm" name = "pymicrocosm"
version = "0.2.0" version = "0.3.0"
description = "A tiny python-based micropub endpoint that supports a Gitea + drone static website" description = "A tiny python-based micropub endpoint that supports a Gitea + drone static website"
authors = ["James Ravenscroft <ravenscroftj@gmail.com>"] authors = ["James Ravenscroft <ravenscroftj@gmail.com>"]
license = "AGPL-3.0" license = "AGPL-3.0"
@ -14,7 +14,7 @@ packages = [
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.7.1" python = ">=3.8,<4.0.0"
Flask = "^2.0.2" Flask = "^2.0.2"
giteapy = {url = "https://github.com/dblueai/giteapy/archive/master.zip"} giteapy = {url = "https://github.com/dblueai/giteapy/archive/master.zip"}
requests = "^2.27.1" requests = "^2.27.1"
@ -22,6 +22,9 @@ python-dotenv = "^0.19.2"
python-slugify = "^5.0.2" python-slugify = "^5.0.2"
PyYAML = "^6.0" PyYAML = "^6.0"
Flask-Micropub = "^0.2.8" Flask-Micropub = "^0.2.8"
pillow = "^10.0.0"
clientapi-forgejo = "^1.0.0"
loguru = "^0.7.2"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
pytest = "^6.2.5" pytest = "^6.2.5"

2
run.sh Executable file
View File

@ -0,0 +1,2 @@
#!/bin/bash
FLASK_APP=microcosm.wsgi:app poetry run flask run --port 10183

View File

@ -3,11 +3,16 @@ import requests
import os import os
import functools import functools
import dotenv import dotenv
import giteapy
import giteapy.rest
import time import time
import base64 import base64
import mimetypes
import sys
from loguru import logger
import logging
import clientapi_forgejo as forgejo
from werkzeug.datastructures import FileStorage from werkzeug.datastructures import FileStorage
import yaml import yaml
@ -23,35 +28,48 @@ from flask import Flask, jsonify, request, Response, Blueprint
dotenv.load_dotenv() dotenv.load_dotenv()
PERMITTED_DOMAIN = os.environ.get( PERMITTED_DOMAIN = os.environ.get(
'PERMITTED_DOMAINS', 'https://brainsteam.co.uk/').split(';') "PERMITTED_DOMAINS", "https://brainsteam.co.uk/"
).split(";")
ENTITY_TYPE_PLURAL_MAP = {"reply": "replies", "watch": "watches"}
ENTITY_TYPE_PLURAL_MAP = {
"reply": "replies",
"watch": "watches"
}
core_bp = Blueprint("core", __name__) core_bp = Blueprint("core", __name__)
class InvalidRequestException(Exception):
"""Class of exception raised when the server receives an invalid request"""
# create a custom handler
class InterceptHandler(logging.Handler):
def emit(self, record):
logger_opt = logger.opt(depth=6, exception=record.exc_info)
logger_opt.log(record.levelno, record.getMessage())
def create_app(): def create_app():
app = Flask(__name__) app = Flask(__name__)
app.config['SECRET_KEY'] = 'my super secret key' app.config["SECRET_KEY"] = "my super secret key"
#app.config.from_file(os.path.join(os.getcwd(), "config.yaml"), yaml.safe_load) # app.config.from_file(os.path.join(os.getcwd(), "config.yaml"), yaml.safe_load)
from .indieauth import micropub, auth_bp from .indieauth import micropub, auth_bp
from .webmentions import webhook_bp from .webmentions import webhook_bp
print(app.config) print(app.config)
micropub.init_app(app, app.config.get('INDIEAUTH_CLIENT_ID', 'test.com')) micropub.init_app(app, os.environ.get("INDIEAUTH_CLIENT_ID", "test.com"))
app.register_blueprint(auth_bp) app.register_blueprint(auth_bp)
app.register_blueprint(core_bp) app.register_blueprint(core_bp)
app.register_blueprint(webhook_bp) app.register_blueprint(webhook_bp)
logger.add(sys.stderr, level=logging.WARN, backtrace=True, diagnose=True)
# logger.start()
app.logger.addHandler(InterceptHandler())
return app return app
@ -59,19 +77,24 @@ def authed_endpoint(f):
@functools.wraps(f) @functools.wraps(f)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
authtok = request.headers.get('Authorization') authtok = request.headers.get("Authorization")
if authtok is None: if authtok is None:
return { return {
"error": "unauthorized", "error": "unauthorized",
"error_description": "An auth token was not provided" "error_description": "An auth token was not provided",
}, 401 }, 401
auth = requests.get("https://tokens.indieauth.com/token", headers={ auth = requests.get(
"Authorization": authtok, "Accept": "application/json"}).json() "https://tokens.indieauth.com/token",
headers={"Authorization": authtok, "Accept": "application/json"},
).json()
if auth.get('me','') not in PERMITTED_DOMAIN: if auth.get("me", "") not in PERMITTED_DOMAIN:
return {"error": "insufficient_scope", "error_description": f"User \"{auth.get('me','')}\" not permitted to post here"}, 401 return {
"error": "insufficient_scope",
"error_description": f"User \"{auth.get('me','')}\" not permitted to post here",
}, 401
return f(*args, *kwargs) return f(*args, *kwargs)
@ -81,64 +104,87 @@ def authed_endpoint(f):
_api_client = None _api_client = None
class InvalidRequestException(Exception):
"""Invalid Request"""
def process_photo_url(created_at: datetime, doc: Dict[str, List[str]], suffix: str = ""): def process_photo_url(
created_at: datetime, doc: Dict[str, List[str]], suffix: str = ""
):
"""Process photo submitted via URL""" """Process photo submitted via URL"""
now_ts = int(time.mktime(created_at.timetuple())) now_ts = int(time.mktime(created_at.timetuple()))
photo_urls = [] photo_urls = []
if isinstance(doc['photo'], str): if isinstance(doc["photo"], str):
doc['photo'] = [doc['photo']] doc["photo"] = [doc["photo"]]
for i, photo in enumerate(doc["photo"]):
for i, photo in enumerate(doc['photo']): if isinstance(photo, str):
photo = {"url": photo, "alt": ""}
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy': if os.environ.get("MICROPUB_IMAGE_STRATEGY") == "copy":
# download the photo # download the photo
r = requests.get(photo) r = requests.get(photo["url"])
ext = os.path.splitext(photo)[1] ext = os.path.splitext(photo["url"])[1]
# generate local filename # generate local filename
filename = os.path.join(os.environ.get( filename = os.path.join(
'MICROPUB_MEDIA_PATH'), created_at.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}") os.environ.get("MICROPUB_MEDIA_PATH"),
photo_url = os.path.join(os.environ.get( created_at.strftime("%Y/%m/%d"),
'MICROPUB_MEDIA_URL_PREFIX'), created_at.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}") str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}",
)
photo_urls.append(photo_url) photo_url = os.path.join(
os.environ.get("MICROPUB_MEDIA_URL_PREFIX"),
created_at.strftime("%Y/%m/%d"),
str(now_ts) + f"{now_ts}_{suffix}_{i}_{ext}",
)
photo_urls.append((photo_url, photo["alt"]))
# make directory if needed # make directory if needed
if not os.path.exists(os.path.dirname(filename)): if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename)) os.makedirs(os.path.dirname(filename))
with open(filename, 'wb') as f: with open(filename, "wb") as f:
f.write(r.content) f.write(r.content)
else: else:
photo_urls.append(photo) photo_urls.append((photo["value"], photo["alt"]))
return photo_urls return photo_urls
def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str=""):
def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str = ""):
"""Process photo directly uploaded to micropub""" """Process photo directly uploaded to micropub"""
now_ts = int(time.mktime(created_at.timetuple())) now_ts = int(time.mktime(created_at.timetuple()))
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy': if os.environ.get("MICROPUB_IMAGE_STRATEGY") == "copy":
file.mimetype
ext = os.path.splitext(file.filename)[1] ext = os.path.splitext(file.filename)[1]
if ext == "":
ext = mimetypes.guess_extension(file.mimetype)
# generate local filename # generate local filename
filename = os.path.join(os.environ.get( filename = os.path.join(
'MICROPUB_MEDIA_PATH'), created_at.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}") os.environ.get("MICROPUB_MEDIA_PATH"),
photo_url = os.path.join(os.environ.get( created_at.strftime("%Y/%m/%d"),
'MICROPUB_MEDIA_URL_PREFIX'), created_at.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}") f"{now_ts}_{suffix}{ext}",
)
photo_url = os.path.join(
os.environ.get("MICROPUB_MEDIA_URL_PREFIX"),
created_at.strftime("%Y/%m/%d"),
f"{now_ts}_{suffix}{ext}",
)
# make directory if needed # make directory if needed
if not os.path.exists(os.path.dirname(filename)): if not os.path.exists(os.path.dirname(filename)):
@ -152,7 +198,7 @@ def process_photo_upload(created_at: datetime, file: FileStorage, suffix: str=""
return None return None
def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str]=None): def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str] = None):
now_ts = int(time.mktime(created_at.timetuple())) now_ts = int(time.mktime(created_at.timetuple()))
@ -164,89 +210,117 @@ def init_frontmatter(created_at: datetime, post_type: str, name: Optional[str]=N
else: else:
slug = str(now_ts) slug = str(now_ts)
url = os.path.join(
"/",
ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
created_at.strftime("%Y/%m/%d"),
slug,
)
url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), print(os.environ.get("CONTENT_PREFIX"))
created_at.strftime("%Y/%m/%d"), slug)
print(os.environ.get( file_path = os.path.join(
'CONTENT_PREFIX')) os.environ.get("CONTENT_PREFIX"),
ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
file_path = os.path.join(os.environ.get( created_at.strftime("%Y/%m/%d"),
'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"), created_at.strftime("%Y/%m/%d"), slug + ".md") slug + ".md",
)
frontmatter = { frontmatter = {
"post_meta": ["date"],
"url": url, "url": url,
"type": post_type, "type": ENTITY_TYPE_PLURAL_MAP.get(post_type, post_type + "s"),
"date": created_at.isoformat(sep='T'), "date": created_at.isoformat(sep="T"),
} }
return frontmatter, file_path return frontmatter, file_path
def detect_entry_type(doc: dict) -> str: def detect_entry_type(doc: dict) -> str:
"""Given a dictionary object from either form or json, detect type of post""" """Given a dictionary object from either form or json, detect type of post"""
if "hypothesis-link" in doc:
entry_type = "annotation"
if ('in-reply-to' in doc) or ('u-in-reply-to' in doc): elif ("in-reply-to" in doc) or ("u-in-reply-to" in doc):
entry_type = "reply" entry_type = "reply"
elif ('bookmark-of' in doc) or ('u-bookmark-of' in doc): elif ("bookmark-of" in doc) or ("u-bookmark-of" in doc):
entry_type = "bookmark" entry_type = "bookmark"
elif ('repost-of' in doc) or ('u-repost-of' in doc): elif ("repost-of" in doc) or ("u-repost-of" in doc):
entry_type = "repost" entry_type = "repost"
elif ('like-of' in doc) or ('u-like-of' in doc): elif ("like-of" in doc) or ("u-like-of" in doc):
entry_type = "like" entry_type = "like"
elif ('read-of' in doc): elif "read-of" in doc:
entry_type = "read" entry_type = "read"
elif ('watch-of' in doc): elif "watch-of" in doc:
entry_type = "watch" entry_type = "watch"
elif ('name' in doc) or ('p-name' in doc): elif ("name" in doc) or ("p-name" in doc):
entry_type = "post" entry_type = "post"
else: else:
entry_type = "note" entry_type = "note"
return entry_type return entry_type
def capture_frontmatter_props(doc: Dict[str, Union[str, List[str]]], frontmatter: Dict[str, Union[str,List[str]]]):
def capture_frontmatter_props(
doc: Dict[str, Union[str, List[str]]], frontmatter: Dict[str, Union[str, List[str]]]
):
keys = ['summary', 'bookmark-of', 'in-reply-to', 'repost-of', 'like-of', 'read-of', 'watch-of', 'listen-of', 'read-status', 'rating'] keys = [
"summary",
"bookmark-of",
"in-reply-to",
"repost-of",
"like-of",
"read-of",
"watch-of",
"listen-of",
"read-status",
"rating",
]
keys += [f'u-{key}' for key in keys] keys += [f"u-{key}" for key in keys]
for key in keys: for key in keys:
if key in doc: if key in doc:
if isinstance(doc[key], dict) and ('type' in doc[key]): if isinstance(doc[key], dict) and ("type" in doc[key]):
if doc[key]['type'][0] == 'h-cite': if doc[key]["type"][0] == "h-cite":
if 'citations' not in frontmatter: if "citations" not in frontmatter:
frontmatter['citations'] = [] frontmatter["citations"] = []
frontmatter['citations'].append(doc[key]['properties']) frontmatter["citations"].append(doc[key]["properties"])
elif isinstance(doc[key], list) and (len(doc[key]) < 2): elif isinstance(doc[key], list) and (len(doc[key]) < 2):
frontmatter[key] = doc[key][0] frontmatter[key] = doc[key][0]
else: else:
frontmatter[key] = doc[key] frontmatter[key] = doc[key]
if 'category' in doc: if "hypothesis-link" in doc:
if isinstance(doc['category'], list): # get the hypothesis data and store it
categories = doc['category'] r = requests.get(doc["hypothesis-link"][0])
frontmatter["hypothesis-meta"] = r.json()
if "category" in doc:
if isinstance(doc["category"], list):
categories = doc["category"]
else: else:
categories = [doc['category']] categories = [doc["category"]]
elif 'p-category' in doc: elif "p-category" in doc:
categories = doc['p-category'] categories = doc["p-category"]
else: else:
categories = request.form.getlist('category[]') categories = request.form.getlist("category[]")
if len(categories) > 0: if len(categories) > 0:
frontmatter['tags'] = categories frontmatter["tags"] = categories
def process_multipart_post(): def process_multipart_post():
doc = request.form.to_dict(flat=True) doc = request.form.to_dict(flat=True)
@ -255,104 +329,132 @@ def process_multipart_post():
now = datetime.now() now = datetime.now()
frontmatter, file_path = init_frontmatter(now, entry_type, doc.get('name')) frontmatter, file_path = init_frontmatter(now, entry_type, doc.get("name"))
capture_frontmatter_props(doc, frontmatter) capture_frontmatter_props(doc, frontmatter)
if "name" in doc:
frontmatter["title"] = doc["name"]
if 'name' in doc: if ("photo" in doc) or ("photo" in request.files) or ("photo[]" in request.files):
frontmatter['title'] = doc['name']
frontmatter["photo"] = []
if ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files): if "photo[]" in request.files:
photos = request.files.getlist("photo[]")
frontmatter['photo'] = []
if 'photo[]' in request.files:
photos = request.files.getlist('photo[]')
docstr = "" docstr = ""
for i, photo in enumerate(photos): for i, photo in enumerate(photos):
photo_url = process_photo_upload(now, photo, suffix=i) photo_url = process_photo_upload(now, photo, suffix=i)
frontmatter['photo'].append(photo_url) if "thumbnail" not in frontmatter:
frontmatter["thumbnail"] = photo_url
docstr += f"\n\n<img src=\"{photo_url}\" class=\"u-photo\" />" frontmatter["photo"].append(photo_url)
#docstr += f'\n\n<img src="{photo_url}" class="u-photo" />'
docstr += f"\n\n {doc['content']}" docstr += f"\n\n {doc['content']}"
else: else:
if 'photo' in doc: if "photo" in doc:
photo_urls = process_photo_url(now, doc) photo_objects = process_photo_url(now, doc)
else: else:
photo_urls = [process_photo_upload(now, request.files['photo'])] photo_objects = [
(process_photo_upload(now, request.files["photo"]), "")
]
frontmatter["photo"] = [
{"value": photo[0], "alt": photo[1]} for photo in photo_objects
]
frontmatter["thumbnail"] = photo_objects[0][0]
frontmatter['photo'] = photo_urls
docstr = "" docstr = ""
for photo in photo_urls: #for photo in photo_objects:
docstr += f"<img src=\"{photo}\" class=\"u-photo\" /> \n\n {doc['content']}" # docstr += f"<img src=\"{photo[0]}\" alt=\"{photo[1]}\" class=\"u-photo\" /> \n\n {doc['content']}"
else: else:
docstr = doc.get('content','') if 'content' in doc else "" docstr = doc.get("content", "") if "content" in doc else ""
if 'mp-syndicate-to' in doc: if "mp-syndicate-to" in doc:
frontmatter['mp-syndicate-to'] = doc['mp-syndicate-to'].split(",") frontmatter["mp-syndicate-to"] = doc["mp-syndicate-to"].split(",")
for url in doc['mp-syndicate-to'].split(","): for url in doc["mp-syndicate-to"].split(","):
docstr += f"\n<a href=\"{url}\"></a>" docstr += f'\n<a href="{url}"></a>'
if 'mp-syndicate-to[]' in request.form: if "mp-syndicate-to[]" in request.form:
frontmatter['mp-syndicate-to'] = request.form.getlist('mp-syndicate-to[]') frontmatter["mp-syndicate-to"] = request.form.getlist("mp-syndicate-to[]")
for url in request.form.getlist('mp-syndicate-to[]'): for url in request.form.getlist("mp-syndicate-to[]"):
docstr += f"\n<a href=\"{url}\"></a>" docstr += f'\n<a href="{url}"></a>'
return docstr, frontmatter, file_path return docstr, frontmatter, file_path
def process_image_alt_texts(doc):
alts = []
if isinstance(doc["photo"], str):
doc["photo"] = [doc["photo"]]
for i, photo in enumerate(doc["photo"]):
if isinstance(photo, dict):
alts.append(doc["alt"])
else:
alts.append("")
return alts
def process_json_post(): def process_json_post():
"""Process JSON POST submission""" """Process JSON POST submission"""
body = request.get_json() body = request.get_json()
# get post type - take the first item in the array # get post type - take the first item in the array
if body['type'][0] != 'h-entry': if body["type"][0] != "h-entry":
return jsonify({"error":"invalid_format"}), 400 return jsonify({"error": "invalid_format"}), 400
props = body['properties'] props = body["properties"]
entry_type = detect_entry_type(props) entry_type = detect_entry_type(props)
if 'published' in props: if "published" in props:
from dateutil import parser from dateutil import parser
now = parser.parse(props['published'][0]) now = parser.parse(props["published"][0])
else: else:
now = datetime.now() now = datetime.now()
frontmatter, file_path = init_frontmatter(now, entry_type, props.get('name')) frontmatter, file_path = init_frontmatter(now, entry_type, props.get("name"))
capture_frontmatter_props(props, frontmatter) capture_frontmatter_props(props, frontmatter)
if 'name' in props: if "name" in props:
frontmatter['title'] = props['name'][0] frontmatter["title"] = props["name"][0]
docstr = "" docstr = ""
if 'photo' in props: if "photo" in props:
photo_urls = process_photo_url(now, props) photo_objects = process_photo_url(now, props)
frontmatter['photo'] = photo_urls frontmatter["photo"] = [
{"value": photo[0], "alt": photo[1]} for photo in photo_objects
]
frontmatter["thumbnail"] = frontmatter["photo"][0]["value"]
docstr = ""
#for photo in photo_objects:
# docstr += f'<img src="{photo[0]}" alt="{photo[1]}" class="u-photo" /> \n\n'
for photo in photo_urls: for content in props.get("content", []):
docstr += f"\n\n<img src=\"{photo}\" class=\"u-photo\" />"
for content in props.get('content', []):
if isinstance(content, dict): if isinstance(content, dict):
if 'html' in content: if "html" in content:
docstr += f"\n\n {content.get('html')}" docstr += f"\n\n {content.get('html')}"
else: else:
@ -360,19 +462,20 @@ def process_json_post():
return docstr, frontmatter, file_path return docstr, frontmatter, file_path
def get_api_client() -> giteapy.RepositoryApi:
def get_api_client() -> forgejo.RepositoryApi:
global _api_client global _api_client
if _api_client is None: if _api_client is None:
config = giteapy.Configuration() config = forgejo.Configuration()
config.host = os.environ.get('GITEA_URL') config.host = os.environ.get("GITEA_URL")
config.api_key['access_token'] = os.environ.get('GITEA_API_KEY') config.api_key["Token"] = os.environ.get("GITEA_API_KEY")
_api_client = giteapy.RepositoryApi(giteapy.ApiClient(config)) _api_client = forgejo.RepositoryApi(forgejo.ApiClient(config))
return _api_client return _api_client
@core_bp.route('/', methods=['POST']) @core_bp.route("/", methods=["POST"])
@authed_endpoint @authed_endpoint
def req(): def req():
@ -382,34 +485,39 @@ def req():
docstr, frontmatter, file_path = process_multipart_post() docstr, frontmatter, file_path = process_multipart_post()
frontmatter_str = yaml.dump(frontmatter) frontmatter_str = yaml.dump(frontmatter)
content = base64.encodestring( content = base64.encodebytes(
f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")).decode("utf8") f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")
).decode("utf8")
api = get_api_client() api = get_api_client()
body = giteapy.CreateFileOptions(content=content) body = forgejo.CreateFileOptions(content=content)
try: try:
r = api.repo_create_file(os.environ.get( r = api.repo_create_file(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), file_path, body) os.environ.get("GITEA_REPO_OWNER"),
os.environ.get("GITEA_REPO_NAME"),
file_path,
body,
)
return Response(status=202, headers={"Location": frontmatter['url']}) return Response(status=202, headers={"Location": frontmatter["url"]})
except Exception as e: except Exception as e:
return {"error": str(e)}, 500
logger.error(e, exc_info=True)
return {"error": str(e)}, 500
def parse_categories(): def parse_categories():
strategy = os.environ.get('MICROPUB_CATEGORY_LIST_STRATEGY') strategy = os.environ.get("MICROPUB_CATEGORY_LIST_STRATEGY")
if strategy == 'feed': if strategy == "feed":
tree = ElementTree.parse(os.environ.get('MICROPUB_CATEGORY_LIST_FILE')) tree = ElementTree.parse(os.environ.get("MICROPUB_CATEGORY_LIST_FILE"))
tags = tree.findall('.//item/title') tags = tree.findall(".//item/title")
return {"categories": [tag.text for tag in tags]}
return {"categories": [tag.text for tag in tags] }
def get_syndication_targets(): def get_syndication_targets():
@ -426,7 +534,7 @@ def get_syndication_targets():
target_def = { target_def = {
"uid": os.environ.get(f"SYNDICATION_TARGET_{target}_URL", target), "uid": os.environ.get(f"SYNDICATION_TARGET_{target}_URL", target),
"name": os.environ.get(f"SYNDICATION_TARGET_{target}_NAME", target), "name": os.environ.get(f"SYNDICATION_TARGET_{target}_NAME", target),
} }
defs.append(target_def) defs.append(target_def)
return defs return defs
@ -437,7 +545,7 @@ def get_syndication_targets():
def media_endpoint(): def media_endpoint():
now = datetime.now() now = datetime.now()
url = process_photo_upload(now, request.files['file']) url = process_photo_upload(now, request.files["file"])
return Response(status=201, headers={"Location": url}) return Response(status=201, headers={"Location": url})
@ -445,50 +553,33 @@ def media_endpoint():
def generate_config_json(): def generate_config_json():
return { return {
"media-endpoint": request.base_url + "media", "media-endpoint": os.environ.get(f"MICROCOSM_BASE_URL", request.base_url)
+ "media",
"syndicate-to": get_syndication_targets(), "syndicate-to": get_syndication_targets(),
"post-types": [ "post-types": [
{ {"type": "note", "name": "Note"},
"type": "note", {"type": "article", "name": "Blog Post"},
"name": "Note" {"type": "photo", "name": "Photo"},
}, {"type": "reply", "name": "Reply"},
{ {"type": "bookmark", "name": "Bookmark"},
"type": "article", {"type": "like", "name": "Like"},
"name": "Blog Post" ],
},
{
"type": "photo",
"name": "Photo"
},
{
"type": "reply",
"name": "Reply"
},
{
"type": "bookmark",
"name": "Bookmark"
},
{
"type": "like",
"name":"Like"
}
]
} }
@core_bp.route("/", methods=['GET']) @core_bp.route("/", methods=["GET"])
@authed_endpoint @authed_endpoint
def index(): def index():
if request.args.get('q') == 'config': if request.args.get("q") == "config":
return generate_config_json() return generate_config_json()
elif request.args.get('q') == 'category': elif request.args.get("q") == "category":
return parse_categories() return parse_categories()
elif request.args.get('q') == 'syndicate-to': elif request.args.get("q") == "syndicate-to":
return {"syndicate-to": get_syndication_targets()} return {"syndicate-to": get_syndication_targets()}
if __name__ == '__main__': if __name__ == "__main__":
app.run(debug=False) app.run(debug=False)

View File

@ -37,13 +37,13 @@ def authform():
@auth_bp.route('/authenticate') @auth_bp.route('/authenticate')
def authenticate(): def authenticate():
return micropub.authenticate( return micropub.authenticate(
request.args.get('me'), next_url=url_for('index')) request.args.get('me'), next_url=url_for('token.indieauth_callback'))
@auth_bp.route('/authorize') @auth_bp.route('/authorize')
def authorize(): def authorize():
return micropub.authorize( return micropub.authorize(
request.args.get('me'), next_url=url_for('index'), request.args.get('me'), next_url=url_for('token.indieauth_callback'),
scope=request.args.get('scope')) scope=request.args.get('scope'))
@ -65,6 +65,7 @@ def indieauth_callback(resp):
""".format(resp.me, resp.next_url, resp.error) """.format(resp.me, resp.next_url, resp.error)
@auth_bp.route('/micropub-callback') @auth_bp.route('/micropub-callback')
@micropub.authorized_handler @micropub.authorized_handler
def micropub_callback(resp): def micropub_callback(resp):

0
src/microcosm/photo.py Normal file
View File

4
src/microcosm/wsgi.py Normal file
View File

@ -0,0 +1,4 @@
from . import create_app
app = create_app()