diff --git a/backend/apps/archive/management/commands/import_data.py b/backend/apps/archive/management/commands/import_data.py index b037daa..dc52454 100644 --- a/backend/apps/archive/management/commands/import_data.py +++ b/backend/apps/archive/management/commands/import_data.py @@ -1,12 +1,13 @@ from datetime import datetime import os import json +import logging +from abc import ABC, abstractmethod +from typing import Dict, Any, Optional from django.utils import timezone from tqdm.auto import tqdm - from PIL import Image as PillowImage - from django.core.management.base import BaseCommand from apps.files.models import ( @@ -35,88 +36,79 @@ from apps.files.tasks import ( ) -class Command(BaseCommand): - help = "Import data from JSON files in a folder or a single JSON file to the Twitter archive" +class BaseImporter(ABC): + """Base abstract class for all site importers.""" - def add_arguments(self, parser): - parser.add_argument( - "path", - type=str, - help="Path to the folder containing JSON files or a single JSON file", - ) - parser.add_argument( - "--delete", action="store_true", help="Delete imported files" - ) + def __init__(self, command_instance): + self.command = command_instance + self.logger = logging.getLogger(f"importer.{self.__class__.__name__}") - def handle(self, *args, **kwargs): - path = kwargs["path"] - delete = kwargs["delete"] + @abstractmethod + def import_data( + self, data: Dict[str, Any], file_path_json: str, delete: bool + ) -> None: + """Import data from JSON file into the database.""" + pass - if os.path.isfile(path): - self.process_json_file(path, delete) - elif os.path.isdir(path): - self.process_json_folder(path, delete) - else: - self.stdout.write( - self.style.ERROR(f"The path '{path}' is not a valid file or folder.") - ) - return + def log_info(self, message: str) -> None: + """Log an informational message.""" + tqdm.write(message) + self.logger.info(message) - def process_json_file(self, file_path, delete): - # self.stdout.write(self.style.NOTICE(f"Importing data from: {file_path}")) - tqdm.write(f"Importing data from: {file_path}") + def log_success(self, message: str) -> None: + """Log a success message.""" + styled_message = self.command.style.SUCCESS(message) + tqdm.write(styled_message) + self.logger.info(message) - with open(file_path, "r") as f: - data = json.load(f) + def log_error(self, message: str) -> None: + """Log an error message.""" + styled_message = self.command.style.ERROR(message) + tqdm.write(styled_message) + self.logger.error(message) - self.import_data(data, file_path, delete) + def log_warning(self, message: str) -> None: + """Log a warning message.""" + styled_message = self.command.style.WARNING(message) + tqdm.write(styled_message) + self.logger.warning(message) - # self.stdout.write(self.style.SUCCESS('Data imported successfully.')) - tqdm.write(self.style.SUCCESS("Data imported successfully.")) + def get_or_create_source_site(self, category: str) -> SourceSiteModel: + """Get or create a source site model instance.""" + source_site_instance, _ = SourceSiteModel.objects.get_or_create(slug=category) + source_site_instance.save() + return source_site_instance - def process_json_folder(self, folder_path, delete): - if not os.path.exists(folder_path): - # self.stdout.write(self.style.ERROR(f"The folder '{folder_path}' does not exist.")) - tqdm.write(self.style.ERROR(f"The folder '{folder_path}' does not exist.")) - return - - # Use os.walk dynamically, and count total files to show progress incrementally - tqdm.write("Counting total files...") - total_files = sum(len(files) for _, _, files in os.walk(folder_path)) - - with tqdm( - total=total_files, desc="Processing JSON files", dynamic_ncols=True - ) as progress_bar: - for root, dirs, files in os.walk(folder_path): - for file_name in files: - progress_bar.update(1) # Increment progress for each file - if file_name.endswith(".json"): - file_path = os.path.join(root, file_name) - self.process_json_file(file_path, delete) - - def import_file(self, file_path, model, delete=False): + def import_file( + self, file_path: str, delete: bool = False + ) -> Optional[PostFileModel]: """ - Imports a file if it doesn't already exist in the database and returns the instance. + Import a file if it doesn't already exist in the database and returns the instance. - :param file_path: The path to the file to import. - :param model: The model class to which the file instance should be linked. - :param delete: Whether to delete the imported file after processing. - :return: The file instance. + Args: + file_path: The path to the file to import. + delete: Whether to delete the imported file after processing. + + Returns: + The file instance or None if file doesn't exist. """ + if not os.path.exists(file_path): + self.log_warning(f"File not found: {file_path}") + return None - file_instance = None # Initialize file_instance to None - - if os.path.exists(file_path): - file_hash = compute_file_hash_blake3(file_path, logger=self) + try: + file_hash = compute_file_hash_blake3(file_path, logger=self.command) file_name = os.path.basename(file_path) _, file_ext = os.path.splitext(file_name) hash_file_name = file_hash + file_ext + # Get or create file name file_name_instance, _ = FileNameModel.objects.get_or_create( filename=file_name ) + # Get or create file file_instance, created = PostFileModel.objects.get_or_create( hash_blake3=file_hash ) @@ -124,8 +116,8 @@ class Command(BaseCommand): if created: with open(file_path, "rb") as file: file_instance.file.save(hash_file_name, file) - file_instance.save() + # Add file metadata file_instance.name.add(file_name_instance) file_instance.extension = file_ext file_instance.size = os.path.getsize(file_path) @@ -138,189 +130,396 @@ class Command(BaseCommand): file_instance.save() + # Process image-specific properties if file_instance.mimetype.startswith("image/"): - # Add Image blur hash if not existing. + # Add Image blur hash if not existing if not file_instance.blur_hash: generate_blur_hash_PostFile.delay(file_instance.id) # Get image resolution - im = PillowImage.open(file_instance.file) - file_instance.height, file_instance.width = im.size - file_instance.save() + try: + im = PillowImage.open(file_instance.file) + file_instance.height, file_instance.width = im.size + file_instance.save() + except Exception as e: + self.log_error(f"Error getting image dimensions: {str(e)}") + # Process video thumbnails if file_instance.file_type in ["video", "gif"]: if not file_instance.thumbnail: generate_video_thumbnail.delay(file_instance.id) + # Generate MD5 hash if not exists if not file_instance.hash_md5: generate_md5_hash_PostFile.delay(file_instance.id) if created: - tqdm.write( - self.style.SUCCESS( - f"Imported: {file_path} file, new instance created" - ) - ) + self.log_success(f"Imported: {file_path} file, new instance created") else: - tqdm.write( - self.style.SUCCESS(f"Imported: {file_path} file, instance updated") - ) + self.log_success(f"Imported: {file_path} file, instance updated") # Delete the imported file if the --delete flag is used - self.delete_imported_file(file_path, delete) - - return file_instance - - def delete_imported_file(self, file_path, delete=False): - """ - Delete the file if the --delete flag is used - - :param delete: Whether to delete the imported file after processing. - """ - if delete: - if os.path.exists(file_path): + if delete and os.path.exists(file_path): os.remove(file_path) - tqdm.write(self.style.SUCCESS(f"Deleted: {file_path}")) - else: - tqdm.write(self.style.WARNING(f"File not found: {file_path}")) + self.log_success(f"Deleted: {file_path}") - def import_data(self, data, file_path_json, delete): - """ """ + return file_instance - # Get source site and create it if it doesn't exist - category = data.get("category") + except Exception as e: + self.log_error(f"Error importing file {file_path}: {str(e)}") + return None - source_site_instance, Null = SourceSiteModel.objects.get_or_create( - slug=category - ) + def add_description( + self, + description_text: str, + date_str: str, + date_format: str, + owner_instance, + owner_type: str, + file_date, + ) -> None: + """ + Add description to a post or creator. - source_site_instance.save() - - if category == "twitter": - if "author" in data.keys(): - creator_instance, Null = CreatorModel.objects.get_or_create( - slug=data["author"]["name"], source_site=source_site_instance - ) - - creator_instance.creator_id = data["author"]["id"] - creator_instance.name = data["author"]["nick"] - - if "description" in data["author"].keys(): - description_text = data["author"]["description"] - description_hash = compute_string_hash_blake3( - description_text, logger=self - ) - - description_instance, created = ( - DescriptionModel.objects.get_or_create(hash=description_hash) - ) - - if created: - description_instance.content = description_text - description_instance.save() - - # Add to CreatorDescription through model with a custom date_imported - creator_description_instance, created = ( - CreatorDescription.objects.get_or_create( - creator=creator_instance, description=description_instance - ) - ) - - creator_description_instance.date_imported = timezone.make_aware( - datetime.fromtimestamp(os.path.getmtime(file_path_json)) - ) - creator_description_instance.save() - - creator_instance.date_created = timezone.make_aware( - datetime.strptime(data["author"]["date"], "%Y-%m-%d %H:%M:%S") - ) - - creator_instance.save() - - post_instance, Null = PostModel.objects.get_or_create( - post_id=data["tweet_id"], source_site=source_site_instance - ) - - if "subcategory" in data.keys(): - category_instance, _ = CategoryModel.objects.get_or_create( - slug=data["subcategory"] + Args: + description_text: The description text to add + date_str: Date string of when the description was created + date_format: Format of the date string + owner_instance: The post or creator instance + owner_type: Either 'post' or 'creator' + file_date: Timestamp of the file for imported date + """ + try: + description_hash = compute_string_hash_blake3( + description_text, logger=self.command ) - if _: - category_instance.name = data["subcategory"].capitalize() - category_instance.save() - - creator_instance.refresh_from_db() - creator_instance.categories.add(category_instance) - creator_instance.save() - - post_instance.category.add(category_instance) - - post_instance.creator = creator_instance - - post_instance.date_created = timezone.make_aware( - datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S"), - ) - - if "sensitive" in data.keys(): - if data["sensitive"]: - post_instance.mature = data["sensitive"] - - if "content" in data.keys(): - description_text = data["content"] - description_hash = compute_string_hash_blake3(description_text, logger=self) description_instance, created = DescriptionModel.objects.get_or_create( hash=description_hash ) - description_instance.save() - if created: - description_instance.date_created = timezone.make_aware( - datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S") - ) description_instance.content = description_text + description_instance.date_created = timezone.make_aware( + datetime.strptime(date_str, date_format) + ) description_instance.save() - post_description_instance, created = PostDescription.objects.get_or_create( - post=post_instance, description=description_instance - ) - if created: - post_description_instance.date_imported = timezone.make_aware( - datetime.fromtimestamp(os.path.getmtime(file_path_json)) + if owner_type == "creator": + relation, created = CreatorDescription.objects.get_or_create( + creator=owner_instance, description=description_instance + ) + else: # post + relation, created = PostDescription.objects.get_or_create( + post=owner_instance, description=description_instance ) - post_description_instance.save() + relation.date_imported = timezone.make_aware( + datetime.fromtimestamp(file_date) + ) + relation.save() - post_instance.description.add(description_instance) + if owner_type == "post": + owner_instance.description.add(description_instance) - if "hashtags" in data.keys(): - for tag in data["hashtags"]: - tag_instance, Null = TagModel.objects.get_or_create(slug=tag) + except Exception as e: + self.log_error(f"Error adding description: {str(e)}") - if tag_instance.name == "": + def add_tags(self, tags_list, post_instance): + """Add tags to a post.""" + for tag in tags_list: + try: + tag_instance, created = TagModel.objects.get_or_create(slug=tag) + + if created or not tag_instance.name: tag_instance.name = tag - - tag_instance.save() + tag_instance.save() post_instance.tags.add(tag_instance) + except Exception as e: + self.log_error(f"Error adding tag '{tag}': {str(e)}") + def ensure_boolean_field(self, value, default=False): + """Convert potentially null/None values to boolean.""" + if value is None: + return default + return bool(value) + + +class TwitterImporter(BaseImporter): + """Importer for Twitter data.""" + + def import_data( + self, data: Dict[str, Any], file_path_json: str, delete: bool + ) -> None: + """Import Twitter data from JSON into the database.""" + try: + category = data.get("category", "twitter") + source_site_instance = self.get_or_create_source_site(category) + + # Process creator if present + creator_instance = None + if "author" in data: + creator_instance = self._process_creator( + data, source_site_instance, file_path_json + ) + + # Get subcategory if available + category_instance = None + if "subcategory" in data: + category_instance = self._process_category(data) + + # Process the post + self._process_post( + data, + source_site_instance, + creator_instance, + category_instance, + file_path_json, + delete, + ) + + except Exception as e: + self.log_error(f"Error importing Twitter data: {str(e)}") + + def _process_creator(self, data, source_site_instance, file_path_json): + """Process creator data for Twitter.""" + creator_instance, _ = CreatorModel.objects.get_or_create( + slug=data["author"]["name"], source_site=source_site_instance + ) + + creator_instance.creator_id = data["author"]["id"] + creator_instance.name = data["author"]["nick"] + + # Add creator description if available + if "description" in data["author"]: + self.add_description( + description_text=data["author"]["description"], + date_str=data["author"]["date"], + date_format="%Y-%m-%d %H:%M:%S", + owner_instance=creator_instance, + owner_type="creator", + file_date=os.path.getmtime(file_path_json), + ) + + creator_instance.date_created = timezone.make_aware( + datetime.strptime(data["author"]["date"], "%Y-%m-%d %H:%M:%S") + ) + + creator_instance.save() + return creator_instance + + def _process_category(self, data): + """Process category data.""" + category_instance, created = CategoryModel.objects.get_or_create( + slug=data["subcategory"] + ) + + if created: + category_instance.name = data["subcategory"].capitalize() + + category_instance.save() + return category_instance + + def _process_post( + self, + data, + source_site_instance, + creator_instance, + category_instance, + file_path_json, + delete, + ): + """Process post data for Twitter.""" + post_instance, _ = PostModel.objects.get_or_create( + post_id=data["tweet_id"], + source_site=source_site_instance, + defaults={ + # Set a default for mature to avoid null constraint error + "mature": False + }, + ) + + if category_instance: + if creator_instance: + creator_instance.refresh_from_db() + creator_instance.categories.add(category_instance) + creator_instance.save() + + post_instance.category.add(category_instance) + + if creator_instance: + post_instance.creator = creator_instance + + post_instance.date_created = timezone.make_aware( + datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S") + ) + + # Set mature flag if available + if "sensitive" in data: + post_instance.mature = self.ensure_boolean_field(data.get("sensitive")) + + # Add post description if available + if "content" in data: + self.add_description( + description_text=data["content"], + date_str=data["date"], + date_format="%Y-%m-%d %H:%M:%S", + owner_instance=post_instance, + owner_type="post", + file_date=os.path.getmtime(file_path_json), + ) + + # Add hashtags if available + if "hashtags" in data: + self.add_tags(data["hashtags"], post_instance) + + # Import the file file_path = file_path_json.removesuffix(".json") - - # Handle file import - file_instance = self.import_file(file_path, PostFileModel, delete) + file_instance = self.import_file(file_path, delete) if file_instance: post_instance.files.add(file_instance) - if category_instance.slug == "avatar": - creator_instance.refresh_from_db() - creator_instance.avatar = file_instance - creator_instance.save() + # Handle profile images + if category_instance: + if category_instance.slug == "avatar" and creator_instance: + creator_instance.refresh_from_db() + creator_instance.avatar = file_instance + creator_instance.save() - if category_instance.slug == "background": - creator_instance.refresh_from_db() - creator_instance.banner = file_instance - creator_instance.save() + if category_instance.slug == "background" and creator_instance: + creator_instance.refresh_from_db() + creator_instance.banner = file_instance + creator_instance.save() post_instance.save() + + ) + + + + ) + + creator_instance.save() + ) + + + + + + + file_path = file_path_json.removesuffix(".json") + + + if file_instance: + post_instance.files.add(file_instance) + + + if not file_instance.height and "height" in data: + file_instance.height = data.get("height") + + if "width" in data or "height" in data: + post_instance.save() + + +class Command(BaseCommand): + help = ( + "Import data from JSON files in a folder or a single JSON file to the archive" + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.importers = { + "twitter": TwitterImporter(self), + } + + # Set up logging + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + filename="import.log", + ) + self.logger = logging.getLogger("import_command") + + def add_arguments(self, parser): + parser.add_argument( + "path", + type=str, + help="Path to the folder containing JSON files or a single JSON file", + ) + parser.add_argument( + "--delete", action="store_true", help="Delete imported files" + ) + parser.add_argument( + "--site", + type=str, + choices=list(self.importers.keys()), + help="Only import files for the specified site", + ) + + def handle(self, *args, **kwargs): + path = kwargs["path"] + delete = kwargs["delete"] + site_filter = kwargs.get("site") + + if os.path.isfile(path): + self.process_json_file(path, delete, site_filter) + elif os.path.isdir(path): + self.process_json_folder(path, delete, site_filter) + else: + self.stdout.write( + self.style.ERROR(f"The path '{path}' is not a valid file or folder.") + ) + return + + def process_json_file(self, file_path, delete, site_filter=None): + tqdm.write(f"Importing data from: {file_path}") + + try: + with open(file_path, "r") as f: + data = json.load(f) + + category = data.get("category", "") + + # Skip if site filter is set and doesn't match + if site_filter and category != site_filter: + tqdm.write( + f"Skipping {file_path}, category {category} doesn't match filter {site_filter}" + ) + return + + # Check if we have an importer for this category + if category in self.importers: + self.importers[category].import_data(data, file_path, delete) + tqdm.write( + self.style.SUCCESS(f"Data imported successfully for {category}.") + ) + else: + tqdm.write( + self.style.WARNING(f"No importer found for category: {category}") + ) + + except json.JSONDecodeError: + tqdm.write(self.style.ERROR(f"Invalid JSON file: {file_path}")) + except Exception as e: + tqdm.write(self.style.ERROR(f"Error processing {file_path}: {str(e)}")) + + def process_json_folder(self, folder_path, delete, site_filter=None): + if not os.path.exists(folder_path): + tqdm.write(self.style.ERROR(f"The folder '{folder_path}' does not exist.")) + return + + # Count total files + tqdm.write("Counting total files...") + total_files = sum(len(files) for _, _, files in os.walk(folder_path)) + + with tqdm( + total=total_files, desc="Processing JSON files", dynamic_ncols=True + ) as progress_bar: + for root, dirs, files in os.walk(folder_path): + for file_name in files: + progress_bar.update(1) # Increment progress for each file + if file_name.endswith(".json"): + file_path = os.path.join(root, file_name) + self.process_json_file(file_path, delete, site_filter)