from multiprocessing import process from typing import List import click import xml.etree.ElementTree as ET from html import unescape from datetime import datetime import os import re import yaml import requests import os from urllib.parse import urlparse import uuid import phpserialize def generate_unique_filename(original_filename): name, ext = os.path.splitext(original_filename) return f"{name}_{uuid.uuid4().hex[:8]}{ext}" def download_image(url, output_dir): response = requests.get(url) if response.status_code == 200: filename = os.path.basename(urlparse(url).path) filepath = os.path.join(output_dir, filename) with open(filepath, 'wb') as f: f.write(response.content) return filename return None def sanitize_filename(filename): return re.sub(r'[^\w\-_\. ]', '_', filename) TYPE_MAP = { 'indieblocks_note': 'note', 'indieblocks_like': 'like', } WHITELIST_TYPES = ['post', 'note', 'like'] def process_mf2_photo(postmeta, namespaces): mf2_photo = None meta_key = postmeta.find('wp:meta_key', namespaces).text if meta_key == 'mf2_photo': meta_value = postmeta.find('wp:meta_value', namespaces).text try: # Remove CDATA wrapper if present if meta_value.startswith(''): meta_value = meta_value[9:-3] # Decode the serialized PHP data decoded_value = phpserialize.loads(meta_value.encode('utf-8')) print(decoded_value) # Convert bytes to strings if necessary if isinstance(decoded_value, dict): mf2_photo = {k.decode('utf-8') if isinstance(k, bytes) else k: v.decode('utf-8') if isinstance(v, bytes) else v for k, v in decoded_value.items()} if len(mf2_photo) > 0: mf2_photo = mf2_photo.values() elif isinstance(decoded_value, list): mf2_photo = [v.decode('utf-8') if isinstance(v, bytes) else v for v in decoded_value] else: mf2_photo = decoded_value except Exception as e: print(f"Warning: Unable to parse mf2_photo: {str(e)}") return mf2_photo def process_mf2_download(mf2_photos, output_dir) -> List[str]: photos = [] for mf2_photo in mf2_photos: if not mf2_photo: continue media_dir = os.path.join(output_dir, 'media') os.makedirs(media_dir, exist_ok=True) try: original_filename = download_image(mf2_photo, media_dir) if original_filename: new_filename = generate_unique_filename(original_filename) old_path = os.path.join(media_dir, original_filename) new_path = os.path.join(media_dir, new_filename) os.rename(old_path, new_path) new_url = f'/media/{new_filename}' print(f"Downloaded and updated mf2_photo: {new_filename}") photos.append(new_url) except Exception as e: print(f"Error downloading mf2_photo {mf2_photo}: {str(e)}") return photos # Return original URL if download failed @click.command() @click.option('--input', '-i', required=True, help='Input WordPress XML file') @click.option('--output-dir', '-o', default='output', help='Output directory for Markdown files') @click.option('--date-cutoff', '-d', type=click.DateTime(formats=["%Y-%m-%d"]), help='Date cutoff in YYYY-MM-DD format. Articles before this date will be skipped.') def parse_wordpress_xml(input, output_dir, date_cutoff): tree = ET.parse(input) root = tree.getroot() namespaces = { 'content': 'http://purl.org/rss/1.0/modules/content/', 'wp': 'http://wordpress.org/export/1.2/', 'excerpt': 'http://wordpress.org/export/1.2/excerpt/', } os.makedirs(output_dir, exist_ok=True) for item in root.findall('.//item', namespaces): post_date = item.find('wp:post_date', namespaces).text post_datetime = datetime.strptime(post_date, "%Y-%m-%d %H:%M:%S") if date_cutoff and post_datetime < date_cutoff: continue # Skip articles before the cutoff date title = item.find('title').text post_type = item.find('wp:post_type', namespaces).text post_id = item.find('wp:post_id', namespaces).text status = item.find('wp:status', namespaces).text # Extract mf2_photo mf2_photo = None for postmeta in item.findall('wp:postmeta', namespaces): mf2_photo = process_mf2_photo(postmeta, namespaces) if mf2_photo: break if mf2_photo: print(f"Post ID: {post_id}, mf2_photo: {mf2_photo}") if post_type in TYPE_MAP: post_type = TYPE_MAP[post_type] if post_type not in WHITELIST_TYPES: # print(f"Skipping {post_type} post with ID {post_id}") continue content = item.find('content:encoded', namespaces) if content is not None and content.text: content_text = unescape(content.text) # Create media directory if it doesn't exist media_dir = os.path.join(output_dir, 'media') os.makedirs(media_dir, exist_ok=True) # Find all image URLs in the content img_urls = re.findall(r']+src="([^">]+)"', content_text) for img_url in img_urls: # Download the image try: original_filename = download_image(img_url, media_dir) if original_filename: # Generate a unique filename with a pseudorandom suffix new_filename = generate_unique_filename( original_filename) old_path = os.path.join(media_dir, original_filename) new_path = os.path.join(media_dir, new_filename) os.rename(old_path, new_path) # Update the image URL in the content new_url = f'/media/{new_filename}' content_text = content_text.replace(img_url, new_url) print(f"Downloaded and updated image: {new_filename}") except Exception as e: print(f"Error downloading image {img_url}:") content = content_text else: content = '' excerpt = item.find('excerpt:encoded', namespaces) excerpt = unescape( excerpt.text) if excerpt is not None and excerpt.text else '' categories = [cat.text for cat in item.findall( 'category[@domain="category"]')] tags = [tag.text for tag in item.findall( 'category[@domain="post_tag"]')] # Prepare frontmatter frontmatter = { 'title': title, 'date': post_date, 'draft': status != 'publish', 'categories': categories, 'tags': tags, 'type': post_type + 's' } # Add this after the existing mf2_photo print statement if mf2_photo: mf2_urls = process_mf2_download(mf2_photo, output_dir) frontmatter['photo'] = [{'url': url} for url in mf2_urls] # Create folder structure and filename year = post_datetime.strftime('%Y') month = post_datetime.strftime('%m') day = post_datetime.strftime('%d') if title: post_name = sanitize_filename(title) else: # If no title, use datestamp and random suffix datestamp = post_datetime.strftime('%Y%m%d') random_suffix = uuid.uuid4().hex[:8] post_name = f"{datestamp}_{random_suffix}" # Create folder structure with post type before date folder_path = os.path.join( output_dir, post_type + 's', year, month, day) os.makedirs(folder_path, exist_ok=True) filename = f"{post_name}.md" filepath = os.path.join(folder_path, filename) # No need for separate type_dir creation as it's now part of the main folder structure with open(filepath, 'w', encoding='utf-8') as f: # Write YAML frontmatter f.write('---\n') yaml.dump(frontmatter, f, default_flow_style=False) f.write('---\n\n') # Write content if excerpt: f.write(f"{excerpt}\n\n\n\n") f.write(content) print(f"Created file: {filepath}") if __name__ == '__main__': parse_wordpress_xml()