hypopub/hypopub.py

93 lines
2.3 KiB
Python

import flask
import json
import os
import dotenv
import requests
import atoma
from datetime import datetime
from filelock import Timeout, FileLock
from tqdm import tqdm
dotenv.load_dotenv()
app = flask.Flask("hypopub")
session = requests.session()
token = os.getenv("MICROPUB_TOKEN")
session.headers = {"Authorization": f"Bearer {token}"}
HYPOTHESIS_USER=os.getenv("HYPOTHESIS_USER")
def send_micropub(entry: atoma.atom.AtomEntry):
# get the json corresponding to the annotation
json_link = None
for link in entry.links:
if (link.rel == 'alternate') and (link.type_ == "application/json"):
json_link = link.href
if json_link is None:
raise Exception("Cannot find json for annotation")
annotation = requests.get(json_link).json()
endpoint = os.getenv("MICROPUB_ENDPOINT")
r = session.post(f"{endpoint}", json={
"type": ["h-entry"],
"properties":{
"published": [entry.published.strftime("%Y-%m-%dT%H:%M:%S")],
"content":[{
"html": entry.content.value
}],
"category": annotation['tags'] + ['hypothes.is'],
"in-reply-to": [annotation['target'][0]['source']],
"hypothesis-link": [json_link]
}
})
if r.status_code > 300:
raise Exception(f"Request failed: {r.status_code}: {r.text}")
def main():
if HYPOTHESIS_USER is None:
raise Exception("You must set HYPOTHESIS_USER environment variable")
response = requests.get(f"https://hypothes.is/stream.atom?user={HYPOTHESIS_USER}")
feed = atoma.parse_atom_bytes(response.content)
print(f"Processing {feed.title.value}")
state = {}
last_run = datetime(1991,1,1).timestamp()
if os.path.exists("state.json"):
with open("state.json") as f:
state = json.load(f)
last_run = state['last_run']
for entry in tqdm(feed.entries):
if entry.published.timestamp() > last_run:
tqdm.write(f"Send micropub payload for '{entry.title.value}'")
send_micropub(entry)
else:
tqdm.write(f"Entry older than last run time: '{entry.title.value}'")
state['last_run'] = datetime.now().timestamp()
with open("state.json",'w') as f:
json.dump(state, f)
if __name__ == "__main__":
main()