Fix tests for Python 3

- drop Python 2 __future__ imports
- fix tests to handle new Django and Python 3 module names
- reformat changed files with black
This commit is contained in:
Jan Dittberner 2019-01-30 21:27:25 +01:00
parent ddec6b4184
commit 3d18392b67
32 changed files with 2707 additions and 2675 deletions

View file

@ -7,26 +7,14 @@ The module starts Celery_ tasks.
"""
from django import forms
from django.utils.translation import ugettext_lazy as _
from django.contrib import admin
from django.utils.translation import ugettext_lazy as _
from fileservertasks.tasks import set_file_ssh_authorized_keys
from gvawebcore.forms import (
PASSWORD_MISMATCH_ERROR
)
from gvawebcore.forms import PASSWORD_MISMATCH_ERROR
from taskresults.models import TaskResult
from .forms import (
INVALID_SSH_PUBLIC_KEY,
DUPLICATE_SSH_PUBLIC_KEY_FOR_USER,
)
from .models import (
AdditionalGroup,
Group,
Shadow,
SshPublicKey,
User,
)
from .forms import DUPLICATE_SSH_PUBLIC_KEY_FOR_USER, INVALID_SSH_PUBLIC_KEY
from .models import AdditionalGroup, Group, Shadow, SshPublicKey, User
class AdditionalGroupInline(admin.TabularInline):
@ -34,6 +22,7 @@ class AdditionalGroupInline(admin.TabularInline):
Inline for :py:class:`osusers.models.AdditionalGroup` instances.
"""
model = AdditionalGroup
@ -42,8 +31,9 @@ class ShadowInline(admin.TabularInline):
Inline for :py:class:`osusers.models.ShadowInline` instances.
"""
model = Shadow
readonly_fields = ['passwd']
readonly_fields = ["passwd"]
can_delete = False
@ -53,18 +43,17 @@ class UserCreationForm(forms.ModelForm):
<osusers.models.User>`.
"""
password1 = forms.CharField(
label=_('Password'), widget=forms.PasswordInput,
required=False,
label=_("Password"), widget=forms.PasswordInput, required=False
)
password2 = forms.CharField(
label=_('Password (again)'), widget=forms.PasswordInput,
required=False,
label=_("Password (again)"), widget=forms.PasswordInput, required=False
)
class Meta:
model = User
fields = ['customer']
fields = ["customer"]
def clean_password2(self):
"""
@ -74,8 +63,8 @@ class UserCreationForm(forms.ModelForm):
:rtype: str or None
"""
password1 = self.cleaned_data.get('password1')
password2 = self.cleaned_data.get('password2')
password1 = self.cleaned_data.get("password1")
password2 = self.cleaned_data.get("password2")
if password1 and password2 and password1 != password2:
raise forms.ValidationError(PASSWORD_MISMATCH_ERROR)
return password2
@ -90,8 +79,10 @@ class UserCreationForm(forms.ModelForm):
"""
user = User.objects.create_user(
customer=self.cleaned_data['customer'],
password=self.cleaned_data['password1'], commit=commit)
customer=self.cleaned_data["customer"],
password=self.cleaned_data["password1"],
commit=commit,
)
return user
def save_m2m(self):
@ -108,14 +99,16 @@ class UserAdmin(admin.ModelAdmin):
<osusers.models.User>`.
"""
actions = ['perform_delete_selected']
actions = ["perform_delete_selected"]
add_form = UserCreationForm
inlines = [AdditionalGroupInline, ShadowInline]
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('customer', 'password1', 'password2')}),
(
None,
{"classes": ("wide",), "fields": ("customer", "password1", "password2")},
),
)
def get_form(self, request, obj=None, **kwargs):
@ -132,10 +125,12 @@ class UserAdmin(admin.ModelAdmin):
"""
defaults = {}
if obj is None:
defaults.update({
'form': self.add_form,
'fields': admin.options.flatten_fieldsets(self.add_fieldsets),
})
defaults.update(
{
"form": self.add_form,
"fields": admin.options.flatten_fieldsets(self.add_fieldsets),
}
)
defaults.update(kwargs)
return super(UserAdmin, self).get_form(request, obj, **defaults)
@ -151,7 +146,7 @@ class UserAdmin(admin.ModelAdmin):
"""
if obj:
return ['uid']
return ["uid"]
return []
def perform_delete_selected(self, request, queryset):
@ -167,7 +162,8 @@ class UserAdmin(admin.ModelAdmin):
"""
for user in queryset.all():
user.delete()
perform_delete_selected.short_description = _('Delete selected users')
perform_delete_selected.short_description = _("Delete selected users")
def get_actions(self, request):
"""
@ -182,8 +178,8 @@ class UserAdmin(admin.ModelAdmin):
"""
actions = super(UserAdmin, self).get_actions(request)
if 'delete_selected' in actions: # pragma: no cover
del actions['delete_selected']
if "delete_selected" in actions: # pragma: no cover
del actions["delete_selected"]
return actions
@ -193,7 +189,8 @@ class GroupAdmin(admin.ModelAdmin):
<osusers.models.Group>`.
"""
actions = ['perform_delete_selected']
actions = ["perform_delete_selected"]
def perform_delete_selected(self, request, queryset):
"""
@ -208,7 +205,8 @@ class GroupAdmin(admin.ModelAdmin):
"""
for group in queryset.all():
group.delete()
perform_delete_selected.short_description = _('Delete selected groups')
perform_delete_selected.short_description = _("Delete selected groups")
def get_actions(self, request):
"""
@ -223,8 +221,8 @@ class GroupAdmin(admin.ModelAdmin):
"""
actions = super(GroupAdmin, self).get_actions(request)
if 'delete_selected' in actions: # pragma: no cover
del actions['delete_selected']
if "delete_selected" in actions: # pragma: no cover
del actions["delete_selected"]
return actions
@ -234,33 +232,37 @@ class SshPublicKeyCreationForm(forms.ModelForm):
<osusers.models.SshPublicKey>`.
"""
publickeytext = forms.CharField(
label=_('Key text'), widget=forms.Textarea,
help_text=_('A SSH public key in either OpenSSH or RFC 4716 format'))
label=_("Key text"),
widget=forms.Textarea,
help_text=_("A SSH public key in either OpenSSH or RFC 4716 format"),
)
class Meta:
model = SshPublicKey
fields = ['user']
fields = ["user"]
def clean_publickeytext(self):
keytext = self.cleaned_data.get('publickeytext')
keytext = self.cleaned_data.get("publickeytext")
try:
SshPublicKey.objects.parse_keytext(keytext)
SshPublicKey.objects.parse_key_text(keytext)
except:
raise forms.ValidationError(INVALID_SSH_PUBLIC_KEY)
return keytext
def clean(self):
user = self.cleaned_data.get('user')
keytext = self.cleaned_data.get('publickeytext')
user = self.cleaned_data.get("user")
keytext = self.cleaned_data.get("publickeytext")
if user and keytext:
alg, data, comment = SshPublicKey.objects.parse_keytext(keytext)
alg, data, comment = SshPublicKey.objects.parse_key_text(keytext)
if SshPublicKey.objects.filter(
user=user, algorithm=alg, data=data
).exists():
self.add_error(
'publickeytext',
forms.ValidationError(DUPLICATE_SSH_PUBLIC_KEY_FOR_USER))
"publickeytext",
forms.ValidationError(DUPLICATE_SSH_PUBLIC_KEY_FOR_USER),
)
super(SshPublicKeyCreationForm, self).clean()
def save(self, commit=True):
@ -272,8 +274,9 @@ class SshPublicKeyCreationForm(forms.ModelForm):
:rtype: :py:class:`osusers.models.SshPublicKey`
"""
algorithm, keydata, comment = SshPublicKey.objects.parse_keytext(
self.cleaned_data.get('publickeytext'))
algorithm, keydata, comment = SshPublicKey.objects.parse_key_text(
self.cleaned_data.get("publickeytext")
)
self.instance.algorithm = algorithm
self.instance.data = keydata
self.instance.comment = comment
@ -286,14 +289,13 @@ class SshPublicKeyAdmin(admin.ModelAdmin):
<osusers.models.SshPublicKey>`.
"""
actions = ['perform_delete_selected']
actions = ["perform_delete_selected"]
add_form = SshPublicKeyCreationForm
list_display = ['user', 'algorithm', 'comment']
list_display = ["user", "algorithm", "comment"]
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('user', 'publickeytext')}),
(None, {"classes": ("wide",), "fields": ("user", "publickeytext")}),
)
def get_form(self, request, obj=None, **kwargs):
@ -311,13 +313,14 @@ class SshPublicKeyAdmin(admin.ModelAdmin):
"""
defaults = {}
if obj is None:
defaults.update({
'form': self.add_form,
'fields': admin.options.flatten_fieldsets(self.add_fieldsets),
})
defaults.update(
{
"form": self.add_form,
"fields": admin.options.flatten_fieldsets(self.add_fieldsets),
}
)
defaults.update(kwargs)
return super(SshPublicKeyAdmin, self).get_form(
request, obj, **defaults)
return super(SshPublicKeyAdmin, self).get_form(request, obj, **defaults)
def get_readonly_fields(self, request, obj=None):
"""
@ -332,7 +335,7 @@ class SshPublicKeyAdmin(admin.ModelAdmin):
"""
if obj:
return ['algorithm', 'data']
return ["algorithm", "data"]
return []
def perform_delete_selected(self, request, queryset):
@ -370,23 +373,19 @@ class SshPublicKeyAdmin(admin.ModelAdmin):
}
"""
users = set([
item['user'] for item in
queryset.values('user').distinct()
])
users = set([item["user"] for item in queryset.values("user").distinct()])
queryset.delete()
for user in users:
# TODO: move to model/signal
TaskResult.objects.create_task_result(
'perform_delete_selected',
"perform_delete_selected",
set_file_ssh_authorized_keys.s(
User.objects.get(uid=user).username,
[str(key) for key in SshPublicKey.objects.filter(
user_id=user)]
)
[str(key) for key in SshPublicKey.objects.filter(user_id=user)],
),
)
perform_delete_selected.short_description = _(
'Delete selected SSH public keys')
perform_delete_selected.short_description = _("Delete selected SSH public keys")
def get_actions(self, request):
"""
@ -401,8 +400,8 @@ class SshPublicKeyAdmin(admin.ModelAdmin):
"""
actions = super(SshPublicKeyAdmin, self).get_actions(request)
if 'delete_selected' in actions: # pragma: no cover
del actions['delete_selected']
if "delete_selected" in actions: # pragma: no cover
del actions["delete_selected"]
return actions

View file

@ -13,14 +13,12 @@ from crispy_forms.layout import Submit
from gvawebcore.forms import PasswordModelFormMixin
from .models import (
SshPublicKey,
User,
)
from .models import SshPublicKey, User
INVALID_SSH_PUBLIC_KEY = _('Invalid SSH public key data format.')
INVALID_SSH_PUBLIC_KEY = _("Invalid SSH public key data format.")
DUPLICATE_SSH_PUBLIC_KEY_FOR_USER = _(
'This SSH public key is already assigned to this user.')
"This SSH public key is already assigned to this user."
)
class ChangeOsUserPasswordForm(PasswordModelFormMixin, forms.ModelForm):
@ -28,6 +26,7 @@ class ChangeOsUserPasswordForm(PasswordModelFormMixin, forms.ModelForm):
A form for setting an OS user's password.
"""
class Meta:
model = User
fields = []
@ -36,8 +35,9 @@ class ChangeOsUserPasswordForm(PasswordModelFormMixin, forms.ModelForm):
self.helper = FormHelper()
super(ChangeOsUserPasswordForm, self).__init__(*args, **kwargs)
self.helper.form_action = reverse(
'set_osuser_password', kwargs={'slug': self.instance.username})
self.helper.add_input(Submit('submit', _('Set password')))
"set_osuser_password", kwargs={"slug": self.instance.username}
)
self.helper.add_input(Submit("submit", _("Set password")))
def save(self, commit=True):
"""
@ -48,7 +48,7 @@ class ChangeOsUserPasswordForm(PasswordModelFormMixin, forms.ModelForm):
:rtype: :py:class:`osusers.models.User`
"""
self.instance.set_password(self.cleaned_data['password1'])
self.instance.set_password(self.cleaned_data["password1"])
return super(ChangeOsUserPasswordForm, self).save(commit=commit)
@ -58,41 +58,45 @@ class AddSshPublicKeyForm(forms.ModelForm):
<osusers.models.SshPublicKey>`.
"""
publickeytext = forms.CharField(
label=_('Key text'), widget=forms.Textarea,
help_text=_('A SSH public key in either OpenSSH or RFC 4716 format'))
label=_("Key text"),
widget=forms.Textarea,
help_text=_("A SSH public key in either OpenSSH or RFC 4716 format"),
)
class Meta:
model = SshPublicKey
fields = []
def __init__(self, *args, **kwargs):
hosting_package = kwargs.pop('hostingpackage')
hosting_package = kwargs.pop("hostingpackage")
self.osuser = hosting_package.osuser
super(AddSshPublicKeyForm, self).__init__(*args, **kwargs)
self.helper = FormHelper()
self.helper.form_action = reverse(
'add_ssh_key', kwargs={'package': hosting_package.id})
self.helper.add_input(Submit('submit', _('Add SSH public key')))
"add_ssh_key", kwargs={"package": hosting_package.id}
)
self.helper.add_input(Submit("submit", _("Add SSH public key")))
def clean_publickeytext(self):
keytext = self.cleaned_data.get('publickeytext')
keytext = self.cleaned_data.get("publickeytext")
try:
SshPublicKey.objects.parse_keytext(keytext)
SshPublicKey.objects.parse_key_text(keytext)
except ValueError:
raise forms.ValidationError(INVALID_SSH_PUBLIC_KEY)
return keytext
def clean(self):
keytext = self.cleaned_data.get('publickeytext')
keytext = self.cleaned_data.get("publickeytext")
if keytext is not None:
alg, data, comment = SshPublicKey.objects.parse_keytext(keytext)
alg, data, comment = SshPublicKey.objects.parse_key_text(keytext)
if SshPublicKey.objects.filter(
user=self.osuser, algorithm=alg, data=data
).exists():
self.add_error(
'publickeytext',
forms.ValidationError(DUPLICATE_SSH_PUBLIC_KEY_FOR_USER)
"publickeytext",
forms.ValidationError(DUPLICATE_SSH_PUBLIC_KEY_FOR_USER),
)
def save(self, commit=True):
@ -104,8 +108,9 @@ class AddSshPublicKeyForm(forms.ModelForm):
:rtype: :py:class:`osusers.models.SshPublicKey`
"""
algorithm, keydata, comment = SshPublicKey.objects.parse_keytext(
self.cleaned_data.get('publickeytext'))
algorithm, keydata, comment = SshPublicKey.objects.parse_key_text(
self.cleaned_data.get("publickeytext")
)
self.instance.user = self.osuser
self.instance.algorithm = algorithm
self.instance.data = keydata
@ -119,17 +124,19 @@ class EditSshPublicKeyCommentForm(forms.ModelForm):
<osusers.models.SshPublicKey>` comment fields.
"""
class Meta:
model = SshPublicKey
fields = ['comment']
fields = ["comment"]
def __init__(self, *args, **kwargs):
hosting_package = kwargs.pop('hostingpackage')
hosting_package = kwargs.pop("hostingpackage")
self.osuser = hosting_package.osuser
super(EditSshPublicKeyCommentForm, self).__init__(*args, **kwargs)
self.fields['comment'].widget = forms.TextInput()
self.fields["comment"].widget = forms.TextInput()
self.helper = FormHelper()
self.helper.form_action = reverse(
'edit_ssh_key_comment',
kwargs={'package': hosting_package.id, 'pk': self.instance.id})
self.helper.add_input(Submit('submit', _('Change Comment')))
"edit_ssh_key_comment",
kwargs={"package": hosting_package.id, "pk": self.instance.id},
)
self.helper.add_input(Submit("submit", _("Change Comment")))

View file

@ -2,20 +2,16 @@
This module defines the database models of operating system users.
"""
from __future__ import unicode_literals
import base64
from datetime import date
import logging
import os
import six
from django.db import models, transaction
from django.conf import settings
from django.core.exceptions import ValidationError
from django.dispatch import Signal
from django.utils import timezone
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext as _
from model_utils.models import TimeStampedModel
@ -27,11 +23,10 @@ from passlib.utils import generate_password
_LOGGER = logging.getLogger(__name__)
password_set = Signal(providing_args=['instance', 'password'])
password_set = Signal(providing_args=["instance", "password"])
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL = _(
"You can not use a user's primary group.")
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL = _("You can not use a user's primary group.")
class GroupManager(models.Manager):
@ -48,34 +43,31 @@ class GroupManager(models.Manager):
:rtype: int
"""
q = self.aggregate(models.Max('gid'))
if q['gid__max'] is None:
q = self.aggregate(models.Max("gid"))
if q["gid__max"] is None:
return settings.OSUSER_MINGID
return max(settings.OSUSER_MINGID, q['gid__max'] + 1)
return max(settings.OSUSER_MINGID, q["gid__max"] + 1)
@python_2_unicode_compatible
class Group(TimeStampedModel, models.Model):
"""
This entity class corresponds to an operating system group.
"""
groupname = models.CharField(
_('Group name'), max_length=16, unique=True)
gid = models.PositiveSmallIntegerField(
_('Group ID'), unique=True, primary_key=True)
descr = models.TextField(_('Description'), blank=True)
passwd = models.CharField(
_('Group password'), max_length=128, blank=True)
groupname = models.CharField(_("Group name"), max_length=16, unique=True)
gid = models.PositiveSmallIntegerField(_("Group ID"), unique=True, primary_key=True)
descr = models.TextField(_("Description"), blank=True)
passwd = models.CharField(_("Group password"), max_length=128, blank=True)
objects = GroupManager()
class Meta:
verbose_name = _('Group')
verbose_name_plural = _('Groups')
verbose_name = _("Group")
verbose_name_plural = _("Groups")
def __str__(self):
return '{0} ({1})'.format(self.groupname, self.gid)
return "{0} ({1})".format(self.groupname, self.gid)
@transaction.atomic
def save(self, *args, **kwargs):
@ -122,10 +114,10 @@ class UserManager(models.Manager):
:rtype: int
"""
q = self.aggregate(models.Max('uid'))
if q['uid__max'] is None:
q = self.aggregate(models.Max("uid"))
if q["uid__max"] is None:
return settings.OSUSER_MINUID
return max(settings.OSUSER_MINUID, q['uid__max'] + 1)
return max(settings.OSUSER_MINUID, q["uid__max"] + 1)
def get_next_username(self):
"""
@ -137,23 +129,21 @@ class UserManager(models.Manager):
"""
count = 1
usernameformat = "{0}{1:02d}"
nextuser = usernameformat.format(settings.OSUSER_USERNAME_PREFIX,
count)
for user in self.values('username').filter(
username__startswith=settings.OSUSER_USERNAME_PREFIX
).order_by('username'):
if user['username'] == nextuser:
nextuser = usernameformat.format(settings.OSUSER_USERNAME_PREFIX, count)
for user in (
self.values("username")
.filter(username__startswith=settings.OSUSER_USERNAME_PREFIX)
.order_by("username")
):
if user["username"] == nextuser:
count += 1
nextuser = usernameformat.format(
settings.OSUSER_USERNAME_PREFIX, count)
nextuser = usernameformat.format(settings.OSUSER_USERNAME_PREFIX, count)
else:
break
return nextuser
@transaction.atomic
def create_user(
self, customer, username=None, password=None, commit=False
):
def create_user(self, customer, username=None, password=None, commit=False):
"""
Create a new user with a primary group named the same as the user and
an initial password.
@ -179,41 +169,42 @@ class UserManager(models.Manager):
password = generate_password()
homedir = os.path.join(settings.OSUSER_HOME_BASEPATH, username)
group = Group.objects.create(groupname=username, gid=gid)
user = self.create(username=username, group=group, uid=uid,
homedir=homedir, customer=customer,
shell=settings.OSUSER_DEFAULT_SHELL)
user = self.create(
username=username,
group=group,
uid=uid,
homedir=homedir,
customer=customer,
shell=settings.OSUSER_DEFAULT_SHELL,
)
user.set_password(password)
if commit:
user.save()
return user
@python_2_unicode_compatible
class User(TimeStampedModel, models.Model):
"""
This entity class corresponds to an operating system user.
"""
username = models.CharField(
_('User name'), max_length=64, unique=True)
uid = models.PositiveSmallIntegerField(
_('User ID'), unique=True, primary_key=True)
group = models.ForeignKey(
Group, verbose_name=_('Group'), on_delete=models.CASCADE)
gecos = models.CharField(_('Gecos field'), max_length=128, blank=True)
homedir = models.CharField(_('Home directory'), max_length=256)
shell = models.CharField(_('Login shell'), max_length=64)
customer = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
username = models.CharField(_("User name"), max_length=64, unique=True)
uid = models.PositiveSmallIntegerField(_("User ID"), unique=True, primary_key=True)
group = models.ForeignKey(Group, verbose_name=_("Group"), on_delete=models.CASCADE)
gecos = models.CharField(_("Gecos field"), max_length=128, blank=True)
homedir = models.CharField(_("Home directory"), max_length=256)
shell = models.CharField(_("Login shell"), max_length=64)
customer = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
objects = UserManager()
class Meta:
verbose_name = _('User')
verbose_name_plural = _('Users')
verbose_name = _("User")
verbose_name_plural = _("Users")
def __str__(self):
return '{0} ({1})'.format(self.username, self.uid)
return "{0} ({1})".format(self.username, self.uid)
@transaction.atomic
def set_password(self, password):
@ -226,17 +217,15 @@ class User(TimeStampedModel, models.Model):
:param str password: the new password
"""
if hasattr(self, 'shadow'):
if hasattr(self, "shadow"):
self.shadow.set_password(password)
else:
self.shadow = Shadow.objects.create_shadow(
user=self, password=password
)
password_set.send(
sender=self.__class__, password=password, instance=self)
self.shadow = Shadow.objects.create_shadow(user=self, password=password)
password_set.send(sender=self.__class__, password=password, instance=self)
return True
def is_sftp_user(self):
# noinspection PyUnresolvedReferences
return self.additionalgroup_set.filter(
group__groupname=settings.OSUSER_SFTP_GROUP
).exists()
@ -269,6 +258,7 @@ class User(TimeStampedModel, models.Model):
:py:meth:`django.db.Model.delete`
"""
# noinspection PyUnresolvedReferences
self.group.delete()
super(User, self).delete(*args, **kwargs)
@ -293,64 +283,84 @@ class ShadowManager(models.Manager):
"""
changedays = (timezone.now().date() - date(1970, 1, 1)).days
shadow = self.create(
user=user, changedays=changedays,
minage=0, maxage=None, gracedays=7,
inactdays=30, expiredays=None
user=user,
changedays=changedays,
minage=0,
maxage=None,
gracedays=7,
inactdays=30,
expiredays=None,
)
shadow.set_password(password)
shadow.save()
return shadow
@python_2_unicode_compatible
class Shadow(TimeStampedModel, models.Model):
"""
This entity class corresponds to an operating system user's shadow file
entry.
"""
user = models.OneToOneField(
User, primary_key=True, verbose_name=_('User'),
on_delete=models.CASCADE)
passwd = models.CharField(_('Encrypted password'), max_length=128)
User, primary_key=True, verbose_name=_("User"), on_delete=models.CASCADE
)
passwd = models.CharField(_("Encrypted password"), max_length=128)
changedays = models.PositiveSmallIntegerField(
_('Date of last change'),
help_text=_('This is expressed in days since Jan 1, 1970'),
blank=True, null=True)
_("Date of last change"),
help_text=_("This is expressed in days since Jan 1, 1970"),
blank=True,
null=True,
)
minage = models.PositiveSmallIntegerField(
_('Minimum age'),
help_text=_('Minimum number of days before the password can be'
' changed'),
blank=True, null=True)
_("Minimum age"),
help_text=_("Minimum number of days before the password can be" " changed"),
blank=True,
null=True,
)
maxage = models.PositiveSmallIntegerField(
_('Maximum age'),
help_text=_('Maximum number of days after which the password has to'
' be changed'),
blank=True, null=True)
_("Maximum age"),
help_text=_(
"Maximum number of days after which the password has to" " be changed"
),
blank=True,
null=True,
)
gracedays = models.PositiveSmallIntegerField(
_('Grace period'),
help_text=_('The number of days before the password is going to'
' expire'),
blank=True, null=True)
_("Grace period"),
help_text=_("The number of days before the password is going to" " expire"),
blank=True,
null=True,
)
inactdays = models.PositiveSmallIntegerField(
_('Inactivity period'),
help_text=_('The number of days after the password has expired during'
' which the password should still be accepted'),
blank=True, null=True)
_("Inactivity period"),
help_text=_(
"The number of days after the password has expired during"
" which the password should still be accepted"
),
blank=True,
null=True,
)
expiredays = models.PositiveSmallIntegerField(
_('Account expiration date'),
help_text=_('The date of expiration of the account, expressed as'
' number of days since Jan 1, 1970'),
blank=True, null=True, default=None)
_("Account expiration date"),
help_text=_(
"The date of expiration of the account, expressed as"
" number of days since Jan 1, 1970"
),
blank=True,
null=True,
default=None,
)
objects = ShadowManager()
class Meta:
verbose_name = _('Shadow password')
verbose_name_plural = _('Shadow passwords')
verbose_name = _("Shadow password")
verbose_name_plural = _("Shadow passwords")
def __str__(self):
return 'for user {0}'.format(self.user)
return "for user {0}".format(self.user)
def set_password(self, password):
"""
@ -361,23 +371,23 @@ class Shadow(TimeStampedModel, models.Model):
self.passwd = sha512_crypt.encrypt(password)
@python_2_unicode_compatible
class AdditionalGroup(TimeStampedModel, models.Model):
"""
This entity class corresponds to additional group assignments for an
:py:class:`operating system user <osusers.models.User>`.
"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
group = models.ForeignKey(Group, on_delete=models.CASCADE)
class Meta:
unique_together = ('user', 'group')
verbose_name = _('Additional group')
verbose_name_plural = _('Additional groups')
unique_together = ("user", "group")
verbose_name = _("Additional group")
verbose_name_plural = _("Additional groups")
def __str__(self):
return '{0} in {1}'.format(self.user, self.group)
return "{0} in {1}".format(self.user, self.group)
def clean(self):
"""
@ -385,6 +395,7 @@ class AdditionalGroup(TimeStampedModel, models.Model):
group.
"""
# noinspection PyUnresolvedReferences
if self.user.group == self.group:
raise ValidationError(CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)
@ -423,60 +434,62 @@ class SshPublicKeyManager(models.Manager):
"""
def parse_keytext(self, keytext):
def parse_key_text(self, key_text: str):
"""
Parse a SSH public key in OpenSSH or :rfc:`4716` format into its
components algorithm, key data and comment.
:param str keytext: key text
:param str key_text: key text
:return: triple of algorithm name, key data and comment
:rtype: triple of str
"""
if keytext.startswith('---- BEGIN SSH2 PUBLIC KEY ----'):
comment = ''
data = ''
continued = ''
if key_text.startswith("---- BEGIN SSH2 PUBLIC KEY ----"):
comment = ""
data = ""
continued = ""
headers = {}
for line in keytext.splitlines():
if line == '---- BEGIN SSH2 PUBLIC KEY ----':
header_tag = None
for line in key_text.splitlines():
if line == "---- BEGIN SSH2 PUBLIC KEY ----":
continue
elif ':' in line: # a header line
elif ":" in line: # a header line
header_tag, header_value = [
item.strip() for item in line.split(':', 1)]
if header_value.endswith('\\'):
item.strip() for item in line.split(":", 1)
]
if header_value.endswith("\\"):
continued = header_value[:-1]
else:
headers[header_tag.lower()] = header_value
elif continued:
if line.endswith('\\'):
if line.endswith("\\"):
continued += line[:-1]
continue
header_value = continued + line
headers[header_tag.lower()] = header_value
continued = ''
elif line == '---- END SSH2 PUBLIC KEY ----':
continued = ""
elif line == "---- END SSH2 PUBLIC KEY ----":
break
elif line: # ignore empty lines
data += line
if 'comment' in headers:
comment = headers['comment']
if "comment" in headers:
comment = headers["comment"]
else:
parts = keytext.split(None, 2)
parts = key_text.split(None, 2)
if len(parts) < 2:
raise ValueError('invalid SSH public key')
raise ValueError("invalid SSH public key")
data = parts[1]
comment = len(parts) == 3 and parts[2] or ""
try:
keybytes = base64.b64decode(data)
except TypeError:
raise ValueError('invalid SSH public key')
parts = keybytes.split(b'\x00' * 3)
raise ValueError("invalid SSH public key")
parts = keybytes.split(b"\x00" * 3)
if len(parts) < 2:
raise ValueError('invalid SSH public key')
alglength = six.byte2int(parts[1])
algname = parts[1][1:1+alglength]
return algname, data, comment
raise ValueError("invalid SSH public key")
key_length = int.from_bytes(parts[1], byteorder="big")
key_algorithm = parts[1][1 : 1 + key_length].decode("utf-8")
return key_algorithm, data, comment
def create_ssh_public_key(self, user, keytext):
"""
@ -490,31 +503,28 @@ class SshPublicKeyManager(models.Manager):
:retype: :py:class:`osusers.models.SshPublicKey`
"""
algorithm, data, comment = self.parse_keytext(keytext)
return self.create(
user=user, algorithm=algorithm, data=data, comment=comment)
algorithm, data, comment = self.parse_key_text(keytext)
return self.create(user=user, algorithm=algorithm, data=data, comment=comment)
@python_2_unicode_compatible
class SshPublicKey(TimeStampedModel):
"""
This entity class represents single SSH keys for an :py:class:`operating
system user <osusers.models.User>`.
"""
user = models.ForeignKey(
User, verbose_name=_('User'), on_delete=models.CASCADE)
algorithm = models.CharField(_('Algorithm'), max_length=20)
data = models.TextField(_('Key bytes'),
help_text=_('Base64 encoded key bytes'))
comment = models.TextField(_('Comment'), blank=True)
user = models.ForeignKey(User, verbose_name=_("User"), on_delete=models.CASCADE)
algorithm = models.CharField(_("Algorithm"), max_length=20)
data = models.TextField(_("Key bytes"), help_text=_("Base64 encoded key bytes"))
comment = models.TextField(_("Comment"), blank=True)
objects = SshPublicKeyManager()
class Meta:
verbose_name = _('SSH public key')
verbose_name_plural = _('SSH public keys')
unique_together = [('user', 'algorithm', 'data')]
verbose_name = _("SSH public key")
verbose_name_plural = _("SSH public keys")
unique_together = [("user", "algorithm", "data")]
def __str__(self):
return "{algorithm} {data} {comment}".format(

View file

@ -5,17 +5,10 @@ from django.test.utils import override_settings
from django.contrib.auth import get_user_model
from mock import MagicMock, Mock, patch
from unittest.mock import MagicMock, Mock, patch
from osusers.forms import (
INVALID_SSH_PUBLIC_KEY,
DUPLICATE_SSH_PUBLIC_KEY_FOR_USER,
)
from osusers.models import (
Group,
SshPublicKey,
User,
)
from osusers.forms import INVALID_SSH_PUBLIC_KEY, DUPLICATE_SSH_PUBLIC_KEY_FOR_USER
from osusers.models import Group, SshPublicKey, User
from osusers.admin import (
GroupAdmin,
PASSWORD_MISMATCH_ERROR,
@ -30,20 +23,19 @@ Customer = get_user_model()
class CustomerTestCase(TestCase):
def setUp(self):
self.customer = Customer.objects.create_user('test')
self.customer = Customer.objects.create_user("test")
super(CustomerTestCase, self).setUp()
class UserCreationFormTest(CustomerTestCase):
def test_clean_password2_same(self):
form = UserCreationForm()
form.cleaned_data = {
'customer': self.customer,
'password1': 'secret',
'password2': 'secret'
"customer": self.customer,
"password1": "secret",
"password2": "secret",
}
self.assertEqual(form.clean_password2(), 'secret')
self.assertEqual(form.clean_password2(), "secret")
def test_clean_password2_empty(self):
form = UserCreationForm()
@ -53,25 +45,23 @@ class UserCreationFormTest(CustomerTestCase):
def test_clean_password2_mismatch(self):
form = UserCreationForm()
form.cleaned_data = {
'customer': self.customer,
'password1': 'secretx',
'password2': 'secrety'
"customer": self.customer,
"password1": "secretx",
"password2": "secrety",
}
with self.assertRaises(forms.ValidationError) as cm:
form.clean_password2()
self.assertEqual(cm.exception.message, PASSWORD_MISMATCH_ERROR)
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
CELERY_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND="memory", BROKER_BACKEND="memory"
)
def test_save_commit(self):
form = UserCreationForm()
form.cleaned_data = {
'customer': self.customer,
'password1': 'secret',
'password2': 'secret'
"customer": self.customer,
"password1": "secret",
"password2": "secret",
}
user = form.save()
self.assertIsNotNone(user)
@ -89,121 +79,99 @@ class UserAdminTest(CustomerTestCase):
super(UserAdminTest, self).setUp()
def test_get_form_without_object(self):
form = self.uadmin.get_form(Mock(name='request'))
self.assertEqual(
form.Meta.fields,
['customer', 'password1', 'password2']
)
form = self.uadmin.get_form(Mock(name="request"))
self.assertEqual(form.Meta.fields, ["customer", "password1", "password2"])
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
CELERY_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND="memory", BROKER_BACKEND="memory"
)
def test_get_form_with_object(self):
user = User.objects.create_user(customer=self.customer)
form = self.uadmin.get_form(Mock(name='request'), user)
form = self.uadmin.get_form(Mock(name="request"), user)
self.assertEqual(
form.Meta.fields,
['username', 'group', 'gecos', 'homedir', 'shell', 'customer',
'uid']
["username", "group", "gecos", "homedir", "shell", "customer", "uid"],
)
def test_get_inline_instances_without_object(self):
inlines = self.uadmin.get_inline_instances(Mock(name='request'))
inlines = self.uadmin.get_inline_instances(Mock(name="request"))
self.assertEqual(len(inlines), 2)
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
CELERY_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND="memory", BROKER_BACKEND="memory"
)
def test_get_inline_instances_with_object(self):
user = User.objects.create_user(customer=self.customer)
inlines = self.uadmin.get_inline_instances(
Mock(name='request'), user)
inlines = self.uadmin.get_inline_instances(Mock(name="request"), user)
self.assertEqual(len(inlines), len(UserAdmin.inlines))
for index in range(len(inlines)):
self.assertIsInstance(inlines[index], UserAdmin.inlines[index])
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
CELERY_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND="memory", BROKER_BACKEND="memory"
)
def test_perform_delete_selected(self):
user = User.objects.create_user(customer=self.customer)
self.uadmin.perform_delete_selected(
Mock(name='request'), User.objects.filter(uid=user.uid))
Mock(name="request"), User.objects.filter(uid=user.uid)
)
self.assertEqual(User.objects.filter(uid=user.uid).count(), 0)
def test_get_actions(self):
requestmock = MagicMock(name='request')
self.assertNotIn(
'delete_selected',
self.uadmin.get_actions(requestmock))
self.assertIn(
'perform_delete_selected',
self.uadmin.get_actions(requestmock))
requestmock = MagicMock(name="request")
self.assertNotIn("delete_selected", self.uadmin.get_actions(requestmock))
self.assertIn("perform_delete_selected", self.uadmin.get_actions(requestmock))
class GroupAdminTest(TestCase):
def setUp(self):
site = AdminSite()
self.gadmin = GroupAdmin(Group, site)
super(GroupAdminTest, self).setUp()
def test_get_inline_instances_without_object(self):
inlines = self.gadmin.get_inline_instances(Mock(name='request'))
inlines = self.gadmin.get_inline_instances(Mock(name="request"))
self.assertEqual(inlines, [])
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
CELERY_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND="memory", BROKER_BACKEND="memory"
)
def test_get_inline_instances_with_object(self):
group = Group.objects.create(gid=1000, groupname='test')
inlines = self.gadmin.get_inline_instances(
Mock(name='request'), group)
group = Group.objects.create(gid=1000, groupname="test")
inlines = self.gadmin.get_inline_instances(Mock(name="request"), group)
self.assertEqual(len(inlines), len(GroupAdmin.inlines))
for index in range(len(inlines)):
self.assertIsInstance(inlines[index], GroupAdmin.inlines[index])
def test_perform_delete_selected(self):
group = Group.objects.create(gid=1000, groupname='test')
group = Group.objects.create(gid=1000, groupname="test")
self.gadmin.perform_delete_selected(
Mock(name='request'), Group.objects.filter(gid=group.gid))
Mock(name="request"), Group.objects.filter(gid=group.gid)
)
self.assertEqual(Group.objects.filter(gid=group.gid).count(), 0)
def test_get_actions(self):
requestmock = MagicMock(name='request')
self.assertNotIn(
'delete_selected',
self.gadmin.get_actions(requestmock))
self.assertIn(
'perform_delete_selected',
self.gadmin.get_actions(requestmock))
requestmock = MagicMock(name="request")
self.assertNotIn("delete_selected", self.gadmin.get_actions(requestmock))
self.assertIn("perform_delete_selected", self.gadmin.get_actions(requestmock))
class SshPublicKeyCreationFormTest(CustomerTestCase):
@patch('osusers.admin.SshPublicKey.objects')
@patch("osusers.admin.SshPublicKey.objects")
def test_clean_publickeytext_valid_key(self, sshpkmanager):
form = SshPublicKeyCreationForm()
sshpkmanager.parse_keytext = MagicMock(side_effect=ValueError)
form.cleaned_data = {'publickeytext': 'wrongkey'}
sshpkmanager.parse_key_text = MagicMock(side_effect=ValueError)
form.cleaned_data = {"publickeytext": "wrongkey"}
with self.assertRaises(forms.ValidationError) as ve:
form.clean_publickeytext()
self.assertEqual(ve.exception.message, INVALID_SSH_PUBLIC_KEY)
@patch('osusers.admin.SshPublicKey.objects')
@patch("osusers.admin.SshPublicKey.objects")
def test_clean_publickeytext_invalid_key(self, sshpkmanager):
form = SshPublicKeyCreationForm()
sshpkmanager.parse_keytext = MagicMock(return_value='goodkey')
form.cleaned_data = {'publickeytext': 'goodkey'}
self.assertEqual(form.clean_publickeytext(), 'goodkey')
sshpkmanager.parse_key_text = MagicMock(return_value="goodkey")
form.cleaned_data = {"publickeytext": "goodkey"}
self.assertEqual(form.clean_publickeytext(), "goodkey")
def test_clean_missing_data(self):
form = SshPublicKeyCreationForm()
@ -211,95 +179,83 @@ class SshPublicKeyCreationFormTest(CustomerTestCase):
form.clean()
self.assertEqual(len(form.errors), 0)
@patch('osusers.admin.SshPublicKey.objects.parse_keytext')
def test_clean_once(self, parse_keytext):
parse_keytext.return_value = ('good', 'key', 'comment')
@patch("osusers.admin.SshPublicKey.objects.parse_key_text")
def test_clean_once(self, parse_key_text):
parse_key_text.return_value = ("good", "key", "comment")
user = User.objects.create_user(customer=self.customer)
form = SshPublicKeyCreationForm()
form.cleaned_data = {
'user': user,
'publickeytext': 'good key comment'
}
form.cleaned_data = {"user": user, "publickeytext": "good key comment"}
form.clean()
self.assertEqual(len(form.errors), 0)
@patch('osusers.admin.SshPublicKey.objects.parse_keytext')
def test_clean_again(self, parse_keytext):
parse_keytext.return_value = ('good', 'key', 'comment')
@patch("osusers.admin.SshPublicKey.objects.parse_key_text")
def test_clean_again(self, parse_key_text):
parse_key_text.return_value = ("good", "key", "comment")
user = User.objects.create_user(customer=self.customer)
SshPublicKey.objects.create(
user=user, algorithm='good', data='key', comment='comment')
user=user, algorithm="good", data="key", comment="comment"
)
form = SshPublicKeyCreationForm()
form.cleaned_data = {
'user': user,
'publickeytext': 'good key comment'
}
form.cleaned_data = {"user": user, "publickeytext": "good key comment"}
form.clean()
self.assertIn('publickeytext', form.errors)
self.assertIn("publickeytext", form.errors)
self.assertEqual(
form.errors['publickeytext'],
[DUPLICATE_SSH_PUBLIC_KEY_FOR_USER])
form.errors["publickeytext"], [DUPLICATE_SSH_PUBLIC_KEY_FOR_USER]
)
@patch('osusers.admin.SshPublicKey.objects.parse_keytext')
def test_save(self, parse_keytext):
parse_keytext.return_value = ('good', 'key', 'comment')
@patch("osusers.admin.SshPublicKey.objects.parse_key_text")
def test_save(self, parse_key_text):
parse_key_text.return_value = ("good", "key", "comment")
user = User.objects.create_user(customer=self.customer)
form = SshPublicKeyCreationForm()
form.cleaned_data = {
'user': user,
'publickeytext': 'good key comment'
}
form.cleaned_data = {"user": user, "publickeytext": "good key comment"}
form.instance.user = user
form.save()
self.assertTrue(
SshPublicKey.objects.filter(
user=user, algorithm='good', data='key'))
SshPublicKey.objects.filter(user=user, algorithm="good", data="key")
)
class SshPublicKeyAdminTest(CustomerTestCase):
def setUp(self):
site = AdminSite()
self.sadmin = SshPublicKeyAdmin(SshPublicKey, site)
super(SshPublicKeyAdminTest, self).setUp()
def test_get_form_no_instance(self):
form = self.sadmin.get_form(request=Mock(name='request'))
form = self.sadmin.get_form(request=Mock(name="request"))
self.assertEqual(form.Meta.model, SshPublicKey)
def test_get_form_with_instance(self):
user = User.objects.create_user(customer=self.customer)
key = SshPublicKey.objects.create(
user=user, algorithm='good', data='key', comment='comment')
form = self.sadmin.get_form(request=Mock(name='request'), obj=key)
user=user, algorithm="good", data="key", comment="comment"
)
form = self.sadmin.get_form(request=Mock(name="request"), obj=key)
self.assertEqual(form.Meta.model, SshPublicKey)
self.assertEqual(
form.Meta.fields,
['user', 'comment', 'algorithm', 'data'])
self.assertEqual(form.Meta.fields, ["user", "comment", "algorithm", "data"])
def test_get_readonly_fields_no_instance(self):
readonly_fields = self.sadmin.get_readonly_fields(
request=Mock(name='request'))
readonly_fields = self.sadmin.get_readonly_fields(request=Mock(name="request"))
self.assertEqual(readonly_fields, [])
def test_get_readonly_fields_with_instance(self):
readonly_fields = self.sadmin.get_readonly_fields(
request=Mock(name='request'), obj=Mock())
self.assertEqual(readonly_fields, ['algorithm', 'data'])
request=Mock(name="request"), obj=Mock()
)
self.assertEqual(readonly_fields, ["algorithm", "data"])
def test_perform_delete_selected(self):
user = User.objects.create_user(customer=self.customer)
key = SshPublicKey.objects.create(
user=user, algorithm='good', data='key', comment='comment')
user=user, algorithm="good", data="key", comment="comment"
)
self.sadmin.perform_delete_selected(
Mock(name='request'), SshPublicKey.objects.filter(id=key.id))
Mock(name="request"), SshPublicKey.objects.filter(id=key.id)
)
self.assertFalse(SshPublicKey.objects.filter(id=key.id).exists())
def test_get_actions(self):
requestmock = MagicMock(name='request')
self.assertNotIn(
'delete_selected',
self.sadmin.get_actions(requestmock))
self.assertIn(
'perform_delete_selected',
self.sadmin.get_actions(requestmock))
requestmock = MagicMock(name="request")
self.assertNotIn("delete_selected", self.sadmin.get_actions(requestmock))
self.assertIn("perform_delete_selected", self.sadmin.get_actions(requestmock))

View file

@ -2,15 +2,13 @@
This module provides tests for :py:mod:`osusers.forms`.
"""
from __future__ import absolute_import, unicode_literals
from mock import MagicMock, Mock, patch
from unittest.mock import MagicMock, Mock, patch
from django import forms
from django.core.urlresolvers import reverse
from django.test import TestCase
from django.contrib.auth import get_user_model
from django.urls import reverse
from passlib.hash import sha512_crypt
@ -34,7 +32,7 @@ class AddSshPublicKeyFormTest(TestCase):
"""
def _setup_hostingpackage(self):
customer = Customer.objects.create_user('test')
customer = Customer.objects.create_user("test")
user = User.objects.create_user(customer=customer)
self.hostingpackage = Mock(id=42, osuser=user)
@ -42,97 +40,92 @@ class AddSshPublicKeyFormTest(TestCase):
instance = MagicMock()
with self.assertRaises(KeyError) as ke:
AddSshPublicKeyForm(instance)
self.assertEqual(ke.exception.args[0], 'hostingpackage')
self.assertEqual(ke.exception.args[0], "hostingpackage")
def test_constructor(self):
self._setup_hostingpackage()
instance = MagicMock()
form = AddSshPublicKeyForm(
instance, hostingpackage=self.hostingpackage)
self.assertTrue(hasattr(form, 'osuser'))
form = AddSshPublicKeyForm(instance, hostingpackage=self.hostingpackage)
self.assertTrue(hasattr(form, "osuser"))
self.assertEqual(form.osuser, self.hostingpackage.osuser)
self.assertTrue(hasattr(form, 'helper'))
self.assertEqual(form.helper.form_action, reverse(
'add_ssh_key', kwargs={'package': self.hostingpackage.id}))
self.assertIn('publickeytext', form.fields)
self.assertEqual(form.helper.inputs[0].name, 'submit')
self.assertTrue(hasattr(form, "helper"))
self.assertEqual(
form.helper.form_action,
reverse("add_ssh_key", kwargs={"package": self.hostingpackage.id}),
)
self.assertIn("publickeytext", form.fields)
self.assertEqual(form.helper.inputs[0].name, "submit")
@patch('osusers.forms.SshPublicKey.objects.parse_keytext')
def test_clean_publickeytext_invalid(self, parse_keytext):
@patch("osusers.forms.SshPublicKey.objects.parse_key_text")
def test_clean_publickeytext_invalid(self, parse_key_text):
self._setup_hostingpackage()
instance = MagicMock()
form = AddSshPublicKeyForm(
instance, hostingpackage=self.hostingpackage)
form.cleaned_data = {'publickeytext': 'a bad key'}
parse_keytext.side_effect = ValueError
form = AddSshPublicKeyForm(instance, hostingpackage=self.hostingpackage)
form.cleaned_data = {"publickeytext": "a bad key"}
parse_key_text.side_effect = ValueError
with self.assertRaises(forms.ValidationError) as ve:
form.clean_publickeytext()
self.assertEqual(ve.exception.message, INVALID_SSH_PUBLIC_KEY)
@patch('osusers.forms.SshPublicKey.objects.parse_keytext')
def test_clean_publickeytext_valid(self, parse_keytext):
@patch("osusers.forms.SshPublicKey.objects.parse_key_text")
def test_clean_publickeytext_valid(self, _):
self._setup_hostingpackage()
instance = MagicMock()
form = AddSshPublicKeyForm(
instance, hostingpackage=self.hostingpackage)
form.cleaned_data = {'publickeytext': 'good key comment'}
form = AddSshPublicKeyForm(instance, hostingpackage=self.hostingpackage)
form.cleaned_data = {"publickeytext": "good key comment"}
retval = form.clean_publickeytext()
self.assertEqual(retval, 'good key comment')
self.assertEqual(retval, "good key comment")
def test_clean_none(self):
self._setup_hostingpackage()
instance = MagicMock()
form = AddSshPublicKeyForm(
instance, hostingpackage=self.hostingpackage)
form.cleaned_data = {'publickeytext': None}
form = AddSshPublicKeyForm(instance, hostingpackage=self.hostingpackage)
form.cleaned_data = {"publickeytext": None}
form.clean()
self.assertIsNone(form.cleaned_data['publickeytext'])
self.assertIsNone(form.cleaned_data["publickeytext"])
@patch('osusers.forms.SshPublicKey.objects.parse_keytext')
def test_clean_fresh(self, parse_keytext):
@patch("osusers.forms.SshPublicKey.objects.parse_key_text")
def test_clean_fresh(self, parse_key_text):
self._setup_hostingpackage()
instance = MagicMock()
form = AddSshPublicKeyForm(
instance, hostingpackage=self.hostingpackage)
sshpubkey = 'good key comment'
form.cleaned_data = {'publickeytext': sshpubkey}
parse_keytext.return_value = sshpubkey.split(' ')
form = AddSshPublicKeyForm(instance, hostingpackage=self.hostingpackage)
sshpubkey = "good key comment"
form.cleaned_data = {"publickeytext": sshpubkey}
parse_key_text.return_value = sshpubkey.split(" ")
form.clean()
self.assertEqual(
form.cleaned_data['publickeytext'], 'good key comment')
self.assertEqual(form.cleaned_data["publickeytext"], "good key comment")
@patch('osusers.forms.SshPublicKey.objects.parse_keytext')
def test_clean_duplicate(self, parse_keytext):
@patch("osusers.forms.SshPublicKey.objects.parse_key_text")
def test_clean_duplicate(self, parse_key_text):
self._setup_hostingpackage()
instance = MagicMock()
form = AddSshPublicKeyForm(
instance, hostingpackage=self.hostingpackage)
form = AddSshPublicKeyForm(instance, hostingpackage=self.hostingpackage)
SshPublicKey.objects.create(
user=self.hostingpackage.osuser, algorithm='good', data='key',
comment='comment')
sshpubkey = 'good key comment'
form.cleaned_data = {'publickeytext': sshpubkey}
parse_keytext.return_value = sshpubkey.split(' ')
user=self.hostingpackage.osuser,
algorithm="good",
data="key",
comment="comment",
)
sshpubkey = "good key comment"
form.cleaned_data = {"publickeytext": sshpubkey}
parse_key_text.return_value = sshpubkey.split(" ")
form.clean()
self.assertIn('publickeytext', form.errors)
self.assertIn(
DUPLICATE_SSH_PUBLIC_KEY_FOR_USER,
form.errors['publickeytext'])
self.assertIn("publickeytext", form.errors)
self.assertIn(DUPLICATE_SSH_PUBLIC_KEY_FOR_USER, form.errors["publickeytext"])
@patch('osusers.admin.SshPublicKey.objects.parse_keytext')
def test_save(self, parse_keytext):
@patch("osusers.admin.SshPublicKey.objects.parse_key_text")
def test_save(self, parse_key_text):
self._setup_hostingpackage()
instance = MagicMock()
form = AddSshPublicKeyForm(
instance, hostingpackage=self.hostingpackage)
sshpubkey = 'good key comment'
form.cleaned_data = {'publickeytext': sshpubkey}
parse_keytext.return_value = sshpubkey.split(' ')
form = AddSshPublicKeyForm(instance, hostingpackage=self.hostingpackage)
sshpubkey = "good key comment"
form.cleaned_data = {"publickeytext": sshpubkey}
parse_key_text.return_value = sshpubkey.split(" ")
retval = form.save()
self.assertTrue(isinstance(retval, SshPublicKey))
self.assertEqual(retval.algorithm, 'good')
self.assertEqual(retval.data, 'key')
self.assertEqual(retval.comment, 'comment')
self.assertEqual(retval.algorithm, "good")
self.assertEqual(retval.data, "key")
self.assertEqual(retval.comment, "comment")
class ChangeOsUserPasswordFormTest(TestCase):
@ -142,25 +135,27 @@ class ChangeOsUserPasswordFormTest(TestCase):
"""
def _setup_user(self):
customer = Customer.objects.create_user('test')
customer = Customer.objects.create_user("test")
self.user = User.objects.create_user(customer=customer)
def test_constructor(self):
self._setup_user()
form = ChangeOsUserPasswordForm(instance=self.user)
self.assertTrue(hasattr(form, 'instance'))
self.assertTrue(hasattr(form, "instance"))
self.assertEqual(form.instance, self.user)
self.assertTrue(hasattr(form, 'helper'))
self.assertEqual(form.helper.form_action, reverse(
'set_osuser_password', kwargs={'slug': self.user.username}))
self.assertEqual(form.helper.inputs[0].name, 'submit')
self.assertTrue(hasattr(form, "helper"))
self.assertEqual(
form.helper.form_action,
reverse("set_osuser_password", kwargs={"slug": self.user.username}),
)
self.assertEqual(form.helper.inputs[0].name, "submit")
def test_save(self):
self._setup_user()
form = ChangeOsUserPasswordForm(instance=self.user)
form.cleaned_data = {'password1': 'test'}
form.cleaned_data = {"password1": "test"}
user = form.save()
self.assertTrue(sha512_crypt.verify('test', user.shadow.passwd))
self.assertTrue(sha512_crypt.verify("test", user.shadow.passwd))
class EditSshPublicKeyCommentFormTest(TestCase):
@ -170,7 +165,7 @@ class EditSshPublicKeyCommentFormTest(TestCase):
"""
def _setup_hostingpackage(self):
customer = Customer.objects.create_user('test')
customer = Customer.objects.create_user("test")
user = User.objects.create_user(customer=customer)
self.hostingpackage = Mock(id=42, osuser=user)
@ -178,18 +173,23 @@ class EditSshPublicKeyCommentFormTest(TestCase):
instance = MagicMock()
with self.assertRaises(KeyError) as ke:
EditSshPublicKeyCommentForm(instance)
self.assertEqual(ke.exception.args[0], 'hostingpackage')
self.assertEqual(ke.exception.args[0], "hostingpackage")
def test_constructor(self):
self._setup_hostingpackage()
instance = MagicMock(id=1)
form = EditSshPublicKeyCommentForm(
instance=instance, hostingpackage=self.hostingpackage)
self.assertTrue(hasattr(form, 'osuser'))
instance=instance, hostingpackage=self.hostingpackage
)
self.assertTrue(hasattr(form, "osuser"))
self.assertEqual(form.osuser, self.hostingpackage.osuser)
self.assertIn('comment', form.fields)
self.assertTrue(hasattr(form, 'helper'))
self.assertEqual(form.helper.form_action, reverse(
'edit_ssh_key_comment',
kwargs={'package': self.hostingpackage.id, 'pk': instance.id}))
self.assertEqual(form.helper.inputs[0].name, 'submit')
self.assertIn("comment", form.fields)
self.assertTrue(hasattr(form, "helper"))
self.assertEqual(
form.helper.form_action,
reverse(
"edit_ssh_key_comment",
kwargs={"package": self.hostingpackage.id, "pk": instance.id},
),
)
self.assertEqual(form.helper.inputs[0].name, "submit")

View file

@ -1,19 +1,17 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from datetime import date
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import ValidationError
from django.test import TestCase
from django.test.utils import override_settings
from django.utils import timezone
from django.contrib.auth import get_user_model
from passlib.hash import sha512_crypt
from osusers.models import (
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL,
AdditionalGroup,
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL,
Group,
Shadow,
SshPublicKey,
@ -21,7 +19,6 @@ from osusers.models import (
)
from taskresults.models import TaskResult
EXAMPLE_KEY_1_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: "1024-bit RSA, converted from OpenSSH by me@example.com"
x-command: /home/me/bin/lock-in-guest.sh
@ -57,12 +54,14 @@ n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
---- END SSH2 PUBLIC KEY ----"""
EXAMPLE_KEY_4_OPENSSH = "".join((
"ssh-rsa ",
"AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb",
"YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ",
"5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE="
))
EXAMPLE_KEY_4_OPENSSH = "".join(
(
"ssh-rsa ",
"AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb",
"YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ",
"5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE=",
)
)
EXAMPLE_KEY_5_RFC4716_MULTILINE = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: DSA Public Key \\
@ -99,10 +98,7 @@ YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ
5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE=
---- END SSH2 PUBLIC KEY ----"""
EXAMPLE_KEY_8_OPENSSH_BROKEN = "".join((
"ssh-rsa ",
"AschrÖdderöd"
))
EXAMPLE_KEY_8_OPENSSH_BROKEN = "".join(("ssh-rsa ", "AschrÖdderöd"))
EXAMPLE_KEY_9_RFC4716_ONLY_HEADER = "---- BEGIN SSH2 PUBLIC KEY ----"
@ -110,9 +106,7 @@ Customer = get_user_model()
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
CELERY_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND="memory", BROKER_BACKEND="memory"
)
class TestCaseWithCeleryTasks(TestCase):
pass
@ -120,38 +114,44 @@ class TestCaseWithCeleryTasks(TestCase):
class AdditionalGroupTest(TestCaseWithCeleryTasks):
def setUp(self):
customer = Customer.objects.create(username='test')
self.group1 = Group.objects.create(groupname='test1', gid=1000)
customer = Customer.objects.create(username="test")
self.group1 = Group.objects.create(groupname="test1", gid=1000)
self.user = User.objects.create(
customer=customer, username='test', uid=1000, group=self.group1,
homedir='/home/test', shell='/bin/bash')
customer=customer,
username="test",
uid=1000,
group=self.group1,
homedir="/home/test",
shell="/bin/bash",
)
def test_clean_primary_group(self):
testsubj = AdditionalGroup(user=self.user, group=self.group1)
with self.assertRaises(ValidationError) as cm:
testsubj.clean()
self.assertEqual(
cm.exception.message, CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)
self.assertEqual(cm.exception.message, CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)
def test_clean_other_group(self):
group2 = Group.objects.create(groupname='test2', gid=1001)
group2 = Group.objects.create(groupname="test2", gid=1001)
testsubj = AdditionalGroup(user=self.user, group=group2)
testsubj.clean()
def test_save(self):
group2 = Group.objects.create(groupname='test2', gid=1001)
group2 = Group.objects.create(groupname="test2", gid=1001)
addgroup = AdditionalGroup(user=self.user, group=group2)
addgroup.save()
taskres = TaskResult.objects.all()
self.assertTrue(len(taskres), 4)
creators = [r.creator for r in taskres]
for tcount, tcreator in [
(2, 'handle_group_created'), (1, 'handle_user_created'),
(1, 'handle_user_added_to_group')]:
(2, "handle_group_created"),
(1, "handle_user_created"),
(1, "handle_user_added_to_group"),
]:
self.assertEqual(creators.count(tcreator), tcount)
def test_save_again(self):
group2 = Group.objects.create(groupname='test2', gid=1001)
group2 = Group.objects.create(groupname="test2", gid=1001)
TaskResult.objects.all().delete()
addgroup = AdditionalGroup(user=self.user, group=group2)
addgroup.save()
@ -161,15 +161,18 @@ class AdditionalGroupTest(TestCaseWithCeleryTasks):
self.assertEqual(len(taskres), 0)
def test_delete(self):
group2 = Group.objects.create(groupname='test2', gid=1001)
group2 = Group.objects.create(groupname="test2", gid=1001)
# noinspection PyUnresolvedReferences
addgroup = AdditionalGroup.objects.create(user=self.user, group=group2)
addgroup.delete()
# noinspection PyUnresolvedReferences
self.assertEqual(len(AdditionalGroup.objects.all()), 0)
def test___str__(self):
group2 = Group.objects.create(groupname='test2', gid=1001)
group2 = Group.objects.create(groupname="test2", gid=1001)
# noinspection PyUnresolvedReferences
addgroup = AdditionalGroup.objects.create(user=self.user, group=group2)
self.assertEqual(str(addgroup), 'test (1000) in test2 (1001)')
self.assertEqual(str(addgroup), "test (1000) in test2 (1001)")
@override_settings(OSUSER_MINGID=10000)
@ -178,56 +181,61 @@ class GroupManagerTest(TestCaseWithCeleryTasks):
self.assertEqual(Group.objects.get_next_gid(), 10000)
def test_get_next_gid_second(self):
Group.objects.create(gid=10010, groupname='test')
Group.objects.create(gid=10010, groupname="test")
self.assertEqual(Group.objects.get_next_gid(), 10011)
class GroupTest(TestCaseWithCeleryTasks):
def test___str__(self):
group = Group.objects.create(gid=10000, groupname='test')
self.assertEqual(str(group), 'test (10000)')
group = Group.objects.create(gid=10000, groupname="test")
self.assertEqual(str(group), "test (10000)")
def test_save(self):
group = Group(gid=10000, groupname='test')
group = Group(gid=10000, groupname="test")
self.assertIs(group.save(), group)
taskres = TaskResult.objects.all()
self.assertTrue(len(taskres), 1)
creators = [r.creator for r in taskres]
for tcount, tcreator in [
(1, 'handle_group_created')]:
for tcount, tcreator in [(1, "handle_group_created")]:
self.assertEqual(creators.count(tcreator), tcount)
def test_save_again(self):
group = Group.objects.create(gid=10000, groupname='test')
taskres = TaskResult.objects.all().delete()
group = Group.objects.create(gid=10000, groupname="test")
TaskResult.objects.all().delete()
group.save()
taskres = TaskResult.objects.all()
self.assertEqual(len(taskres), 0)
def test_delete(self):
group = Group.objects.create(gid=10000, groupname='test')
group = Group.objects.create(gid=10000, groupname="test")
self.assertEqual(len(Group.objects.all()), 1)
group.delete()
self.assertEqual(len(Group.objects.all()), 0)
self.assertEqual(len(TaskResult.objects.all()), 2)
tr = TaskResult.objects.first()
self.assertEqual(tr.creator, 'handle_group_created')
self.assertEqual(tr.creator, "handle_group_created")
class ShadowManagerTest(TestCaseWithCeleryTasks):
def setUp(self):
self.customer = Customer.objects.create(username='test')
self.customer = Customer.objects.create(username="test")
super(ShadowManagerTest, self).setUp()
def test_create_shadow(self):
user = User(
customer=self.customer, username='test', uid=1000,
group=Group(gid=1000, groupname='test'), homedir='/home/test',
shell='/bin/fooshell')
shadow = Shadow.objects.create_shadow(user, 'test')
self.assertTrue(sha512_crypt.verify('test', shadow.passwd))
self.assertEqual(shadow.changedays,
(timezone.now().date() - date(1970, 1, 1)).days)
group = Group.objects.create(gid=1000, groupname="test")
user = User.objects.create(
customer=self.customer,
username="test",
uid=1000,
group=group,
homedir="/home/test",
shell="/bin/fooshell",
)
shadow = Shadow.objects.create_shadow(user, "test")
self.assertTrue(sha512_crypt.verify("test", shadow.passwd))
self.assertEqual(
shadow.changedays, (timezone.now().date() - date(1970, 1, 1)).days
)
self.assertEqual(shadow.user, user)
self.assertEqual(shadow.minage, 0)
self.assertIsNone(shadow.maxage)
@ -238,39 +246,50 @@ class ShadowManagerTest(TestCaseWithCeleryTasks):
class ShadowTest(TestCaseWithCeleryTasks):
def setUp(self):
self.customer = Customer.objects.create(username='test')
self.customer = Customer.objects.create(username="test")
super(ShadowTest, self).setUp()
def test___str__(self):
group = Group.objects.create(
groupname='test', gid=1000)
group = Group.objects.create(groupname="test", gid=1000)
user = User.objects.create(
customer=self.customer, username='test', uid=1000, group=group,
homedir='/home/test', shell='/bin/bash')
customer=self.customer,
username="test",
uid=1000,
group=group,
homedir="/home/test",
shell="/bin/bash",
)
shadow = Shadow(user=user)
self.assertEqual(str(shadow), 'for user test (1000)')
self.assertEqual(str(shadow), "for user test (1000)")
def test_set_password(self):
group = Group.objects.create(
groupname='test', gid=1000)
group = Group.objects.create(groupname="test", gid=1000)
user = User.objects.create(
customer=self.customer, username='test', uid=1000, group=group,
homedir='/home/test', shell='/bin/bash')
customer=self.customer,
username="test",
uid=1000,
group=group,
homedir="/home/test",
shell="/bin/bash",
)
shadow = Shadow(user=user)
shadow.set_password('test')
self.assertTrue(sha512_crypt.verify('test', shadow.passwd))
shadow.set_password("test")
self.assertTrue(sha512_crypt.verify("test", shadow.passwd))
@override_settings(
OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test',
OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell'
OSUSER_MINUID=10000,
OSUSER_MINGID=10000,
OSUSER_USERNAME_PREFIX="test",
OSUSER_HOME_BASEPATH="/home",
OSUSER_DEFAULT_SHELL="/bin/fooshell",
)
class UserManagerTest(TestCaseWithCeleryTasks):
def _create_group(self):
return Group.objects.create(gid=10000, groupname='foo')
return Group.objects.create(gid=10000, groupname="foo")
def setUp(self):
self.customer = Customer.objects.create(username='test')
self.customer = Customer.objects.create(username="test")
super(UserManagerTest, self).setUp()
def test_get_next_uid_first(self):
@ -278,40 +297,58 @@ class UserManagerTest(TestCaseWithCeleryTasks):
def test_get_next_uid_second(self):
User.objects.create(
customer=self.customer, uid=10010, username='foo',
group=self._create_group(), homedir='/home/foo',
shell='/bin/fooshell')
customer=self.customer,
uid=10010,
username="foo",
group=self._create_group(),
homedir="/home/foo",
shell="/bin/fooshell",
)
self.assertEqual(User.objects.get_next_uid(), 10011)
def test_get_next_username_first(self):
self.assertEqual(User.objects.get_next_username(), 'test01')
self.assertEqual(User.objects.get_next_username(), "test01")
def test_get_next_username_second(self):
User.objects.create(
customer=self.customer, uid=10000, username='test01',
group=self._create_group(), homedir='/home/foo',
shell='/bin/fooshell')
self.assertEqual(User.objects.get_next_username(), 'test02')
customer=self.customer,
uid=10000,
username="test01",
group=self._create_group(),
homedir="/home/foo",
shell="/bin/fooshell",
)
self.assertEqual(User.objects.get_next_username(), "test02")
def test_get_next_username_gaps(self):
group = self._create_group()
User.objects.create(
customer=self.customer, uid=10000, username='test01', group=group,
homedir='/home/foo', shell='/bin/fooshell')
customer=self.customer,
uid=10000,
username="test01",
group=group,
homedir="/home/foo",
shell="/bin/fooshell",
)
User.objects.create(
customer=self.customer, uid=10002, username='test03', group=group,
homedir='/home/foo', shell='/bin/fooshell')
self.assertEqual(User.objects.get_next_username(), 'test02')
customer=self.customer,
uid=10002,
username="test03",
group=group,
homedir="/home/foo",
shell="/bin/fooshell",
)
self.assertEqual(User.objects.get_next_username(), "test02")
def test_create_user_first(self):
user = User.objects.create_user(customer=self.customer)
self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000)
self.assertEqual(user.group.groupname, 'test01')
self.assertEqual(user.username, 'test01')
self.assertEqual(user.homedir, '/home/test01')
self.assertEqual(user.shell, '/bin/fooshell')
self.assertEqual(user.group.groupname, "test01")
self.assertEqual(user.username, "test01")
self.assertEqual(user.homedir, "/home/test01")
self.assertEqual(user.shell, "/bin/fooshell")
self.assertIsNotNone(user.shadow)
def test_create_user_tasks(self):
@ -320,8 +357,10 @@ class UserManagerTest(TestCaseWithCeleryTasks):
self.assertEqual(len(taskres), 3)
creators = [r.creator for r in taskres]
for creator in [
'handle_group_created', 'handle_user_created',
'handle_user_password_set']:
"handle_group_created",
"handle_user_created",
"handle_user_password_set",
]:
self.assertIn(creator, creators)
def test_create_user_second(self):
@ -330,36 +369,34 @@ class UserManagerTest(TestCaseWithCeleryTasks):
self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10001)
self.assertEqual(user.group.gid, 10001)
self.assertEqual(user.group.groupname, 'test02')
self.assertEqual(user.username, 'test02')
self.assertEqual(user.homedir, '/home/test02')
self.assertEqual(user.shell, '/bin/fooshell')
self.assertEqual(user.group.groupname, "test02")
self.assertEqual(user.username, "test02")
self.assertEqual(user.homedir, "/home/test02")
self.assertEqual(user.shell, "/bin/fooshell")
self.assertIsNotNone(user.shadow)
self.assertEqual(len(User.objects.all()), 2)
def test_create_user_known_password(self):
user = User.objects.create_user(
customer=self.customer, password='foobar')
user = User.objects.create_user(customer=self.customer, password="foobar")
self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000)
self.assertEqual(user.group.groupname, 'test01')
self.assertEqual(user.username, 'test01')
self.assertEqual(user.homedir, '/home/test01')
self.assertEqual(user.shell, '/bin/fooshell')
self.assertEqual(user.group.groupname, "test01")
self.assertEqual(user.username, "test01")
self.assertEqual(user.homedir, "/home/test01")
self.assertEqual(user.shell, "/bin/fooshell")
self.assertIsNotNone(user.shadow)
self.assertTrue(sha512_crypt.verify('foobar', user.shadow.passwd))
self.assertTrue(sha512_crypt.verify("foobar", user.shadow.passwd))
def test_create_user_predefined_username(self):
user = User.objects.create_user(
customer=self.customer, username='tester')
user = User.objects.create_user(customer=self.customer, username="tester")
self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000)
self.assertEqual(user.group.groupname, 'tester')
self.assertEqual(user.username, 'tester')
self.assertEqual(user.homedir, '/home/tester')
self.assertEqual(user.shell, '/bin/fooshell')
self.assertEqual(user.group.groupname, "tester")
self.assertEqual(user.username, "tester")
self.assertEqual(user.homedir, "/home/tester")
self.assertEqual(user.shell, "/bin/fooshell")
self.assertIsNotNone(user.shadow)
def test_create_user_commit(self):
@ -367,31 +404,34 @@ class UserManagerTest(TestCaseWithCeleryTasks):
self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000)
self.assertEqual(user.group.groupname, 'test01')
self.assertEqual(user.username, 'test01')
self.assertEqual(user.homedir, '/home/test01')
self.assertEqual(user.shell, '/bin/fooshell')
self.assertEqual(user.group.groupname, "test01")
self.assertEqual(user.username, "test01")
self.assertEqual(user.homedir, "/home/test01")
self.assertEqual(user.shell, "/bin/fooshell")
self.assertIsNotNone(user.shadow)
@override_settings(
OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test',
OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell'
OSUSER_MINUID=10000,
OSUSER_MINGID=10000,
OSUSER_USERNAME_PREFIX="test",
OSUSER_HOME_BASEPATH="/home",
OSUSER_DEFAULT_SHELL="/bin/fooshell",
)
class UserTest(TestCaseWithCeleryTasks):
def setUp(self):
self.customer = Customer.objects.create_user('test')
self.customer = Customer.objects.create_user("test")
super(UserTest, self).setUp()
def test___str__(self):
user = User.objects.create_user(self.customer)
self.assertEqual(str(user), 'test01 (10000)')
self.assertEqual(str(user), "test01 (10000)")
def test_set_password(self):
user = User.objects.create_user(self.customer)
self.assertFalse(sha512_crypt.verify('test', user.shadow.passwd))
user.set_password('test')
self.assertTrue(sha512_crypt.verify('test', user.shadow.passwd))
self.assertFalse(sha512_crypt.verify("test", user.shadow.passwd))
user.set_password("test")
self.assertTrue(sha512_crypt.verify("test", user.shadow.passwd))
def test_save(self):
user = User.objects.create_user(self.customer)
@ -400,8 +440,10 @@ class UserTest(TestCaseWithCeleryTasks):
self.assertEqual(len(taskres), 3)
creators = [r.creator for r in taskres]
for task in [
'handle_group_created', 'handle_user_created',
'handle_user_password_set']:
"handle_group_created",
"handle_user_created",
"handle_user_password_set",
]:
self.assertIn(task, creators)
def test_delete_only_user(self):
@ -411,30 +453,38 @@ class UserTest(TestCaseWithCeleryTasks):
self.assertEqual(len(taskres), 6)
creators = [r.creator for r in taskres]
for task in [
'handle_group_created', 'handle_user_created',
'handle_user_password_set', 'handle_user_deleted',
'handle_group_deleted', 'handle_user_deleted']:
"handle_group_created",
"handle_user_created",
"handle_user_password_set",
"handle_user_deleted",
"handle_group_deleted",
"handle_user_deleted",
]:
self.assertIn(task, creators)
self.assertEqual(len(User.objects.all()), 0)
def test_delete_additional_groups(self):
group1 = Group.objects.create(gid=2000, groupname='group1')
group2 = Group.objects.create(gid=2001, groupname='group2')
group1 = Group.objects.create(gid=2000, groupname="group1")
group2 = Group.objects.create(gid=2001, groupname="group2")
user = User.objects.create_user(self.customer)
for group in [group1, group2]:
# noinspection PyUnresolvedReferences
user.additionalgroup_set.add(
AdditionalGroup.objects.create(user=user, group=group))
AdditionalGroup.objects.create(user=user, group=group)
)
TaskResult.objects.all().delete()
user.delete()
taskres = TaskResult.objects.all()
self.assertEqual(len(taskres), 5)
creators = [t.creator for t in taskres]
for tcount, tcreator in [
(2, 'handle_user_removed_from_group'),
(2, 'handle_user_deleted'),
(1, 'handle_group_deleted')]:
(2, "handle_user_removed_from_group"),
(2, "handle_user_deleted"),
(1, "handle_group_deleted"),
]:
self.assertEqual(creators.count(tcreator), tcount)
self.assertEqual(len(User.objects.all()), 0)
# noinspection PyUnresolvedReferences
self.assertEqual(len(AdditionalGroup.objects.all()), 0)
def test_is_sftp_user(self):
@ -442,138 +492,140 @@ class UserTest(TestCaseWithCeleryTasks):
self.assertFalse(user.is_sftp_user())
sftp_group = Group.objects.create(
gid=2000, groupname=settings.OSUSER_SFTP_GROUP)
gid=2000, groupname=settings.OSUSER_SFTP_GROUP
)
# noinspection PyUnresolvedReferences
user.additionalgroup_set.add(
AdditionalGroup.objects.create(user=user, group=sftp_group))
AdditionalGroup.objects.create(user=user, group=sftp_group)
)
self.assertTrue(user.is_sftp_user())
class SshPublicKeyManagerTest(TestCaseWithCeleryTasks):
def test_parse_keytext_rfc4716_1(self):
res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_1_RFC4716)
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_1_RFC4716)
self.assertEqual(len(res), 3)
self.assertGreater(len(res[1]), 40)
self.assertEqual(res[0], 'ssh-rsa')
self.assertEqual(res[0], "ssh-rsa")
self.assertEqual(
res[2], '"1024-bit RSA, converted from OpenSSH by me@example.com"')
res[2], '"1024-bit RSA, converted from OpenSSH by me@example.com"'
)
def test_parse_keytext_rfc4716_2(self):
res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_2_RFC4716)
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_2_RFC4716)
self.assertEqual(len(res), 3)
self.assertEqual(res[0], 'ssh-dss')
self.assertEqual(res[0], "ssh-dss")
self.assertGreater(len(res[1]), 40)
self.assertEqual(
res[2],
"This is my public key for use on servers which I don't like.")
res[2], "This is my public key for use on servers which I don't like."
)
def test_parse_keytext_rfc4716_3(self):
res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_3_RFC4716)
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_3_RFC4716)
self.assertEqual(len(res), 3)
self.assertEqual(res[0], 'ssh-dss')
self.assertEqual(res[0], "ssh-dss")
self.assertGreater(len(res[1]), 40)
self.assertEqual(res[2], "DSA Public Key for use with MyIsp")
def test_parse_keytext_openssh(self):
res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_4_OPENSSH)
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_4_OPENSSH)
self.assertEquals(len(res), 3)
self.assertEqual(res[0], 'ssh-rsa')
self.assertEqual(res[0], "ssh-rsa")
self.assertGreater(len(res[1]), 40)
self.assertEqual(res[2], '')
self.assertEqual(res[2], "")
def test_parse_keytext_invalid_multiline(self):
with self.assertRaises(ValueError):
SshPublicKey.objects.parse_keytext("\r\n".join(["xx"]*10))
SshPublicKey.objects.parse_key_text("\r\n".join(["xx"] * 10))
def test_parse_keytext_empty_line(self):
res = SshPublicKey.objects.parse_keytext(
EXAMPLE_KEY_6_RFC4716_EMPTY_LINE)
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_6_RFC4716_EMPTY_LINE)
self.assertEqual(len(res), 3)
self.assertEqual(res[0], 'ssh-dss')
self.assertEqual(res[0], "ssh-dss")
self.assertGreater(len(res[1]), 40)
self.assertEqual(res[2], "DSA Public Key for use with MyIsp")
def test_parse_keytext_invalid_empty_rfc4716_header(self):
with self.assertRaises(ValueError):
SshPublicKey.objects.parse_keytext(
EXAMPLE_KEY_9_RFC4716_ONLY_HEADER)
SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_9_RFC4716_ONLY_HEADER)
def test_parse_keytext_no_comment(self):
res = SshPublicKey.objects.parse_keytext(
EXAMPLE_KEY_7_NO_COMMENT)
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_7_NO_COMMENT)
self.assertEqual(len(res), 3)
self.assertEqual(res[0], 'ssh-rsa')
self.assertEqual(res[0], "ssh-rsa")
self.assertGreater(len(res[1]), 40)
self.assertEqual(res[2], '')
self.assertEqual(res[2], "")
def test_parse_keytext_multiline_comment(self):
res = SshPublicKey.objects.parse_keytext(
EXAMPLE_KEY_5_RFC4716_MULTILINE)
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_5_RFC4716_MULTILINE)
self.assertEqual(len(res), 3)
self.assertEqual(res[0], 'ssh-dss')
self.assertEqual(res[0], "ssh-dss")
self.assertGreater(len(res[1]), 40)
self.assertEqual(res[2], "DSA Public Key for use with MyIsp")
def test_parse_keytext_invalid(self):
with self.assertRaises(ValueError):
SshPublicKey.objects.parse_keytext('invalid')
SshPublicKey.objects.parse_key_text("invalid")
def test_parse_keytext_invalid_openssh(self):
with self.assertRaises(ValueError):
SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_8_OPENSSH_BROKEN)
SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_8_OPENSSH_BROKEN)
def test_create_ssh_public_key(self):
customer = Customer.objects.create_user('test')
customer = Customer.objects.create_user("test")
user = User.objects.create_user(customer)
key = SshPublicKey.objects.create_ssh_public_key(
user, EXAMPLE_KEY_4_OPENSSH)
key = SshPublicKey.objects.create_ssh_public_key(user, EXAMPLE_KEY_4_OPENSSH)
self.assertIsInstance(key, SshPublicKey)
self.assertEqual(key.user, user)
self.assertEqual(key.algorithm, 'ssh-rsa')
self.assertEqual(key.algorithm, "ssh-rsa")
self.assertEqual(key.data, EXAMPLE_KEY_4_OPENSSH.split()[1])
self.assertEqual(key.comment, '')
self.assertEqual(key.comment, "")
class SshPublicKeyTest(TestCaseWithCeleryTasks):
def setUp(self):
super(SshPublicKeyTest, self).setUp()
customer = Customer.objects.create_user('test')
customer = Customer.objects.create_user("test")
self.user = User.objects.create_user(customer)
TaskResult.objects.all().delete()
def test__str__rfc4716(self):
res = SshPublicKey.objects.create_ssh_public_key(
self.user, EXAMPLE_KEY_3_RFC4716)
self.user, EXAMPLE_KEY_3_RFC4716
)
self.maxDiff = None
self.assertEqual(
str(res), 'ssh-dss AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxae'
'hvx5wOJ0rzZdzoSOXxbETW6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7St'
'xyltHnXF1YLfKD1G4T6JYrdHYI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3g'
'Jq2e7Yisk/gF+1VAAAAFQDb8D5cvwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4'
'KLYk3IwRbXblwXdkPggA4pfdtW9vGfJ0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/F'
'XPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAAvioUPkmdMc0zuWoSOEsSNhVDtX3WdvVc'
'GcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACBAN7CY+KKv1gHpRzFwdQm7HK9bb1LA'
'o2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HSn24VYtYtsMu74qXviYjziVucWK'
'jjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5sY29ouezv4Xz2PuMch5VGPP'
'+CDqzCM4loWgV DSA Public Key for use with MyIsp')
str(res),
"ssh-dss AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxae"
"hvx5wOJ0rzZdzoSOXxbETW6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7St"
"xyltHnXF1YLfKD1G4T6JYrdHYI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3g"
"Jq2e7Yisk/gF+1VAAAAFQDb8D5cvwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4"
"KLYk3IwRbXblwXdkPggA4pfdtW9vGfJ0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/F"
"XPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAAvioUPkmdMc0zuWoSOEsSNhVDtX3WdvVc"
"GcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACBAN7CY+KKv1gHpRzFwdQm7HK9bb1LA"
"o2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HSn24VYtYtsMu74qXviYjziVucWK"
"jjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5sY29ouezv4Xz2PuMch5VGPP"
"+CDqzCM4loWgV DSA Public Key for use with MyIsp",
)
def test__str__openssh(self):
res = SshPublicKey.objects.create_ssh_public_key(
self.user, EXAMPLE_KEY_4_OPENSSH)
self.user, EXAMPLE_KEY_4_OPENSSH
)
self.assertEqual(str(res), EXAMPLE_KEY_4_OPENSSH)
def test_call_tasks_on_save(self):
SshPublicKey.objects.create_ssh_public_key(
self.user, EXAMPLE_KEY_4_OPENSSH)
SshPublicKey.objects.create_ssh_public_key(self.user, EXAMPLE_KEY_4_OPENSSH)
taskresults = TaskResult.objects.all()
self.assertEqual(len(taskresults), 1)
self.assertEqual(
taskresults[0].creator, 'handle_ssh_keys_changed')
self.assertEqual(taskresults[0].creator, "handle_ssh_keys_changed")
def test_call_tasks_on_delete(self):
key = SshPublicKey.objects.create_ssh_public_key(
self.user, EXAMPLE_KEY_4_OPENSSH)
self.user, EXAMPLE_KEY_4_OPENSSH
)
TaskResult.objects.all().delete()
key.delete()
taskresults = TaskResult.objects.all()
self.assertEqual(len(taskresults), 1)
self.assertEqual(
taskresults[0].creator, 'handle_ssh_keys_changed')
self.assertEqual(taskresults[0].creator, "handle_ssh_keys_changed")

View file

@ -2,66 +2,59 @@
This module provides tests for :py:mod:`osusers.views`.
"""
from __future__ import absolute_import, unicode_literals
try:
from unittest.mock import patch, MagicMock
except ImportError:
from mock import patch, MagicMock
from unittest.mock import patch, MagicMock
from django.core.urlresolvers import reverse
from django.test import TestCase, TransactionTestCase
from django.contrib.auth import get_user_model
from django.urls import reverse
from hostingpackages.models import (
CustomerHostingPackage,
HostingPackageTemplate,
)
from hostingpackages.models import CustomerHostingPackage, HostingPackageTemplate
from osusers.models import SshPublicKey
from osusers.views import (
AddSshPublicKey,
DeleteSshPublicKey,
EditSshPublicKeyComment,
)
from osusers.views import AddSshPublicKey, DeleteSshPublicKey, EditSshPublicKeyComment
User = get_user_model()
TEST_USER = 'test'
TEST_PASSWORD = 'secret'
TEST_EMAIL = 'test@example.org'
EXAMPLE_KEY = "".join((
"ssh-rsa ",
"AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb",
"YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ",
"5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE="
))
TEST_USER = "test"
TEST_PASSWORD = "secret"
TEST_EMAIL = "test@example.org"
EXAMPLE_KEY = "".join(
(
"ssh-rsa ",
"AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb",
"YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ",
"5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE=",
)
)
class HostingPackageAwareTestMixin(object):
# noinspection PyMethodMayBeStatic
def _setup_hosting_package(self, customer):
template = HostingPackageTemplate.objects.create(
name='testpackagetemplate', mailboxcount=10, diskspace=1,
diskspace_unit=0)
name="testpackagetemplate", mailboxcount=10, diskspace=1, diskspace_unit=0
)
package = CustomerHostingPackage.objects.create_from_template(
customer, template, 'testpackage')
with patch('hostingpackages.models.settings') as hmsettings:
customer, template, "testpackage"
)
with patch("hostingpackages.models.settings") as hmsettings:
hmsettings.OSUSER_DEFAULT_GROUPS = []
package.save()
return package
class AddSshPublicKeyTest(HostingPackageAwareTestMixin, TestCase):
def setUp(self):
self.customer = User.objects.create_user(
username=TEST_USER, password=TEST_PASSWORD)
username=TEST_USER, password=TEST_PASSWORD
)
self.package = self._setup_hosting_package(self.customer)
def _get_url(self):
return reverse('add_ssh_key', kwargs={'package': self.package.id})
return reverse("add_ssh_key", kwargs={"package": self.package.id})
def test_get_anonymous(self):
response = self.client.get(self._get_url())
@ -73,77 +66,75 @@ class AddSshPublicKeyTest(HostingPackageAwareTestMixin, TestCase):
self.assertEqual(response.status_code, 200)
def test_get_other_regular_user(self):
User.objects.create_user(
'test2', password=TEST_PASSWORD)
self.client.login(username='test2', password=TEST_PASSWORD)
User.objects.create_user("test2", password=TEST_PASSWORD)
self.client.login(username="test2", password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertEqual(response.status_code, 403)
def test_get_staff_user(self):
User.objects.create_superuser(
'admin', email=TEST_EMAIL, password=TEST_PASSWORD)
self.client.login(username='admin', password=TEST_PASSWORD)
User.objects.create_superuser("admin", email=TEST_EMAIL, password=TEST_PASSWORD)
self.client.login(username="admin", password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertEqual(response.status_code, 200)
def test_get_template(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertTemplateUsed(response, 'osusers/sshpublickey_create.html')
self.assertTemplateUsed(response, "osusers/sshpublickey_create.html")
def test_get_form_kwargs(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
view = AddSshPublicKey(
request=MagicMock(), kwargs={'package': str(self.package.pk)})
request=MagicMock(), kwargs={"package": str(self.package.pk)}
)
the_kwargs = view.get_form_kwargs()
self.assertIn('hostingpackage', the_kwargs)
self.assertEqual(the_kwargs['hostingpackage'], self.package)
self.assertIn("hostingpackage", the_kwargs)
self.assertEqual(the_kwargs["hostingpackage"], self.package)
def test_get_context_data(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertIn('customer', response.context)
self.assertEqual(response.context['customer'], self.customer)
self.assertIn('osuser', response.context)
self.assertEqual(
response.context['osuser'], self.package.osuser.username)
self.assertIn("customer", response.context)
self.assertEqual(response.context["customer"], self.customer)
self.assertIn("osuser", response.context)
self.assertEqual(response.context["osuser"], self.package.osuser.username)
def test_form_valid_redirect(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.post(
self._get_url(),
data={'publickeytext': EXAMPLE_KEY})
self._get_url(), data={"publickeytext": EXAMPLE_KEY}
)
self.assertRedirects(response, self.package.get_absolute_url())
def test_form_valid_message(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.post(
self._get_url(), follow=True,
data={'publickeytext': EXAMPLE_KEY})
messages = list(response.context['messages'])
self._get_url(), follow=True, data={"publickeytext": EXAMPLE_KEY}
)
messages = list(response.context["messages"])
self.assertEqual(len(messages), 1)
self.assertEqual(
'Successfully added new ssh-rsa SSH public key.'.format(
"Successfully added new ssh-rsa SSH public key.".format(
username=self.package.osuser.username
), str(messages[0]))
),
str(messages[0]),
)
class DeleteSshPublicKeyTest(HostingPackageAwareTestMixin, TestCase):
def setUp(self):
self.customer = User.objects.create_user(
username=TEST_USER, password=TEST_PASSWORD)
username=TEST_USER, password=TEST_PASSWORD
)
self.package = self._setup_hosting_package(self.customer)
self.sshkey = SshPublicKey.objects.create(
user=self.package.osuser, algorithm='good', data='key',
comment='comment')
user=self.package.osuser, algorithm="good", data="key", comment="comment"
)
def _get_url(self):
return reverse(
'delete_ssh_key', kwargs={
'package': self.package.id,
'pk': self.sshkey.id
})
"delete_ssh_key", kwargs={"package": self.package.id, "pk": self.sshkey.id}
)
def test_get_anonymous(self):
response = self.client.get(self._get_url())
@ -155,71 +146,63 @@ class DeleteSshPublicKeyTest(HostingPackageAwareTestMixin, TestCase):
self.assertEqual(response.status_code, 200)
def test_get_other_regular_user(self):
User.objects.create_user(
'test2', password=TEST_PASSWORD)
self.client.login(username='test2', password=TEST_PASSWORD)
User.objects.create_user("test2", password=TEST_PASSWORD)
self.client.login(username="test2", password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertEqual(response.status_code, 403)
def test_get_staff_user(self):
User.objects.create_superuser(
'admin', email=TEST_EMAIL, password=TEST_PASSWORD)
self.client.login(username='admin', password=TEST_PASSWORD)
User.objects.create_superuser("admin", email=TEST_EMAIL, password=TEST_PASSWORD)
self.client.login(username="admin", password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertEqual(response.status_code, 200)
def test_get_template(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertTemplateUsed(
response, 'osusers/sshpublickey_confirm_delete.html')
self.assertTemplateUsed(response, "osusers/sshpublickey_confirm_delete.html")
def test_get_queryset(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
view = DeleteSshPublicKey(
request=MagicMock(), kwargs={
'package': str(self.package.pk),
'pk': str(self.sshkey.pk)
})
request=MagicMock(),
kwargs={"package": str(self.package.pk), "pk": str(self.sshkey.pk)},
)
queryset = view.get_queryset()
self.assertQuerysetEqual(queryset, [repr(self.sshkey)])
def test_get_context_data(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.get(self._get_url())
for key in ('hostingpackage', 'customer', 'osuser'):
for key in ("hostingpackage", "customer", "osuser"):
self.assertIn(key, response.context)
self.assertEqual(response.context['hostingpackage'], self.package)
self.assertEqual(response.context['customer'], self.customer)
self.assertEqual(
response.context['osuser'], self.package.osuser.username)
self.assertEqual(response.context["hostingpackage"], self.package)
self.assertEqual(response.context["customer"], self.customer)
self.assertEqual(response.context["osuser"], self.package.osuser.username)
def test_get_success_url(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.post(
self._get_url(),
data={'comment': 'new comment'})
self.assertRedirects(response, reverse('list_ssh_keys', kwargs={
'package': self.package.id}))
response = self.client.post(self._get_url(), data={"comment": "new comment"})
self.assertRedirects(
response, reverse("list_ssh_keys", kwargs={"package": self.package.id})
)
class EditSshPublicKeyCommentTest(
HostingPackageAwareTestMixin, TransactionTestCase):
class EditSshPublicKeyCommentTest(HostingPackageAwareTestMixin, TransactionTestCase):
def setUp(self):
self.customer = User.objects.create_user(
username=TEST_USER, password=TEST_PASSWORD)
username=TEST_USER, password=TEST_PASSWORD
)
self.package = self._setup_hosting_package(self.customer)
self.sshkey = SshPublicKey.objects.create(
user=self.package.osuser, algorithm='good', data='key',
comment='comment')
user=self.package.osuser, algorithm="good", data="key", comment="comment"
)
def _get_url(self):
return reverse(
'edit_ssh_key_comment', kwargs={
'package': self.package.id,
'pk': self.sshkey.id
})
"edit_ssh_key_comment",
kwargs={"package": self.package.id, "pk": self.sshkey.id},
)
def test_get_anonymous(self):
response = self.client.get(self._get_url())
@ -231,74 +214,67 @@ class EditSshPublicKeyCommentTest(
self.assertEqual(response.status_code, 200)
def test_get_other_regular_user(self):
User.objects.create_user(
'test2', password=TEST_PASSWORD)
self.client.login(username='test2', password=TEST_PASSWORD)
User.objects.create_user("test2", password=TEST_PASSWORD)
self.client.login(username="test2", password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertEqual(response.status_code, 403)
def test_get_staff_user(self):
User.objects.create_superuser(
'admin', email=TEST_EMAIL, password=TEST_PASSWORD)
self.client.login(username='admin', password=TEST_PASSWORD)
User.objects.create_superuser("admin", email=TEST_EMAIL, password=TEST_PASSWORD)
self.client.login(username="admin", password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertEqual(response.status_code, 200)
def test_get_template(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertTemplateUsed(
response, 'osusers/sshpublickey_edit_comment.html')
self.assertTemplateUsed(response, "osusers/sshpublickey_edit_comment.html")
def test_get_queryset(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
view = EditSshPublicKeyComment(
request=MagicMock(), kwargs={
'package': str(self.package.pk),
'pk': str(self.sshkey.pk)
})
request=MagicMock(),
kwargs={"package": str(self.package.pk), "pk": str(self.sshkey.pk)},
)
queryset = view.get_queryset()
self.assertQuerysetEqual(queryset, [repr(self.sshkey)])
def test_get_form_kwargs(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
view = EditSshPublicKeyComment(
request=MagicMock(), kwargs={
'package': str(self.package.pk),
'pk': str(self.sshkey.pk)
})
request=MagicMock(),
kwargs={"package": str(self.package.pk), "pk": str(self.sshkey.pk)},
)
the_kwargs = view.get_form_kwargs()
self.assertIn('hostingpackage', the_kwargs)
self.assertEqual(the_kwargs['hostingpackage'], self.package)
self.assertIn("hostingpackage", the_kwargs)
self.assertEqual(the_kwargs["hostingpackage"], self.package)
def test_get_context_data(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.get(self._get_url())
for key in ('hostingpackage', 'customer', 'osuser'):
for key in ("hostingpackage", "customer", "osuser"):
self.assertIn(key, response.context)
self.assertEqual(response.context['hostingpackage'], self.package)
self.assertEqual(response.context['customer'], self.customer)
self.assertEqual(
response.context['osuser'], self.package.osuser.username)
self.assertEqual(response.context["hostingpackage"], self.package)
self.assertEqual(response.context["customer"], self.customer)
self.assertEqual(response.context["osuser"], self.package.osuser.username)
def test_get_success_url(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.post(
self._get_url(),
data={'comment': 'new comment'})
self.assertRedirects(response, reverse('list_ssh_keys', kwargs={
'package': self.package.id}))
response = self.client.post(self._get_url(), data={"comment": "new comment"})
self.assertRedirects(
response, reverse("list_ssh_keys", kwargs={"package": self.package.id})
)
class ListSshPublicKeysTest(HostingPackageAwareTestMixin, TestCase):
def setUp(self):
self.customer = User.objects.create_user(
username=TEST_USER, password=TEST_PASSWORD)
username=TEST_USER, password=TEST_PASSWORD
)
self.package = self._setup_hosting_package(self.customer)
def _get_url(self):
return reverse('list_ssh_keys', kwargs={'package': self.package.id})
return reverse("list_ssh_keys", kwargs={"package": self.package.id})
def test_get_anonymous(self):
response = self.client.get(self._get_url())
@ -310,45 +286,43 @@ class ListSshPublicKeysTest(HostingPackageAwareTestMixin, TestCase):
self.assertEqual(response.status_code, 200)
def test_get_other_regular_user(self):
User.objects.create_user(
'test2', password=TEST_PASSWORD)
self.client.login(username='test2', password=TEST_PASSWORD)
User.objects.create_user("test2", password=TEST_PASSWORD)
self.client.login(username="test2", password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertEqual(response.status_code, 403)
def test_get_staff_user(self):
User.objects.create_superuser(
'admin', email=TEST_EMAIL, password=TEST_PASSWORD)
self.client.login(username='admin', password=TEST_PASSWORD)
User.objects.create_superuser("admin", email=TEST_EMAIL, password=TEST_PASSWORD)
self.client.login(username="admin", password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertEqual(response.status_code, 200)
def test_get_template(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertTemplateUsed(response, 'osusers/sshpublickey_list.html')
self.assertTemplateUsed(response, "osusers/sshpublickey_list.html")
def test_get_context_data(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.get(self._get_url())
for key in ('hostingpackage', 'customer', 'osuser'):
for key in ("hostingpackage", "customer", "osuser"):
self.assertIn(key, response.context)
self.assertEqual(response.context['hostingpackage'], self.package)
self.assertEqual(response.context['customer'], self.customer)
self.assertEqual(
response.context['osuser'], self.package.osuser.username)
self.assertEqual(response.context["hostingpackage"], self.package)
self.assertEqual(response.context["customer"], self.customer)
self.assertEqual(response.context["osuser"], self.package.osuser.username)
class SetOsUserPasswordTest(HostingPackageAwareTestMixin, TestCase):
def setUp(self):
self.customer = User.objects.create_user(
username=TEST_USER, password=TEST_PASSWORD)
username=TEST_USER, password=TEST_PASSWORD
)
self.package = self._setup_hosting_package(self.customer)
def _get_url(self):
return reverse('set_osuser_password', kwargs={
'slug': self.package.osuser.username})
return reverse(
"set_osuser_password", kwargs={"slug": self.package.osuser.username}
)
def test_get_anonymous(self):
response = self.client.get(self._get_url())
@ -360,45 +334,48 @@ class SetOsUserPasswordTest(HostingPackageAwareTestMixin, TestCase):
self.assertEqual(response.status_code, 200)
def test_get_other_regular_user(self):
User.objects.create_user(
'test2', password=TEST_PASSWORD)
self.client.login(username='test2', password=TEST_PASSWORD)
User.objects.create_user("test2", password=TEST_PASSWORD)
self.client.login(username="test2", password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertEqual(response.status_code, 403)
def test_get_staff_user(self):
User.objects.create_superuser(
'admin', email=TEST_EMAIL, password=TEST_PASSWORD)
self.client.login(username='admin', password=TEST_PASSWORD)
User.objects.create_superuser("admin", email=TEST_EMAIL, password=TEST_PASSWORD)
self.client.login(username="admin", password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertEqual(response.status_code, 200)
def test_get_template(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertTemplateUsed(response, 'osusers/user_setpassword.html')
self.assertTemplateUsed(response, "osusers/user_setpassword.html")
def test_get_context_data(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.get(self._get_url())
self.assertIn('customer', response.context)
self.assertEqual(response.context['customer'], self.customer)
self.assertIn("customer", response.context)
self.assertEqual(response.context["customer"], self.customer)
def test_form_valid_redirect(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.post(
self._get_url(),
data={'password1': TEST_PASSWORD, 'password2': TEST_PASSWORD})
data={"password1": TEST_PASSWORD, "password2": TEST_PASSWORD},
)
self.assertRedirects(response, self.package.get_absolute_url())
def test_form_valid_message(self):
self.client.login(username=TEST_USER, password=TEST_PASSWORD)
response = self.client.post(
self._get_url(), follow=True,
data={'password1': TEST_PASSWORD, 'password2': TEST_PASSWORD})
messages = list(response.context['messages'])
self._get_url(),
follow=True,
data={"password1": TEST_PASSWORD, "password2": TEST_PASSWORD},
)
messages = list(response.context["messages"])
self.assertEqual(len(messages), 1)
self.assertEqual(
'New password for {username} has been set successfully.'.format(
"New password for {username} has been set successfully.".format(
username=self.package.osuser.username
), str(messages[0]))
),
str(messages[0]),
)