286 lines
8.8 KiB
Python
286 lines
8.8 KiB
Python
import os
|
|
import io
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Optional, Tuple
|
|
|
|
from django.db import transaction
|
|
|
|
from celery import shared_task
|
|
from celery.exceptions import Retry
|
|
from PIL import Image as PillowImage
|
|
import blurhash
|
|
|
|
from .models import PostFileModel
|
|
from utils.hash import compute_file_hash_blake3, compute_md5_hash, compute_blur_hash
|
|
|
|
|
|
class ThumbnailGenerationError(Exception):
|
|
"""Custom exception for thumbnail generation errors."""
|
|
|
|
pass
|
|
|
|
|
|
def _setup_output_path(file_hash: str, prefix: str = "thumbnail") -> Tuple[str, str]:
|
|
"""
|
|
Set up the output directory and generate a unique filename.
|
|
|
|
Args:
|
|
file_hash (str): Hash to use in the filename
|
|
prefix (str): Prefix for the filename
|
|
|
|
Returns:
|
|
Tuple[str, str]: Output directory path and full file path
|
|
"""
|
|
output_dir = "/tmp/thumbgen/"
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
filename = f"{prefix}_{file_hash}.png"
|
|
filepath = os.path.join(output_dir, filename)
|
|
|
|
return output_dir, filepath
|
|
|
|
|
|
def _update_file_model(
|
|
file_model: PostFileModel, thumbnail_path: str, thumbnail_filename: str
|
|
) -> None:
|
|
"""
|
|
Update the PostFileModel with the new thumbnail and related hashes.
|
|
|
|
Args:
|
|
file_model (PostFileModel): The model to update
|
|
thumbnail_path (str): Path to the generated thumbnail
|
|
thumbnail_filename (str): Filename for the saved thumbnail
|
|
"""
|
|
# Compute the hash for the generated thumbnail
|
|
thumbnail_hash_blake3 = compute_file_hash_blake3(thumbnail_path)
|
|
|
|
# Update the PostFileModel's thumbnail field with the new file
|
|
with open(thumbnail_path, "rb") as file:
|
|
file_model.thumbnail.save(thumbnail_filename, file)
|
|
|
|
# Set the thumbnail hash
|
|
file_model.thumbnail_hash_blake3 = thumbnail_hash_blake3
|
|
|
|
# Generate and set the blur hash for the thumbnail
|
|
file_model.thumbnail_blur_hash = compute_blur_hash(thumbnail_path)
|
|
|
|
# Save the model
|
|
file_model.save()
|
|
|
|
|
|
def _handle_task_error(e: Exception, file_id: int, process_name: str):
|
|
"""
|
|
Handle errors in thumbnail generation tasks.
|
|
|
|
Args:
|
|
e (Exception): The exception that occurred
|
|
file_id (int): ID of the file being processed
|
|
process_name (str): Name of the process for error reporting
|
|
|
|
Raises:
|
|
Retry: To trigger Celery retry mechanism
|
|
"""
|
|
error_message = f"Error in {process_name} for file {file_id}: {str(e)}"
|
|
print(error_message)
|
|
raise Retry(exc=e)
|
|
|
|
|
|
@shared_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=5)
|
|
def generate_blur_hash_PostFile(file_id: int) -> str:
|
|
"""
|
|
Generate and save a blur hash for an image stored in PostFileModel.
|
|
|
|
Args:
|
|
file_id (int): ID of the PostFileModel instance
|
|
|
|
Returns:
|
|
str: Success message
|
|
"""
|
|
try:
|
|
with transaction.atomic():
|
|
img = PostFileModel.objects.select_for_update().get(id=file_id)
|
|
image_data = io.BytesIO(img.file.read())
|
|
pil_img = PillowImage.open(image_data)
|
|
|
|
blurhash_string = blurhash.encode(pil_img, 4, 3)
|
|
|
|
img.refresh_from_db()
|
|
img.blur_hash = blurhash_string
|
|
img.save()
|
|
|
|
return f"Successfully generated blur hash for file {file_id}"
|
|
|
|
except Exception as e:
|
|
_handle_task_error(e, file_id, "blur hash generation")
|
|
|
|
|
|
@shared_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=5)
|
|
def generate_md5_hash_PostFile(file_id: int) -> str:
|
|
"""
|
|
Generate and save an MD5 hash for a file stored in PostFileModel.
|
|
|
|
Args:
|
|
file_id (int): ID of the PostFileModel instance
|
|
|
|
Returns:
|
|
str: Success message
|
|
"""
|
|
try:
|
|
with transaction.atomic():
|
|
pstfile = PostFileModel.objects.select_for_update().get(id=file_id)
|
|
|
|
# Compute the MD5 hash
|
|
md5_hash = compute_md5_hash(pstfile.file.path)
|
|
|
|
# Save the computed hash
|
|
pstfile.refresh_from_db()
|
|
pstfile.hash_md5 = md5_hash
|
|
pstfile.save()
|
|
|
|
return f"Successfully generated MD5 hash for file {file_id}"
|
|
|
|
except Exception as e:
|
|
_handle_task_error(e, file_id, "MD5 hash generation")
|
|
|
|
|
|
@shared_task(name="generate_video_thumbnail")
|
|
def generate_video_thumbnail(
|
|
file_id: int,
|
|
size: int = 0,
|
|
timestamp: Optional[float] = None,
|
|
movie_strip: bool = False,
|
|
) -> str:
|
|
"""
|
|
Generate video thumbnails using ffmpegthumbnailer and update the PostFileModel instance.
|
|
|
|
Args:
|
|
file_id (int): ID of the PostFileModel instance
|
|
size (int): Desired thumbnail width or height (defaults to video size)
|
|
timestamp (float): Timestamp in seconds where the thumbnail should be extracted
|
|
movie_strip (bool): Create a movie strip overlay
|
|
|
|
Returns:
|
|
str: Success message or error message
|
|
"""
|
|
try:
|
|
with transaction.atomic():
|
|
# Retrieve the PostFileModel instance with a lock
|
|
pstfile = PostFileModel.objects.select_for_update().get(id=file_id)
|
|
|
|
if not pstfile.file:
|
|
return "Error: Video file not found for the given file_id."
|
|
|
|
video_path = pstfile.file.path
|
|
|
|
# Setup output path
|
|
_, thumbnail_file_path = _setup_output_path(
|
|
pstfile.hash_blake3, "video_thumbnail"
|
|
)
|
|
thumbnail_filename = Path(thumbnail_file_path).name
|
|
|
|
# Build command
|
|
cmd = [
|
|
"ffmpegthumbnailer",
|
|
"-i",
|
|
video_path,
|
|
"-o",
|
|
thumbnail_file_path,
|
|
"-s",
|
|
str(size),
|
|
"-m",
|
|
]
|
|
|
|
if movie_strip:
|
|
cmd.extend(["-f"])
|
|
|
|
# Generate thumbnail at specified timestamps
|
|
if timestamp is not None:
|
|
cmd.extend(["-t", f"{timestamp}"])
|
|
|
|
# Execute command
|
|
subprocess.run(cmd, check=True)
|
|
|
|
# Update model with new thumbnail
|
|
_update_file_model(pstfile, thumbnail_file_path, thumbnail_filename)
|
|
|
|
# Clean up temporary file
|
|
os.remove(thumbnail_file_path)
|
|
|
|
return f"Video thumbnail generated successfully for file {file_id}"
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
_handle_task_error(e, file_id, "video thumbnail generation")
|
|
except Exception as e:
|
|
_handle_task_error(e, file_id, "video thumbnail generation")
|
|
|
|
|
|
@shared_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=5)
|
|
def generate_pdf_thumbnail(
|
|
file_id: int, page: int = 1, size: Optional[Tuple[int, int]] = None, dpi: int = 200
|
|
) -> str:
|
|
"""
|
|
Generate PDF thumbnails using pdf2image and update the PostFileModel instance.
|
|
|
|
Args:
|
|
file_id (int): ID of the PostFileModel instance
|
|
page (int): Page number to use for thumbnail (defaults to first page)
|
|
size (Tuple[int, int], optional): Desired thumbnail (width, height) or None to maintain original size
|
|
dpi (int): DPI for rendering the PDF (higher values result in larger images)
|
|
|
|
Returns:
|
|
str: Success message or error message
|
|
"""
|
|
try:
|
|
from pdf2image import convert_from_path
|
|
|
|
with transaction.atomic():
|
|
# Retrieve the PostFileModel instance with a lock
|
|
pstfile = PostFileModel.objects.select_for_update().get(id=file_id)
|
|
|
|
if not pstfile.file:
|
|
return "Error: PDF file not found for the given file_id."
|
|
|
|
pdf_path = pstfile.file.path
|
|
|
|
# Setup output path
|
|
_, thumbnail_file_path = _setup_output_path(
|
|
pstfile.hash_blake3, "pdf_thumbnail"
|
|
)
|
|
thumbnail_filename = Path(thumbnail_file_path).name
|
|
|
|
# Convert PDF to image using pdf2image
|
|
# first_page and last_page are 1-indexed
|
|
images = convert_from_path(
|
|
pdf_path, dpi=dpi, first_page=page, last_page=page
|
|
)
|
|
|
|
# Get the first page (should be the only one based on our parameters)
|
|
if not images:
|
|
raise ValueError(f"Could not extract page {page} from PDF")
|
|
|
|
image = images[0]
|
|
|
|
# Resize if size is specified
|
|
if size:
|
|
image = image.resize(size, PillowImage.LANCZOS)
|
|
|
|
# Save the image
|
|
image.save(thumbnail_file_path, "PNG")
|
|
|
|
# Update model with new thumbnail
|
|
_update_file_model(pstfile, thumbnail_file_path, thumbnail_filename)
|
|
|
|
# Clean up temporary file
|
|
os.remove(thumbnail_file_path)
|
|
|
|
return f"PDF thumbnail generated successfully for file {file_id}"
|
|
|
|
except ImportError:
|
|
error_message = (
|
|
"pdf2image library is not installed. Install it with: pip install pdf2image"
|
|
)
|
|
print(error_message)
|
|
raise
|
|
except Exception as e:
|
|
_handle_task_error(e, file_id, "PDF thumbnail generation")
|