Refactor: import data managment command for better modularity

This commit is contained in:
Aroy-Art 2025-03-15 22:53:41 +01:00
parent a9d33bd85c
commit 72f3a9b7b1
Signed by: Aroy
GPG key ID: 583642324A1D2070

View file

@ -1,12 +1,13 @@
from datetime import datetime
import os
import json
import logging
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from django.utils import timezone
from tqdm.auto import tqdm
from PIL import Image as PillowImage
from django.core.management.base import BaseCommand
from apps.files.models import (
@ -35,88 +36,79 @@ from apps.files.tasks import (
)
class Command(BaseCommand):
help = "Import data from JSON files in a folder or a single JSON file to the Twitter archive"
class BaseImporter(ABC):
"""Base abstract class for all site importers."""
def add_arguments(self, parser):
parser.add_argument(
"path",
type=str,
help="Path to the folder containing JSON files or a single JSON file",
)
parser.add_argument(
"--delete", action="store_true", help="Delete imported files"
)
def __init__(self, command_instance):
self.command = command_instance
self.logger = logging.getLogger(f"importer.{self.__class__.__name__}")
def handle(self, *args, **kwargs):
path = kwargs["path"]
delete = kwargs["delete"]
@abstractmethod
def import_data(
self, data: Dict[str, Any], file_path_json: str, delete: bool
) -> None:
"""Import data from JSON file into the database."""
pass
if os.path.isfile(path):
self.process_json_file(path, delete)
elif os.path.isdir(path):
self.process_json_folder(path, delete)
else:
self.stdout.write(
self.style.ERROR(f"The path '{path}' is not a valid file or folder.")
)
return
def log_info(self, message: str) -> None:
"""Log an informational message."""
tqdm.write(message)
self.logger.info(message)
def process_json_file(self, file_path, delete):
# self.stdout.write(self.style.NOTICE(f"Importing data from: {file_path}"))
tqdm.write(f"Importing data from: {file_path}")
def log_success(self, message: str) -> None:
"""Log a success message."""
styled_message = self.command.style.SUCCESS(message)
tqdm.write(styled_message)
self.logger.info(message)
with open(file_path, "r") as f:
data = json.load(f)
def log_error(self, message: str) -> None:
"""Log an error message."""
styled_message = self.command.style.ERROR(message)
tqdm.write(styled_message)
self.logger.error(message)
self.import_data(data, file_path, delete)
def log_warning(self, message: str) -> None:
"""Log a warning message."""
styled_message = self.command.style.WARNING(message)
tqdm.write(styled_message)
self.logger.warning(message)
# self.stdout.write(self.style.SUCCESS('Data imported successfully.'))
tqdm.write(self.style.SUCCESS("Data imported successfully."))
def get_or_create_source_site(self, category: str) -> SourceSiteModel:
"""Get or create a source site model instance."""
source_site_instance, _ = SourceSiteModel.objects.get_or_create(slug=category)
source_site_instance.save()
return source_site_instance
def process_json_folder(self, folder_path, delete):
if not os.path.exists(folder_path):
# self.stdout.write(self.style.ERROR(f"The folder '{folder_path}' does not exist."))
tqdm.write(self.style.ERROR(f"The folder '{folder_path}' does not exist."))
return
# Use os.walk dynamically, and count total files to show progress incrementally
tqdm.write("Counting total files...")
total_files = sum(len(files) for _, _, files in os.walk(folder_path))
with tqdm(
total=total_files, desc="Processing JSON files", dynamic_ncols=True
) as progress_bar:
for root, dirs, files in os.walk(folder_path):
for file_name in files:
progress_bar.update(1) # Increment progress for each file
if file_name.endswith(".json"):
file_path = os.path.join(root, file_name)
self.process_json_file(file_path, delete)
def import_file(self, file_path, model, delete=False):
def import_file(
self, file_path: str, delete: bool = False
) -> Optional[PostFileModel]:
"""
Imports a file if it doesn't already exist in the database and returns the instance.
Import a file if it doesn't already exist in the database and returns the instance.
:param file_path: The path to the file to import.
:param model: The model class to which the file instance should be linked.
:param delete: Whether to delete the imported file after processing.
:return: The file instance.
Args:
file_path: The path to the file to import.
delete: Whether to delete the imported file after processing.
Returns:
The file instance or None if file doesn't exist.
"""
if not os.path.exists(file_path):
self.log_warning(f"File not found: {file_path}")
return None
file_instance = None # Initialize file_instance to None
if os.path.exists(file_path):
file_hash = compute_file_hash_blake3(file_path, logger=self)
try:
file_hash = compute_file_hash_blake3(file_path, logger=self.command)
file_name = os.path.basename(file_path)
_, file_ext = os.path.splitext(file_name)
hash_file_name = file_hash + file_ext
# Get or create file name
file_name_instance, _ = FileNameModel.objects.get_or_create(
filename=file_name
)
# Get or create file
file_instance, created = PostFileModel.objects.get_or_create(
hash_blake3=file_hash
)
@ -124,8 +116,8 @@ class Command(BaseCommand):
if created:
with open(file_path, "rb") as file:
file_instance.file.save(hash_file_name, file)
file_instance.save()
# Add file metadata
file_instance.name.add(file_name_instance)
file_instance.extension = file_ext
file_instance.size = os.path.getsize(file_path)
@ -138,189 +130,396 @@ class Command(BaseCommand):
file_instance.save()
# Process image-specific properties
if file_instance.mimetype.startswith("image/"):
# Add Image blur hash if not existing.
# Add Image blur hash if not existing
if not file_instance.blur_hash:
generate_blur_hash_PostFile.delay(file_instance.id)
# Get image resolution
im = PillowImage.open(file_instance.file)
file_instance.height, file_instance.width = im.size
file_instance.save()
try:
im = PillowImage.open(file_instance.file)
file_instance.height, file_instance.width = im.size
file_instance.save()
except Exception as e:
self.log_error(f"Error getting image dimensions: {str(e)}")
# Process video thumbnails
if file_instance.file_type in ["video", "gif"]:
if not file_instance.thumbnail:
generate_video_thumbnail.delay(file_instance.id)
# Generate MD5 hash if not exists
if not file_instance.hash_md5:
generate_md5_hash_PostFile.delay(file_instance.id)
if created:
tqdm.write(
self.style.SUCCESS(
f"Imported: {file_path} file, new instance created"
)
)
self.log_success(f"Imported: {file_path} file, new instance created")
else:
tqdm.write(
self.style.SUCCESS(f"Imported: {file_path} file, instance updated")
)
self.log_success(f"Imported: {file_path} file, instance updated")
# Delete the imported file if the --delete flag is used
self.delete_imported_file(file_path, delete)
return file_instance
def delete_imported_file(self, file_path, delete=False):
"""
Delete the file if the --delete flag is used
:param delete: Whether to delete the imported file after processing.
"""
if delete:
if os.path.exists(file_path):
if delete and os.path.exists(file_path):
os.remove(file_path)
tqdm.write(self.style.SUCCESS(f"Deleted: {file_path}"))
else:
tqdm.write(self.style.WARNING(f"File not found: {file_path}"))
self.log_success(f"Deleted: {file_path}")
def import_data(self, data, file_path_json, delete):
""" """
return file_instance
# Get source site and create it if it doesn't exist
category = data.get("category")
except Exception as e:
self.log_error(f"Error importing file {file_path}: {str(e)}")
return None
source_site_instance, Null = SourceSiteModel.objects.get_or_create(
slug=category
)
def add_description(
self,
description_text: str,
date_str: str,
date_format: str,
owner_instance,
owner_type: str,
file_date,
) -> None:
"""
Add description to a post or creator.
source_site_instance.save()
if category == "twitter":
if "author" in data.keys():
creator_instance, Null = CreatorModel.objects.get_or_create(
slug=data["author"]["name"], source_site=source_site_instance
)
creator_instance.creator_id = data["author"]["id"]
creator_instance.name = data["author"]["nick"]
if "description" in data["author"].keys():
description_text = data["author"]["description"]
description_hash = compute_string_hash_blake3(
description_text, logger=self
)
description_instance, created = (
DescriptionModel.objects.get_or_create(hash=description_hash)
)
if created:
description_instance.content = description_text
description_instance.save()
# Add to CreatorDescription through model with a custom date_imported
creator_description_instance, created = (
CreatorDescription.objects.get_or_create(
creator=creator_instance, description=description_instance
)
)
creator_description_instance.date_imported = timezone.make_aware(
datetime.fromtimestamp(os.path.getmtime(file_path_json))
)
creator_description_instance.save()
creator_instance.date_created = timezone.make_aware(
datetime.strptime(data["author"]["date"], "%Y-%m-%d %H:%M:%S")
)
creator_instance.save()
post_instance, Null = PostModel.objects.get_or_create(
post_id=data["tweet_id"], source_site=source_site_instance
)
if "subcategory" in data.keys():
category_instance, _ = CategoryModel.objects.get_or_create(
slug=data["subcategory"]
Args:
description_text: The description text to add
date_str: Date string of when the description was created
date_format: Format of the date string
owner_instance: The post or creator instance
owner_type: Either 'post' or 'creator'
file_date: Timestamp of the file for imported date
"""
try:
description_hash = compute_string_hash_blake3(
description_text, logger=self.command
)
if _:
category_instance.name = data["subcategory"].capitalize()
category_instance.save()
creator_instance.refresh_from_db()
creator_instance.categories.add(category_instance)
creator_instance.save()
post_instance.category.add(category_instance)
post_instance.creator = creator_instance
post_instance.date_created = timezone.make_aware(
datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S"),
)
if "sensitive" in data.keys():
if data["sensitive"]:
post_instance.mature = data["sensitive"]
if "content" in data.keys():
description_text = data["content"]
description_hash = compute_string_hash_blake3(description_text, logger=self)
description_instance, created = DescriptionModel.objects.get_or_create(
hash=description_hash
)
description_instance.save()
if created:
description_instance.date_created = timezone.make_aware(
datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S")
)
description_instance.content = description_text
description_instance.date_created = timezone.make_aware(
datetime.strptime(date_str, date_format)
)
description_instance.save()
post_description_instance, created = PostDescription.objects.get_or_create(
post=post_instance, description=description_instance
)
if created:
post_description_instance.date_imported = timezone.make_aware(
datetime.fromtimestamp(os.path.getmtime(file_path_json))
if owner_type == "creator":
relation, created = CreatorDescription.objects.get_or_create(
creator=owner_instance, description=description_instance
)
else: # post
relation, created = PostDescription.objects.get_or_create(
post=owner_instance, description=description_instance
)
post_description_instance.save()
relation.date_imported = timezone.make_aware(
datetime.fromtimestamp(file_date)
)
relation.save()
post_instance.description.add(description_instance)
if owner_type == "post":
owner_instance.description.add(description_instance)
if "hashtags" in data.keys():
for tag in data["hashtags"]:
tag_instance, Null = TagModel.objects.get_or_create(slug=tag)
except Exception as e:
self.log_error(f"Error adding description: {str(e)}")
if tag_instance.name == "":
def add_tags(self, tags_list, post_instance):
"""Add tags to a post."""
for tag in tags_list:
try:
tag_instance, created = TagModel.objects.get_or_create(slug=tag)
if created or not tag_instance.name:
tag_instance.name = tag
tag_instance.save()
tag_instance.save()
post_instance.tags.add(tag_instance)
except Exception as e:
self.log_error(f"Error adding tag '{tag}': {str(e)}")
def ensure_boolean_field(self, value, default=False):
"""Convert potentially null/None values to boolean."""
if value is None:
return default
return bool(value)
class TwitterImporter(BaseImporter):
"""Importer for Twitter data."""
def import_data(
self, data: Dict[str, Any], file_path_json: str, delete: bool
) -> None:
"""Import Twitter data from JSON into the database."""
try:
category = data.get("category", "twitter")
source_site_instance = self.get_or_create_source_site(category)
# Process creator if present
creator_instance = None
if "author" in data:
creator_instance = self._process_creator(
data, source_site_instance, file_path_json
)
# Get subcategory if available
category_instance = None
if "subcategory" in data:
category_instance = self._process_category(data)
# Process the post
self._process_post(
data,
source_site_instance,
creator_instance,
category_instance,
file_path_json,
delete,
)
except Exception as e:
self.log_error(f"Error importing Twitter data: {str(e)}")
def _process_creator(self, data, source_site_instance, file_path_json):
"""Process creator data for Twitter."""
creator_instance, _ = CreatorModel.objects.get_or_create(
slug=data["author"]["name"], source_site=source_site_instance
)
creator_instance.creator_id = data["author"]["id"]
creator_instance.name = data["author"]["nick"]
# Add creator description if available
if "description" in data["author"]:
self.add_description(
description_text=data["author"]["description"],
date_str=data["author"]["date"],
date_format="%Y-%m-%d %H:%M:%S",
owner_instance=creator_instance,
owner_type="creator",
file_date=os.path.getmtime(file_path_json),
)
creator_instance.date_created = timezone.make_aware(
datetime.strptime(data["author"]["date"], "%Y-%m-%d %H:%M:%S")
)
creator_instance.save()
return creator_instance
def _process_category(self, data):
"""Process category data."""
category_instance, created = CategoryModel.objects.get_or_create(
slug=data["subcategory"]
)
if created:
category_instance.name = data["subcategory"].capitalize()
category_instance.save()
return category_instance
def _process_post(
self,
data,
source_site_instance,
creator_instance,
category_instance,
file_path_json,
delete,
):
"""Process post data for Twitter."""
post_instance, _ = PostModel.objects.get_or_create(
post_id=data["tweet_id"],
source_site=source_site_instance,
defaults={
# Set a default for mature to avoid null constraint error
"mature": False
},
)
if category_instance:
if creator_instance:
creator_instance.refresh_from_db()
creator_instance.categories.add(category_instance)
creator_instance.save()
post_instance.category.add(category_instance)
if creator_instance:
post_instance.creator = creator_instance
post_instance.date_created = timezone.make_aware(
datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S")
)
# Set mature flag if available
if "sensitive" in data:
post_instance.mature = self.ensure_boolean_field(data.get("sensitive"))
# Add post description if available
if "content" in data:
self.add_description(
description_text=data["content"],
date_str=data["date"],
date_format="%Y-%m-%d %H:%M:%S",
owner_instance=post_instance,
owner_type="post",
file_date=os.path.getmtime(file_path_json),
)
# Add hashtags if available
if "hashtags" in data:
self.add_tags(data["hashtags"], post_instance)
# Import the file
file_path = file_path_json.removesuffix(".json")
# Handle file import
file_instance = self.import_file(file_path, PostFileModel, delete)
file_instance = self.import_file(file_path, delete)
if file_instance:
post_instance.files.add(file_instance)
if category_instance.slug == "avatar":
creator_instance.refresh_from_db()
creator_instance.avatar = file_instance
creator_instance.save()
# Handle profile images
if category_instance:
if category_instance.slug == "avatar" and creator_instance:
creator_instance.refresh_from_db()
creator_instance.avatar = file_instance
creator_instance.save()
if category_instance.slug == "background":
creator_instance.refresh_from_db()
creator_instance.banner = file_instance
creator_instance.save()
if category_instance.slug == "background" and creator_instance:
creator_instance.refresh_from_db()
creator_instance.banner = file_instance
creator_instance.save()
post_instance.save()
)
)
creator_instance.save()
)
file_path = file_path_json.removesuffix(".json")
if file_instance:
post_instance.files.add(file_instance)
if not file_instance.height and "height" in data:
file_instance.height = data.get("height")
if "width" in data or "height" in data:
post_instance.save()
class Command(BaseCommand):
help = (
"Import data from JSON files in a folder or a single JSON file to the archive"
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.importers = {
"twitter": TwitterImporter(self),
}
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
filename="import.log",
)
self.logger = logging.getLogger("import_command")
def add_arguments(self, parser):
parser.add_argument(
"path",
type=str,
help="Path to the folder containing JSON files or a single JSON file",
)
parser.add_argument(
"--delete", action="store_true", help="Delete imported files"
)
parser.add_argument(
"--site",
type=str,
choices=list(self.importers.keys()),
help="Only import files for the specified site",
)
def handle(self, *args, **kwargs):
path = kwargs["path"]
delete = kwargs["delete"]
site_filter = kwargs.get("site")
if os.path.isfile(path):
self.process_json_file(path, delete, site_filter)
elif os.path.isdir(path):
self.process_json_folder(path, delete, site_filter)
else:
self.stdout.write(
self.style.ERROR(f"The path '{path}' is not a valid file or folder.")
)
return
def process_json_file(self, file_path, delete, site_filter=None):
tqdm.write(f"Importing data from: {file_path}")
try:
with open(file_path, "r") as f:
data = json.load(f)
category = data.get("category", "")
# Skip if site filter is set and doesn't match
if site_filter and category != site_filter:
tqdm.write(
f"Skipping {file_path}, category {category} doesn't match filter {site_filter}"
)
return
# Check if we have an importer for this category
if category in self.importers:
self.importers[category].import_data(data, file_path, delete)
tqdm.write(
self.style.SUCCESS(f"Data imported successfully for {category}.")
)
else:
tqdm.write(
self.style.WARNING(f"No importer found for category: {category}")
)
except json.JSONDecodeError:
tqdm.write(self.style.ERROR(f"Invalid JSON file: {file_path}"))
except Exception as e:
tqdm.write(self.style.ERROR(f"Error processing {file_path}: {str(e)}"))
def process_json_folder(self, folder_path, delete, site_filter=None):
if not os.path.exists(folder_path):
tqdm.write(self.style.ERROR(f"The folder '{folder_path}' does not exist."))
return
# Count total files
tqdm.write("Counting total files...")
total_files = sum(len(files) for _, _, files in os.walk(folder_path))
with tqdm(
total=total_files, desc="Processing JSON files", dynamic_ncols=True
) as progress_bar:
for root, dirs, files in os.walk(folder_path):
for file_name in files:
progress_bar.update(1) # Increment progress for each file
if file_name.endswith(".json"):
file_path = os.path.join(root, file_name)
self.process_json_file(file_path, delete, site_filter)