import os import zipfile import json from celery import shared_task @shared_task def list_zip_contents(zip_path): """List the contents of a ZIP file. Args: zip_path (str): The path to the ZIP file. Returns: str: A JSON string containing the list of files in the ZIP archive or an error/warning message if applicable. """ try: # Check if the ZIP file size exceeds 2 GiB if os.path.getsize(zip_path) >= 2 * 1024 * 1024 * 1024: return json.dumps({"warning": "archive file is too big (>2GiB), ignoring"}) else: # Open the ZIP file with zipfile.ZipFile(zip_path, 'r') as zip_ref: file_list = [] # Iterate over each file in the ZIP archive for info in zip_ref.infolist(): # Append file details to the list file_list.append({ "name": info.filename, "size": info.file_size, "date": info.date_time, "crc": info.CRC, "compressed_size": info.compress_size, }) # Return the list of files as a JSON string return json.dumps({"files": file_list}) except Exception as e: # Return an error message if an exception occurs return json.dumps({"error": str(e)})