Gallery-Archivist/backend/apps/archive/management/commands/import_data.py

758 lines
26 KiB
Python

from datetime import datetime
import os
import json
import logging
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from django.utils import timezone
from tqdm.auto import tqdm
from PIL import Image as PillowImage
from django.core.management.base import BaseCommand
from apps.files.models import (
FileNameModel,
PostFileModel,
)
from apps.archive.models import (
SourceSiteModel,
CategoryModel,
CreatorModel,
PostModel,
TitleModel,
CreatorTitle,
PostTitle,
DescriptionModel,
CreatorDescription,
PostDescription,
TagModel,
)
from utils.hash import compute_string_hash_blake3, compute_file_hash_blake3
from utils.files import get_mime_type, categorize_mime_type
from apps.files.tasks import (
generate_blur_hash_PostFile,
generate_md5_hash_PostFile,
generate_video_thumbnail,
generate_pdf_thumbnail,
)
class BaseImporter(ABC):
"""Base abstract class for all site importers."""
def __init__(self, command_instance):
self.command = command_instance
self.logger = logging.getLogger(f"importer.{self.__class__.__name__}")
@abstractmethod
def import_data(
self, data: Dict[str, Any], file_path_json: str, delete: bool
) -> None:
"""Import data from JSON file into the database."""
pass
def log_info(self, message: str) -> None:
"""Log an informational message."""
tqdm.write(message)
self.logger.info(message)
def log_success(self, message: str) -> None:
"""Log a success message."""
styled_message = self.command.style.SUCCESS(message)
tqdm.write(styled_message)
self.logger.info(message)
def log_error(self, message: str) -> None:
"""Log an error message."""
styled_message = self.command.style.ERROR(message)
tqdm.write(styled_message)
self.logger.error(message)
def log_warning(self, message: str) -> None:
"""Log a warning message."""
styled_message = self.command.style.WARNING(message)
tqdm.write(styled_message)
self.logger.warning(message)
def get_or_create_source_site(self, category: str) -> SourceSiteModel:
"""Get or create a source site model instance."""
source_site_instance, _ = SourceSiteModel.objects.get_or_create(slug=category)
source_site_instance.save()
return source_site_instance
def import_file(
self, file_path: str, delete: bool = False
) -> Optional[PostFileModel]:
"""
Import a file if it doesn't already exist in the database and returns the instance.
Args:
file_path: The path to the file to import.
delete: Whether to delete the imported file after processing.
Returns:
The file instance or None if file doesn't exist.
"""
if not os.path.exists(file_path):
self.log_warning(f"File not found: {file_path}")
return None
try:
file_hash = compute_file_hash_blake3(file_path, logger=self.command)
file_name = os.path.basename(file_path)
_, file_ext = os.path.splitext(file_name)
hash_file_name = file_hash + file_ext
# Get or create file name
file_name_instance, _ = FileNameModel.objects.get_or_create(
filename=file_name
)
# Get or create file
file_instance, created = PostFileModel.objects.get_or_create(
hash_blake3=file_hash
)
if created or not file_instance.file:
with open(file_path, "rb") as file:
file_instance.file.save(hash_file_name, file)
# Add file metadata
file_instance.name.add(file_name_instance)
file_instance.extension = file_ext
file_instance.size = os.path.getsize(file_path)
file_mime = get_mime_type(file_path)
file_type = categorize_mime_type(file_mime)
file_instance.file_type = file_type
file_instance.mimetype = file_mime
file_instance.save()
# Process image-specific properties
if file_instance.mimetype.startswith("image/"):
# Add Image blur hash if not existing
if not file_instance.blur_hash:
generate_blur_hash_PostFile.delay(file_instance.id)
# Get image resolution
try:
im = PillowImage.open(file_instance.file)
file_instance.height, file_instance.width = im.size
file_instance.save()
except Exception as e:
self.log_error(f"Error getting image dimensions: {str(e)}")
# Process video thumbnails
if file_instance.file_type in ["video", "gif"]:
if not file_instance.thumbnail:
generate_video_thumbnail.delay(file_instance.id)
# Process PDF thumbnails
if file_instance.file_type in ["pdf"]:
if not file_instance.thumbnail:
generate_pdf_thumbnail.delay(file_instance.id)
# Generate MD5 hash if not exists
if not file_instance.hash_md5:
generate_md5_hash_PostFile.delay(file_instance.id)
if created:
self.log_success(f"Imported: {file_path} file, new instance created")
else:
self.log_success(f"Imported: {file_path} file, instance updated")
# Delete the imported file if the --delete flag is used
if delete and os.path.exists(file_path):
os.remove(file_path)
self.log_success(f"Deleted: {file_path}")
return file_instance
except Exception as e:
self.log_error(f"Error importing file {file_path}: {str(e)}")
return None
def add_title(
self,
title_text: str,
date_str: str,
date_format: str,
owner_instance,
owner_type: str,
file_date,
) -> None:
"""
Add title to a post or creator.
Args:
title_text: The title text to add
owner_instance: The post or creator instance
owner_type: Either 'post' or 'creator'
"""
try:
title_hash = compute_string_hash_blake3(title_text, logger=self.command)
title_instance, created = TitleModel.objects.get_or_create(hash=title_hash)
if created:
title_instance.content = title_text
title_instance.date_created = timezone.make_aware(
datetime.strptime(date_str, date_format)
)
title_instance.save()
if owner_type == "creator":
relation, created = CreatorTitle.objects.get_or_create(
creator=owner_instance, title=title_instance
)
else: # post
relation, created = PostTitle.objects.get_or_create(
post=owner_instance, title=title_instance
)
relation.date_imported = timezone.make_aware(
datetime.fromtimestamp(file_date)
)
relation.save()
if owner_type == "post":
owner_instance.title.add(title_instance)
except Exception as e:
self.log_error(f"Error adding description: {str(e)}")
def add_description(
self,
description_text: str,
date_str: str,
date_format: str,
owner_instance,
owner_type: str,
file_date,
) -> None:
"""
Add description to a post or creator.
Args:
description_text: The description text to add
date_str: Date string of when the description was created
date_format: Format of the date string
owner_instance: The post or creator instance
owner_type: Either 'post' or 'creator'
file_date: Timestamp of the file for imported date
"""
try:
description_hash = compute_string_hash_blake3(
description_text, logger=self.command
)
description_instance, created = DescriptionModel.objects.get_or_create(
hash=description_hash
)
if created:
description_instance.content = description_text
description_instance.date_created = timezone.make_aware(
datetime.strptime(date_str, date_format)
)
description_instance.save()
if owner_type == "creator":
relation, created = CreatorDescription.objects.get_or_create(
creator=owner_instance, description=description_instance
)
else: # post
relation, created = PostDescription.objects.get_or_create(
post=owner_instance, description=description_instance
)
relation.date_imported = timezone.make_aware(
datetime.fromtimestamp(file_date)
)
relation.save()
if owner_type == "post":
owner_instance.description.add(description_instance)
except Exception as e:
self.log_error(f"Error adding description: {str(e)}")
def add_tags(self, tags_list, post_instance):
"""Add tags to a post."""
for tag in tags_list:
try:
tag_instance, created = TagModel.objects.get_or_create(slug=tag)
if created or not tag_instance.name:
tag_instance.name = tag
tag_instance.save()
post_instance.tags.add(tag_instance)
except Exception as e:
self.log_error(f"Error adding tag '{tag}': {str(e)}")
def ensure_boolean_field(self, value, default=False):
"""Convert potentially null/None values to boolean."""
if value is None:
return default
return bool(value)
class TwitterImporter(BaseImporter):
"""Importer for Twitter data."""
def import_data(
self, data: Dict[str, Any], file_path_json: str, delete: bool
) -> None:
"""Import Twitter data from JSON into the database."""
try:
category = data.get("category", "twitter")
source_site_instance = self.get_or_create_source_site(category)
# Process creator if present
creator_instance = None
if "author" in data:
creator_instance = self._process_creator(
data, source_site_instance, file_path_json
)
# Get subcategory if available
category_instance = None
if "subcategory" in data:
category_instance = self._process_category(data)
# Process the post
self._process_post(
data,
source_site_instance,
creator_instance,
category_instance,
file_path_json,
delete,
)
except Exception as e:
self.log_error(f"Error importing Twitter data: {str(e)}")
def _process_creator(self, data, source_site_instance, file_path_json):
"""Process creator data for Twitter."""
creator_instance, _ = CreatorModel.objects.get_or_create(
slug=data["author"]["name"], source_site=source_site_instance
)
creator_instance.creator_id = data["author"]["id"]
creator_instance.name = data["author"]["nick"]
# Add creator description if available
if "description" in data["author"]:
self.add_description(
description_text=data["author"]["description"],
date_str=data["author"]["date"],
date_format="%Y-%m-%d %H:%M:%S",
owner_instance=creator_instance,
owner_type="creator",
file_date=os.path.getmtime(file_path_json),
)
creator_instance.date_created = timezone.make_aware(
datetime.strptime(data["author"]["date"], "%Y-%m-%d %H:%M:%S")
)
creator_instance.save()
return creator_instance
def _process_category(self, data):
"""Process category data."""
category_instance, created = CategoryModel.objects.get_or_create(
slug=data["subcategory"]
)
if created:
category_instance.name = data["subcategory"].capitalize()
category_instance.save()
return category_instance
def _process_post(
self,
data,
source_site_instance,
creator_instance,
category_instance,
file_path_json,
delete,
):
"""Process post data for Twitter."""
post_instance, _ = PostModel.objects.get_or_create(
post_id=data["tweet_id"],
source_site=source_site_instance,
defaults={
# Set a default for mature to avoid null constraint error
"mature": False
},
)
if category_instance:
if creator_instance:
creator_instance.refresh_from_db()
creator_instance.categories.add(category_instance)
creator_instance.save()
post_instance.category.add(category_instance)
# Link creator
if creator_instance:
post_instance.creator = creator_instance
post_instance.save()
post_instance.date_created = timezone.make_aware(
datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S")
)
# Set mature flag if available
if "sensitive" in data:
post_instance.mature = self.ensure_boolean_field(data.get("sensitive"))
# Add post description if available
if "content" in data:
self.add_description(
description_text=data["content"],
date_str=data["date"],
date_format="%Y-%m-%d %H:%M:%S",
owner_instance=post_instance,
owner_type="post",
file_date=os.path.getmtime(file_path_json),
)
# Add hashtags if available
if "hashtags" in data:
self.add_tags(data["hashtags"], post_instance)
# Import the file
file_path = file_path_json.removesuffix(".json")
file_instance = self.import_file(file_path, delete)
if file_instance:
post_instance.files.add(file_instance)
# Handle profile images
if category_instance:
if category_instance.slug == "avatar" and creator_instance:
creator_instance.refresh_from_db()
creator_instance.avatar = file_instance
creator_instance.save()
if category_instance.slug == "background" and creator_instance:
creator_instance.refresh_from_db()
creator_instance.banner = file_instance
creator_instance.save()
post_instance.save()
class FurAffinityImporter(BaseImporter):
"""Importer for FurAffinity data."""
def import_data(
self, data: Dict[str, Any], file_path_json: str, delete: bool
) -> None:
"""Import FurAffinity data from JSON into the database."""
try:
category = data.get("category", "furaffinity")
source_site_instance = self.get_or_create_source_site(category)
# Process creator
creator_instance = self._process_creator(data, source_site_instance)
# Process category
category_instance = self._process_category(data)
# Process post
self._process_post(
data,
source_site_instance,
creator_instance,
category_instance,
file_path_json,
delete,
)
except Exception as e:
self.log_error(f"Error importing FurAffinity data: {str(e)}")
def _process_creator(self, data, source_site_instance):
"""Process creator data for FurAffinity."""
# Use artist if available, otherwise fall back to user field
artist = data.get("artist", "")
artist_url = data.get("artist_url", artist.lower())
if not artist_url and "user" in data:
artist_url = data.get("user", "")
creator_instance, _ = CreatorModel.objects.get_or_create(
slug=artist_url, source_site=source_site_instance
)
if artist:
creator_instance.name = artist
else:
creator_instance.name = artist_url
creator_instance.creator_id = artist_url
# We don't have creator creation date in FurAffinity data
# Using post date as an approximation
if "date" in data:
creator_instance.date_created = timezone.make_aware(
datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S")
)
creator_instance.save()
return creator_instance
def _process_category(self, data):
"""Process category data for FurAffinity."""
subcategory = data.get("subcategory", "gallery")
category_instance, created = CategoryModel.objects.get_or_create(
slug=subcategory
)
if created:
category_instance.name = subcategory.capitalize()
# Process FA-specific categories
if "fa_category" in data:
fa_category = data["fa_category"]
fa_category_instance, _ = CategoryModel.objects.get_or_create(
slug=fa_category.lower().replace(" ", "_")
)
fa_category_instance.name = fa_category
fa_category_instance.save()
category_instance.save()
return category_instance
def _process_post(
self,
data,
source_site_instance,
creator_instance,
category_instance,
file_path_json,
delete,
):
"""Process post data for FurAffinity."""
post_id = str(data.get("id", ""))
post_instance, _ = PostModel.objects.get_or_create(
post_id=post_id, source_site=source_site_instance
)
# Add category
if category_instance:
post_instance.category.add(category_instance)
# Add category to creator
if creator_instance:
creator_instance.refresh_from_db()
creator_instance.categories.add(category_instance)
creator_instance.save()
# Link creator
if creator_instance:
post_instance.creator = creator_instance
post_instance.save()
# Set creation date
if "date" in data:
post_instance.date_created = timezone.make_aware(
datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S")
)
# Set mature content flag based on rating
rating = data.get("rating", "").lower()
post_instance.mature = rating in ["mature", "adult"]
# Add title
title_text = data.get("title", "")
if title_text:
self.add_title(
title_text=title_text,
date_str=data["date"],
date_format="%Y-%m-%d %H:%M:%S",
owner_instance=post_instance,
owner_type="post",
file_date=os.path.getmtime(file_path_json),
)
# Add description
description_text = data.get("description", "")
if description_text:
self.add_description(
description_text=description_text,
date_str=data["date"],
date_format="%Y-%m-%d %H:%M:%S",
owner_instance=post_instance,
owner_type="post",
file_date=os.path.getmtime(file_path_json),
)
# Add tags
if "tags" in data:
self.add_tags(data["tags"], post_instance)
# Add species as a special tag if present
if "species" in data and data["species"] not in [
"Unspecified / Any",
"Any",
]:
species_tags = [s.strip() for s in data["species"].split("/")]
self.add_tags(species_tags, post_instance)
# Add gender as a special tag if present
if "gender" in data and data["gender"] not in ["Unspecified / Any", "Any"]:
gender_tags = [g.strip() for g in data["gender"].split("/")]
self.add_tags(gender_tags, post_instance)
# Add metadata as JSON field if your model supports it
metadata = {}
for field in ["views", "favorites", "comments", "theme", "fa_category"]:
if field in data:
metadata[field] = data[field]
# If your PostModel has a metadata JSONField, uncomment this
# post_instance.metadata = metadata
# Import the file
file_path = file_path_json.removesuffix(".json")
# Check if the file exists, otherwise try to construct from filename and extension
if not os.path.exists(file_path) and "filename" in data and "extension" in data:
alt_file_path = f"{os.path.dirname(file_path_json)}/{data['filename']}.{data['extension']}"
file_instance = self.import_file(alt_file_path, delete)
else:
file_instance = self.import_file(file_path, delete)
if file_instance:
post_instance.files.add(file_instance)
# Add known image dimensions if available
if not file_instance.width and "width" in data:
file_instance.width = data.get("width")
if not file_instance.height and "height" in data:
file_instance.height = data.get("height")
if "width" in data or "height" in data:
file_instance.save()
post_instance.save()
class Command(BaseCommand):
help = (
"Import data from JSON files in a folder or a single JSON file to the archive"
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.importers = {
"twitter": TwitterImporter(self),
"furaffinity": FurAffinityImporter(self),
}
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
filename="import.log",
)
self.logger = logging.getLogger("import_command")
def add_arguments(self, parser):
parser.add_argument(
"path",
type=str,
help="Path to the folder containing JSON files or a single JSON file",
)
parser.add_argument(
"--delete", action="store_true", help="Delete imported files"
)
parser.add_argument(
"--site",
type=str,
choices=list(self.importers.keys()),
help="Only import files for the specified site",
)
def handle(self, *args, **kwargs):
path = kwargs["path"]
delete = kwargs["delete"]
site_filter = kwargs.get("site")
if os.path.isfile(path):
self.process_json_file(path, delete, site_filter)
elif os.path.isdir(path):
self.process_json_folder(path, delete, site_filter)
else:
self.stdout.write(
self.style.ERROR(f"The path '{path}' is not a valid file or folder.")
)
return
def process_json_file(self, file_path, delete, site_filter=None):
tqdm.write(f"Importing data from: {file_path}")
try:
with open(file_path, "r") as f:
data = json.load(f)
category = data.get("category", "")
# Skip if site filter is set and doesn't match
if site_filter and category != site_filter:
tqdm.write(
f"Skipping {file_path}, category {category} doesn't match filter {site_filter}"
)
return
# Check if we have an importer for this category
if category in self.importers:
self.importers[category].import_data(data, file_path, delete)
tqdm.write(
self.style.SUCCESS(f"Data imported successfully for {category}.")
)
else:
tqdm.write(
self.style.WARNING(f"No importer found for category: {category}")
)
except json.JSONDecodeError:
tqdm.write(self.style.ERROR(f"Invalid JSON file: {file_path}"))
except Exception as e:
tqdm.write(self.style.ERROR(f"Error processing {file_path}: {str(e)}"))
def process_json_folder(self, folder_path, delete, site_filter=None):
if not os.path.exists(folder_path):
tqdm.write(self.style.ERROR(f"The folder '{folder_path}' does not exist."))
return
# Count total files
tqdm.write("Counting total files...")
total_files = sum(len(files) for _, _, files in os.walk(folder_path))
with tqdm(
total=total_files, desc="Processing JSON files", dynamic_ncols=True
) as progress_bar:
for root, dirs, files in os.walk(folder_path):
for file_name in files:
progress_bar.update(1) # Increment progress for each file
if file_name.endswith(".json"):
file_path = os.path.join(root, file_name)
self.process_json_file(file_path, delete, site_filter)