40 lines
1.4 KiB
Python
40 lines
1.4 KiB
Python
|
import os
|
||
|
import zipfile
|
||
|
import json
|
||
|
from celery import shared_task
|
||
|
|
||
|
@shared_task
|
||
|
def list_zip_contents(zip_path):
|
||
|
"""List the contents of a ZIP file.
|
||
|
|
||
|
Args:
|
||
|
zip_path (str): The path to the ZIP file.
|
||
|
|
||
|
Returns:
|
||
|
str: A JSON string containing the list of files in the ZIP archive
|
||
|
or an error/warning message if applicable.
|
||
|
"""
|
||
|
try:
|
||
|
# Check if the ZIP file size exceeds 2 GiB
|
||
|
if os.path.getsize(zip_path) >= 2 * 1024 * 1024 * 1024:
|
||
|
return json.dumps({"warning": "archive file is too big (>2GiB), ignoring"})
|
||
|
else:
|
||
|
# Open the ZIP file
|
||
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||
|
file_list = []
|
||
|
# Iterate over each file in the ZIP archive
|
||
|
for info in zip_ref.infolist():
|
||
|
# Append file details to the list
|
||
|
file_list.append({
|
||
|
"name": info.filename,
|
||
|
"size": info.file_size,
|
||
|
"date": info.date_time,
|
||
|
"crc": info.CRC,
|
||
|
"compressed_size": info.compress_size,
|
||
|
})
|
||
|
# Return the list of files as a JSON string
|
||
|
return json.dumps({"files": file_list})
|
||
|
except Exception as e:
|
||
|
# Return an error message if an exception occurs
|
||
|
return json.dumps({"error": str(e)})
|