153 lines
4.7 KiB
Python
153 lines
4.7 KiB
Python
|
#tasks.py
|
||
|
import requests
|
||
|
import subprocess, os
|
||
|
|
||
|
from sys import stdout, stderr
|
||
|
|
||
|
from bs4 import BeautifulSoup
|
||
|
from blake3 import blake3
|
||
|
|
||
|
from celery import shared_task
|
||
|
|
||
|
from .models import FA_User, FA_UserIconFile
|
||
|
|
||
|
@shared_task
|
||
|
def fa_import_data():
|
||
|
try:
|
||
|
# Get the current working directory
|
||
|
# current_dir = os.getcwd()
|
||
|
|
||
|
# Change the working directory to the parent folder
|
||
|
# os.chdir(os.path.dirname(current_dir))
|
||
|
|
||
|
result = subprocess.run(["python", "manage.py", "import_data", "gallery-dl/", "--delete"], capture_output=True, text=True)
|
||
|
|
||
|
return {
|
||
|
'stdout': result.stdout,
|
||
|
'stderr': result.stderr,
|
||
|
'returncode': result.returncode
|
||
|
}
|
||
|
except Exception as e:
|
||
|
return {'error': str(e)}
|
||
|
|
||
|
|
||
|
@shared_task
|
||
|
def scrape_fa_submission(url):
|
||
|
# print(url)
|
||
|
# print(subprocess.run(['pwd'], capture_output=True, text=True))
|
||
|
try:
|
||
|
# Get the current working directory
|
||
|
# current_dir = os.getcwd()
|
||
|
|
||
|
# Change the working directory to the parent folder
|
||
|
# os.chdir(os.path.dirname(current_dir))
|
||
|
|
||
|
|
||
|
result = subprocess.run(['gallery-dl', '-c','../gallery-dl.conf', '-d', 'gallery-dl', '--mtime-from-date', '--write-metadata', url], capture_output=True, text=True)
|
||
|
|
||
|
importTask = fa_import_data.delay()
|
||
|
print(importTask)
|
||
|
|
||
|
return {
|
||
|
'stdout': result.stdout,
|
||
|
'stderr': result.stderr,
|
||
|
'returncode': result.returncode
|
||
|
}
|
||
|
except Exception as e:
|
||
|
return {'error': str(e)}
|
||
|
|
||
|
|
||
|
@shared_task
|
||
|
def get_fa_user_info(user):
|
||
|
|
||
|
#try:
|
||
|
# file_instance = FA_UserIconFile.objects.get(file_hash=file_hash)
|
||
|
# #self.stdout.write(self.style.NOTICE(f"Skipping: {file_path} file, already imported"))
|
||
|
# print(f"Skipping: {file_path} file, already imported")
|
||
|
#except FA_UserIconFile.DoesNotExist:
|
||
|
# # If the file doesn't exist, create a new file instance and link it to the submission
|
||
|
# with open(file_path, 'rb') as file:
|
||
|
# file_instance = FA_Submission_File()
|
||
|
# file_instance.file_hash = file_hash
|
||
|
#
|
||
|
# file_name = os.path.basename(file_path)
|
||
|
# Null, file_ext = os.path.splitext(file_name)
|
||
|
# hash_file_name = file_hash + file_ext
|
||
|
# file_instance.file.save(hash_file_name, file)
|
||
|
#
|
||
|
# file_instance.file_name = file_name
|
||
|
# file_instance.save()
|
||
|
|
||
|
# # Now link the image_instance to your_model_instance
|
||
|
# submission.file = file_instance
|
||
|
|
||
|
url = "https://www.furaffinity.net/user/" + user # Replace with the URL of the page you want to scrape
|
||
|
|
||
|
# Fetch the web page content
|
||
|
response = requests.get(url)
|
||
|
if response.status_code == 200:
|
||
|
html_content = response.text
|
||
|
else:
|
||
|
return f"Error: Unable to fetch the page. Status code: {response.status_code}"
|
||
|
|
||
|
# Parse the HTML content using BeautifulSoup
|
||
|
soup = BeautifulSoup(html_content, "html.parser")
|
||
|
|
||
|
""" title = soup.title.text """
|
||
|
# Find the 'img' tag inside the 'a' tag with class 'current' inside the 'userpage-nav-avatar' tag
|
||
|
img_tag = soup.select_one('userpage-nav-avatar a.current img')
|
||
|
|
||
|
if img_tag:
|
||
|
# Extract the 'src' attribute of the 'img' tag to get the image URL
|
||
|
image_url = img_tag['src']
|
||
|
return image_url
|
||
|
else:
|
||
|
stderr.write("Image not found on the page.")
|
||
|
|
||
|
#return title
|
||
|
|
||
|
def compute_file_hash(self, file):
|
||
|
try:
|
||
|
# Compute BLAKE3 hash of the file
|
||
|
hasher = blake3()
|
||
|
with open(file, 'rb') as f:
|
||
|
while chunk := f.read(65536):
|
||
|
hasher.update(chunk)
|
||
|
return hasher.hexdigest()
|
||
|
except Exception as e:
|
||
|
self.stdout.write(self.style.WARNING(f"Error computing file hash: {e}"))
|
||
|
return None
|
||
|
|
||
|
|
||
|
|
||
|
def save_image_to_model(image_url):
|
||
|
try:
|
||
|
# Download the image from the URL
|
||
|
response = requests.get(image_url)
|
||
|
response.raise_for_status()
|
||
|
|
||
|
# Create a new instance of YourModel
|
||
|
instance = F()
|
||
|
|
||
|
# Save the image to the FileField
|
||
|
instance.image_field.save(f'image_{instance.pk}.jpg', ContentFile(response.content), save=True)
|
||
|
|
||
|
# Save the model instance to the database
|
||
|
instance.save()
|
||
|
|
||
|
return instance
|
||
|
except requests.exceptions.RequestException as e:
|
||
|
print(f"Failed to download the image: {e}")
|
||
|
|
||
|
return None
|
||
|
|
||
|
@shared_task
|
||
|
def test_task():
|
||
|
print("This is a test task. Celery is working!")
|
||
|
num = 12 * 2
|
||
|
return num
|
||
|
|
||
|
@shared_task
|
||
|
def calculate_square(number1, number2):
|
||
|
result = number1 * number2
|
||
|
return result
|