micropub-flask-gitea/example.py

407 lines
10 KiB
Python

from typing import Dict, List
import requests
import os
import functools
import dotenv
import giteapy
import time
import base64
from werkzeug.datastructures import FileStorage
import yaml
from slugify import slugify
from datetime import datetime
from xml.etree import ElementTree
from flask import Flask, request, url_for, Response
from requests import api
from flask_micropub import MicropubClient
dotenv.load_dotenv()
PERMITTED_DOMAIN = os.environ.get(
'PERMITTED_DOMAINS', 'https://brainsteam.co.uk/').split(';')
app = Flask(__name__)
app.config['SECRET_KEY'] = 'my super secret key'
micropub = MicropubClient(app, client_id='https://brainsteam.co.uk')
ENTITY_TYPE_PLURAL_MAP = {
"post": "posts",
"note": "notes",
"reply": "replies",
"bookmark": "bookmarks"
}
def authed_endpoint(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
authtok = request.headers.get('Authorization')
if authtok is None:
return {
"error": "unauthorized",
"error_description": "An auth token was not provided"
}, 401
auth = requests.get("https://tokens.indieauth.com/token", headers={
"Authorization": authtok, "Accept": "application/json"}).json()
if auth['me'] not in PERMITTED_DOMAIN:
return {"error": "forbidden", "error_description": f"User {auth['me']} not permitted to post here"}, 403
return f(*args, *kwargs)
return wrapper
_api_client = None
def get_api_client():
global _api_client
if _api_client is None:
config = giteapy.Configuration()
config.host = os.environ.get('GITEA_URL')
config.api_key['access_token'] = os.environ.get('GITEA_API_KEY')
_api_client = giteapy.RepositoryApi(giteapy.ApiClient(config))
return _api_client
def process_photo_url(now: datetime, doc: Dict[str, List[str]], suffix: str = ""):
"""Process photo submitted via URL"""
now_ts = int(time.mktime(now.timetuple()))
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
# download the photo
r = requests.get(doc['photo'])
ext = os.path.splitext(doc['photo'])[1]
# generate local filename
filename = os.path.join(os.environ.get(
'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}{ext}")
photo_url = os.path.join(os.environ.get(
'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), str(now_ts) + f"{now_ts}_{suffix}{ext}")
# make directory if needed
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
with open(filename, 'wb') as f:
f.write(r.content)
# elif os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'gitea':
else:
photo_url = doc['photo']
return photo_url
def process_photo_upload(now: datetime, file: FileStorage, suffix: str=""):
"""Process photo directly uploaded to micropub"""
now_ts = int(time.mktime(now.timetuple()))
if os.environ.get('MICROPUB_IMAGE_STRATEGY') == 'copy':
ext = os.path.splitext(file.filename)[1]
# generate local filename
filename = os.path.join(os.environ.get(
'MICROPUB_MEDIA_PATH'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
photo_url = os.path.join(os.environ.get(
'MICROPUB_MEDIA_URL_PREFIX'), now.strftime("%Y/%m/%d"), f"{now_ts}_{suffix}{ext}")
# make directory if needed
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
file.save(filename)
return photo_url
else:
return None
@app.route('/', methods=['POST'])
@authed_endpoint
def req():
doc = request.form.to_dict(flat=True)
if 'in-reply-to' in doc:
entry_type = "reply"
elif 'bookmark-of' in doc:
entry_type = "bookmark"
elif 'repost-of' in doc:
entry_type = "repost"
elif 'like-of' in doc:
entry_type = "like"
# elif ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files):
# entry_type = "photo"
elif 'name' in doc:
entry_type = "post"
else:
entry_type = "note"
now = datetime.now()
now_ts = int(time.mktime(now.timetuple()))
url = os.path.join("/", ENTITY_TYPE_PLURAL_MAP.get(entry_type, entry_type + "s"),
now.strftime("%Y/%m/%d"), str(now_ts))
if 'name' in doc:
slug = slugify(doc['name']) + str(now_ts)
else:
slug = str(now_ts)
file_path = os.path.join(os.environ.get(
'CONTENT_PREFIX'), ENTITY_TYPE_PLURAL_MAP.get(entry_type, entry_type + "s"), now.strftime("%Y/%m/%d"), slug + ".md")
frontmatter = {
"url": url,
"type": entry_type,
"date": now.isoformat(sep='T'),
}
for key in ['bookmark-of', 'in-reply-to', 'repost-of', 'like-of']:
if key in doc:
frontmatter[key] = doc[key]
if 'category' in doc:
categories = [doc['category']]
else:
categories = request.form.getlist('category[]')
if 'name' in doc:
frontmatter['title'] = doc['name']
if len(categories) > 0:
frontmatter['tags'] = categories
if ('photo' in doc) or ('photo' in request.files) or ('photo[]' in request.files):
frontmatter['photo'] = []
if 'photo[]' in request.files:
photos = request.files.getlist('photo[]')
docstr = ""
for i, photo in enumerate(photos):
photo_url = process_photo_upload(now, photo, suffix=i)
frontmatter.photo.append(photo_url)
docstr += f"\n\n<img src=\"{photo_url}\" class=\"u-photo\" />"
docstr += f"\n\n {doc['content']}"
else:
if 'photo' in doc:
photo_url = process_photo_url(now, doc)
else:
photo_url = process_photo_upload(now, request.files['photo'])
frontmatter['photo'] = [photo_url]
docstr = f"<img src=\"{photo_url}\" class=\"u-photo\" /> \n\n {doc['content']}"
else:
docstr = doc['content']
if 'mp-syndicate-to' in doc:
for url in doc['mp-syndicate-to'].split(","):
docstr += f"\n<a href=\"{url}\"></a>"
frontmatter_str = yaml.dump(frontmatter)
content = base64.encodestring(
f"---\n{frontmatter_str}\n---\n\n{docstr}".encode("utf8")).decode("utf8")
api = get_api_client()
body = giteapy.CreateFileOptions(content=content)
try:
r = api.repo_create_file(os.environ.get(
'GITEA_REPO_OWNER'), os.environ.get('GITEA_REPO_NAME'), file_path, body)
return Response(status=202, headers={"Location": url})
except Exception as e:
return {"error": str(e)}, 500
def parse_categories():
strategy = os.environ.get('MICROPUB_CATEGORY_LIST_STRATEGY')
if strategy == 'feed':
tree = ElementTree.parse(os.environ.get('MICROPUB_CATEGORY_LIST_FILE'))
tags = tree.findall('.//item/title')
return {"categories": [tag.text for tag in tags] }
def get_syndication_targets():
targets = os.environ.get("SYNDICATION_TARGETS", "").split(",")
defs = []
for target in targets:
if os.environ.get(f"SYNDICATION_TARGET_{target}_URL") is None:
print(f"No url for SYNDICATION_TARGET_{target}_URL")
continue
target_def = {
"uid": os.environ.get(f"SYNDICATION_TARGET_{target}_URL", target),
"name": os.environ.get(f"SYNDICATION_TARGET_{target}_NAME", target),
}
defs.append(target_def)
return defs
def generate_config_json():
return {
"media-endpoint": request.base_url + "media",
"syndicate-to": get_syndication_targets(),
"post-types": [
{
"type": "note",
"name": "Note"
},
{
"type": "article",
"name": "Blog Post"
},
{
"type": "photo",
"name": "Photo"
},
{
"type": "reply",
"name": "Reply"
},
{
"type": "bookmark",
"name": "Bookmark"
}
]
}
@app.route("/", methods=['GET'])
@authed_endpoint
def index():
if request.args.get('q') == 'config':
return generate_config_json()
elif request.args.get('q') == 'category':
return parse_categories()
@app.route('/form', methods=['GET'])
def authform():
return """
<!DOCTYPE html>
<html>
<body>
<form action="/authenticate" method="GET">
<input type="text" name="me" placeholder="your domain.com"/>
<button type="submit">Authenticate</button>
</form>
<form action="/authorize" method="GET">
<input type="text" name="me" placeholder="your domain.com"/>
<select name="scope">
<option>read</option>
<option>post</option>
<option>comment</option>
<option>create draft update delete media read follow mute block create</option>
</select>
<button type="submit">Authorize</button>
</form>
</body>
</html>
"""
@app.route('/authenticate')
def authenticate():
return micropub.authenticate(
request.args.get('me'), next_url=url_for('index'))
@app.route('/authorize')
def authorize():
return micropub.authorize(
request.args.get('me'), next_url=url_for('index'),
scope=request.args.get('scope'))
@app.route('/indieauth-callback')
@micropub.authenticated_handler
def indieauth_callback(resp):
return """
<!DOCTYPE html>
<html>
<body>
Authenticated:
<ul>
<li>me: {}</li>
<li>next: {}</li>
<li>error: {}</li>
</ul>
</body>
</html>
""".format(resp.me, resp.next_url, resp.error)
@app.route('/micropub-callback')
@micropub.authorized_handler
def micropub_callback(resp):
return """
<!DOCTYPE html>
<html>
<body>
Authorized:
<ul>
<li>me: {}</li>
<li>endpoint: {}</li>
<li>token: {}</li>
<li>next: {}</li>
<li>error: {}</li>
</ul>
</body>
</html>
""".format(resp.me, resp.micropub_endpoint, resp.access_token,
resp.next_url, resp.error)
if __name__ == '__main__':
app.run(debug=True)