""" This module contains the Django admin classes of the :py:mod:`osusers` app. The module starts Celery_ tasks. .. _Celery: http://www.celeryproject.org/ """ from django import forms from django.utils.translation import ugettext_lazy as _ from django.contrib import admin from fileservertasks.tasks import set_file_ssh_authorized_keys from gvawebcore.forms import ( PASSWORD_MISMATCH_ERROR ) from taskresults.models import TaskResult from .forms import ( INVALID_SSH_PUBLIC_KEY, DUPLICATE_SSH_PUBLIC_KEY_FOR_USER, ) from .models import ( AdditionalGroup, Group, Shadow, SshPublicKey, User, ) class AdditionalGroupInline(admin.TabularInline): """ Inline for :py:class:`osusers.models.AdditionalGroup` instances. """ model = AdditionalGroup class ShadowInline(admin.TabularInline): """ Inline for :py:class:`osusers.models.ShadowInline` instances. """ model = Shadow readonly_fields = ['passwd'] can_delete = False class UserCreationForm(forms.ModelForm): """ A form for creating :py:class:`operating system users `. """ password1 = forms.CharField( label=_('Password'), widget=forms.PasswordInput, required=False, ) password2 = forms.CharField( label=_('Password (again)'), widget=forms.PasswordInput, required=False, ) class Meta: model = User fields = ['customer'] def clean_password2(self): """ Check that the two password entries match. :return: the validated password :rtype: str or None """ password1 = self.cleaned_data.get('password1') password2 = self.cleaned_data.get('password2') if password1 and password2 and password1 != password2: raise forms.ValidationError(PASSWORD_MISMATCH_ERROR) return password2 def save(self, commit=True): """ Save the provided password in hashed format. :param boolean commit: whether to save the created user :return: user instance :rtype: :py:class:`osusers.models.User` """ user = User.objects.create_user( customer=self.cleaned_data['customer'], password=self.cleaned_data['password1'], commit=commit) return user def save_m2m(self): """ No additional groups are created when this form is saved, so this method just does nothing. """ class UserAdmin(admin.ModelAdmin): """ Admin class for working with :py:class:`operating system users `. """ actions = ['perform_delete_selected'] add_form = UserCreationForm inlines = [AdditionalGroupInline, ShadowInline] add_fieldsets = ( (None, { 'classes': ('wide',), 'fields': ('customer', 'password1', 'password2')}), ) def get_form(self, request, obj=None, **kwargs): """ Use special form during user creation. :param request: the current HTTP request :param obj: either a :py:class:`User ` instance or None for a new user :param kwargs: keyword arguments to be passed to :py:meth:`django.contrib.admin.ModelAdmin.get_form` :return: form instance """ defaults = {} if obj is None: defaults.update({ 'form': self.add_form, 'fields': admin.options.flatten_fieldsets(self.add_fieldsets), }) defaults.update(kwargs) return super(UserAdmin, self).get_form(request, obj, **defaults) def get_readonly_fields(self, request, obj=None): """ Make sure that uid is not editable for existing users. :param request: the current HTTP request :param obj: either a :py:class:`User ` instance or None for a new user :return: a list of fields :rtype: list """ if obj: return ['uid'] return [] def perform_delete_selected(self, request, queryset): """ Action to delete a list of selected users. This action calls the delete method of each selected user in contrast to the default `delete_selected`. :param request: the current HTTP request :param queryset: Django ORM queryset representing the selected users """ for user in queryset.all(): user.delete() perform_delete_selected.short_description = _('Delete selected users') def get_actions(self, request): """ Get the available actions for users. This overrides the default behavior to remove the default `delete_selected` action. :param request: the current HTTP request :return: list of actions :rtype: list """ actions = super(UserAdmin, self).get_actions(request) if 'delete_selected' in actions: # pragma: no cover del actions['delete_selected'] return actions class GroupAdmin(admin.ModelAdmin): """ Admin class for workint with :py:class:`operating system groups `. """ actions = ['perform_delete_selected'] def perform_delete_selected(self, request, queryset): """ Action to delete a list of selected groups. This action calls the delete method of each selected group in contrast to the default `delete_selected`. :param request: the current HTTP request :param queryset: Django ORM queryset representing the selected groups """ for group in queryset.all(): group.delete() perform_delete_selected.short_description = _('Delete selected groups') def get_actions(self, request): """ Get the available actions for groups. This overrides the default behavior to remove the default `delete_selected` action. :param request: the current HTTP request :return: list of actions :rtype: list """ actions = super(GroupAdmin, self).get_actions(request) if 'delete_selected' in actions: # pragma: no cover del actions['delete_selected'] return actions class SshPublicKeyCreationForm(forms.ModelForm): """ A form for creating :py:class:`SSH public keys `. """ publickeytext = forms.CharField( label=_('Key text'), widget=forms.Textarea, help_text=_('A SSH public key in either OpenSSH or RFC 4716 format')) class Meta: model = SshPublicKey fields = ['user'] def clean_publickeytext(self): keytext = self.cleaned_data.get('publickeytext') try: SshPublicKey.objects.parse_keytext(keytext) except: raise forms.ValidationError(INVALID_SSH_PUBLIC_KEY) return keytext def clean(self): user = self.cleaned_data.get('user') keytext = self.cleaned_data.get('publickeytext') if user and keytext: alg, data, comment = SshPublicKey.objects.parse_keytext(keytext) if SshPublicKey.objects.filter( user=user, algorithm=alg, data=data ).exists(): self.add_error( 'publickeytext', forms.ValidationError(DUPLICATE_SSH_PUBLIC_KEY_FOR_USER)) super(SshPublicKeyCreationForm, self).clean() def save(self, commit=True): """ Save the provided ssh public key in properly split format. :param boolean commit: whether to save the created public key :return: ssh public key instance :rtype: :py:class:`osusers.models.SshPublicKey` """ algorithm, keydata, comment = SshPublicKey.objects.parse_keytext( self.cleaned_data.get('publickeytext')) self.instance.algorithm = algorithm self.instance.data = keydata self.instance.comment = comment return super(SshPublicKeyCreationForm, self).save(commit) class SshPublicKeyAdmin(admin.ModelAdmin): """ Admin class for working with :py:class:`SSH public keys `. """ actions = ['perform_delete_selected'] add_form = SshPublicKeyCreationForm list_display = ['user', 'algorithm', 'comment'] add_fieldsets = ( (None, { 'classes': ('wide',), 'fields': ('user', 'publickeytext')}), ) def get_form(self, request, obj=None, **kwargs): """ Use special form for ssh public key creation." :param request: the current HTTP request :param obj: either a :py:class:`SshPublicKey ` instance or None for a new SSH public key :param kwargs: keyword arguments to be passed to :py:meth:`django.contrib.admin.ModelAdmin.get_form` :return: form instance """ defaults = {} if obj is None: defaults.update({ 'form': self.add_form, 'fields': admin.options.flatten_fieldsets(self.add_fieldsets), }) defaults.update(kwargs) return super(SshPublicKeyAdmin, self).get_form( request, obj, **defaults) def get_readonly_fields(self, request, obj=None): """ Make sure that algorithm and data of SSH public keys are not editable. :param request: the current HTTP request :param obj: either a :py:class:`SshPublicKey ` instance or None for a new SSH public key :return: a list of fields :rtype: list """ if obj: return ['algorithm', 'data'] return [] def perform_delete_selected(self, request, queryset): """ Action to delete a list of selected ssh keys. This action makes sure that the ssh keys of all users affected by the current deletion are refreshed on the file server. :param request: the current HTTP request :param queryset: Django ORM queryset representing the selected ssh keys This method starts a Celery_ task to update the list of authorized keys for each affected user. .. blockdiag:: :desctable: blockdiag { node_width = 200; A -> B; A [ label = "", shape = beginpoint, description = "this method" ]; B [ label = "set file ssh authorized_keys", description = ":py:func:`set_file_ssh_authorized_keys() ` called with username and a list of keys, returning the path of the ssh authorized_keys file", color = "LightGreen", stacked ]; } """ users = set([ item['user'] for item in queryset.values('user').distinct() ]) queryset.delete() for user in users: # TODO: move to model/signal TaskResult.objects.create_task_result( 'perform_delete_selected', set_file_ssh_authorized_keys.s( User.objects.get(uid=user).username, [str(key) for key in SshPublicKey.objects.filter( user_id=user)] ) ) perform_delete_selected.short_description = _( 'Delete selected SSH public keys') def get_actions(self, request): """ Get the available actions for SSH public keys. This overrides the default behavior to remove the default `delete_selected` action. :param request: the current HTTP request :return: list of actions :rtype: list """ actions = super(SshPublicKeyAdmin, self).get_actions(request) if 'delete_selected' in actions: # pragma: no cover del actions['delete_selected'] return actions admin.site.register(Group, GroupAdmin) admin.site.register(User, UserAdmin) admin.site.register(SshPublicKey, SshPublicKeyAdmin)