Gallery-Archivist/backend/apps/archive/management/commands/import_data.py

326 lines
11 KiB
Python

from datetime import datetime
import os
import json
from django.utils import timezone
from tqdm.auto import tqdm
from PIL import Image as PillowImage
from django.core.management.base import BaseCommand
from apps.files.models import (
FileNameModel,
PostFileModel,
)
from apps.archive.models import (
SourceSiteModel,
CategoryModel,
CreatorModel,
PostModel,
DescriptionModel,
CreatorDescription,
PostDescription,
TagModel,
)
from utils.hash import compute_string_hash_blake3, compute_file_hash_blake3
from utils.files import get_mime_type, categorize_mime_type
from apps.files.tasks import (
generate_blur_hash_PostFile,
generate_md5_hash_PostFile,
generate_video_thumbnail,
)
class Command(BaseCommand):
help = "Import data from JSON files in a folder or a single JSON file to the Twitter archive"
def add_arguments(self, parser):
parser.add_argument(
"path",
type=str,
help="Path to the folder containing JSON files or a single JSON file",
)
parser.add_argument(
"--delete", action="store_true", help="Delete imported files"
)
def handle(self, *args, **kwargs):
path = kwargs["path"]
delete = kwargs["delete"]
if os.path.isfile(path):
self.process_json_file(path, delete)
elif os.path.isdir(path):
self.process_json_folder(path, delete)
else:
self.stdout.write(
self.style.ERROR(f"The path '{path}' is not a valid file or folder.")
)
return
def process_json_file(self, file_path, delete):
# self.stdout.write(self.style.NOTICE(f"Importing data from: {file_path}"))
tqdm.write(f"Importing data from: {file_path}")
with open(file_path, "r") as f:
data = json.load(f)
self.import_data(data, file_path, delete)
# self.stdout.write(self.style.SUCCESS('Data imported successfully.'))
tqdm.write(self.style.SUCCESS("Data imported successfully."))
def process_json_folder(self, folder_path, delete):
if not os.path.exists(folder_path):
# self.stdout.write(self.style.ERROR(f"The folder '{folder_path}' does not exist."))
tqdm.write(self.style.ERROR(f"The folder '{folder_path}' does not exist."))
return
# Use os.walk dynamically, and count total files to show progress incrementally
tqdm.write("Counting total files...")
total_files = sum(len(files) for _, _, files in os.walk(folder_path))
with tqdm(
total=total_files, desc="Processing JSON files", dynamic_ncols=True
) as progress_bar:
for root, dirs, files in os.walk(folder_path):
for file_name in files:
progress_bar.update(1) # Increment progress for each file
if file_name.endswith(".json"):
file_path = os.path.join(root, file_name)
self.process_json_file(file_path, delete)
def import_file(self, file_path, model, delete=False):
"""
Imports a file if it doesn't already exist in the database and returns the instance.
:param file_path: The path to the file to import.
:param model: The model class to which the file instance should be linked.
:param delete: Whether to delete the imported file after processing.
:return: The file instance.
"""
file_instance = None # Initialize file_instance to None
if os.path.exists(file_path):
file_hash = compute_file_hash_blake3(file_path, logger=self)
file_name = os.path.basename(file_path)
_, file_ext = os.path.splitext(file_name)
hash_file_name = file_hash + file_ext
file_name_instance, _ = FileNameModel.objects.get_or_create(
filename=file_name
)
file_instance, created = PostFileModel.objects.get_or_create(
hash_blake3=file_hash
)
if created:
with open(file_path, "rb") as file:
file_instance.file.save(hash_file_name, file)
file_instance.save()
file_instance.name.add(file_name_instance)
file_instance.extension = file_ext
file_instance.size = os.path.getsize(file_path)
file_mime = get_mime_type(file_path)
file_type = categorize_mime_type(file_mime)
file_instance.file_type = file_type
file_instance.mimetype = file_mime
file_instance.save()
if file_instance.mimetype.startswith("image/"):
# Add Image blur hash if not existing.
if not file_instance.blur_hash:
generate_blur_hash_PostFile.delay(file_instance.id)
# Get image resolution
im = PillowImage.open(file_instance.file)
file_instance.height, file_instance.width = im.size
file_instance.save()
if file_instance.file_type in ["video", "gif"]:
if not file_instance.thumbnail:
generate_video_thumbnail.delay(file_instance.id)
if not file_instance.hash_md5:
generate_md5_hash_PostFile.delay(file_instance.id)
if created:
tqdm.write(
self.style.SUCCESS(
f"Imported: {file_path} file, new instance created"
)
)
else:
tqdm.write(
self.style.SUCCESS(f"Imported: {file_path} file, instance updated")
)
# Delete the imported file if the --delete flag is used
self.delete_imported_file(file_path, delete)
return file_instance
def delete_imported_file(self, file_path, delete=False):
"""
Delete the file if the --delete flag is used
:param delete: Whether to delete the imported file after processing.
"""
if delete:
if os.path.exists(file_path):
os.remove(file_path)
tqdm.write(self.style.SUCCESS(f"Deleted: {file_path}"))
else:
tqdm.write(self.style.WARNING(f"File not found: {file_path}"))
def import_data(self, data, file_path_json, delete):
""" """
# Get source site and create it if it doesn't exist
category = data.get("category")
source_site_instance, Null = SourceSiteModel.objects.get_or_create(
slug=category
)
source_site_instance.save()
if category == "twitter":
if "author" in data.keys():
creator_instance, Null = CreatorModel.objects.get_or_create(
slug=data["author"]["name"], source_site=source_site_instance
)
creator_instance.creator_id = data["author"]["id"]
creator_instance.name = data["author"]["nick"]
if "description" in data["author"].keys():
description_text = data["author"]["description"]
description_hash = compute_string_hash_blake3(
description_text, logger=self
)
description_instance, created = (
DescriptionModel.objects.get_or_create(hash=description_hash)
)
if created:
description_instance.content = description_text
description_instance.save()
# Add to CreatorDescription through model with a custom date_imported
creator_description_instance, created = (
CreatorDescription.objects.get_or_create(
creator=creator_instance, description=description_instance
)
)
creator_description_instance.date_imported = timezone.make_aware(
datetime.fromtimestamp(os.path.getmtime(file_path_json))
)
creator_description_instance.save()
creator_instance.date_created = timezone.make_aware(
datetime.strptime(data["author"]["date"], "%Y-%m-%d %H:%M:%S")
)
creator_instance.save()
post_instance, Null = PostModel.objects.get_or_create(
post_id=data["tweet_id"], source_site=source_site_instance
)
if "subcategory" in data.keys():
category_instance, _ = CategoryModel.objects.get_or_create(
slug=data["subcategory"]
)
if _:
category_instance.name = data["subcategory"].capitalize()
category_instance.save()
creator_instance.refresh_from_db()
creator_instance.categories.add(category_instance)
creator_instance.save()
post_instance.category.add(category_instance)
post_instance.creator = creator_instance
post_instance.date_created = timezone.make_aware(
datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S"),
)
if "sensitive" in data.keys():
if data["sensitive"]:
post_instance.mature = data["sensitive"]
if "content" in data.keys():
description_text = data["content"]
description_hash = compute_string_hash_blake3(description_text, logger=self)
description_instance, created = DescriptionModel.objects.get_or_create(
hash=description_hash
)
description_instance.save()
if created:
description_instance.date_created = timezone.make_aware(
datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S")
)
description_instance.content = description_text
description_instance.save()
post_description_instance, created = PostDescription.objects.get_or_create(
post=post_instance, description=description_instance
)
if created:
post_description_instance.date_imported = timezone.make_aware(
datetime.fromtimestamp(os.path.getmtime(file_path_json))
)
post_description_instance.save()
post_instance.description.add(description_instance)
if "hashtags" in data.keys():
for tag in data["hashtags"]:
tag_instance, Null = TagModel.objects.get_or_create(slug=tag)
if tag_instance.name == "":
tag_instance.name = tag
tag_instance.save()
post_instance.tags.add(tag_instance)
file_path = file_path_json.removesuffix(".json")
# Handle file import
file_instance = self.import_file(file_path, PostFileModel, delete)
if file_instance:
post_instance.files.add(file_instance)
if category_instance.slug == "avatar":
creator_instance.refresh_from_db()
creator_instance.avatar = file_instance
creator_instance.save()
if category_instance.slug == "background":
creator_instance.refresh_from_db()
creator_instance.banner = file_instance
creator_instance.save()
post_instance.save()