gva/gnuviechadmin/hostingpackages/views.py

396 lines
13 KiB
Python

"""
This module defines views related to hosting packages.
"""
from __future__ import absolute_import
import http
import logging
from datetime import timedelta
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth.mixins import PermissionRequiredMixin, UserPassesTestMixin
from django.http import Http404
from django.shortcuts import get_object_or_404, redirect
from django.utils import timezone
from django.utils.translation import gettext as _
from django.views.generic import DetailView, ListView
from django.views.generic.edit import CreateView, FormView
import rest_framework.request
from rest_framework.permissions import BasePermission
from rest_framework.response import Response
from rest_framework.views import APIView
from gvacommon.viewmixins import StaffOrSelfLoginRequiredMixin
from managemails.models import Mailbox
from .forms import (
AddDiskspaceOptionForm,
AddMailboxOptionForm,
AddUserDatabaseOptionForm,
CreateCustomerHostingPackageForm,
CreateHostingPackageForm,
)
from .models import (
CustomerHostingPackage,
CustomerPackageDiskUsage,
DiskSpaceOption,
MailboxOption,
UserDatabaseOption,
)
from .serializers import DiskUsageSerializer
logger = logging.getLogger(__name__)
class CreateHostingPackage(PermissionRequiredMixin, CreateView):
"""
Create a hosting package.
"""
model = CustomerHostingPackage
raise_exception = True
permission_required = "domains.add_customerhostingpackage"
template_name_suffix = "_create"
form_class = CreateHostingPackageForm
def form_valid(self, form):
hosting_package = form.save()
messages.success(
self.request,
_("Started setup of new hosting package {name}.").format(
name=hosting_package.name
),
)
return redirect(hosting_package)
class CreateCustomerHostingPackage(CreateHostingPackage):
"""
Create a hosting package for a selected customer.
"""
form_class = CreateCustomerHostingPackageForm
def get_form_kwargs(self):
kwargs = super(CreateCustomerHostingPackage, self).get_form_kwargs()
kwargs.update(self.kwargs)
return kwargs
def get_customer_object(self):
return get_object_or_404(get_user_model(), username=self.kwargs["user"])
def get_context_data(self, **kwargs):
context = super(CreateCustomerHostingPackage, self).get_context_data(**kwargs)
context["customer"] = self.get_customer_object()
return context
def form_valid(self, form):
hosting_package = form.save(commit=False)
hosting_package.customer = self.get_customer_object()
hosting_package.save()
messages.success(
self.request,
_("Started setup of new hosting package {name}.").format(
name=hosting_package.name
),
)
return redirect(hosting_package)
class CustomerHostingPackageDetails(StaffOrSelfLoginRequiredMixin, DetailView):
"""
This view is for showing details of a customer hosting package.
"""
model = CustomerHostingPackage
context_object_name = "hostingpackage"
customer = None
def get_customer_object(self):
if self.customer is None:
self.customer = get_object_or_404(
get_user_model(), username=self.kwargs["user"]
)
return self.customer
def get_context_data(self, **kwargs):
context = super(CustomerHostingPackageDetails, self).get_context_data(**kwargs)
context.update(
{
"customer": self.get_customer_object(),
"uploadserver": settings.OSUSER_UPLOAD_SERVER,
"databases": context["hostingpackage"].databases,
"osuser": context["hostingpackage"].osuser,
"hostingoptions": context["hostingpackage"].get_hostingoptions(),
"domains": context["hostingpackage"].domains.all(),
"mailboxes": context["hostingpackage"].mailboxes,
}
)
context["sshkeys"] = context["osuser"].sshpublickey_set.all()
return context
class StaffUserRequiredMixin(UserPassesTestMixin):
"""
Mixin to make views available to staff members only.
"""
def test_func(self):
return self.request.user.is_staff
class AllCustomerHostingPackageList(StaffUserRequiredMixin, ListView):
"""
This view is used for showing a list of all hosting packages.
"""
model = CustomerHostingPackage
template_name_suffix = "_admin_list"
def get_queryset(self):
return (
super()
.get_queryset()
.select_related("osuser", "customer")
.only("name", "pk", "created", "customer__username", "osuser__username")
)
class HostingOptionChoices(StaffUserRequiredMixin, DetailView):
"""
This view displays choices of hosting options for a customer hosting
package.
"""
model = CustomerHostingPackage
context_object_name = "hostingpackage"
template_name_suffix = "_option_choices"
def get_context_data(self, **kwargs):
context = super(HostingOptionChoices, self).get_context_data(**kwargs)
context.update(
{
"customer": self.get_object().customer,
"hosting_options": (
(
_("Disk space"),
[
(option, "diskspace")
for option in DiskSpaceOption.objects.all()
],
),
(
_("Mailboxes"),
[
(option, "mailboxes")
for option in MailboxOption.objects.all()
],
),
(
_("Databases"),
[
(option, "databases")
for option in UserDatabaseOption.objects.all()
],
),
),
}
)
return context
class AddHostingOption(StaffUserRequiredMixin, FormView):
template_name = "hostingpackages/add_hosting_option.html"
def get_form_class(self):
optiontype = self.kwargs["type"]
if optiontype == "diskspace":
return AddDiskspaceOptionForm
elif optiontype == "mailboxes":
return AddMailboxOptionForm
elif optiontype == "databases":
return AddUserDatabaseOptionForm
raise Http404()
def get_hosting_package(self):
return get_object_or_404(CustomerHostingPackage, pk=int(self.kwargs["package"]))
def get_option_template(self):
optiontype = self.kwargs["type"]
optionid = int(self.kwargs["optionid"])
if optiontype == "diskspace":
return get_object_or_404(DiskSpaceOption, pk=optionid)
elif optiontype == "mailboxes":
return get_object_or_404(MailboxOption, pk=optionid)
elif optiontype == "databases":
return get_object_or_404(UserDatabaseOption, pk=optionid)
raise Http404()
def get_form_kwargs(self):
kwargs = super(AddHostingOption, self).get_form_kwargs()
kwargs["hostingpackage"] = self.get_hosting_package()
kwargs["option_template"] = self.get_option_template()
return kwargs
def get_initial(self):
initial = super(AddHostingOption, self).get_initial()
template = self.get_option_template()
if type(template) == DiskSpaceOption:
initial.update(
{
"diskspace": template.diskspace,
"diskspace_unit": template.diskspace_unit,
}
)
elif type(template) == MailboxOption:
initial["number"] = template.number
elif type(template) == UserDatabaseOption:
initial["number"] = template.number
else:
raise Http404()
return initial
def get_context_data(self, **kwargs):
context = super(AddHostingOption, self).get_context_data(**kwargs)
context["option_template"] = self.get_option_template()
return context
def form_valid(self, form):
option = form.save()
hosting_package = self.get_hosting_package()
messages.success(
self.request,
_(
"Successfully added option {option} to hosting package " "{package}."
).format(option=option, package=hosting_package.name),
)
return redirect(hosting_package)
class HasDiskUsageUploadPermission(BasePermission):
def has_permission(self, request, view):
return (
request.user.has_perm("hostingpackages.add_customerpackagediskusage")
and request.method == "POST"
)
class UploadCustomerPackageDiskUsage(APIView):
permission_classes = [HasDiskUsageUploadPermission]
allowed_methods = ("POST",)
serializer = DiskUsageSerializer(many=True)
def post(self, request: rest_framework.request.Request, format=None):
if request.content_type != "application/json":
return Response("Unacceptable", status=http.HTTPStatus.BAD_REQUEST)
submitted_sources = set()
for row in request.data:
user = row["user"]
for key in row:
if key == "user":
continue
else:
submitted_sources.add(key)
for item, size in row[key].items():
try:
package = CustomerHostingPackage.objects.get(
osuser__username=user
)
(
metric,
created,
) = CustomerPackageDiskUsage.objects.get_or_create(
package=package,
source=key,
item=item,
)
metric.used_kb = size
if key == "mail":
try:
ma_mb = package.mailboxes.get(
username=item
).mailaddressmailbox_set.first()
if ma_mb:
metric.email_address_id = ma_mb.mailaddress_id
except Mailbox.DoesNotExist:
logger.warning("mail box %s does not exist", item)
metric.save()
except CustomerHostingPackage.DoesNotExist:
logger.warning(
"hosting package for user %s does not exist", user
)
if submitted_sources:
CustomerPackageDiskUsage.objects.filter(
source__in=submitted_sources,
modified__lt=timezone.now() - timedelta(minutes=30),
).delete()
logger.info("usage data submitted by %s", request.user)
return Response("Accepted", status=http.HTTPStatus.ACCEPTED)
class CustomerHostingPackageDiskUsageDetails(StaffOrSelfLoginRequiredMixin, DetailView):
template_name_suffix = "_disk_usage_details"
model = CustomerHostingPackage
pk_url_kwarg = "package"
context_object_name = "hostingpackage"
customer = None
def get_customer_object(self):
if self.customer is None:
self.customer = get_object_or_404(
get_user_model(), username=self.kwargs["user"]
)
return self.customer
def get_queryset(self, queryset=None):
return super().get_queryset().prefetch_related("customerpackagediskusage_set")
def get_context_data(self, **kwargs):
context_data = super().get_context_data(**kwargs)
mail_usage, web_usage, mysql_usage, pgsql_usage = [], [], [], []
for usage in self.get_object().customerpackagediskusage_set.order_by(
"-used_kb"
):
if usage.used_kb <= 0:
continue
if usage.source == "mail":
mail_usage.append(usage)
elif usage.source == "web":
web_usage.append(usage)
elif usage.source == "mysql":
mysql_usage.append(usage)
elif usage.source == "pgsql":
pgsql_usage.append(usage)
context_data.update(
{
"customer": self.get_customer_object(),
"mail_usage": mail_usage,
"web_usage": web_usage,
"mysql_usage": mysql_usage,
"pgsql_usage": pgsql_usage,
}
)
return context_data