gva/gnuviechadmin/osusers/admin.py
Jan Dittberner 5dc3549896 Improve documentation
This commit adds a lot of documentation including block diagramms for
message flows.
2016-09-24 21:57:28 +02:00

411 lines
12 KiB
Python

"""
This module contains the Django admin classes of the :py:mod:`osusers` app.
The module starts Celery_ tasks.
.. _Celery: http://www.celeryproject.org/
"""
from django import forms
from django.utils.translation import ugettext_lazy as _
from django.contrib import admin
from fileservertasks.tasks import set_file_ssh_authorized_keys
from gvawebcore.forms import (
PASSWORD_MISMATCH_ERROR
)
from taskresults.models import TaskResult
from .forms import (
INVALID_SSH_PUBLIC_KEY,
DUPLICATE_SSH_PUBLIC_KEY_FOR_USER,
)
from .models import (
AdditionalGroup,
Group,
Shadow,
SshPublicKey,
User,
)
class AdditionalGroupInline(admin.TabularInline):
"""
Inline for :py:class:`osusers.models.AdditionalGroup` instances.
"""
model = AdditionalGroup
class ShadowInline(admin.TabularInline):
"""
Inline for :py:class:`osusers.models.ShadowInline` instances.
"""
model = Shadow
readonly_fields = ['passwd']
can_delete = False
class UserCreationForm(forms.ModelForm):
"""
A form for creating :py:class:`operating system users
<osusers.models.User>`.
"""
password1 = forms.CharField(
label=_('Password'), widget=forms.PasswordInput,
required=False,
)
password2 = forms.CharField(
label=_('Password (again)'), widget=forms.PasswordInput,
required=False,
)
class Meta:
model = User
fields = ['customer']
def clean_password2(self):
"""
Check that the two password entries match.
:return: the validated password
:rtype: str or None
"""
password1 = self.cleaned_data.get('password1')
password2 = self.cleaned_data.get('password2')
if password1 and password2 and password1 != password2:
raise forms.ValidationError(PASSWORD_MISMATCH_ERROR)
return password2
def save(self, commit=True):
"""
Save the provided password in hashed format.
:param boolean commit: whether to save the created user
:return: user instance
:rtype: :py:class:`osusers.models.User`
"""
user = User.objects.create_user(
customer=self.cleaned_data['customer'],
password=self.cleaned_data['password1'], commit=commit)
return user
def save_m2m(self):
"""
No additional groups are created when this form is saved, so this
method just does nothing.
"""
class UserAdmin(admin.ModelAdmin):
"""
Admin class for working with :py:class:`operating system users
<osusers.models.User>`.
"""
actions = ['perform_delete_selected']
add_form = UserCreationForm
inlines = [AdditionalGroupInline, ShadowInline]
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('customer', 'password1', 'password2')}),
)
def get_form(self, request, obj=None, **kwargs):
"""
Use special form during user creation.
:param request: the current HTTP request
:param obj: either a :py:class:`User <osusers.models.User>` instance or
None for a new user
:param kwargs: keyword arguments to be passed to
:py:meth:`django.contrib.admin.ModelAdmin.get_form`
:return: form instance
"""
defaults = {}
if obj is None:
defaults.update({
'form': self.add_form,
'fields': admin.options.flatten_fieldsets(self.add_fieldsets),
})
defaults.update(kwargs)
return super(UserAdmin, self).get_form(request, obj, **defaults)
def get_readonly_fields(self, request, obj=None):
"""
Make sure that uid is not editable for existing users.
:param request: the current HTTP request
:param obj: either a :py:class:`User <osusers.models.User>` instance or
None for a new user
:return: a list of fields
:rtype: list
"""
if obj:
return ['uid']
return []
def perform_delete_selected(self, request, queryset):
"""
Action to delete a list of selected users.
This action calls the delete method of each selected user in contrast
to the default `delete_selected`.
:param request: the current HTTP request
:param queryset: Django ORM queryset representing the selected users
"""
for user in queryset.all():
user.delete()
perform_delete_selected.short_description = _('Delete selected users')
def get_actions(self, request):
"""
Get the available actions for users.
This overrides the default behavior to remove the default
`delete_selected` action.
:param request: the current HTTP request
:return: list of actions
:rtype: list
"""
actions = super(UserAdmin, self).get_actions(request)
if 'delete_selected' in actions: # pragma: no cover
del actions['delete_selected']
return actions
class GroupAdmin(admin.ModelAdmin):
"""
Admin class for workint with :py:class:`operating system groups
<osusers.models.Group>`.
"""
actions = ['perform_delete_selected']
def perform_delete_selected(self, request, queryset):
"""
Action to delete a list of selected groups.
This action calls the delete method of each selected group in contrast
to the default `delete_selected`.
:param request: the current HTTP request
:param queryset: Django ORM queryset representing the selected groups
"""
for group in queryset.all():
group.delete()
perform_delete_selected.short_description = _('Delete selected groups')
def get_actions(self, request):
"""
Get the available actions for groups.
This overrides the default behavior to remove the default
`delete_selected` action.
:param request: the current HTTP request
:return: list of actions
:rtype: list
"""
actions = super(GroupAdmin, self).get_actions(request)
if 'delete_selected' in actions: # pragma: no cover
del actions['delete_selected']
return actions
class SshPublicKeyCreationForm(forms.ModelForm):
"""
A form for creating :py:class:`SSH public keys
<osusers.models.SshPublicKey>`.
"""
publickeytext = forms.CharField(
label=_('Key text'), widget=forms.Textarea,
help_text=_('A SSH public key in either OpenSSH or RFC 4716 format'))
class Meta:
model = SshPublicKey
fields = ['user']
def clean_publickeytext(self):
keytext = self.cleaned_data.get('publickeytext')
try:
SshPublicKey.objects.parse_keytext(keytext)
except:
raise forms.ValidationError(INVALID_SSH_PUBLIC_KEY)
return keytext
def clean(self):
user = self.cleaned_data.get('user')
keytext = self.cleaned_data.get('publickeytext')
if user and keytext:
alg, data, comment = SshPublicKey.objects.parse_keytext(keytext)
if SshPublicKey.objects.filter(
user=user, algorithm=alg, data=data
).exists():
self.add_error(
'publickeytext',
forms.ValidationError(DUPLICATE_SSH_PUBLIC_KEY_FOR_USER))
super(SshPublicKeyCreationForm, self).clean()
def save(self, commit=True):
"""
Save the provided ssh public key in properly split format.
:param boolean commit: whether to save the created public key
:return: ssh public key instance
:rtype: :py:class:`osusers.models.SshPublicKey`
"""
algorithm, keydata, comment = SshPublicKey.objects.parse_keytext(
self.cleaned_data.get('publickeytext'))
self.instance.algorithm = algorithm
self.instance.data = keydata
self.instance.comment = comment
return super(SshPublicKeyCreationForm, self).save(commit)
class SshPublicKeyAdmin(admin.ModelAdmin):
"""
Admin class for working with :py:class:`SSH public keys
<osusers.models.SshPublicKey>`.
"""
actions = ['perform_delete_selected']
add_form = SshPublicKeyCreationForm
list_display = ['user', 'algorithm', 'comment']
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('user', 'publickeytext')}),
)
def get_form(self, request, obj=None, **kwargs):
"""
Use special form for ssh public key creation."
:param request: the current HTTP request
:param obj: either a :py:class:`SshPublicKey
<osusers.models.SshPublicKey>` instance or None for a new SSH
public key
:param kwargs: keyword arguments to be passed to
:py:meth:`django.contrib.admin.ModelAdmin.get_form`
:return: form instance
"""
defaults = {}
if obj is None:
defaults.update({
'form': self.add_form,
'fields': admin.options.flatten_fieldsets(self.add_fieldsets),
})
defaults.update(kwargs)
return super(SshPublicKeyAdmin, self).get_form(
request, obj, **defaults)
def get_readonly_fields(self, request, obj=None):
"""
Make sure that algorithm and data of SSH public keys are not editable.
:param request: the current HTTP request
:param obj: either a :py:class:`SshPublicKey
<osusers.models.SshPublicKey>` instance or None for a new SSH
public key
:return: a list of fields
:rtype: list
"""
if obj:
return ['algorithm', 'data']
return []
def perform_delete_selected(self, request, queryset):
"""
Action to delete a list of selected ssh keys.
This action makes sure that the ssh keys of all users affected by the
current deletion are refreshed on the file server.
:param request: the current HTTP request
:param queryset: Django ORM queryset representing the selected ssh keys
This method starts a Celery_ task to update the list of authorized keys
for each affected user.
.. blockdiag::
:desctable:
blockdiag {
node_width = 200;
A -> B;
A [ label = "", shape = beginpoint,
description = "this method"
];
B [ label = "set file ssh authorized_keys",
description = ":py:func:`set_file_ssh_authorized_keys()
<fileservertasks.tasks.set_file_ssh_authorized_keys>`
called with username and a list of keys, returning the path
of the ssh authorized_keys file",
color = "LightGreen",
stacked
];
}
"""
users = set([
item['user'] for item in
queryset.values('user').distinct()
])
queryset.delete()
for user in users:
# TODO: move to model/signal
TaskResult.objects.create_task_result(
'perform_delete_selected',
set_file_ssh_authorized_keys.s(
User.objects.get(uid=user).username,
[str(key) for key in SshPublicKey.objects.filter(
user_id=user)]
)
)
perform_delete_selected.short_description = _(
'Delete selected SSH public keys')
def get_actions(self, request):
"""
Get the available actions for SSH public keys.
This overrides the default behavior to remove the default
`delete_selected` action.
:param request: the current HTTP request
:return: list of actions
:rtype: list
"""
actions = super(SshPublicKeyAdmin, self).get_actions(request)
if 'delete_selected' in actions: # pragma: no cover
del actions['delete_selected']
return actions
admin.site.register(Group, GroupAdmin)
admin.site.register(User, UserAdmin)
admin.site.register(SshPublicKey, SshPublicKeyAdmin)