from datetime import datetime import os import json from django.utils import timezone from tqdm.auto import tqdm from PIL import Image as PillowImage from django.core.management.base import BaseCommand from apps.files.models import ( FileNameModel, PostFileModel, ) from apps.archive.models import ( SourceSiteModel, CategoryModel, CreatorModel, PostModel, DescriptionModel, CreatorDescription, PostDescription, TagModel, ) from utils.hash import compute_string_hash_blake3, compute_file_hash_blake3 from utils.files import get_mime_type, categorize_mime_type from apps.files.tasks import ( generate_blur_hash_PostFile, generate_md5_hash_PostFile, generate_video_thumbnail, ) class Command(BaseCommand): help = "Import data from JSON files in a folder or a single JSON file to the Twitter archive" def add_arguments(self, parser): parser.add_argument( "path", type=str, help="Path to the folder containing JSON files or a single JSON file", ) parser.add_argument( "--delete", action="store_true", help="Delete imported files" ) def handle(self, *args, **kwargs): path = kwargs["path"] delete = kwargs["delete"] if os.path.isfile(path): self.process_json_file(path, delete) elif os.path.isdir(path): self.process_json_folder(path, delete) else: self.stdout.write( self.style.ERROR(f"The path '{path}' is not a valid file or folder.") ) return def process_json_file(self, file_path, delete): # self.stdout.write(self.style.NOTICE(f"Importing data from: {file_path}")) tqdm.write(f"Importing data from: {file_path}") with open(file_path, "r") as f: data = json.load(f) self.import_data(data, file_path, delete) # self.stdout.write(self.style.SUCCESS('Data imported successfully.')) tqdm.write(self.style.SUCCESS("Data imported successfully.")) def process_json_folder(self, folder_path, delete): if not os.path.exists(folder_path): # self.stdout.write(self.style.ERROR(f"The folder '{folder_path}' does not exist.")) tqdm.write(self.style.ERROR(f"The folder '{folder_path}' does not exist.")) return # Use os.walk dynamically, and count total files to show progress incrementally tqdm.write("Counting total files...") total_files = sum(len(files) for _, _, files in os.walk(folder_path)) with tqdm( total=total_files, desc="Processing JSON files", dynamic_ncols=True ) as progress_bar: for root, dirs, files in os.walk(folder_path): for file_name in files: progress_bar.update(1) # Increment progress for each file if file_name.endswith(".json"): file_path = os.path.join(root, file_name) self.process_json_file(file_path, delete) def import_file(self, file_path, model, delete=False): """ Imports a file if it doesn't already exist in the database and returns the instance. :param file_path: The path to the file to import. :param model: The model class to which the file instance should be linked. :param delete: Whether to delete the imported file after processing. :return: The file instance. """ file_instance = None # Initialize file_instance to None if os.path.exists(file_path): file_hash = compute_file_hash_blake3(file_path, logger=self) file_name = os.path.basename(file_path) _, file_ext = os.path.splitext(file_name) hash_file_name = file_hash + file_ext file_name_instance, _ = FileNameModel.objects.get_or_create( filename=file_name ) file_instance, created = PostFileModel.objects.get_or_create( hash_blake3=file_hash ) if created: with open(file_path, "rb") as file: file_instance.file.save(hash_file_name, file) file_instance.save() file_instance.name.add(file_name_instance) file_instance.extension = file_ext file_instance.size = os.path.getsize(file_path) file_mime = get_mime_type(file_path) file_type = categorize_mime_type(file_mime) file_instance.file_type = file_type file_instance.mimetype = file_mime file_instance.save() if file_instance.mimetype.startswith("image/"): # Add Image blur hash if not existing. if not file_instance.blur_hash: generate_blur_hash_PostFile.delay(file_instance.id) # Get image resolution im = PillowImage.open(file_instance.file) file_instance.height, file_instance.width = im.size file_instance.save() if file_instance.file_type in ["video", "gif"]: if not file_instance.thumbnail: generate_video_thumbnail.delay(file_instance.id) if not file_instance.hash_md5: generate_md5_hash_PostFile.delay(file_instance.id) if created: tqdm.write( self.style.SUCCESS( f"Imported: {file_path} file, new instance created" ) ) else: tqdm.write( self.style.SUCCESS(f"Imported: {file_path} file, instance updated") ) # Delete the imported file if the --delete flag is used self.delete_imported_file(file_path, delete) return file_instance def delete_imported_file(self, file_path, delete=False): """ Delete the file if the --delete flag is used :param delete: Whether to delete the imported file after processing. """ if delete: if os.path.exists(file_path): os.remove(file_path) tqdm.write(self.style.SUCCESS(f"Deleted: {file_path}")) else: tqdm.write(self.style.WARNING(f"File not found: {file_path}")) def import_data(self, data, file_path_json, delete): """ """ # Get source site and create it if it doesn't exist category = data.get("category") source_site_instance, Null = SourceSiteModel.objects.get_or_create( slug=category ) source_site_instance.save() if category == "twitter": if "author" in data.keys(): creator_instance, Null = CreatorModel.objects.get_or_create( slug=data["author"]["name"], source_site=source_site_instance ) creator_instance.creator_id = data["author"]["id"] creator_instance.name = data["author"]["nick"] if "description" in data["author"].keys(): description_text = data["author"]["description"] description_hash = compute_string_hash_blake3( description_text, logger=self ) description_instance, created = ( DescriptionModel.objects.get_or_create(hash=description_hash) ) if created: description_instance.content = description_text description_instance.save() # Add to CreatorDescription through model with a custom date_imported creator_description_instance, created = ( CreatorDescription.objects.get_or_create( creator=creator_instance, description=description_instance ) ) creator_description_instance.date_imported = timezone.make_aware( datetime.fromtimestamp(os.path.getmtime(file_path_json)) ) creator_description_instance.save() creator_instance.date_created = timezone.make_aware( datetime.strptime(data["author"]["date"], "%Y-%m-%d %H:%M:%S") ) creator_instance.save() post_instance, Null = PostModel.objects.get_or_create( post_id=data["tweet_id"], source_site=source_site_instance ) if "subcategory" in data.keys(): category_instance, _ = CategoryModel.objects.get_or_create( slug=data["subcategory"] ) if _: category_instance.name = data["subcategory"].capitalize() category_instance.save() creator_instance.refresh_from_db() creator_instance.categories.add(category_instance) creator_instance.save() post_instance.category.add(category_instance) post_instance.creator = creator_instance post_instance.date_created = timezone.make_aware( datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S"), ) if "sensitive" in data.keys(): if data["sensitive"]: post_instance.mature = data["sensitive"] if "content" in data.keys(): description_text = data["content"] description_hash = compute_string_hash_blake3(description_text, logger=self) description_instance, created = DescriptionModel.objects.get_or_create( hash=description_hash ) description_instance.save() if created: description_instance.date_created = timezone.make_aware( datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S") ) description_instance.content = description_text description_instance.save() post_description_instance, created = PostDescription.objects.get_or_create( post=post_instance, description=description_instance ) if created: post_description_instance.date_imported = timezone.make_aware( datetime.fromtimestamp(os.path.getmtime(file_path_json)) ) post_description_instance.save() post_instance.description.add(description_instance) if "hashtags" in data.keys(): for tag in data["hashtags"]: tag_instance, Null = TagModel.objects.get_or_create(slug=tag) if tag_instance.name == "": tag_instance.name = tag tag_instance.save() post_instance.tags.add(tag_instance) file_path = file_path_json.removesuffix(".json") # Handle file import file_instance = self.import_file(file_path, PostFileModel, delete) if file_instance: post_instance.files.add(file_instance) if category_instance.slug == "avatar": creator_instance.refresh_from_db() creator_instance.avatar = file_instance creator_instance.save() if category_instance.slug == "background": creator_instance.refresh_from_db() creator_instance.banner = file_instance creator_instance.save() post_instance.save()