Add: furaffinity views.py

This commit is contained in:
Aroy-Art 2023-10-21 17:43:50 +02:00
parent 16eaa5eed7
commit 12088d5880
Signed by: Aroy
GPG key ID: DB9689E9391DD156

View file

@ -0,0 +1,284 @@
from django.shortcuts import render, get_object_or_404
from django.http import HttpResponse, HttpResponseRedirect, StreamingHttpResponse
from django.contrib.auth.decorators import login_required
from django.core.files.storage import FileSystemStorage
from django.core.paginator import Paginator
from django.db.models import Q, Case, When, Value, IntegerField
from .forms import SearchForm, URLImportForm
from .models import FA_User, FA_Tags, FA_Submission, FA_Submission_File
from .tasks import scrape_fa_submission, fa_import_data, test_task, calculate_square
import time, datetime, math
from django.contrib.auth.models import User, Group
from rest_framework import viewsets
from rest_framework import permissions
from .serializers import UserSerializer, GroupSerializer
class UserViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows users to be viewed or edited.
"""
queryset = User.objects.all().order_by('-date_joined')
serializer_class = UserSerializer
permission_classes = [permissions.IsAuthenticated]
class GroupViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows groups to be viewed or edited.
"""
queryset = Group.objects.all()
serializer_class = GroupSerializer
permission_classes = [permissions.IsAuthenticated]
# Create your views here.
def handle_task_result(task_result):
# Handle the task result here
print("Task completed! Result:", task_result)
@login_required(login_url="/login/")
def home(request):
new_archives = FA_Submission.objects.order_by('-date_added')[:10]
new_submissions = FA_Submission.objects.order_by('-date')[:10]
context = {'new_archives': new_archives, "new_submissions": new_submissions}
return render(request, 'fa/home.html', context)
# View Furaffinity submissions
@login_required(login_url="/login/")
def submission_detail(request, submission_id):
submission = get_object_or_404(FA_Submission, submission_id=submission_id)
file_url = submission.file.file.url if submission.file else None
return render(request, 'fa/submission_detail.html', {'submission': submission, 'file_url': file_url})
@login_required(login_url="/login/")
def search_results(request):
'''
'''
# Define a function to calculate relevance
def calculate_relevance(queryset, search_query, fields):
'''
Calculate relevance scores for items in a queryset based on a search query.
Parameters:
queryset (QuerySet): The initial queryset that you want to annotate with relevance scores.
search_query (str): The query string you want to search for within the specified fields.
fields (list): A list of fields within the queryset where you want to search for the search_query.
Returns:
QuerySet: A queryset annotated with a "relevance" score for each item.
'''
# Create a list of "When" conditions for each field
conditions = [When(**{f"{field}__icontains": search_query, 'then': Value(1)}) for field in fields]
# Annotate the queryset with a "relevance" score
# The "Case" expression evaluates the conditions and assigns a score of 1 if the search_query is found in any of the specified fields,
# otherwise, it assigns a score of 0.
return queryset.annotate(relevance=Case(*conditions, default=Value(0), output_field=IntegerField()))
if request.method == 'GET':
form = SearchForm(request.GET)
if form.is_valid():
search_query = form.cleaned_data['search_query']
## Perform the search on your data
#title_description_results = FA_Submission.objects.filter(Q(title__icontains=search_query) | Q(description__icontains=search_query))
#tag_results = FA_Submission.objects.filter(tags__tag__icontains=search_query)
#
## Merge the title/content results and tag results, and remove duplicates
#search_results = (title_description_results | tag_results).distinct()
# Check if search_query starts with '#'
if search_query.startswith('#'):
# Strip '#' and split by spaces to get a list of tags
tagshas = search_query.split()
tags = []
for tag in tagshas:
tags.append(tag.lstrip('#'))
print(tags)
# Create Q objects for each tag and chain them using OR
q_objects = Q()
for tag in tags:
q_objects |= Q(tags__tag__icontains=tag)
print(q_objects)
# Filter using the Q objects
tag_results = calculate_relevance(FA_Submission.objects.filter(tags__tag__icontains=q_objects), q_objects, ['tags__tag'])
print(tag_results)
#search_results = calculate_relevance(FA_Submission.objects.filter(tags__tag__icontains=search_query), search_query, ['tags__tag'])
# Order by relevance and remove duplicates
search_results = tag_results.order_by('-relevance').distinct()
#search_results = FA_Submission.objects.filter(tags__tag__icontains=search_query)
else:
# Perform the search on data
# Calculate relevance for each queryset
title_description_results = calculate_relevance(FA_Submission.objects.filter(Q(title__icontains=search_query) | Q(description__icontains=search_query)), search_query, ['title', 'description'])
tag_results = calculate_relevance(FA_Submission.objects.filter(tags__tag__icontains=search_query), search_query, ['tags__tag'])
# Merge the results, order by relevance and remove duplicates
search_results = (title_description_results | tag_results).order_by('-relevance').distinct()
# Set the number of submissions to display per page
per_page = 10
paginator = Paginator(search_results, per_page)
page_number = request.GET.get('page') # Get the current page number from the URL parameter
page_obj = paginator.get_page(page_number)
# Render the search results to a template
context = {'search_results': page_obj}
return render(request, 'fa/search_results.html', context)
else:
form = SearchForm()
return render(request, 'fa/search_results.html', {'form': form})
@login_required(login_url="/login/")
def fa_users(request):
list_users = FA_User.objects.order_by('artist')
context = {'list_users': list_users}
return render(request, 'fa/users.html', context)
@login_required(login_url="/login/")
def fa_user_page(request, fa_user):
user = get_object_or_404(FA_User, artist_url=fa_user)
user_submissions = FA_Submission.objects.filter(artist=user).order_by('-date')
# Set the number of submissions to display per page
per_page = 10
paginator = Paginator(user_submissions, per_page)
page_number = request.GET.get('page') # Get the current page number from the URL parameter
page_obj = paginator.get_page(page_number)
context = {'user': user, 'page_obj': page_obj}
return render(request, 'fa/user_page.html', context)
#return HttpResponse("Your at FA_User index of %s." % fa_user)
@login_required(login_url="/login/")
def fa_tags(request):
list_tags = FA_Tags.objects.order_by("tag")
context = {'list_tags': list_tags}
return render(request, 'fa/tags.html', context)
@login_required(login_url="/login/")
def tag_details(request, tag):
tag_slug = get_object_or_404(FA_Tags, tag_slug=tag)
submissions = FA_Submission.objects.filter(tags=tag_slug).order_by('-date')
context = {'submissions': submissions, 'tag': tag_slug }
return render(request, 'fa/tag_details.html', context)
@login_required(login_url="/login/")
def fa_config(request):
return render(request, 'fa_config.html')
@login_required(login_url="/login/")
def fa_import(request):
if request.method == 'POST':
form = URLImportForm(request.POST)
if form.is_valid():
#print(form.cleaned_data['url'])
result = scrape_fa_submission.delay(form.cleaned_data['url'])
print(result)
# Handle successful form submission
# You can redirect to a success page or do other actions
#return render(request, 'fa/import.html', {'form': form})
return HttpResponseRedirect("/fa/")
else:
form = URLImportForm()
return render(request, 'fa/import.html', {'form': form})
def stream_task_output(request, task_id):
def event_stream():
task = run_import_data.AsyncResult(task_id)
while not task.ready():
task_output = get_task_output(task_id) # Implement a function to retrieve task output
yield f"data: {task_output}\n\n"
time.sleep(1) # Adjust the delay between updates
response = HttpResponse(event_stream(), content_type='text/event-stream')
response['Cache-Control'] = 'no-cache'
return response
def stream_datetime(request):
def event_stream():
while True:
time.sleep(3)
yield 'data: The server time is: %s\n\n' % datetime.datetime.now()
response = StreamingHttpResponse(event_stream(), content_type='text/event-stream')
response['Cache-Control'] = 'no-cache'
return response
@login_required(login_url="/login/")
def stats(request):
def convert_size(size_bytes):
if size_bytes == 0:
return "0B"
size_name = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
totalMediaSizeBytes = 0
for files in FA_Submission_File.objects.all():
totalMediaSizeBytes += files.file.size
submissions = FA_Submission.objects.count()
users = FA_User.objects.count()
tags = FA_Tags.objects.count()
mediaFiles = FA_Submission_File.objects.count()
missingFiles = len(FA_Submission.objects.filter(file__isnull=True))
totalMediaSize = convert_size(totalMediaSizeBytes)
context = {
"submissions": submissions,
"users": users,
"tags": tags,
"mediaFiles": mediaFiles,
"missingFiles": missingFiles,
"totalMediaSize": totalMediaSize,
}
return render(request, "fa/stats.html", context)
@login_required(login_url="/login/")
def simple_upload(request):
if request.method == 'POST' and request.FILES['myfile']:
myfile = request.FILES['myfile']
fs = FileSystemStorage()
filename = fs.save(myfile.name, myfile)
uploaded_file_url = fs.url(filename)
return render(request, 'fa/simple_upload.html', {
'uploaded_file_url': uploaded_file_url
})
return render(request, 'fa/simple_upload.html')
#class FA_ImportView(generic.DetailView):
# pass