implement SshPublicKey model, manager and tests

- implement osusers.models.SshPublicKey and osusers.models.SshPublicKeyManager
- fix broken osusers.models.tests.test_models
- add new test classes SshPublicKeyManagerTest and SshPublicKeyTest
- add migration for SshPublicKey model
This commit is contained in:
Jan Dittberner 2015-01-31 21:48:07 +01:00
parent 9fa351f801
commit 20359681db
3 changed files with 393 additions and 140 deletions

View file

@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
import django.utils.timezone
import model_utils.fields
class Migration(migrations.Migration):
dependencies = [
('osusers', '0004_auto_20150104_1751'),
]
operations = [
migrations.CreateModel(
name='SshPublicKey',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, verbose_name='created', editable=False)),
('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, verbose_name='modified', editable=False)),
('algorithm', models.CharField(max_length=20, verbose_name='Algorithm')),
('data', models.TextField(help_text='Base64 encoded key bytes', verbose_name='Key bytes')),
('comment', models.TextField(verbose_name='Comment', blank=True)),
('user', models.ForeignKey(verbose_name='User', to='osusers.User')),
],
options={
'verbose_name': 'SSH public key',
'verbose_name_plural': 'SSH public keys',
},
bases=(models.Model,),
),
migrations.AlterUniqueTogether(
name='sshpublickey',
unique_together=set([('user', 'algorithm', 'data')]),
),
]

View file

@ -4,9 +4,11 @@ This module defines the database models of operating system users.
""" """
from __future__ import unicode_literals from __future__ import unicode_literals
import base64
from datetime import date from datetime import date
import logging import logging
import os import os
import six
from django.db import models, transaction from django.db import models, transaction
from django.conf import settings from django.conf import settings
@ -477,3 +479,103 @@ class AdditionalGroup(TimeStampedModel, models.Model):
'remove_ldap_user_from_group' 'remove_ldap_user_from_group'
) )
super(AdditionalGroup, self).delete(*args, **kwargs) super(AdditionalGroup, self).delete(*args, **kwargs)
class SshPublicKeyManager(models.Manager):
"""
Default manager for :py:class:`SSH public key
<osusers.models.SshPublicKey>` instances.
"""
def parse_keytext(self, keytext):
"""
Parse a SSH public key in OpenSSH or :rfc:`4716` format into its
components algorithm, key data and comment.
:param str keytext: key text
:return: triple of algorithm name, key data and comment
:rtype: triple of str
"""
if keytext.startswith('---- BEGIN SSH2 PUBLIC KEY ----'):
comment = ''
data = ''
continued = ''
headers = {}
for line in keytext.splitlines():
if line == '---- BEGIN SSH2 PUBLIC KEY ----':
continue
elif ':' in line: # a header line
header_tag, header_value = [
item.strip() for item in line.split(':', 1)]
if header_value.endswith('\\'):
continued = header_value[:-1]
else:
headers[header_tag.lower()] = header_value
elif continued:
if line.endswith('\\'):
continued += line[:-1]
continue
header_value = continued + line
headers[header_tag.lower()] = header_value
continued = ''
elif line == '---- END SSH2 PUBLIC KEY ----':
break
elif line: # ignore empty lines
data += line
if 'comment' in headers:
comment = headers['comment']
else:
parts = keytext.split()
if len(parts) > 3:
raise ValueError("unsupported key format")
data = parts[1]
comment = len(parts) == 3 and parts[2] or ""
keybytes = base64.b64decode(data)
parts = keybytes.split(b'\x00' * 3)
alglength = six.byte2int(parts[1])
algname = parts[1][1:1+alglength]
return algname, data, comment
def create_ssh_public_key(self, user, keytext):
"""
Create a new :py:class:`SSH public key <osusers.models.SshPublicKey>`
for a user from the given key text representation. The text can be
either in openssh format or :rfc:`4716` format.
:param user: :py:class:`operating system user <osusers.models.User>`
:param str keytext: key text
:return: public key
:retype: :py:class:`osusers.models.SshPublicKey`
"""
algorithm, data, comment = self.parse_keytext(keytext)
return self.create(
user=user, algorithm=algorithm, data=data, comment=comment)
@python_2_unicode_compatible
class SshPublicKey(TimeStampedModel):
"""
This entity class represents single SSH keys for an :py:class:`operating
system user <osusers.models.User>`.
"""
user = models.ForeignKey(User, verbose_name=_('User'))
algorithm = models.CharField(_('Algorithm'), max_length=20)
data = models.TextField(_('Key bytes'),
help_text=_('Base64 encoded key bytes'))
comment = models.TextField(_('Comment'), blank=True)
objects = SshPublicKeyManager()
class Meta:
verbose_name = _('SSH public key')
verbose_name_plural = _('SSH public keys')
unique_together = [('user', 'algorithm', 'data')]
def __str__(self):
return "{algorithm} {data} {comment}".format(
algorithm=self.algorithm, data=self.data, comment=self.comment
).strip()

View file

@ -1,11 +1,11 @@
from datetime import date from datetime import date
from django.conf import settings
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.test import TestCase from django.test import TestCase
from django.test.utils import override_settings from django.test.utils import override_settings
from django.utils import timezone from django.utils import timezone
from django.contrib.auth import get_user_model
from mock import patch, MagicMock
from passlib.hash import sha512_crypt from passlib.hash import sha512_crypt
@ -14,8 +14,84 @@ from osusers.models import (
AdditionalGroup, AdditionalGroup,
Group, Group,
Shadow, Shadow,
SshPublicKey,
User, User,
) )
from taskresults.models import TaskResult
EXAMPLE_KEY_1_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: "1024-bit RSA, converted from OpenSSH by me@example.com"
x-command: /home/me/bin/lock-in-guest.sh
AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb
YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ
5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE=
---- END SSH2 PUBLIC KEY ----"""
EXAMPLE_KEY_2_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: This is my public key for use on \
servers which I don't like.
AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
---- END SSH2 PUBLIC KEY ----"""
EXAMPLE_KEY_3_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: DSA Public Key for use with MyIsp
AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
---- END SSH2 PUBLIC KEY ----"""
EXAMPLE_KEY_4_OPENSSH = "".join((
"ssh-rsa ",
"AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb",
"YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ",
"5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE="
))
EXAMPLE_KEY_5_RFC4716_MULTILINE = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: DSA Public Key \\
for use with \\
MyIsp
AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
---- END SSH2 PUBLIC KEY ----"""
EXAMPLE_KEY_6_RFC4716_EMPTY_LINE = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: DSA Public Key for use with MyIsp
AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
---- END SSH2 PUBLIC KEY ----"""
Customer = get_user_model()
@override_settings( @override_settings(
@ -29,9 +105,10 @@ class TestCaseWithCeleryTasks(TestCase):
class AdditionalGroupTest(TestCaseWithCeleryTasks): class AdditionalGroupTest(TestCaseWithCeleryTasks):
def setUp(self): def setUp(self):
customer = Customer.objects.create(username='test')
self.group1 = Group.objects.create(groupname='test1', gid=1000) self.group1 = Group.objects.create(groupname='test1', gid=1000)
self.user = User.objects.create( self.user = User.objects.create(
username='test', uid=1000, group=self.group1, customer=customer, username='test', uid=1000, group=self.group1,
homedir='/home/test', shell='/bin/bash') homedir='/home/test', shell='/bin/bash')
def test_clean_primary_group(self): def test_clean_primary_group(self):
@ -48,13 +125,11 @@ class AdditionalGroupTest(TestCaseWithCeleryTasks):
def test_save(self): def test_save(self):
group2 = Group.objects.create(groupname='test2', gid=1001) group2 = Group.objects.create(groupname='test2', gid=1001)
GroupTaskResult.objects.all().delete()
addgroup = AdditionalGroup(user=self.user, group=group2) addgroup = AdditionalGroup(user=self.user, group=group2)
addgroup.save() addgroup.save()
taskres = GroupTaskResult.objects.all() taskres = TaskResult.objects.all()
self.assertTrue(len(taskres), 1) self.assertTrue(len(taskres), 1)
self.assertEqual(taskres[0].task_name, 'add_ldap_user_to_group') self.assertEqual(taskres[0].task_name, 'setup_file_sftp_userdir')
self.assertEqual(taskres[0].group, group2)
def test_delete(self): def test_delete(self):
group2 = Group.objects.create(groupname='test2', gid=1001) group2 = Group.objects.create(groupname='test2', gid=1001)
@ -86,26 +161,27 @@ class GroupTest(TestCaseWithCeleryTasks):
def test_save(self): def test_save(self):
group = Group(gid=10000, groupname='test') group = Group(gid=10000, groupname='test')
self.assertIs(group.save(), group) self.assertIs(group.save(), group)
taskres = GroupTaskResult.objects.all()
self.assertEqual(len(taskres), 1)
self.assertEqual(taskres[0].group, group)
self.assertEqual(taskres[0].task_name, 'create_ldap_group')
def test_delete(self): def test_delete(self):
group = Group.objects.create(gid=10000, groupname='test') group = Group.objects.create(gid=10000, groupname='test')
self.assertEqual(len(Group.objects.all()), 1) self.assertEqual(len(Group.objects.all()), 1)
self.assertEqual(len(GroupTaskResult.objects.all()), 1)
group.delete() group.delete()
self.assertEqual(len(Group.objects.all()), 0) self.assertEqual(len(Group.objects.all()), 0)
self.assertEqual(len(GroupTaskResult.objects.all()), 0) self.assertEqual(len(TaskResult.objects.all()), 1)
tr = TaskResult.objects.first()
self.assertEqual(tr.task_name, 'delete_ldap_group')
class ShadowManagerTest(TestCaseWithCeleryTasks): class ShadowManagerTest(TestCaseWithCeleryTasks):
def setUp(self):
self.customer = Customer.objects.create(username='test')
super(ShadowManagerTest, self).setUp()
def test_create_shadow(self): def test_create_shadow(self):
user = User( user = User(
username='test', uid=1000, customer=self.customer, username='test', uid=1000,
group=Group(gid=1000, groupname='test'), group=Group(gid=1000, groupname='test'), homedir='/home/test',
homedir='/home/test', shell='/bin/fooshell') shell='/bin/fooshell')
shadow = Shadow.objects.create_shadow(user, 'test') shadow = Shadow.objects.create_shadow(user, 'test')
self.assertTrue(sha512_crypt.verify('test', shadow.passwd)) self.assertTrue(sha512_crypt.verify('test', shadow.passwd))
self.assertEqual(shadow.changedays, self.assertEqual(shadow.changedays,
@ -119,12 +195,16 @@ class ShadowManagerTest(TestCaseWithCeleryTasks):
class ShadowTest(TestCaseWithCeleryTasks): class ShadowTest(TestCaseWithCeleryTasks):
def setUp(self):
self.customer = Customer.objects.create(username='test')
super(ShadowTest, self).setUp()
def test___str__(self): def test___str__(self):
group = Group.objects.create( group = Group.objects.create(
groupname='test', gid=1000) groupname='test', gid=1000)
user = User.objects.create( user = User.objects.create(
username='test', uid=1000, group=group, homedir='/home/test', customer=self.customer, username='test', uid=1000, group=group,
shell='/bin/bash') homedir='/home/test', shell='/bin/bash')
shadow = Shadow(user=user) shadow = Shadow(user=user)
self.assertEqual(str(shadow), 'for user test (1000)') self.assertEqual(str(shadow), 'for user test (1000)')
@ -132,90 +212,13 @@ class ShadowTest(TestCaseWithCeleryTasks):
group = Group.objects.create( group = Group.objects.create(
groupname='test', gid=1000) groupname='test', gid=1000)
user = User.objects.create( user = User.objects.create(
username='test', uid=1000, group=group, homedir='/home/test', customer=self.customer, username='test', uid=1000, group=group,
shell='/bin/bash') homedir='/home/test', shell='/bin/bash')
shadow = Shadow(user=user) shadow = Shadow(user=user)
shadow.set_password('test') shadow.set_password('test')
self.assertTrue(sha512_crypt.verify('test', shadow.passwd)) self.assertTrue(sha512_crypt.verify('test', shadow.passwd))
TEST_TASK_UUID = '3120f6a8-2665-4fa3-a785-79efd28bfe92'
TEST_TASK_NAME = 'test.task'
TEST_TASK_RESULT = '4ll y0ur b453 4r3 b3l0ng t0 u5'
class TaskResultTest(TestCase):
def test__set_result_fields_not_ready(self):
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME)
mock.ready.return_value = False
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
self.assertFalse(tr.is_finished)
self.assertFalse(tr.is_success)
self.assertEqual(tr.state, '')
self.assertEqual(tr.result_body, '')
def test__set_result_fields_ready(self):
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME,
state='SUCCESS', result=TEST_TASK_RESULT)
mock.ready.return_value = True
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
self.assertTrue(tr.is_finished)
self.assertTrue(tr.is_success)
self.assertEqual(tr.state, 'SUCCESS')
self.assertEqual(tr.result_body, TEST_TASK_RESULT)
def test__set_result_fields_exception(self):
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME,
state='FAILURE', result=Exception('Fail'))
mock.ready.return_value = True
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
self.assertTrue(tr.is_finished)
self.assertFalse(tr.is_success)
self.assertEqual(tr.state, 'FAILURE')
self.assertEqual(tr.result_body, 'Fail')
@patch('osusers.models.AsyncResult')
def test_update_taskstatus_unfinished(self, asyncres):
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME)
mock.ready.return_value = False
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
self.assertFalse(tr.is_finished)
mymock = asyncres(TEST_TASK_UUID)
mymock.ready.return_value = True
mymock.state = 'SUCCESS'
mymock.result = TEST_RESULT
tr.update_taskstatus()
mymock.ready.assert_called_with()
self.assertTrue(tr.is_finished)
@patch('osusers.models.AsyncResult')
def test_update_taskstatus_finished(self, asyncres):
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME)
mock.ready.return_value = True
mock.state = 'SUCCESS'
mock.result = TEST_RESULT
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
self.assertTrue(tr.is_finished)
mymock = asyncres(TEST_TASK_UUID)
tr.update_taskstatus()
self.assertFalse(mymock.ready.called)
self.assertTrue(tr.is_finished)
TEST_RESULT = MagicMock()
TEST_RESULT.task_id = TEST_TASK_UUID
TEST_RESULT.task_name = TEST_TASK_NAME
TEST_RESULT.ready.return_value = False
class TaskResultManagerTest(TestCase):
def test_create(self):
tr = DeleteTaskResult.objects.create(TEST_RESULT, TEST_TASK_NAME)
self.assertIsInstance(tr, DeleteTaskResult)
self.assertEqual(tr.task_uuid, TEST_TASK_UUID)
self.assertEqual(tr.task_name, TEST_TASK_NAME)
@override_settings( @override_settings(
OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test', OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test',
OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell' OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell'
@ -224,13 +227,18 @@ class UserManagerTest(TestCaseWithCeleryTasks):
def _create_group(self): def _create_group(self):
return Group.objects.create(gid=10000, groupname='foo') return Group.objects.create(gid=10000, groupname='foo')
def setUp(self):
self.customer = Customer.objects.create(username='test')
super(UserManagerTest, self).setUp()
def test_get_next_uid_first(self): def test_get_next_uid_first(self):
self.assertEqual(User.objects.get_next_uid(), 10000) self.assertEqual(User.objects.get_next_uid(), 10000)
def test_get_next_uid_second(self): def test_get_next_uid_second(self):
User.objects.create( User.objects.create(
uid=10010, username='foo', group=self._create_group(), customer=self.customer, uid=10010, username='foo',
homedir='/home/foo', shell='/bin/fooshell') group=self._create_group(), homedir='/home/foo',
shell='/bin/fooshell')
self.assertEqual(User.objects.get_next_uid(), 10011) self.assertEqual(User.objects.get_next_uid(), 10011)
def test_get_next_username_first(self): def test_get_next_username_first(self):
@ -238,22 +246,23 @@ class UserManagerTest(TestCaseWithCeleryTasks):
def test_get_next_username_second(self): def test_get_next_username_second(self):
User.objects.create( User.objects.create(
uid=10000, username='test01', group=self._create_group(), customer=self.customer, uid=10000, username='test01',
homedir='/home/foo', shell='/bin/fooshell') group=self._create_group(), homedir='/home/foo',
shell='/bin/fooshell')
self.assertEqual(User.objects.get_next_username(), 'test02') self.assertEqual(User.objects.get_next_username(), 'test02')
def test_get_next_username_gaps(self): def test_get_next_username_gaps(self):
group = self._create_group() group = self._create_group()
User.objects.create( User.objects.create(
uid=10000, username='test01', group=group, customer=self.customer, uid=10000, username='test01', group=group,
homedir='/home/foo', shell='/bin/fooshell') homedir='/home/foo', shell='/bin/fooshell')
User.objects.create( User.objects.create(
uid=10002, username='test03', group=group, customer=self.customer, uid=10002, username='test03', group=group,
homedir='/home/foo', shell='/bin/fooshell') homedir='/home/foo', shell='/bin/fooshell')
self.assertEqual(User.objects.get_next_username(), 'test02') self.assertEqual(User.objects.get_next_username(), 'test02')
def test_create_user_first(self): def test_create_user_first(self):
user = User.objects.create_user() user = User.objects.create_user(customer=self.customer)
self.assertIsInstance(user, User) self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000) self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.gid, 10000)
@ -264,15 +273,16 @@ class UserManagerTest(TestCaseWithCeleryTasks):
self.assertIsNotNone(user.shadow) self.assertIsNotNone(user.shadow)
def test_create_user_tasks(self): def test_create_user_tasks(self):
user = User.objects.create_user() User.objects.create_user(customer=self.customer)
gtaskres = GroupTaskResult.objects.all() taskres = TaskResult.objects.all()
self.assertEqual(len(gtaskres), 1) self.assertEqual(len(taskres), 2)
self.assertEqual(gtaskres[0].task_name, 'create_ldap_group') tasknames = [r.task_name for r in taskres]
self.assertEqual(gtaskres[0].group, user.group) self.assertEqual(tasknames.count('setup_file_sftp_userdir'), 1)
self.assertEqual(tasknames.count('setup_file_mail_userdir'), 1)
def test_create_user_second(self): def test_create_user_second(self):
User.objects.create_user() User.objects.create_user(customer=self.customer)
user = User.objects.create_user() user = User.objects.create_user(customer=self.customer)
self.assertIsInstance(user, User) self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10001) self.assertEqual(user.uid, 10001)
self.assertEqual(user.group.gid, 10001) self.assertEqual(user.group.gid, 10001)
@ -284,7 +294,8 @@ class UserManagerTest(TestCaseWithCeleryTasks):
self.assertEqual(len(User.objects.all()), 2) self.assertEqual(len(User.objects.all()), 2)
def test_create_user_known_password(self): def test_create_user_known_password(self):
user = User.objects.create_user(password='foobar') user = User.objects.create_user(
customer=self.customer, password='foobar')
self.assertIsInstance(user, User) self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000) self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.gid, 10000)
@ -296,7 +307,8 @@ class UserManagerTest(TestCaseWithCeleryTasks):
self.assertTrue(sha512_crypt.verify('foobar', user.shadow.passwd)) self.assertTrue(sha512_crypt.verify('foobar', user.shadow.passwd))
def test_create_user_predefined_username(self): def test_create_user_predefined_username(self):
user = User.objects.create_user(username='tester') user = User.objects.create_user(
customer=self.customer, username='tester')
self.assertIsInstance(user, User) self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000) self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.gid, 10000)
@ -307,7 +319,7 @@ class UserManagerTest(TestCaseWithCeleryTasks):
self.assertIsNotNone(user.shadow) self.assertIsNotNone(user.shadow)
def test_create_user_commit(self): def test_create_user_commit(self):
user = User.objects.create_user(commit=True) user = User.objects.create_user(customer=self.customer, commit=True)
self.assertIsInstance(user, User) self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000) self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.gid, 10000)
@ -323,55 +335,157 @@ class UserManagerTest(TestCaseWithCeleryTasks):
OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell' OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell'
) )
class UserTest(TestCaseWithCeleryTasks): class UserTest(TestCaseWithCeleryTasks):
def setUp(self):
self.customer = Customer.objects.create_user('test')
super(UserTest, self).setUp()
def test___str__(self): def test___str__(self):
user = User.objects.create_user() user = User.objects.create_user(self.customer)
self.assertEqual(str(user), 'test01 (10000)') self.assertEqual(str(user), 'test01 (10000)')
def test_set_password(self): def test_set_password(self):
user = User.objects.create_user() user = User.objects.create_user(self.customer)
self.assertFalse(sha512_crypt.verify('test', user.shadow.passwd)) self.assertFalse(sha512_crypt.verify('test', user.shadow.passwd))
UserTaskResult.objects.all().delete()
user.set_password('test') user.set_password('test')
self.assertTrue(sha512_crypt.verify('test', user.shadow.passwd)) self.assertTrue(sha512_crypt.verify('test', user.shadow.passwd))
taskres = UserTaskResult.objects.all()
self.assertEqual(len(taskres), 1)
self.assertEqual(taskres[0].user, user)
self.assertEqual(taskres[0].task_name, 'create_ldap_user')
def test_save(self): def test_save(self):
user = User.objects.create_user() user = User.objects.create_user(self.customer)
UserTaskResult.objects.all().delete() TaskResult.objects.all().delete()
user.save() user.save()
taskres = UserTaskResult.objects.all() taskres = TaskResult.objects.all()
self.assertEqual(len(taskres), 1) self.assertEqual(len(taskres), 2)
self.assertEqual(taskres[0].user, user) task_names = [r.task_name for r in taskres]
self.assertEqual(taskres[0].task_name, 'create_ldap_user') self.assertIn('setup_file_sftp_userdir', task_names)
self.assertIn('setup_file_mail_userdir', task_names)
def test_delete_only_user(self): def test_delete_only_user(self):
user = User.objects.create_user() user = User.objects.create_user(self.customer)
TaskResult.objects.all().delete()
user.delete() user.delete()
taskres = DeleteTaskResult.objects.all() taskres = TaskResult.objects.all()
self.assertEqual(len(taskres), 2) self.assertEqual(len(taskres), 3)
self.assertIn('delete_ldap_user', tasknames = [r.task_name for r in taskres]
[r.task_name for r in taskres]) self.assertEqual(tasknames.count('delete_file_mail_userdir'), 1)
self.assertIn('delete_ldap_group_if_empty', self.assertEqual(tasknames.count('delete_file_sftp_userdir'), 1)
[r.task_name for r in taskres]) self.assertEqual(tasknames.count('delete_ldap_group'), 1)
self.assertEqual(len(User.objects.all()), 0) self.assertEqual(len(User.objects.all()), 0)
def test_delete_additional_groups(self): def test_delete_additional_groups(self):
group1 = Group.objects.create(gid=2000, groupname='group1') group1 = Group.objects.create(gid=2000, groupname='group1')
group2 = Group.objects.create(gid=2001, groupname='group2') group2 = Group.objects.create(gid=2001, groupname='group2')
user = User.objects.create_user() user = User.objects.create_user(self.customer)
for group in [group1, group2]: for group in [group1, group2]:
user.additionalgroup_set.add( user.additionalgroup_set.add(
AdditionalGroup.objects.create(user=user, group=group)) AdditionalGroup.objects.create(user=user, group=group))
TaskResult.objects.all().delete()
user.delete() user.delete()
taskres = DeleteTaskResult.objects.all() taskres = TaskResult.objects.all()
self.assertEqual(len(taskres), 4) self.assertEqual(len(taskres), 3)
tasknames = [t.task_name for t in taskres] tasknames = [t.task_name for t in taskres]
self.assertEqual(tasknames.count('remove_ldap_user_from_group'), 2) self.assertEqual(tasknames.count('delete_file_mail_userdir'), 1)
self.assertEqual(tasknames.count('delete_ldap_user'), 1) self.assertEqual(tasknames.count('delete_file_sftp_userdir'), 1)
self.assertEqual(tasknames.count('delete_ldap_group_if_empty'), 1) self.assertEqual(tasknames.count('delete_ldap_group'), 1)
self.assertEqual(len(User.objects.all()), 0) self.assertEqual(len(User.objects.all()), 0)
self.assertEqual(len(AdditionalGroup.objects.all()), 0) self.assertEqual(len(AdditionalGroup.objects.all()), 0)
def test_is_sftp_user(self):
user = User.objects.create_user(self.customer)
self.assertFalse(user.is_sftp_user())
sftp_group = Group.objects.create(
gid=2000, groupname=settings.OSUSER_SFTP_GROUP)
user.additionalgroup_set.add(
AdditionalGroup.objects.create(user=user, group=sftp_group))
self.assertTrue(user.is_sftp_user())
class SshPublicKeyManagerTest(TestCaseWithCeleryTasks):
def test_parse_keytext_rfc4716_1(self):
res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_1_RFC4716)
self.assertEqual(len(res), 3)
self.assertGreater(len(res[1]), 40)
self.assertEqual(res[0], 'ssh-rsa')
self.assertEqual(
res[2], '"1024-bit RSA, converted from OpenSSH by me@example.com"')
def test_parse_keytext_rfc4716_2(self):
res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_2_RFC4716)
self.assertEqual(len(res), 3)
self.assertEqual(res[0], 'ssh-dss')
self.assertGreater(len(res[1]), 40)
self.assertEqual(
res[2],
"This is my public key for use on servers which I don't like.")
def test_parse_keytext_rfc4716_3(self):
res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_3_RFC4716)
self.assertEqual(len(res), 3)
self.assertEqual(res[0], 'ssh-dss')
self.assertGreater(len(res[1]), 40)
self.assertEqual(res[2], "DSA Public Key for use with MyIsp")
def test_parse_keytext_openssh(self):
res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_4_OPENSSH)
self.assertEquals(len(res), 3)
self.assertEqual(res[0], 'ssh-rsa')
self.assertGreater(len(res[1]), 40)
self.assertEqual(res[2], '')
def test_parse_keytext_invalid(self):
with self.assertRaises(ValueError):
SshPublicKey.objects.parse_keytext("\r\n".join(["xx"]*10))
def test_parse_keytext_empty_line(self):
res = SshPublicKey.objects.parse_keytext(
EXAMPLE_KEY_6_RFC4716_EMPTY_LINE)
self.assertEqual(len(res), 3)
self.assertEqual(res[0], 'ssh-dss')
self.assertGreater(len(res[1]), 40)
self.assertEqual(res[2], "DSA Public Key for use with MyIsp")
def test_parse_keytext_multiline_comment(self):
res = SshPublicKey.objects.parse_keytext(
EXAMPLE_KEY_5_RFC4716_MULTILINE)
self.assertEqual(len(res), 3)
self.assertEqual(res[0], 'ssh-dss')
self.assertGreater(len(res[1]), 40)
self.assertEqual(res[2], "DSA Public Key for use with MyIsp")
def test_create_ssh_public_key(self):
customer = Customer.objects.create_user('test')
user = User.objects.create_user(customer)
key = SshPublicKey.objects.create_ssh_public_key(
user, EXAMPLE_KEY_4_OPENSSH)
self.assertIsInstance(key, SshPublicKey)
self.assertEqual(key.user, user)
self.assertEqual(key.algorithm, 'ssh-rsa')
self.assertEqual(key.data, EXAMPLE_KEY_4_OPENSSH.split()[1])
self.assertEqual(key.comment, '')
class SshPublicKeyTest(TestCaseWithCeleryTasks):
def setUp(self):
super(SshPublicKeyTest, self).setUp()
customer = Customer.objects.create_user('test')
self.user = User.objects.create_user(customer)
def test__str__rfc4716(self):
res = SshPublicKey.objects.create_ssh_public_key(
self.user, EXAMPLE_KEY_3_RFC4716)
self.assertEqual(
str(res), 'ssh-dss AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxae'
'hvx5wOJ0rzZdzoSOXxbETW6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7St'
'xyltHnXF1YLfKD1G4T6JYrdHYI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3g'
'Jq2e7Yisk/gF+1VAAAAFQDb8D5cvwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4'
'KLYk3IwRbXblwXdkPggA4pfdtW9vGfJ0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/F'
'XPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAAvioUPkmdMc0zuWoSOEsSNhVDtX3WdvVc'
'GcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACBAN7CY+KKv1gHpRzFwdQm7HK9bb1LA'
'o2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HSn24VYtYtsMu74qXviYjziVucWK'
'jjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5sY29ouezv4Xz2PuMch5VGPP'
'+CDqzCM4loWgV DSA Public Key for use with MyIsp')
def test__str__openssh(self):
res = SshPublicKey.objects.create_ssh_public_key(
self.user, EXAMPLE_KEY_4_OPENSSH)
self.assertEqual(str(res), EXAMPLE_KEY_4_OPENSSH)