Jan Dittberner
4f39c0d2c4
This commit raises the test coverage for osusers.admin to 100% by adding tests for UserAdmin, GroupAdmin, SshPublicKeyCreationForm and SshPublicKeyAdmin. The commit adds a refactoring TODO to SshPublicKeyAdmin.perform_delete_selected because the asynchronous background task should be launched from a signal handler.
383 lines
11 KiB
Python
383 lines
11 KiB
Python
"""
|
|
This module contains the Django admin classes of the :py:mod:`osusers` app.
|
|
|
|
"""
|
|
from django import forms
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.contrib import admin
|
|
|
|
from fileservertasks.tasks import set_file_ssh_authorized_keys
|
|
from gvawebcore.forms import (
|
|
PASSWORD_MISMATCH_ERROR
|
|
)
|
|
from taskresults.models import TaskResult
|
|
|
|
from .forms import (
|
|
INVALID_SSH_PUBLIC_KEY,
|
|
DUPLICATE_SSH_PUBLIC_KEY_FOR_USER,
|
|
)
|
|
from .models import (
|
|
AdditionalGroup,
|
|
Group,
|
|
Shadow,
|
|
SshPublicKey,
|
|
User,
|
|
)
|
|
|
|
|
|
class AdditionalGroupInline(admin.TabularInline):
|
|
"""
|
|
Inline for :py:class:`osusers.models.AdditionalGroup` instances.
|
|
|
|
"""
|
|
model = AdditionalGroup
|
|
|
|
|
|
class ShadowInline(admin.TabularInline):
|
|
"""
|
|
Inline for :py:class:`osusers.models.ShadowInline` instances.
|
|
|
|
"""
|
|
model = Shadow
|
|
readonly_fields = ['passwd']
|
|
can_delete = False
|
|
|
|
|
|
class UserCreationForm(forms.ModelForm):
|
|
"""
|
|
A form for creating :py:class:`operating system users
|
|
<osusers.models.User>`.
|
|
|
|
"""
|
|
password1 = forms.CharField(
|
|
label=_('Password'), widget=forms.PasswordInput,
|
|
required=False,
|
|
)
|
|
password2 = forms.CharField(
|
|
label=_('Password (again)'), widget=forms.PasswordInput,
|
|
required=False,
|
|
)
|
|
|
|
class Meta:
|
|
model = User
|
|
fields = ['customer']
|
|
|
|
def clean_password2(self):
|
|
"""
|
|
Check that the two password entries match.
|
|
|
|
:return: the validated password
|
|
:rtype: str or None
|
|
|
|
"""
|
|
password1 = self.cleaned_data.get('password1')
|
|
password2 = self.cleaned_data.get('password2')
|
|
if password1 and password2 and password1 != password2:
|
|
raise forms.ValidationError(PASSWORD_MISMATCH_ERROR)
|
|
return password2
|
|
|
|
def save(self, commit=True):
|
|
"""
|
|
Save the provided password in hashed format.
|
|
|
|
:param boolean commit: whether to save the created user
|
|
:return: user instance
|
|
:rtype: :py:class:`osusers.models.User`
|
|
|
|
"""
|
|
user = User.objects.create_user(
|
|
customer=self.cleaned_data['customer'],
|
|
password=self.cleaned_data['password1'], commit=commit)
|
|
return user
|
|
|
|
def save_m2m(self):
|
|
"""
|
|
No additional groups are created when this form is saved, so this
|
|
method just does nothing.
|
|
|
|
"""
|
|
|
|
|
|
class UserAdmin(admin.ModelAdmin):
|
|
"""
|
|
Admin class for working with :py:class:`operating system users
|
|
<osusers.models.User>`.
|
|
|
|
"""
|
|
actions = ['perform_delete_selected']
|
|
add_form = UserCreationForm
|
|
inlines = [AdditionalGroupInline, ShadowInline]
|
|
|
|
add_fieldsets = (
|
|
(None, {
|
|
'classes': ('wide',),
|
|
'fields': ('customer', 'password1', 'password2')}),
|
|
)
|
|
|
|
def get_form(self, request, obj=None, **kwargs):
|
|
"""
|
|
Use special form during user creation.
|
|
|
|
:param request: the current HTTP request
|
|
:param obj: either a :py:class:`User <osusers.models.User>` instance or
|
|
None for a new user
|
|
:param kwargs: keyword arguments to be passed to
|
|
:py:meth:`django.contrib.admin.ModelAdmin.get_form`
|
|
:return: form instance
|
|
|
|
"""
|
|
defaults = {}
|
|
if obj is None:
|
|
defaults.update({
|
|
'form': self.add_form,
|
|
'fields': admin.options.flatten_fieldsets(self.add_fieldsets),
|
|
})
|
|
defaults.update(kwargs)
|
|
return super(UserAdmin, self).get_form(request, obj, **defaults)
|
|
|
|
def get_readonly_fields(self, request, obj=None):
|
|
"""
|
|
Make sure that uid is not editable for existing users.
|
|
|
|
:param request: the current HTTP request
|
|
:param obj: either a :py:class:`User <osusers.models.User>` instance or
|
|
None for a new user
|
|
:return: a list of fields
|
|
:rtype: list
|
|
|
|
"""
|
|
if obj:
|
|
return ['uid']
|
|
return []
|
|
|
|
def perform_delete_selected(self, request, queryset):
|
|
"""
|
|
Action to delete a list of selected users.
|
|
|
|
This action calls the delete method of each selected user in contrast
|
|
to the default `delete_selected`.
|
|
|
|
:param request: the current HTTP request
|
|
:param queryset: Django ORM queryset representing the selected users
|
|
|
|
"""
|
|
for user in queryset.all():
|
|
user.delete()
|
|
perform_delete_selected.short_description = _('Delete selected users')
|
|
|
|
def get_actions(self, request):
|
|
"""
|
|
Get the available actions for users.
|
|
|
|
This overrides the default behavior to remove the default
|
|
`delete_selected` action.
|
|
|
|
:param request: the current HTTP request
|
|
:return: list of actions
|
|
:rtype: list
|
|
|
|
"""
|
|
actions = super(UserAdmin, self).get_actions(request)
|
|
if 'delete_selected' in actions: # pragma: no cover
|
|
del actions['delete_selected']
|
|
return actions
|
|
|
|
|
|
class GroupAdmin(admin.ModelAdmin):
|
|
"""
|
|
Admin class for workint with :py:class:`operating system groups
|
|
<osusers.models.Group>`.
|
|
|
|
"""
|
|
actions = ['perform_delete_selected']
|
|
|
|
def perform_delete_selected(self, request, queryset):
|
|
"""
|
|
Action to delete a list of selected groups.
|
|
|
|
This action calls the delete method of each selected group in contrast
|
|
to the default `delete_selected`.
|
|
|
|
:param request: the current HTTP request
|
|
:param queryset: Django ORM queryset representing the selected groups
|
|
|
|
"""
|
|
for group in queryset.all():
|
|
group.delete()
|
|
perform_delete_selected.short_description = _('Delete selected groups')
|
|
|
|
def get_actions(self, request):
|
|
"""
|
|
Get the available actions for groups.
|
|
|
|
This overrides the default behavior to remove the default
|
|
`delete_selected` action.
|
|
|
|
:param request: the current HTTP request
|
|
:return: list of actions
|
|
:rtype: list
|
|
|
|
"""
|
|
actions = super(GroupAdmin, self).get_actions(request)
|
|
if 'delete_selected' in actions: # pragma: no cover
|
|
del actions['delete_selected']
|
|
return actions
|
|
|
|
|
|
class SshPublicKeyCreationForm(forms.ModelForm):
|
|
"""
|
|
A form for creating :py:class:`SSH public keys
|
|
<osusers.models.SshPublicKey>`.
|
|
|
|
"""
|
|
publickeytext = forms.CharField(
|
|
label=_('Key text'), widget=forms.Textarea,
|
|
help_text=_('A SSH public key in either OpenSSH or RFC 4716 format'))
|
|
|
|
class Meta:
|
|
model = SshPublicKey
|
|
fields = ['user']
|
|
|
|
def clean_publickeytext(self):
|
|
keytext = self.cleaned_data.get('publickeytext')
|
|
try:
|
|
SshPublicKey.objects.parse_keytext(keytext)
|
|
except:
|
|
raise forms.ValidationError(INVALID_SSH_PUBLIC_KEY)
|
|
return keytext
|
|
|
|
def clean(self):
|
|
user = self.cleaned_data.get('user')
|
|
keytext = self.cleaned_data.get('publickeytext')
|
|
if user and keytext:
|
|
alg, data, comment = SshPublicKey.objects.parse_keytext(keytext)
|
|
if SshPublicKey.objects.filter(
|
|
user=user, algorithm=alg, data=data
|
|
).exists():
|
|
self.add_error(
|
|
'publickeytext',
|
|
forms.ValidationError(DUPLICATE_SSH_PUBLIC_KEY_FOR_USER))
|
|
super(SshPublicKeyCreationForm, self).clean()
|
|
|
|
def save(self, commit=True):
|
|
"""
|
|
Save the provided ssh public key in properly split format.
|
|
|
|
:param boolean commit: whether to save the created public key
|
|
:return: ssh public key instance
|
|
:rtype: :py:class:`osusers.models.SshPublicKey`
|
|
|
|
"""
|
|
algorithm, keydata, comment = SshPublicKey.objects.parse_keytext(
|
|
self.cleaned_data.get('publickeytext'))
|
|
self.instance.algorithm = algorithm
|
|
self.instance.data = keydata
|
|
self.instance.comment = comment
|
|
return super(SshPublicKeyCreationForm, self).save(commit)
|
|
|
|
|
|
class SshPublicKeyAdmin(admin.ModelAdmin):
|
|
"""
|
|
Admin class for working with :py:class:`SSH public keys
|
|
<osusers.models.SshPublicKey>`.
|
|
|
|
"""
|
|
actions = ['perform_delete_selected']
|
|
add_form = SshPublicKeyCreationForm
|
|
list_display = ['user', 'algorithm', 'comment']
|
|
|
|
add_fieldsets = (
|
|
(None, {
|
|
'classes': ('wide',),
|
|
'fields': ('user', 'publickeytext')}),
|
|
)
|
|
|
|
def get_form(self, request, obj=None, **kwargs):
|
|
"""
|
|
Use special form for ssh public key creation."
|
|
|
|
:param request: the current HTTP request
|
|
:param obj: either a :py:class:`SshPublicKey
|
|
<osusers.models.SshPublicKey>` instance or None for a new SSH
|
|
public key
|
|
:param kwargs: keyword arguments to be passed to
|
|
:py:meth:`django.contrib.admin.ModelAdmin.get_form`
|
|
:return: form instance
|
|
|
|
"""
|
|
defaults = {}
|
|
if obj is None:
|
|
defaults.update({
|
|
'form': self.add_form,
|
|
'fields': admin.options.flatten_fieldsets(self.add_fieldsets),
|
|
})
|
|
defaults.update(kwargs)
|
|
return super(SshPublicKeyAdmin, self).get_form(
|
|
request, obj, **defaults)
|
|
|
|
def get_readonly_fields(self, request, obj=None):
|
|
"""
|
|
Make sure that algorithm and data of SSH public keys are not editable.
|
|
|
|
:param request: the current HTTP request
|
|
:param obj: either a :py:class:`SshPublicKey
|
|
<osusers.models.SshPublicKey>` instance or None for a new SSH
|
|
public key
|
|
:return: a list of fields
|
|
:rtype: list
|
|
|
|
"""
|
|
if obj:
|
|
return ['algorithm', 'data']
|
|
return []
|
|
|
|
def perform_delete_selected(self, request, queryset):
|
|
"""
|
|
Action to delete a list of selected ssh keys.
|
|
|
|
This action makes sure that the ssh keys of all users affected by the
|
|
current deletion are refreshed on the file server.
|
|
|
|
:param request: the current HTTP request
|
|
:param queryset: Django ORM queryset representing the selected ssh keys
|
|
|
|
"""
|
|
users = set([
|
|
item['user'] for item in
|
|
queryset.values('user').distinct()
|
|
])
|
|
queryset.delete()
|
|
for user in users:
|
|
# TODO: move to model/signal
|
|
TaskResult.objects.create_task_result(
|
|
'perform_delete_selected',
|
|
set_file_ssh_authorized_keys.s(
|
|
User.objects.get(uid=user).username,
|
|
[str(key) for key in SshPublicKey.objects.filter(
|
|
user_id=user)]
|
|
)
|
|
)
|
|
perform_delete_selected.short_description = _(
|
|
'Delete selected SSH public keys')
|
|
|
|
def get_actions(self, request):
|
|
"""
|
|
Get the available actions for SSH public keys.
|
|
|
|
This overrides the default behavior to remove the default
|
|
`delete_selected` action.
|
|
|
|
:param request: the current HTTP request
|
|
:return: list of actions
|
|
:rtype: list
|
|
|
|
"""
|
|
actions = super(SshPublicKeyAdmin, self).get_actions(request)
|
|
if 'delete_selected' in actions: # pragma: no cover
|
|
del actions['delete_selected']
|
|
return actions
|
|
|
|
|
|
admin.site.register(Group, GroupAdmin)
|
|
admin.site.register(User, UserAdmin)
|
|
admin.site.register(SshPublicKey, SshPublicKeyAdmin)
|