87 lines
2.1 KiB
Python
87 lines
2.1 KiB
Python
import requests
|
|
from bs4 import BeautifulSoup
|
|
import os
|
|
import json
|
|
import click
|
|
import feedparser
|
|
|
|
from datetime import datetime
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
ALLOWED_MENTIONS = ["https://fed.brid.gy/", "https://brid.gy/publish/mastodon"]
|
|
|
|
|
|
@click.group()
|
|
def cli():
|
|
load_dotenv()
|
|
pass
|
|
|
|
|
|
def check_for_links(url):
|
|
"""Check the url for links using BeautifulSoup"""
|
|
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
soup = BeautifulSoup(response.content, "html.parser")
|
|
|
|
links = []
|
|
for a_tag in soup.find_all("a", href=True):
|
|
links.append(a_tag["href"])
|
|
|
|
return set(links)
|
|
except requests.RequestException as e:
|
|
print(f"Error fetching URL: {e}")
|
|
return []
|
|
|
|
|
|
def send_telegraph_webmention(from_url, to_url):
|
|
"""Send a telegraph webmention"""
|
|
|
|
r = requests.post("https://telegraph.p3k.io/webmention", data={
|
|
"source": from_url,
|
|
"target": to_url,
|
|
"token": os.getenv("TELEGRAPH_TOKEN"),
|
|
})
|
|
|
|
return r.text
|
|
|
|
|
|
@cli.command(name="update")
|
|
def update():
|
|
"""Run update"""
|
|
|
|
if os.path.exists("wm_state.json"):
|
|
with open("wm_state.json") as f:
|
|
state = json.load(f)
|
|
else:
|
|
state = {}
|
|
|
|
if "last_updated" in state:
|
|
last_updated = datetime(*state["last_updated"][:6])
|
|
else:
|
|
last_updated = datetime(2024, 10, 10)
|
|
|
|
feed: feedparser.FeedParserDict = feedparser.parse(os.environ.get("RSS_URL"))
|
|
|
|
print(f"Checking for new entries on {os.environ.get('RSS_URL')} since {last_updated}")
|
|
for entry in feed["entries"]:
|
|
if datetime(*entry["published_parsed"][:6]) > last_updated:
|
|
links = check_for_links(entry["link"])
|
|
for link in links:
|
|
if link in ALLOWED_MENTIONS:
|
|
print(f"Try to send webmention from {entry['link']} to {link}")
|
|
print(send_telegraph_webmention(entry['link'], link))
|
|
|
|
# print(feed['updated_parsed'])
|
|
|
|
state["last_updated"] = feed["updated_parsed"]
|
|
|
|
with open("wm_state.json", "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|