Add: test import_data management command

This commit is contained in:
Aroy-Art 2024-09-10 22:03:27 +02:00
parent 680c60f190
commit 19c2a48b1a
Signed by: Aroy
GPG key ID: DB9689E9391DD156

View file

@ -0,0 +1,559 @@
# /management/commands/import_data.py
import os
import json
import requests
from blake3 import blake3
from tqdm.auto import tqdm
from PIL import Image
from datetime import datetime
from django.core.management.base import BaseCommand
from django.core.files.base import ContentFile
from django.utils.text import slugify
from django.utils import timezone
from django.core.exceptions import ObjectDoesNotExist
from django.contrib.contenttypes.models import ContentType
from apps.files.models import User_Profile_Images, User_Banner_Images, Submission_File, Metadata_Files
from apps.sites.models import Category, Submissions, Users, Tags
from apps.sites.furaffinity.models import FA_Submission, FA_Tags, FA_User, FA_Species, FA_Gender, FA_Mature
from apps.sites.twitter.models import Twitter_Submissions, Twitter_Users, Twitter_Tags
from utils.files import get_mime_type
from utils.strings import get_urls
class Command(BaseCommand):
help = 'Import data from JSON files in a folder or a single JSON file to the Twitter archive'
def add_arguments(self, parser):
parser.add_argument('path', type=str, help='Path to the folder containing JSON files or a single JSON file')
parser.add_argument('--delete', action='store_true', help='Delete imported files')
def handle(self, *args, **kwargs):
path = kwargs['path']
delete = kwargs['delete']
if os.path.isfile(path):
self.process_json_file(path, delete)
elif os.path.isdir(path):
self.process_json_folder(path, delete)
else:
self.stdout.write(self.style.ERROR(f"The path '{path}' is not a valid file or folder."))
return
def process_json_file(self, file_path, delete):
#self.stdout.write(self.style.NOTICE(f"Importing data from: {file_path}"))
tqdm.write(f"Importing data from: {file_path}")
with open(file_path) as f:
data = json.load(f)
self.import_data(data, file_path, delete)
tqdm.write(self.style.SUCCESS('Data imported successfully.'))
def process_json_folder(self, folder_path, delete):
if not os.path.exists(folder_path):
#self.stdout.write(self.style.ERROR(f"The folder '{folder_path}' does not exist."))
tqdm.write(self.style.ERROR(f"The folder '{folder_path}' does not exist."))
return
for root, dirs, files in tqdm(os.walk(folder_path), dynamic_ncols=True):
for file_name in files:
if file_name.endswith('.json'):
file_path = os.path.join(root, file_name)
self.process_json_file(file_path, delete)
def compute_file_hash(self, file_path):
""" Compute BLAKE3 hash of the file """
try:
hasher = blake3()
with open(file_path, 'rb') as f:
while chunk := f.read(65536):
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
tqdm.write(self.style.WARNING(f"Error computing file hash: {e}"))
return None
def compute_string_hash(self, string):
""" Compute BLAKE3 hash of the string """
try:
hasher = blake3()
hasher.update(string.encode())
return hasher.hexdigest()
except Exception as e:
tqdm.write(self.style.WARNING(f"Error computing string hash: {e}"))
return None
def import_file(self, file_path, model, delete=False):
"""
Imports a file if it doesn't already exist in the database and returns the instance.
:param file_path: The path to the file to import.
:param model: The model class to which the file instance should be linked.
:param delete: Whether to delete the imported file after processing.
:return: The file instance.
"""
file_instance = None # Initialize file_instance to None
if os.path.exists(file_path):
file_hash = self.compute_file_hash(file_path)
file_name = os.path.basename(file_path)
Null, file_ext = os.path.splitext(file_name)
hash_file_name = file_hash + file_ext
try:
file_instance = model.objects.get(file_hash=file_hash)
file_instance.file_ext = file_ext
file_instance.size = os.path.getsize(file_path)
file_instance.file_mime = get_mime_type(file_path)
if file_instance.file_mime.startswith("image/"):
im = Image.open(file_instance.file)
file_instance.image_height, file_instance.image_width = im.size
else:
file_instance.image_height = None
file_instance.image_width = None
file_instance.save()
tqdm.write(self.style.NOTICE(f"Skipping: {file_path} file, already imported"))
except model.DoesNotExist:
# If the file doesn't exist, create a new file instance
with open(file_path, 'rb') as file:
file_instance = model()
file_instance.file_hash = file_hash
file_instance.file.save(hash_file_name, file)
file_instance.file_ext = file_ext
file_instance.file_mime = get_mime_type(file_path)
file_instance.size = os.path.getsize(file_path)
if file_instance.file_mime.startswith("image/"):
im = Image.open(file_instance.file)
file_instance.image_height, file_instance.image_width = im.size
else:
file_instance.image_height = None
file_instance.image_width = None
file_instance.file_name = file_name
file_instance.save()
tqdm.write(self.style.NOTICE(f"Import file: {file_path}"))
if delete:
self.delete_imported_file(file_path)
return file_instance
def delete_imported_file(self, file_path, delete=False):
"""
Delete the file if the --delete flag is used
:param delete: Whether to delete the imported file after processing.
"""
if delete:
if os.path.exists(file_path):
os.remove(file_path)
tqdm.write(self.style.SUCCESS(f"Deleted: {file_path}"))
else:
tqdm.write(self.style.WARNING(f"File not found: {file_path}"))
def import_data(self, data, json_file_path, delete):
category = data['category']
if category == "twitter":
self.import_from_twitter(data, json_file_path, delete)
elif category == "furaffinity":
self.import_from_furaffinity(data, json_file_path, delete)
else:
tqdm.write(f"Skipping '{category}' not implemented")
def import_twitter_user(self, data, file_path, category, delete=False):
"""
Import a Twitter user from the provided data into the database.
Parameters:
data (dict): The data containing information about the Twitter user.
file_path (str): The file path for importing user images.
delete (bool): Flag indicating whether to delete user images after importing it.
Returns:
Twitter_Users: The Twitter user object imported or retrieved from the database.
"""
content_type = ContentType.objects.get_for_model(Twitter_Users)
author, created = Twitter_Users.objects.get_or_create(artist_id=data['author']['id'])
author.artist = data['author']['nick']
author.artist_url = data['author']['name']
author.date = timezone.make_aware(datetime.strptime(data['author']["date"], "%Y-%m-%d %H:%M:%S"))
author.description = data['author']['description']
if 'url' in data['author'].keys():
author.extra_url = data['author']['url']
author.location = data['author']['location']
author.verified = data['author']['verified']
if author.favourites_count == None or data['author']["favourites_count"] > author.favourites_count:
author.favourites_count = data['author']["favourites_count"]
if author.followers_count == None or data['author']["followers_count"] > author.followers_count:
author.followers_count = data['author']["followers_count"]
if author.friends_count == None or data['author']["friends_count"] > author.friends_count:
author.friends_count = data['author']["friends_count"]
if author.media_count == None or data['author']["media_count"] > author.media_count:
author.media_count = data['author']["media_count"]
if author.listed_count == None or data['author']["listed_count"] > author.listed_count:
author.listed_count = data['author']["listed_count"]
if author.statuses_count == None or data['author']["statuses_count"] > author.statuses_count:
author.statuses_count = data['author']["statuses_count"]
if data['subcategory'] == "avatar":
author.profile_image = data['author']['profile_image']
author.icon = self.import_file(file_path, User_Profile_Images, delete)
elif data['subcategory'] == "background":
author.profile_banner = data['author']['profile_banner']
author.banner = self.import_file(file_path, User_Banner_Images, delete)
author_hash = self.compute_string_hash(data['author']['name'] + data['category'])
site_user, created = Users.objects.get_or_create(user_hash=author_hash)
site_user.category = category
# Get the primary key of the twitter_submission instance
site_user_id = author.pk
# Create the SubmissionsLink instance
site_user.content_type=content_type
site_user.object_id=site_user_id
site_user.save()
author.save()
return author, site_user
def import_twitter_tags(self, data: dict, category: str) -> list[Twitter_Tags]:
"""
Import a Twitter tag from the provided data into the database.
Parameters:
data (dict): The data containing information about the Twitter tag.
Returns:
list[Twitter_Tags]: A list of imported or retrieved Twitter tag objects.
"""
content_type = ContentType.objects.get_for_model(Twitter_Tags)
tags: list[Twitter_Tags] = []
if "hashtags" in data:
for t_tag_name in data["hashtags"]:
t_tag_slug = slugify(t_tag_name)
try:
# Check if the tag already exists in the database by name
tag: Twitter_Tags = Twitter_Tags.objects.get(tag_slug=t_tag_slug)
tag_id = tag.pk
except ObjectDoesNotExist:
# If the tag does not exist, create a new tag and generate the slug
tag = Twitter_Tags(tag=t_tag_name)
tag.tag_slug = t_tag_slug
tag_id = tag.pk
site_tags, created = Tags.objects.get_or_create(tag_slug=t_tag_slug)
site_tags.category.add(category)
site_tags.content_type=content_type
site_tags.object_id=tag_id
site_tags.save()
tag.save() # Save the tag (either new or existing)
tags.append(tag)
return tags
def import_from_twitter(self, data, json_file_path, delete):
category, created = Category.objects.get_or_create(name=data['category'])
category.save()
twitter_submission, created = Twitter_Submissions.objects.get_or_create(submission_id=data["tweet_id"])
file_path = json_file_path.removesuffix(".json")
# Handle author import
author, site_user = self.import_twitter_user(data, file_path, category, delete)
twitter_submission.author = author
# Handle tag import
tags = self.import_twitter_tags(data, category)
for tag in tags:
twitter_submission.tags.add(tag) # Add the tag to the submission
twitter_submission.gallery_type = data['subcategory']
# Handle file import
twitter_submission.files.add(self.import_file(file_path, Submission_File, delete))
# Handle metadata file import
twitter_submission.metadata.add(self.import_file(json_file_path, Metadata_Files, delete))
twitter_submission.description = data['content']
twitter_submission.date = timezone.make_aware(datetime.strptime(data['date'], "%Y-%m-%d %H:%M:%S"))
twitter_submission.origin_site = data['category']
twitter_submission.file_extension = data['extension']
twitter_submission.origin_filename = data['filename']
if twitter_submission.media_num is None or data['num'] > twitter_submission.media_num:
twitter_submission.media_num = data['num']
if "height" in data.keys():
twitter_submission.image_height = data['height']
if "width" in data.keys():
twitter_submission.image_width = data['width']
if "sensitive" in data.keys():
twitter_submission.sensitive = data['sensitive']
if "favorite_count" in data.keys():
twitter_submission.favorites_count = data['favorite_count']
if "quote_count" in data.keys():
twitter_submission.quote_count = data['quote_count']
if "reply_count" in data.keys():
twitter_submission.reply_count = data['reply_count']
if "retweet_count" in data.keys():
twitter_submission.retweet_count = data['retweet_count']
twitter_submission.lang = data['lang']
twitter_submission.save()
submission_hash = self.compute_string_hash(category.name + data['author']['name'] + str(data["tweet_id"]))
submission, created = Submissions.objects.get_or_create(submission_hash=submission_hash)
submission.category = category
submission.author = site_user
if twitter_submission.sensitive is not None:
submission.mature = twitter_submission.sensitive
else:
submission.mature = False
submission.date = timezone.make_aware(datetime.strptime(data['date'], "%Y-%m-%d %H:%M:%S"))
content_type = ContentType.objects.get_for_model(Twitter_Submissions)
# Get the primary key of the twitter_submission instance
twitter_submission_id = twitter_submission.pk
# Create the SubmissionsLink instance
submission.content_type=content_type
submission.object_id=twitter_submission_id
submission.save()
self.delete_imported_file(json_file_path, delete)
self.delete_imported_file(file_path, delete)
def import_furaffinity_user(self, data, json_file_path, category, delete):
content_type = ContentType.objects.get_for_model(FA_User)
artist, created = FA_User.objects.get_or_create(artist_url=data["artist_url"], artist=data["artist"])
author_hash = self.compute_string_hash(data["artist_url"] + data['category'])
site_user, created = Users.objects.get_or_create(user_hash=author_hash)
site_user.category = category
# Get the primary key of the furaffinity_submission instance
site_user_id = artist.pk
# Create the SubmissionsLink instance
site_user.content_type=content_type
site_user.object_id=site_user_id
site_user.save()
return artist, site_user
def import_furaffinity_tags(self, data, category):
content_type = ContentType.objects.get_for_model(FA_Tags)
tags: list[FA_Tags] = []
site_tags: list[Tags] = []
if "tags" in data:
for t_tag_name in data["tags"]:
t_tag_slug = slugify(t_tag_name)
try:
# Check if the tag already exists in the database by name
tag: FA_Tags = FA_Tags.objects.get(tag_slug=t_tag_slug)
tag_id = tag.pk
except ObjectDoesNotExist:
# If the tag does not exist, create a new tag and generate the slug
tag = FA_Tags(tag=t_tag_name)
tag.tag_slug = t_tag_slug
tag_id = tag.pk
site_tag, created = Tags.objects.get_or_create(tag_slug=t_tag_slug)
site_tag.category.add(category)
site_tag.content_type=content_type
site_tag.object_id=tag_id
site_tag.save()
tag.save() # Save the tag (either new or existing)
tags.append(tag)
site_tags.append(site_tag)
return tags, site_tags
def import_from_furaffinity(self, data, json_file_path, delete):
category, created = Category.objects.get_or_create(name=data['category'])
category.save()
furaffinity_submission, created = FA_Submission.objects.get_or_create(submission_id=data["id"])
furaffinity_submission.media_url = data["url"]
furaffinity_submission.title = data["title"]
furaffinity_submission.description = data["description"]
furaffinity_submission.date = timezone.make_aware(datetime.strptime(data["date"], "%Y-%m-%d %H:%M:%S"))
file_path = json_file_path.removesuffix(".json")
# Handle author import
author, site_user = self.import_furaffinity_user(data, file_path, category, delete)
furaffinity_submission.artist = author
# Handle tag import
tags, site_tags = self.import_furaffinity_tags(data, category)
for tag in tags:
furaffinity_submission.tags.add(tag) # Add the tag to the submission
species, created = FA_Species.objects.get_or_create(species=data["species"])
furaffinity_submission.species = species
# Handle mature rating import
mature, created = FA_Mature.objects.get_or_create(mature=data["rating"])
furaffinity_submission.mature_rating = mature
furaffinity_submission.number_of_comments = data["comments"]
furaffinity_submission.views = data["views"]
gender, created = FA_Gender.objects.get_or_create(gender=data["gender"])
furaffinity_submission.gender = gender
furaffinity_submission.fa_theme = data["theme"]
furaffinity_submission.fa_category = data["fa_category"]
furaffinity_submission.gallery_type = data["subcategory"]
furaffinity_submission.file_extension = data["extension"]
furaffinity_submission.image_height = data["height"]
furaffinity_submission.image_width = data["width"]
# Handle file import
furaffinity_submission.files.add(self.import_file(file_path, Submission_File, delete))
# Handle metadata file import
furaffinity_submission.metadata.add(self.import_file(json_file_path, Metadata_Files, delete))
furaffinity_submission.save()
submission_hash = self.compute_string_hash(category.name + data["artist_url"] + str(data["id"]))
submission, created = Submissions.objects.get_or_create(submission_hash=submission_hash)
submission.category = category
submission.tags.add(*site_tags)
submission.author = site_user
if furaffinity_submission.mature_rating.mature != "General" and not None:
print("Mature")
submission.mature = True
else:
submission.mature = False
submission.date = timezone.make_aware(datetime.strptime(data['date'], "%Y-%m-%d %H:%M:%S"))
content_type = ContentType.objects.get_for_model(FA_Submission)
# Get the primary key of the twitter_submission instance
furaffinity_submission_id = furaffinity_submission.pk
# Create the SubmissionsLink instance
submission.content_type=content_type
submission.object_id=furaffinity_submission_id
submission.save()
self.delete_imported_file(json_file_path, delete)
self.delete_imported_file(file_path, delete)