gva/gnuviechadmin/osusers/admin.py
Jan Dittberner 4af1a39ca4 Upgrade to Django 3.2
- update dependencies
- fix deprecation warnings
- fix tests
- skip some tests that need more work
- reformat changed code with isort and black
2023-02-18 22:46:48 +01:00

412 lines
12 KiB
Python

"""
This module contains the Django admin classes of the :py:mod:`osusers` app.
The module starts Celery_ tasks.
.. _Celery: http://www.celeryproject.org/
"""
from django import forms
from django.contrib import admin
from django.utils.translation import gettext_lazy as _
from fileservertasks.tasks import set_file_ssh_authorized_keys
from gvawebcore.forms import PASSWORD_MISMATCH_ERROR
from taskresults.models import TaskResult
from .forms import DUPLICATE_SSH_PUBLIC_KEY_FOR_USER, INVALID_SSH_PUBLIC_KEY
from .models import AdditionalGroup, Group, Shadow, SshPublicKey, User
class AdditionalGroupInline(admin.TabularInline):
"""
Inline for :py:class:`osusers.models.AdditionalGroup` instances.
"""
model = AdditionalGroup
class ShadowInline(admin.TabularInline):
"""
Inline for :py:class:`osusers.models.ShadowInline` instances.
"""
model = Shadow
readonly_fields = ["passwd"]
can_delete = False
class UserCreationForm(forms.ModelForm):
"""
A form for creating :py:class:`operating system users
<osusers.models.User>`.
"""
password1 = forms.CharField(
label=_("Password"), widget=forms.PasswordInput, required=False
)
password2 = forms.CharField(
label=_("Password (again)"), widget=forms.PasswordInput, required=False
)
class Meta:
model = User
fields = ["customer"]
def clean_password2(self):
"""
Check that the two password entries match.
:return: the validated password
:rtype: str or None
"""
password1 = self.cleaned_data.get("password1")
password2 = self.cleaned_data.get("password2")
if password1 and password2 and password1 != password2:
raise forms.ValidationError(PASSWORD_MISMATCH_ERROR)
return password2
def save(self, commit=True):
"""
Save the provided password in hashed format.
:param boolean commit: whether to save the created user
:return: user instance
:rtype: :py:class:`osusers.models.User`
"""
user = User.objects.create_user(
customer=self.cleaned_data["customer"],
password=self.cleaned_data["password1"],
commit=commit,
)
return user
def save_m2m(self):
"""
No additional groups are created when this form is saved, so this
method just does nothing.
"""
class UserAdmin(admin.ModelAdmin):
"""
Admin class for working with :py:class:`operating system users
<osusers.models.User>`.
"""
actions = ["perform_delete_selected"]
add_form = UserCreationForm
inlines = [AdditionalGroupInline, ShadowInline]
add_fieldsets = (
(
None,
{"classes": ("wide",), "fields": ("customer", "password1", "password2")},
),
)
def get_form(self, request, obj=None, **kwargs):
"""
Use special form during user creation.
:param request: the current HTTP request
:param obj: either a :py:class:`User <osusers.models.User>` instance or
None for a new user
:param kwargs: keyword arguments to be passed to
:py:meth:`django.contrib.admin.ModelAdmin.get_form`
:return: form instance
"""
defaults = {}
if obj is None:
defaults.update(
{
"form": self.add_form,
"fields": admin.options.flatten_fieldsets(self.add_fieldsets),
}
)
defaults.update(kwargs)
return super(UserAdmin, self).get_form(request, obj, **defaults)
def get_readonly_fields(self, request, obj=None):
"""
Make sure that uid is not editable for existing users.
:param request: the current HTTP request
:param obj: either a :py:class:`User <osusers.models.User>` instance or
None for a new user
:return: a list of fields
:rtype: list
"""
if obj:
return ["uid"]
return []
def perform_delete_selected(self, request, queryset):
"""
Action to delete a list of selected users.
This action calls the delete method of each selected user in contrast
to the default `delete_selected`.
:param request: the current HTTP request
:param queryset: Django ORM queryset representing the selected users
"""
for user in queryset.all():
user.delete()
perform_delete_selected.short_description = _("Delete selected users")
def get_actions(self, request):
"""
Get the available actions for users.
This overrides the default behavior to remove the default
`delete_selected` action.
:param request: the current HTTP request
:return: list of actions
:rtype: list
"""
actions = super(UserAdmin, self).get_actions(request)
if "delete_selected" in actions: # pragma: no cover
del actions["delete_selected"]
return actions
class GroupAdmin(admin.ModelAdmin):
"""
Admin class for workint with :py:class:`operating system groups
<osusers.models.Group>`.
"""
actions = ["perform_delete_selected"]
def perform_delete_selected(self, request, queryset):
"""
Action to delete a list of selected groups.
This action calls the delete method of each selected group in contrast
to the default `delete_selected`.
:param request: the current HTTP request
:param queryset: Django ORM queryset representing the selected groups
"""
for group in queryset.all():
group.delete()
perform_delete_selected.short_description = _("Delete selected groups")
def get_actions(self, request):
"""
Get the available actions for groups.
This overrides the default behavior to remove the default
`delete_selected` action.
:param request: the current HTTP request
:return: list of actions
:rtype: list
"""
actions = super(GroupAdmin, self).get_actions(request)
if "delete_selected" in actions: # pragma: no cover
del actions["delete_selected"]
return actions
class SshPublicKeyCreationForm(forms.ModelForm):
"""
A form for creating :py:class:`SSH public keys
<osusers.models.SshPublicKey>`.
"""
publickeytext = forms.CharField(
label=_("Key text"),
widget=forms.Textarea,
help_text=_("A SSH public key in either OpenSSH or RFC 4716 format"),
)
class Meta:
model = SshPublicKey
fields = ["user"]
def clean_publickeytext(self):
keytext = self.cleaned_data.get("publickeytext")
try:
SshPublicKey.objects.parse_key_text(keytext)
except:
raise forms.ValidationError(INVALID_SSH_PUBLIC_KEY)
return keytext
def clean(self):
user = self.cleaned_data.get("user")
keytext = self.cleaned_data.get("publickeytext")
if user and keytext:
alg, data, comment = SshPublicKey.objects.parse_key_text(keytext)
if SshPublicKey.objects.filter(
user=user, algorithm=alg, data=data
).exists():
self.add_error(
"publickeytext",
forms.ValidationError(DUPLICATE_SSH_PUBLIC_KEY_FOR_USER),
)
super(SshPublicKeyCreationForm, self).clean()
def save(self, commit=True):
"""
Save the provided ssh public key in properly split format.
:param boolean commit: whether to save the created public key
:return: ssh public key instance
:rtype: :py:class:`osusers.models.SshPublicKey`
"""
algorithm, keydata, comment = SshPublicKey.objects.parse_key_text(
self.cleaned_data.get("publickeytext")
)
self.instance.algorithm = algorithm
self.instance.data = keydata
self.instance.comment = comment
return super(SshPublicKeyCreationForm, self).save(commit)
class SshPublicKeyAdmin(admin.ModelAdmin):
"""
Admin class for working with :py:class:`SSH public keys
<osusers.models.SshPublicKey>`.
"""
actions = ["perform_delete_selected"]
add_form = SshPublicKeyCreationForm
list_display = ["user", "algorithm", "comment"]
add_fieldsets = (
(None, {"classes": ("wide",), "fields": ("user", "publickeytext")}),
)
def get_form(self, request, obj=None, **kwargs):
"""
Use special form for ssh public key creation."
:param request: the current HTTP request
:param obj: either a :py:class:`SshPublicKey
<osusers.models.SshPublicKey>` instance or None for a new SSH
public key
:param kwargs: keyword arguments to be passed to
:py:meth:`django.contrib.admin.ModelAdmin.get_form`
:return: form instance
"""
defaults = {}
if obj is None:
defaults.update(
{
"form": self.add_form,
"fields": admin.options.flatten_fieldsets(self.add_fieldsets),
}
)
defaults.update(kwargs)
return super(SshPublicKeyAdmin, self).get_form(request, obj, **defaults)
def get_readonly_fields(self, request, obj=None):
"""
Make sure that algorithm and data of SSH public keys are not editable.
:param request: the current HTTP request
:param obj: either a :py:class:`SshPublicKey
<osusers.models.SshPublicKey>` instance or None for a new SSH
public key
:return: a list of fields
:rtype: list
"""
if obj:
return ["algorithm", "data"]
return []
def perform_delete_selected(self, request, queryset):
"""
Action to delete a list of selected ssh keys.
This action makes sure that the ssh keys of all users affected by the
current deletion are refreshed on the file server.
:param request: the current HTTP request
:param queryset: Django ORM queryset representing the selected ssh keys
This method starts a Celery_ task to update the list of authorized keys
for each affected user.
.. blockdiag::
:desctable:
blockdiag {
node_width = 200;
A -> B;
A [ label = "", shape = beginpoint,
description = "this method"
];
B [ label = "set file ssh authorized_keys",
description = ":py:func:`set_file_ssh_authorized_keys()
<fileservertasks.tasks.set_file_ssh_authorized_keys>`
called with username and a list of keys, returning the path
of the ssh authorized_keys file",
color = "LightGreen",
stacked
];
}
"""
users = set([item["user"] for item in queryset.values("user").distinct()])
queryset.delete()
for user in users:
# TODO: move to model/signal
TaskResult.objects.create_task_result(
"perform_delete_selected",
set_file_ssh_authorized_keys.s(
User.objects.get(uid=user).username,
[str(key) for key in SshPublicKey.objects.filter(user_id=user)],
),
)
perform_delete_selected.short_description = _("Delete selected SSH public keys")
def get_actions(self, request):
"""
Get the available actions for SSH public keys.
This overrides the default behavior to remove the default
`delete_selected` action.
:param request: the current HTTP request
:return: list of actions
:rtype: list
"""
actions = super(SshPublicKeyAdmin, self).get_actions(request)
if "delete_selected" in actions: # pragma: no cover
del actions["delete_selected"]
return actions
admin.site.register(Group, GroupAdmin)
admin.site.register(User, UserAdmin)
admin.site.register(SshPublicKey, SshPublicKeyAdmin)