""" This module defines views related to hosting packages. """ from __future__ import absolute_import import http import logging from datetime import timedelta from django.conf import settings from django.contrib import messages from django.contrib.auth import get_user_model from django.contrib.auth.mixins import PermissionRequiredMixin, UserPassesTestMixin from django.http import Http404 from django.shortcuts import get_object_or_404, redirect from django.utils import timezone from django.utils.translation import gettext as _ from django.views.generic import DetailView, ListView from django.views.generic.edit import CreateView, FormView import rest_framework.request from rest_framework.permissions import BasePermission from rest_framework.response import Response from rest_framework.views import APIView from gvacommon.viewmixins import StaffOrSelfLoginRequiredMixin from managemails.models import Mailbox from .forms import ( AddDiskspaceOptionForm, AddMailboxOptionForm, AddUserDatabaseOptionForm, CreateCustomerHostingPackageForm, CreateHostingPackageForm, ) from .models import ( CustomerHostingPackage, CustomerPackageDiskUsage, DiskSpaceOption, MailboxOption, UserDatabaseOption, ) from .serializers import DiskUsageSerializer logger = logging.getLogger(__name__) class CreateHostingPackage(PermissionRequiredMixin, CreateView): """ Create a hosting package. """ model = CustomerHostingPackage raise_exception = True permission_required = "domains.add_customerhostingpackage" template_name_suffix = "_create" form_class = CreateHostingPackageForm def form_valid(self, form): hosting_package = form.save() messages.success( self.request, _("Started setup of new hosting package {name}.").format( name=hosting_package.name ), ) return redirect(hosting_package) class CreateCustomerHostingPackage(CreateHostingPackage): """ Create a hosting package for a selected customer. """ form_class = CreateCustomerHostingPackageForm def get_form_kwargs(self): kwargs = super(CreateCustomerHostingPackage, self).get_form_kwargs() kwargs.update(self.kwargs) return kwargs def get_customer_object(self): return get_object_or_404(get_user_model(), username=self.kwargs["user"]) def get_context_data(self, **kwargs): context = super(CreateCustomerHostingPackage, self).get_context_data(**kwargs) context["customer"] = self.get_customer_object() return context def form_valid(self, form): hosting_package = form.save(commit=False) hosting_package.customer = self.get_customer_object() hosting_package.save() messages.success( self.request, _("Started setup of new hosting package {name}.").format( name=hosting_package.name ), ) return redirect(hosting_package) class CustomerHostingPackageDetails(StaffOrSelfLoginRequiredMixin, DetailView): """ This view is for showing details of a customer hosting package. """ model = CustomerHostingPackage context_object_name = "hostingpackage" customer = None def get_customer_object(self): if self.customer is None: self.customer = get_object_or_404( get_user_model(), username=self.kwargs["user"] ) return self.customer def get_context_data(self, **kwargs): context = super(CustomerHostingPackageDetails, self).get_context_data(**kwargs) context.update( { "customer": self.get_customer_object(), "uploadserver": settings.OSUSER_UPLOAD_SERVER, "databases": context["hostingpackage"].databases, "osuser": context["hostingpackage"].osuser, "hostingoptions": context["hostingpackage"].get_hostingoptions(), "domains": context["hostingpackage"].domains.all(), "mailboxes": context["hostingpackage"].mailboxes, } ) context["sshkeys"] = context["osuser"].sshpublickey_set.all() return context class StaffUserRequiredMixin(UserPassesTestMixin): """ Mixin to make views available to staff members only. """ def test_func(self): return self.request.user.is_staff class AllCustomerHostingPackageList(StaffUserRequiredMixin, ListView): """ This view is used for showing a list of all hosting packages. """ model = CustomerHostingPackage template_name_suffix = "_admin_list" def get_queryset(self): return ( super() .get_queryset() .select_related("osuser", "customer") .only("name", "pk", "created", "customer__username", "osuser__username") ) class HostingOptionChoices(StaffUserRequiredMixin, DetailView): """ This view displays choices of hosting options for a customer hosting package. """ model = CustomerHostingPackage context_object_name = "hostingpackage" template_name_suffix = "_option_choices" def get_context_data(self, **kwargs): context = super(HostingOptionChoices, self).get_context_data(**kwargs) context.update( { "customer": self.get_object().customer, "hosting_options": ( ( _("Disk space"), [ (option, "diskspace") for option in DiskSpaceOption.objects.all() ], ), ( _("Mailboxes"), [ (option, "mailboxes") for option in MailboxOption.objects.all() ], ), ( _("Databases"), [ (option, "databases") for option in UserDatabaseOption.objects.all() ], ), ), } ) return context class AddHostingOption(StaffUserRequiredMixin, FormView): template_name = "hostingpackages/add_hosting_option.html" def get_form_class(self): optiontype = self.kwargs["type"] if optiontype == "diskspace": return AddDiskspaceOptionForm elif optiontype == "mailboxes": return AddMailboxOptionForm elif optiontype == "databases": return AddUserDatabaseOptionForm raise Http404() def get_hosting_package(self): return get_object_or_404(CustomerHostingPackage, pk=int(self.kwargs["package"])) def get_option_template(self): optiontype = self.kwargs["type"] optionid = int(self.kwargs["optionid"]) if optiontype == "diskspace": return get_object_or_404(DiskSpaceOption, pk=optionid) elif optiontype == "mailboxes": return get_object_or_404(MailboxOption, pk=optionid) elif optiontype == "databases": return get_object_or_404(UserDatabaseOption, pk=optionid) raise Http404() def get_form_kwargs(self): kwargs = super(AddHostingOption, self).get_form_kwargs() kwargs["hostingpackage"] = self.get_hosting_package() kwargs["option_template"] = self.get_option_template() return kwargs def get_initial(self): initial = super(AddHostingOption, self).get_initial() template = self.get_option_template() if type(template) == DiskSpaceOption: initial.update( { "diskspace": template.diskspace, "diskspace_unit": template.diskspace_unit, } ) elif type(template) == MailboxOption: initial["number"] = template.number elif type(template) == UserDatabaseOption: initial["number"] = template.number else: raise Http404() return initial def get_context_data(self, **kwargs): context = super(AddHostingOption, self).get_context_data(**kwargs) context["option_template"] = self.get_option_template() return context def form_valid(self, form): option = form.save() hosting_package = self.get_hosting_package() messages.success( self.request, _( "Successfully added option {option} to hosting package " "{package}." ).format(option=option, package=hosting_package.name), ) return redirect(hosting_package) class HasDiskUsageUploadPermission(BasePermission): def has_permission(self, request, view): return ( request.user.has_perm("hostingpackages.add_customerpackagediskusage") and request.method == "POST" ) class UploadCustomerPackageDiskUsage(APIView): permission_classes = [HasDiskUsageUploadPermission] allowed_methods = ("POST",) serializer = DiskUsageSerializer(many=True) def post(self, request: rest_framework.request.Request, format=None): if request.content_type != "application/json": return Response("Unacceptable", status=http.HTTPStatus.BAD_REQUEST) submitted_sources = set() for row in request.data: user = row["user"] for key in row: if key == "user": continue else: submitted_sources.add(key) for item, size in row[key].items(): try: package = CustomerHostingPackage.objects.get( osuser__username=user ) ( metric, created, ) = CustomerPackageDiskUsage.objects.get_or_create( package=package, source=key, item=item, ) metric.used_kb = size if key == "mail": try: ma_mb = package.mailboxes.get( username=item ).mailaddressmailbox_set.first() if ma_mb: metric.email_address_id = ma_mb.mailaddress_id except Mailbox.DoesNotExist: logger.warning("mail box %s does not exist", item) metric.save() except CustomerHostingPackage.DoesNotExist: logger.warning( "hosting package for user %s does not exist", user ) if submitted_sources: CustomerPackageDiskUsage.objects.filter( source__in=submitted_sources, modified__lt=timezone.now() - timedelta(minutes=30), ).delete() logger.info("usage data submitted by %s", request.user) return Response("Accepted", status=http.HTTPStatus.ACCEPTED) class CustomerHostingPackageDiskUsageDetails(StaffOrSelfLoginRequiredMixin, DetailView): template_name_suffix = "_disk_usage_details" model = CustomerHostingPackage pk_url_kwarg = "package" context_object_name = "hostingpackage" customer = None def get_customer_object(self): if self.customer is None: self.customer = get_object_or_404( get_user_model(), username=self.kwargs["user"] ) return self.customer def get_queryset(self, queryset=None): return super().get_queryset().prefetch_related("customerpackagediskusage_set") def get_context_data(self, **kwargs): context_data = super().get_context_data(**kwargs) mail_usage, web_usage, mysql_usage, pgsql_usage = [], [], [], [] for usage in self.get_object().customerpackagediskusage_set.order_by( "-used_kb" ): if usage.used_kb <= 0: continue if usage.source == "mail": mail_usage.append(usage) elif usage.source == "web": web_usage.append(usage) elif usage.source == "mysql": mysql_usage.append(usage) elif usage.source == "pgsql": pgsql_usage.append(usage) context_data.update( { "customer": self.get_customer_object(), "mail_usage": mail_usage, "web_usage": web_usage, "mysql_usage": mysql_usage, "pgsql_usage": pgsql_usage, } ) return context_data