from datetime import datetime import os import json import logging from abc import ABC, abstractmethod from typing import Dict, Any, Optional from django.utils import timezone from tqdm.auto import tqdm from PIL import Image as PillowImage from django.core.management.base import BaseCommand from apps.files.models import ( FileNameModel, PostFileModel, ) from apps.archive.models import ( SourceSiteModel, CategoryModel, CreatorModel, PostModel, DescriptionModel, CreatorDescription, PostDescription, TagModel, ) from utils.hash import compute_string_hash_blake3, compute_file_hash_blake3 from utils.files import get_mime_type, categorize_mime_type from apps.files.tasks import ( generate_blur_hash_PostFile, generate_md5_hash_PostFile, generate_video_thumbnail, generate_pdf_thumbnail, ) class BaseImporter(ABC): """Base abstract class for all site importers.""" def __init__(self, command_instance): self.command = command_instance self.logger = logging.getLogger(f"importer.{self.__class__.__name__}") @abstractmethod def import_data( self, data: Dict[str, Any], file_path_json: str, delete: bool ) -> None: """Import data from JSON file into the database.""" pass def log_info(self, message: str) -> None: """Log an informational message.""" tqdm.write(message) self.logger.info(message) def log_success(self, message: str) -> None: """Log a success message.""" styled_message = self.command.style.SUCCESS(message) tqdm.write(styled_message) self.logger.info(message) def log_error(self, message: str) -> None: """Log an error message.""" styled_message = self.command.style.ERROR(message) tqdm.write(styled_message) self.logger.error(message) def log_warning(self, message: str) -> None: """Log a warning message.""" styled_message = self.command.style.WARNING(message) tqdm.write(styled_message) self.logger.warning(message) def get_or_create_source_site(self, category: str) -> SourceSiteModel: """Get or create a source site model instance.""" source_site_instance, _ = SourceSiteModel.objects.get_or_create(slug=category) source_site_instance.save() return source_site_instance def import_file( self, file_path: str, delete: bool = False ) -> Optional[PostFileModel]: """ Import a file if it doesn't already exist in the database and returns the instance. Args: file_path: The path to the file to import. delete: Whether to delete the imported file after processing. Returns: The file instance or None if file doesn't exist. """ if not os.path.exists(file_path): self.log_warning(f"File not found: {file_path}") return None try: file_hash = compute_file_hash_blake3(file_path, logger=self.command) file_name = os.path.basename(file_path) _, file_ext = os.path.splitext(file_name) hash_file_name = file_hash + file_ext # Get or create file name file_name_instance, _ = FileNameModel.objects.get_or_create( filename=file_name ) # Get or create file file_instance, created = PostFileModel.objects.get_or_create( hash_blake3=file_hash ) if created: with open(file_path, "rb") as file: file_instance.file.save(hash_file_name, file) # Add file metadata file_instance.name.add(file_name_instance) file_instance.extension = file_ext file_instance.size = os.path.getsize(file_path) file_mime = get_mime_type(file_path) file_type = categorize_mime_type(file_mime) file_instance.file_type = file_type file_instance.mimetype = file_mime file_instance.save() # Process image-specific properties if file_instance.mimetype.startswith("image/"): # Add Image blur hash if not existing if not file_instance.blur_hash: generate_blur_hash_PostFile.delay(file_instance.id) # Get image resolution try: im = PillowImage.open(file_instance.file) file_instance.height, file_instance.width = im.size file_instance.save() except Exception as e: self.log_error(f"Error getting image dimensions: {str(e)}") # Process video thumbnails if file_instance.file_type in ["video", "gif"]: if not file_instance.thumbnail: generate_video_thumbnail.delay(file_instance.id) if file_instance.file_type in ["pdf"]: if not file_instance.thumbnail: generate_pdf_thumbnail.delay(file_instance.id) # Generate MD5 hash if not exists if not file_instance.hash_md5: generate_md5_hash_PostFile.delay(file_instance.id) if created: self.log_success(f"Imported: {file_path} file, new instance created") else: self.log_success(f"Imported: {file_path} file, instance updated") # Delete the imported file if the --delete flag is used if delete and os.path.exists(file_path): os.remove(file_path) self.log_success(f"Deleted: {file_path}") return file_instance except Exception as e: self.log_error(f"Error importing file {file_path}: {str(e)}") return None def add_description( self, description_text: str, date_str: str, date_format: str, owner_instance, owner_type: str, file_date, ) -> None: """ Add description to a post or creator. Args: description_text: The description text to add date_str: Date string of when the description was created date_format: Format of the date string owner_instance: The post or creator instance owner_type: Either 'post' or 'creator' file_date: Timestamp of the file for imported date """ try: description_hash = compute_string_hash_blake3( description_text, logger=self.command ) description_instance, created = DescriptionModel.objects.get_or_create( hash=description_hash ) if created: description_instance.content = description_text description_instance.date_created = timezone.make_aware( datetime.strptime(date_str, date_format) ) description_instance.save() if owner_type == "creator": relation, created = CreatorDescription.objects.get_or_create( creator=owner_instance, description=description_instance ) else: # post relation, created = PostDescription.objects.get_or_create( post=owner_instance, description=description_instance ) relation.date_imported = timezone.make_aware( datetime.fromtimestamp(file_date) ) relation.save() if owner_type == "post": owner_instance.description.add(description_instance) except Exception as e: self.log_error(f"Error adding description: {str(e)}") def add_tags(self, tags_list, post_instance): """Add tags to a post.""" for tag in tags_list: try: tag_instance, created = TagModel.objects.get_or_create(slug=tag) if created or not tag_instance.name: tag_instance.name = tag tag_instance.save() post_instance.tags.add(tag_instance) except Exception as e: self.log_error(f"Error adding tag '{tag}': {str(e)}") def ensure_boolean_field(self, value, default=False): """Convert potentially null/None values to boolean.""" if value is None: return default return bool(value) class TwitterImporter(BaseImporter): """Importer for Twitter data.""" def import_data( self, data: Dict[str, Any], file_path_json: str, delete: bool ) -> None: """Import Twitter data from JSON into the database.""" try: category = data.get("category", "twitter") source_site_instance = self.get_or_create_source_site(category) # Process creator if present creator_instance = None if "author" in data: creator_instance = self._process_creator( data, source_site_instance, file_path_json ) # Get subcategory if available category_instance = None if "subcategory" in data: category_instance = self._process_category(data) # Process the post self._process_post( data, source_site_instance, creator_instance, category_instance, file_path_json, delete, ) except Exception as e: self.log_error(f"Error importing Twitter data: {str(e)}") def _process_creator(self, data, source_site_instance, file_path_json): """Process creator data for Twitter.""" creator_instance, _ = CreatorModel.objects.get_or_create( slug=data["author"]["name"], source_site=source_site_instance ) creator_instance.creator_id = data["author"]["id"] creator_instance.name = data["author"]["nick"] # Add creator description if available if "description" in data["author"]: self.add_description( description_text=data["author"]["description"], date_str=data["author"]["date"], date_format="%Y-%m-%d %H:%M:%S", owner_instance=creator_instance, owner_type="creator", file_date=os.path.getmtime(file_path_json), ) creator_instance.date_created = timezone.make_aware( datetime.strptime(data["author"]["date"], "%Y-%m-%d %H:%M:%S") ) creator_instance.save() return creator_instance def _process_category(self, data): """Process category data.""" category_instance, created = CategoryModel.objects.get_or_create( slug=data["subcategory"] ) if created: category_instance.name = data["subcategory"].capitalize() category_instance.save() return category_instance def _process_post( self, data, source_site_instance, creator_instance, category_instance, file_path_json, delete, ): """Process post data for Twitter.""" post_instance, _ = PostModel.objects.get_or_create( post_id=data["tweet_id"], source_site=source_site_instance, defaults={ # Set a default for mature to avoid null constraint error "mature": False }, ) if category_instance: if creator_instance: creator_instance.refresh_from_db() creator_instance.categories.add(category_instance) creator_instance.save() post_instance.category.add(category_instance) if creator_instance: post_instance.creator = creator_instance post_instance.date_created = timezone.make_aware( datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S") ) # Set mature flag if available if "sensitive" in data: post_instance.mature = self.ensure_boolean_field(data.get("sensitive")) # Add post description if available if "content" in data: self.add_description( description_text=data["content"], date_str=data["date"], date_format="%Y-%m-%d %H:%M:%S", owner_instance=post_instance, owner_type="post", file_date=os.path.getmtime(file_path_json), ) # Add hashtags if available if "hashtags" in data: self.add_tags(data["hashtags"], post_instance) # Import the file file_path = file_path_json.removesuffix(".json") file_instance = self.import_file(file_path, delete) if file_instance: post_instance.files.add(file_instance) # Handle profile images if category_instance: if category_instance.slug == "avatar" and creator_instance: creator_instance.refresh_from_db() creator_instance.avatar = file_instance creator_instance.save() if category_instance.slug == "background" and creator_instance: creator_instance.refresh_from_db() creator_instance.banner = file_instance creator_instance.save() post_instance.save() class FurAffinityImporter(BaseImporter): """Importer for FurAffinity data.""" def import_data( self, data: Dict[str, Any], file_path_json: str, delete: bool ) -> None: """Import FurAffinity data from JSON into the database.""" try: category = data.get("category", "furaffinity") source_site_instance = self.get_or_create_source_site(category) # Process creator creator_instance = self._process_creator(data, source_site_instance) # Process category category_instance = self._process_category(data) # Process post self._process_post( data, source_site_instance, creator_instance, category_instance, file_path_json, delete, ) except Exception as e: self.log_error(f"Error importing FurAffinity data: {str(e)}") def _process_creator(self, data, source_site_instance): """Process creator data for FurAffinity.""" artist = data.get("artist", "") artist_url = data.get("artist_url", artist.lower()) creator_instance, _ = CreatorModel.objects.get_or_create( slug=artist_url, source_site=source_site_instance ) creator_instance.name = artist creator_instance.creator_id = artist_url # We don't have creator creation date in FurAffinity data # Using post date as an approximation if "date" in data: creator_instance.date_created = timezone.make_aware( datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S") ) creator_instance.save() return creator_instance def _process_category(self, data): """Process category data for FurAffinity.""" subcategory = data.get("subcategory", "gallery") category_instance, created = CategoryModel.objects.get_or_create( slug=subcategory ) if created: category_instance.name = subcategory.capitalize() # Process FA-specific categories if "fa_category" in data: fa_category = data["fa_category"] fa_category_instance, _ = CategoryModel.objects.get_or_create( slug=fa_category.lower().replace(" ", "_") ) fa_category_instance.name = fa_category fa_category_instance.save() category_instance.save() return category_instance def _process_post( self, data, source_site_instance, creator_instance, category_instance, file_path_json, delete, ): """Process post data for FurAffinity.""" post_id = str(data.get("id", "")) post_instance, _ = PostModel.objects.get_or_create( post_id=post_id, source_site=source_site_instance ) # Add category if category_instance: post_instance.category.add(category_instance) # Add category to creator if creator_instance: creator_instance.refresh_from_db() creator_instance.categories.add(category_instance) creator_instance.save() # Link creator if creator_instance: post_instance.creator = creator_instance # Set creation date if "date" in data: post_instance.date_created = timezone.make_aware( datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S") ) # Set mature content flag based on rating rating = data.get("rating", "").lower() post_instance.mature = rating in ["mature", "adult"] # Add description (title + description) title = data.get("title", "") description = data.get("description", "") full_description = f"{title}\n\n{description}" if title else description if full_description: self.add_description( description_text=full_description, date_str=data["date"], date_format="%Y-%m-%d %H:%M:%S", owner_instance=post_instance, owner_type="post", file_date=os.path.getmtime(file_path_json), ) # Add tags if "tags" in data: self.add_tags(data["tags"], post_instance) # Add species as a special tag if present if "species" in data and data["species"] not in [ "Unspecified / Any", "Any", ]: species_tags = [s.strip() for s in data["species"].split("/")] self.add_tags(species_tags, post_instance) # Add gender as a special tag if present if "gender" in data and data["gender"] not in ["Unspecified / Any", "Any"]: gender_tags = [g.strip() for g in data["gender"].split("/")] self.add_tags(gender_tags, post_instance) # Add metadata as JSON field if your model supports it metadata = {} for field in ["views", "favorites", "comments", "theme", "fa_category"]: if field in data: metadata[field] = data[field] # If your PostModel has a metadata JSONField, uncomment this # post_instance.metadata = metadata # Import the file file_path = file_path_json.removesuffix(".json") # Check if the file exists, otherwise try to construct from filename and extension if not os.path.exists(file_path) and "filename" in data and "extension" in data: alt_file_path = f"{os.path.dirname(file_path_json)}/{data['filename']}.{data['extension']}" file_instance = self.import_file(alt_file_path, delete) else: file_instance = self.import_file(file_path, delete) if file_instance: post_instance.files.add(file_instance) # Add known image dimensions if available if not file_instance.width and "width" in data: file_instance.width = data.get("width") if not file_instance.height and "height" in data: file_instance.height = data.get("height") if "width" in data or "height" in data: file_instance.save() post_instance.save() class Command(BaseCommand): help = ( "Import data from JSON files in a folder or a single JSON file to the archive" ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.importers = { "twitter": TwitterImporter(self), "furaffinity": FurAffinityImporter(self), } # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", filename="import.log", ) self.logger = logging.getLogger("import_command") def add_arguments(self, parser): parser.add_argument( "path", type=str, help="Path to the folder containing JSON files or a single JSON file", ) parser.add_argument( "--delete", action="store_true", help="Delete imported files" ) parser.add_argument( "--site", type=str, choices=list(self.importers.keys()), help="Only import files for the specified site", ) def handle(self, *args, **kwargs): path = kwargs["path"] delete = kwargs["delete"] site_filter = kwargs.get("site") if os.path.isfile(path): self.process_json_file(path, delete, site_filter) elif os.path.isdir(path): self.process_json_folder(path, delete, site_filter) else: self.stdout.write( self.style.ERROR(f"The path '{path}' is not a valid file or folder.") ) return def process_json_file(self, file_path, delete, site_filter=None): tqdm.write(f"Importing data from: {file_path}") try: with open(file_path, "r") as f: data = json.load(f) category = data.get("category", "") # Skip if site filter is set and doesn't match if site_filter and category != site_filter: tqdm.write( f"Skipping {file_path}, category {category} doesn't match filter {site_filter}" ) return # Check if we have an importer for this category if category in self.importers: self.importers[category].import_data(data, file_path, delete) tqdm.write( self.style.SUCCESS(f"Data imported successfully for {category}.") ) else: tqdm.write( self.style.WARNING(f"No importer found for category: {category}") ) except json.JSONDecodeError: tqdm.write(self.style.ERROR(f"Invalid JSON file: {file_path}")) except Exception as e: tqdm.write(self.style.ERROR(f"Error processing {file_path}: {str(e)}")) def process_json_folder(self, folder_path, delete, site_filter=None): if not os.path.exists(folder_path): tqdm.write(self.style.ERROR(f"The folder '{folder_path}' does not exist.")) return # Count total files tqdm.write("Counting total files...") total_files = sum(len(files) for _, _, files in os.walk(folder_path)) with tqdm( total=total_files, desc="Processing JSON files", dynamic_ncols=True ) as progress_bar: for root, dirs, files in os.walk(folder_path): for file_name in files: progress_bar.update(1) # Increment progress for each file if file_name.endswith(".json"): file_path = os.path.join(root, file_name) self.process_json_file(file_path, delete, site_filter)