248 lines
8.6 KiB
Python
248 lines
8.6 KiB
Python
|
from multiprocessing import process
|
||
|
from typing import List
|
||
|
import click
|
||
|
import xml.etree.ElementTree as ET
|
||
|
from html import unescape
|
||
|
from datetime import datetime
|
||
|
import os
|
||
|
import re
|
||
|
import yaml
|
||
|
import requests
|
||
|
import os
|
||
|
from urllib.parse import urlparse
|
||
|
import uuid
|
||
|
import phpserialize
|
||
|
|
||
|
|
||
|
def generate_unique_filename(original_filename):
|
||
|
name, ext = os.path.splitext(original_filename)
|
||
|
return f"{name}_{uuid.uuid4().hex[:8]}{ext}"
|
||
|
|
||
|
|
||
|
def download_image(url, output_dir):
|
||
|
response = requests.get(url)
|
||
|
if response.status_code == 200:
|
||
|
filename = os.path.basename(urlparse(url).path)
|
||
|
filepath = os.path.join(output_dir, filename)
|
||
|
with open(filepath, 'wb') as f:
|
||
|
f.write(response.content)
|
||
|
return filename
|
||
|
return None
|
||
|
|
||
|
|
||
|
def sanitize_filename(filename):
|
||
|
return re.sub(r'[^\w\-_\. ]', '_', filename)
|
||
|
|
||
|
|
||
|
TYPE_MAP = {
|
||
|
'indieblocks_note': 'note',
|
||
|
'indieblocks_like': 'like',
|
||
|
}
|
||
|
|
||
|
WHITELIST_TYPES = ['post', 'note', 'like']
|
||
|
|
||
|
|
||
|
def process_mf2_photo(postmeta, namespaces):
|
||
|
mf2_photo = None
|
||
|
meta_key = postmeta.find('wp:meta_key', namespaces).text
|
||
|
if meta_key == 'mf2_photo':
|
||
|
meta_value = postmeta.find('wp:meta_value', namespaces).text
|
||
|
try:
|
||
|
# Remove CDATA wrapper if present
|
||
|
if meta_value.startswith('<![CDATA[') and meta_value.endswith(']]>'):
|
||
|
meta_value = meta_value[9:-3]
|
||
|
|
||
|
# Decode the serialized PHP data
|
||
|
decoded_value = phpserialize.loads(meta_value.encode('utf-8'))
|
||
|
|
||
|
print(decoded_value)
|
||
|
|
||
|
# Convert bytes to strings if necessary
|
||
|
if isinstance(decoded_value, dict):
|
||
|
mf2_photo = {k.decode('utf-8') if isinstance(k, bytes) else k:
|
||
|
v.decode('utf-8') if isinstance(v, bytes) else v
|
||
|
for k, v in decoded_value.items()}
|
||
|
|
||
|
if len(mf2_photo) > 0:
|
||
|
mf2_photo = mf2_photo.values()
|
||
|
|
||
|
elif isinstance(decoded_value, list):
|
||
|
mf2_photo = [v.decode('utf-8') if isinstance(v, bytes) else v
|
||
|
for v in decoded_value]
|
||
|
else:
|
||
|
mf2_photo = decoded_value
|
||
|
except Exception as e:
|
||
|
print(f"Warning: Unable to parse mf2_photo: {str(e)}")
|
||
|
return mf2_photo
|
||
|
|
||
|
|
||
|
def process_mf2_download(mf2_photos, output_dir) -> List[str]:
|
||
|
|
||
|
photos = []
|
||
|
|
||
|
for mf2_photo in mf2_photos:
|
||
|
if not mf2_photo:
|
||
|
continue
|
||
|
|
||
|
media_dir = os.path.join(output_dir, 'media')
|
||
|
os.makedirs(media_dir, exist_ok=True)
|
||
|
|
||
|
try:
|
||
|
original_filename = download_image(mf2_photo, media_dir)
|
||
|
if original_filename:
|
||
|
new_filename = generate_unique_filename(original_filename)
|
||
|
old_path = os.path.join(media_dir, original_filename)
|
||
|
new_path = os.path.join(media_dir, new_filename)
|
||
|
os.rename(old_path, new_path)
|
||
|
|
||
|
new_url = f'/media/{new_filename}'
|
||
|
print(f"Downloaded and updated mf2_photo: {new_filename}")
|
||
|
photos.append(new_url)
|
||
|
except Exception as e:
|
||
|
print(f"Error downloading mf2_photo {mf2_photo}: {str(e)}")
|
||
|
|
||
|
return photos # Return original URL if download failed
|
||
|
|
||
|
|
||
|
@click.command()
|
||
|
@click.option('--input', '-i', required=True, help='Input WordPress XML file')
|
||
|
@click.option('--output-dir', '-o', default='output', help='Output directory for Markdown files')
|
||
|
@click.option('--date-cutoff', '-d', type=click.DateTime(formats=["%Y-%m-%d"]),
|
||
|
help='Date cutoff in YYYY-MM-DD format. Articles before this date will be skipped.')
|
||
|
def parse_wordpress_xml(input, output_dir, date_cutoff):
|
||
|
tree = ET.parse(input)
|
||
|
root = tree.getroot()
|
||
|
|
||
|
namespaces = {
|
||
|
'content': 'http://purl.org/rss/1.0/modules/content/',
|
||
|
'wp': 'http://wordpress.org/export/1.2/',
|
||
|
'excerpt': 'http://wordpress.org/export/1.2/excerpt/',
|
||
|
}
|
||
|
|
||
|
os.makedirs(output_dir, exist_ok=True)
|
||
|
for item in root.findall('.//item', namespaces):
|
||
|
post_date = item.find('wp:post_date', namespaces).text
|
||
|
post_datetime = datetime.strptime(post_date, "%Y-%m-%d %H:%M:%S")
|
||
|
|
||
|
if date_cutoff and post_datetime < date_cutoff:
|
||
|
continue # Skip articles before the cutoff date
|
||
|
|
||
|
title = item.find('title').text
|
||
|
post_type = item.find('wp:post_type', namespaces).text
|
||
|
post_id = item.find('wp:post_id', namespaces).text
|
||
|
status = item.find('wp:status', namespaces).text
|
||
|
|
||
|
# Extract mf2_photo
|
||
|
mf2_photo = None
|
||
|
for postmeta in item.findall('wp:postmeta', namespaces):
|
||
|
mf2_photo = process_mf2_photo(postmeta, namespaces)
|
||
|
if mf2_photo:
|
||
|
break
|
||
|
|
||
|
if mf2_photo:
|
||
|
print(f"Post ID: {post_id}, mf2_photo: {mf2_photo}")
|
||
|
|
||
|
if post_type in TYPE_MAP:
|
||
|
post_type = TYPE_MAP[post_type]
|
||
|
|
||
|
if post_type not in WHITELIST_TYPES:
|
||
|
# print(f"Skipping {post_type} post with ID {post_id}")
|
||
|
continue
|
||
|
|
||
|
content = item.find('content:encoded', namespaces)
|
||
|
if content is not None and content.text:
|
||
|
content_text = unescape(content.text)
|
||
|
|
||
|
# Create media directory if it doesn't exist
|
||
|
media_dir = os.path.join(output_dir, 'media')
|
||
|
os.makedirs(media_dir, exist_ok=True)
|
||
|
|
||
|
# Find all image URLs in the content
|
||
|
img_urls = re.findall(r'<img[^>]+src="([^">]+)"', content_text)
|
||
|
|
||
|
for img_url in img_urls:
|
||
|
# Download the image
|
||
|
try:
|
||
|
original_filename = download_image(img_url, media_dir)
|
||
|
if original_filename:
|
||
|
# Generate a unique filename with a pseudorandom suffix
|
||
|
new_filename = generate_unique_filename(
|
||
|
original_filename)
|
||
|
old_path = os.path.join(media_dir, original_filename)
|
||
|
new_path = os.path.join(media_dir, new_filename)
|
||
|
os.rename(old_path, new_path)
|
||
|
|
||
|
# Update the image URL in the content
|
||
|
new_url = f'/media/{new_filename}'
|
||
|
content_text = content_text.replace(img_url, new_url)
|
||
|
print(f"Downloaded and updated image: {new_filename}")
|
||
|
except Exception as e:
|
||
|
print(f"Error downloading image {img_url}:")
|
||
|
|
||
|
content = content_text
|
||
|
else:
|
||
|
content = ''
|
||
|
|
||
|
excerpt = item.find('excerpt:encoded', namespaces)
|
||
|
excerpt = unescape(
|
||
|
excerpt.text) if excerpt is not None and excerpt.text else ''
|
||
|
|
||
|
categories = [cat.text for cat in item.findall(
|
||
|
'category[@domain="category"]')]
|
||
|
tags = [tag.text for tag in item.findall(
|
||
|
'category[@domain="post_tag"]')]
|
||
|
|
||
|
# Prepare frontmatter
|
||
|
frontmatter = {
|
||
|
'title': title,
|
||
|
'date': post_date,
|
||
|
'draft': status != 'publish',
|
||
|
'categories': categories,
|
||
|
'tags': tags,
|
||
|
'type': post_type + 's'
|
||
|
}
|
||
|
|
||
|
# Add this after the existing mf2_photo print statement
|
||
|
if mf2_photo:
|
||
|
mf2_urls = process_mf2_download(mf2_photo, output_dir)
|
||
|
frontmatter['photo'] = [{'url': url} for url in mf2_urls]
|
||
|
|
||
|
# Create folder structure and filename
|
||
|
year = post_datetime.strftime('%Y')
|
||
|
month = post_datetime.strftime('%m')
|
||
|
day = post_datetime.strftime('%d')
|
||
|
if title:
|
||
|
post_name = sanitize_filename(title)
|
||
|
else:
|
||
|
# If no title, use datestamp and random suffix
|
||
|
datestamp = post_datetime.strftime('%Y%m%d')
|
||
|
random_suffix = uuid.uuid4().hex[:8]
|
||
|
post_name = f"{datestamp}_{random_suffix}"
|
||
|
|
||
|
# Create folder structure with post type before date
|
||
|
folder_path = os.path.join(
|
||
|
output_dir, post_type + 's', year, month, day)
|
||
|
os.makedirs(folder_path, exist_ok=True)
|
||
|
|
||
|
filename = f"{post_name}.md"
|
||
|
filepath = os.path.join(folder_path, filename)
|
||
|
|
||
|
# No need for separate type_dir creation as it's now part of the main folder structure
|
||
|
|
||
|
with open(filepath, 'w', encoding='utf-8') as f:
|
||
|
# Write YAML frontmatter
|
||
|
f.write('---\n')
|
||
|
yaml.dump(frontmatter, f, default_flow_style=False)
|
||
|
f.write('---\n\n')
|
||
|
|
||
|
# Write content
|
||
|
if excerpt:
|
||
|
f.write(f"{excerpt}\n\n<!--more-->\n\n")
|
||
|
f.write(content)
|
||
|
|
||
|
print(f"Created file: {filepath}")
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
parse_wordpress_xml()
|