From 20359681dbf97ce79e5607338cfcf9a6dd42742f Mon Sep 17 00:00:00 2001 From: Jan Dittberner Date: Sat, 31 Jan 2015 21:48:07 +0100 Subject: [PATCH] implement SshPublicKey model, manager and tests - implement osusers.models.SshPublicKey and osusers.models.SshPublicKeyManager - fix broken osusers.models.tests.test_models - add new test classes SshPublicKeyManagerTest and SshPublicKeyTest - add migration for SshPublicKey model --- .../migrations/0005_auto_20150131_2009.py | 37 ++ gnuviechadmin/osusers/models.py | 102 +++++ gnuviechadmin/osusers/tests/test_models.py | 394 +++++++++++------- 3 files changed, 393 insertions(+), 140 deletions(-) create mode 100644 gnuviechadmin/osusers/migrations/0005_auto_20150131_2009.py diff --git a/gnuviechadmin/osusers/migrations/0005_auto_20150131_2009.py b/gnuviechadmin/osusers/migrations/0005_auto_20150131_2009.py new file mode 100644 index 0000000..809f1a2 --- /dev/null +++ b/gnuviechadmin/osusers/migrations/0005_auto_20150131_2009.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations +import django.utils.timezone +import model_utils.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ('osusers', '0004_auto_20150104_1751'), + ] + + operations = [ + migrations.CreateModel( + name='SshPublicKey', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, verbose_name='created', editable=False)), + ('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, verbose_name='modified', editable=False)), + ('algorithm', models.CharField(max_length=20, verbose_name='Algorithm')), + ('data', models.TextField(help_text='Base64 encoded key bytes', verbose_name='Key bytes')), + ('comment', models.TextField(verbose_name='Comment', blank=True)), + ('user', models.ForeignKey(verbose_name='User', to='osusers.User')), + ], + options={ + 'verbose_name': 'SSH public key', + 'verbose_name_plural': 'SSH public keys', + }, + bases=(models.Model,), + ), + migrations.AlterUniqueTogether( + name='sshpublickey', + unique_together=set([('user', 'algorithm', 'data')]), + ), + ] diff --git a/gnuviechadmin/osusers/models.py b/gnuviechadmin/osusers/models.py index 2f62463..9f13e7f 100644 --- a/gnuviechadmin/osusers/models.py +++ b/gnuviechadmin/osusers/models.py @@ -4,9 +4,11 @@ This module defines the database models of operating system users. """ from __future__ import unicode_literals +import base64 from datetime import date import logging import os +import six from django.db import models, transaction from django.conf import settings @@ -477,3 +479,103 @@ class AdditionalGroup(TimeStampedModel, models.Model): 'remove_ldap_user_from_group' ) super(AdditionalGroup, self).delete(*args, **kwargs) + + +class SshPublicKeyManager(models.Manager): + """ + Default manager for :py:class:`SSH public key + ` instances. + + """ + + def parse_keytext(self, keytext): + """ + Parse a SSH public key in OpenSSH or :rfc:`4716` format into its + components algorithm, key data and comment. + + :param str keytext: key text + :return: triple of algorithm name, key data and comment + :rtype: triple of str + + """ + if keytext.startswith('---- BEGIN SSH2 PUBLIC KEY ----'): + comment = '' + data = '' + continued = '' + headers = {} + for line in keytext.splitlines(): + if line == '---- BEGIN SSH2 PUBLIC KEY ----': + continue + elif ':' in line: # a header line + header_tag, header_value = [ + item.strip() for item in line.split(':', 1)] + if header_value.endswith('\\'): + continued = header_value[:-1] + else: + headers[header_tag.lower()] = header_value + elif continued: + if line.endswith('\\'): + continued += line[:-1] + continue + header_value = continued + line + headers[header_tag.lower()] = header_value + continued = '' + elif line == '---- END SSH2 PUBLIC KEY ----': + break + elif line: # ignore empty lines + data += line + if 'comment' in headers: + comment = headers['comment'] + else: + parts = keytext.split() + if len(parts) > 3: + raise ValueError("unsupported key format") + data = parts[1] + comment = len(parts) == 3 and parts[2] or "" + keybytes = base64.b64decode(data) + parts = keybytes.split(b'\x00' * 3) + alglength = six.byte2int(parts[1]) + algname = parts[1][1:1+alglength] + return algname, data, comment + + def create_ssh_public_key(self, user, keytext): + """ + Create a new :py:class:`SSH public key ` + for a user from the given key text representation. The text can be + either in openssh format or :rfc:`4716` format. + + :param user: :py:class:`operating system user ` + :param str keytext: key text + :return: public key + :retype: :py:class:`osusers.models.SshPublicKey` + + """ + algorithm, data, comment = self.parse_keytext(keytext) + return self.create( + user=user, algorithm=algorithm, data=data, comment=comment) + + +@python_2_unicode_compatible +class SshPublicKey(TimeStampedModel): + """ + This entity class represents single SSH keys for an :py:class:`operating + system user `. + + """ + user = models.ForeignKey(User, verbose_name=_('User')) + algorithm = models.CharField(_('Algorithm'), max_length=20) + data = models.TextField(_('Key bytes'), + help_text=_('Base64 encoded key bytes')) + comment = models.TextField(_('Comment'), blank=True) + + objects = SshPublicKeyManager() + + class Meta: + verbose_name = _('SSH public key') + verbose_name_plural = _('SSH public keys') + unique_together = [('user', 'algorithm', 'data')] + + def __str__(self): + return "{algorithm} {data} {comment}".format( + algorithm=self.algorithm, data=self.data, comment=self.comment + ).strip() diff --git a/gnuviechadmin/osusers/tests/test_models.py b/gnuviechadmin/osusers/tests/test_models.py index f1a2c11..cf137f8 100644 --- a/gnuviechadmin/osusers/tests/test_models.py +++ b/gnuviechadmin/osusers/tests/test_models.py @@ -1,11 +1,11 @@ from datetime import date +from django.conf import settings from django.core.exceptions import ValidationError from django.test import TestCase from django.test.utils import override_settings from django.utils import timezone - -from mock import patch, MagicMock +from django.contrib.auth import get_user_model from passlib.hash import sha512_crypt @@ -14,8 +14,84 @@ from osusers.models import ( AdditionalGroup, Group, Shadow, + SshPublicKey, User, ) +from taskresults.models import TaskResult + + +EXAMPLE_KEY_1_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ---- +Comment: "1024-bit RSA, converted from OpenSSH by me@example.com" +x-command: /home/me/bin/lock-in-guest.sh +AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb +YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ +5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE= +---- END SSH2 PUBLIC KEY ----""" + +EXAMPLE_KEY_2_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ---- +Comment: This is my public key for use on \ +servers which I don't like. +AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET +W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH +YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c +vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf +J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA +vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB +AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS +n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5 +sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV +---- END SSH2 PUBLIC KEY ----""" + +EXAMPLE_KEY_3_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ---- +Comment: DSA Public Key for use with MyIsp +AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET +W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH +YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c +vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf +J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA +vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB +AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS +n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5 +sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV +---- END SSH2 PUBLIC KEY ----""" + +EXAMPLE_KEY_4_OPENSSH = "".join(( +"ssh-rsa ", +"AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb", +"YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ", +"5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE=" +)) + +EXAMPLE_KEY_5_RFC4716_MULTILINE = """---- BEGIN SSH2 PUBLIC KEY ---- +Comment: DSA Public Key \\ +for use with \\ +MyIsp +AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET +W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH +YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c +vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf +J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA +vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB +AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS +n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5 +sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV +---- END SSH2 PUBLIC KEY ----""" + +EXAMPLE_KEY_6_RFC4716_EMPTY_LINE = """---- BEGIN SSH2 PUBLIC KEY ---- +Comment: DSA Public Key for use with MyIsp + +AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET +W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH +YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c +vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf +J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA +vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB +AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS +n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5 +sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV +---- END SSH2 PUBLIC KEY ----""" + +Customer = get_user_model() @override_settings( @@ -29,9 +105,10 @@ class TestCaseWithCeleryTasks(TestCase): class AdditionalGroupTest(TestCaseWithCeleryTasks): def setUp(self): + customer = Customer.objects.create(username='test') self.group1 = Group.objects.create(groupname='test1', gid=1000) self.user = User.objects.create( - username='test', uid=1000, group=self.group1, + customer=customer, username='test', uid=1000, group=self.group1, homedir='/home/test', shell='/bin/bash') def test_clean_primary_group(self): @@ -48,13 +125,11 @@ class AdditionalGroupTest(TestCaseWithCeleryTasks): def test_save(self): group2 = Group.objects.create(groupname='test2', gid=1001) - GroupTaskResult.objects.all().delete() addgroup = AdditionalGroup(user=self.user, group=group2) addgroup.save() - taskres = GroupTaskResult.objects.all() + taskres = TaskResult.objects.all() self.assertTrue(len(taskres), 1) - self.assertEqual(taskres[0].task_name, 'add_ldap_user_to_group') - self.assertEqual(taskres[0].group, group2) + self.assertEqual(taskres[0].task_name, 'setup_file_sftp_userdir') def test_delete(self): group2 = Group.objects.create(groupname='test2', gid=1001) @@ -86,26 +161,27 @@ class GroupTest(TestCaseWithCeleryTasks): def test_save(self): group = Group(gid=10000, groupname='test') self.assertIs(group.save(), group) - taskres = GroupTaskResult.objects.all() - self.assertEqual(len(taskres), 1) - self.assertEqual(taskres[0].group, group) - self.assertEqual(taskres[0].task_name, 'create_ldap_group') def test_delete(self): group = Group.objects.create(gid=10000, groupname='test') self.assertEqual(len(Group.objects.all()), 1) - self.assertEqual(len(GroupTaskResult.objects.all()), 1) group.delete() self.assertEqual(len(Group.objects.all()), 0) - self.assertEqual(len(GroupTaskResult.objects.all()), 0) + self.assertEqual(len(TaskResult.objects.all()), 1) + tr = TaskResult.objects.first() + self.assertEqual(tr.task_name, 'delete_ldap_group') class ShadowManagerTest(TestCaseWithCeleryTasks): + def setUp(self): + self.customer = Customer.objects.create(username='test') + super(ShadowManagerTest, self).setUp() + def test_create_shadow(self): user = User( - username='test', uid=1000, - group=Group(gid=1000, groupname='test'), - homedir='/home/test', shell='/bin/fooshell') + customer=self.customer, username='test', uid=1000, + group=Group(gid=1000, groupname='test'), homedir='/home/test', + shell='/bin/fooshell') shadow = Shadow.objects.create_shadow(user, 'test') self.assertTrue(sha512_crypt.verify('test', shadow.passwd)) self.assertEqual(shadow.changedays, @@ -119,12 +195,16 @@ class ShadowManagerTest(TestCaseWithCeleryTasks): class ShadowTest(TestCaseWithCeleryTasks): + def setUp(self): + self.customer = Customer.objects.create(username='test') + super(ShadowTest, self).setUp() + def test___str__(self): group = Group.objects.create( groupname='test', gid=1000) user = User.objects.create( - username='test', uid=1000, group=group, homedir='/home/test', - shell='/bin/bash') + customer=self.customer, username='test', uid=1000, group=group, + homedir='/home/test', shell='/bin/bash') shadow = Shadow(user=user) self.assertEqual(str(shadow), 'for user test (1000)') @@ -132,90 +212,13 @@ class ShadowTest(TestCaseWithCeleryTasks): group = Group.objects.create( groupname='test', gid=1000) user = User.objects.create( - username='test', uid=1000, group=group, homedir='/home/test', - shell='/bin/bash') + customer=self.customer, username='test', uid=1000, group=group, + homedir='/home/test', shell='/bin/bash') shadow = Shadow(user=user) shadow.set_password('test') self.assertTrue(sha512_crypt.verify('test', shadow.passwd)) -TEST_TASK_UUID = '3120f6a8-2665-4fa3-a785-79efd28bfe92' -TEST_TASK_NAME = 'test.task' -TEST_TASK_RESULT = '4ll y0ur b453 4r3 b3l0ng t0 u5' - - -class TaskResultTest(TestCase): - def test__set_result_fields_not_ready(self): - mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME) - mock.ready.return_value = False - tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) - self.assertFalse(tr.is_finished) - self.assertFalse(tr.is_success) - self.assertEqual(tr.state, '') - self.assertEqual(tr.result_body, '') - - def test__set_result_fields_ready(self): - mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME, - state='SUCCESS', result=TEST_TASK_RESULT) - mock.ready.return_value = True - tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) - self.assertTrue(tr.is_finished) - self.assertTrue(tr.is_success) - self.assertEqual(tr.state, 'SUCCESS') - self.assertEqual(tr.result_body, TEST_TASK_RESULT) - - def test__set_result_fields_exception(self): - mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME, - state='FAILURE', result=Exception('Fail')) - mock.ready.return_value = True - tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) - self.assertTrue(tr.is_finished) - self.assertFalse(tr.is_success) - self.assertEqual(tr.state, 'FAILURE') - self.assertEqual(tr.result_body, 'Fail') - - @patch('osusers.models.AsyncResult') - def test_update_taskstatus_unfinished(self, asyncres): - mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME) - mock.ready.return_value = False - tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) - self.assertFalse(tr.is_finished) - mymock = asyncres(TEST_TASK_UUID) - mymock.ready.return_value = True - mymock.state = 'SUCCESS' - mymock.result = TEST_RESULT - tr.update_taskstatus() - mymock.ready.assert_called_with() - self.assertTrue(tr.is_finished) - - @patch('osusers.models.AsyncResult') - def test_update_taskstatus_finished(self, asyncres): - mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME) - mock.ready.return_value = True - mock.state = 'SUCCESS' - mock.result = TEST_RESULT - tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) - self.assertTrue(tr.is_finished) - mymock = asyncres(TEST_TASK_UUID) - tr.update_taskstatus() - self.assertFalse(mymock.ready.called) - self.assertTrue(tr.is_finished) - - -TEST_RESULT = MagicMock() -TEST_RESULT.task_id = TEST_TASK_UUID -TEST_RESULT.task_name = TEST_TASK_NAME -TEST_RESULT.ready.return_value = False - - -class TaskResultManagerTest(TestCase): - def test_create(self): - tr = DeleteTaskResult.objects.create(TEST_RESULT, TEST_TASK_NAME) - self.assertIsInstance(tr, DeleteTaskResult) - self.assertEqual(tr.task_uuid, TEST_TASK_UUID) - self.assertEqual(tr.task_name, TEST_TASK_NAME) - - @override_settings( OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test', OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell' @@ -224,13 +227,18 @@ class UserManagerTest(TestCaseWithCeleryTasks): def _create_group(self): return Group.objects.create(gid=10000, groupname='foo') + def setUp(self): + self.customer = Customer.objects.create(username='test') + super(UserManagerTest, self).setUp() + def test_get_next_uid_first(self): self.assertEqual(User.objects.get_next_uid(), 10000) def test_get_next_uid_second(self): User.objects.create( - uid=10010, username='foo', group=self._create_group(), - homedir='/home/foo', shell='/bin/fooshell') + customer=self.customer, uid=10010, username='foo', + group=self._create_group(), homedir='/home/foo', + shell='/bin/fooshell') self.assertEqual(User.objects.get_next_uid(), 10011) def test_get_next_username_first(self): @@ -238,22 +246,23 @@ class UserManagerTest(TestCaseWithCeleryTasks): def test_get_next_username_second(self): User.objects.create( - uid=10000, username='test01', group=self._create_group(), - homedir='/home/foo', shell='/bin/fooshell') + customer=self.customer, uid=10000, username='test01', + group=self._create_group(), homedir='/home/foo', + shell='/bin/fooshell') self.assertEqual(User.objects.get_next_username(), 'test02') def test_get_next_username_gaps(self): group = self._create_group() User.objects.create( - uid=10000, username='test01', group=group, + customer=self.customer, uid=10000, username='test01', group=group, homedir='/home/foo', shell='/bin/fooshell') User.objects.create( - uid=10002, username='test03', group=group, + customer=self.customer, uid=10002, username='test03', group=group, homedir='/home/foo', shell='/bin/fooshell') self.assertEqual(User.objects.get_next_username(), 'test02') def test_create_user_first(self): - user = User.objects.create_user() + user = User.objects.create_user(customer=self.customer) self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) @@ -264,15 +273,16 @@ class UserManagerTest(TestCaseWithCeleryTasks): self.assertIsNotNone(user.shadow) def test_create_user_tasks(self): - user = User.objects.create_user() - gtaskres = GroupTaskResult.objects.all() - self.assertEqual(len(gtaskres), 1) - self.assertEqual(gtaskres[0].task_name, 'create_ldap_group') - self.assertEqual(gtaskres[0].group, user.group) + User.objects.create_user(customer=self.customer) + taskres = TaskResult.objects.all() + self.assertEqual(len(taskres), 2) + tasknames = [r.task_name for r in taskres] + self.assertEqual(tasknames.count('setup_file_sftp_userdir'), 1) + self.assertEqual(tasknames.count('setup_file_mail_userdir'), 1) def test_create_user_second(self): - User.objects.create_user() - user = User.objects.create_user() + User.objects.create_user(customer=self.customer) + user = User.objects.create_user(customer=self.customer) self.assertIsInstance(user, User) self.assertEqual(user.uid, 10001) self.assertEqual(user.group.gid, 10001) @@ -284,7 +294,8 @@ class UserManagerTest(TestCaseWithCeleryTasks): self.assertEqual(len(User.objects.all()), 2) def test_create_user_known_password(self): - user = User.objects.create_user(password='foobar') + user = User.objects.create_user( + customer=self.customer, password='foobar') self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) @@ -296,7 +307,8 @@ class UserManagerTest(TestCaseWithCeleryTasks): self.assertTrue(sha512_crypt.verify('foobar', user.shadow.passwd)) def test_create_user_predefined_username(self): - user = User.objects.create_user(username='tester') + user = User.objects.create_user( + customer=self.customer, username='tester') self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) @@ -307,7 +319,7 @@ class UserManagerTest(TestCaseWithCeleryTasks): self.assertIsNotNone(user.shadow) def test_create_user_commit(self): - user = User.objects.create_user(commit=True) + user = User.objects.create_user(customer=self.customer, commit=True) self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) @@ -323,55 +335,157 @@ class UserManagerTest(TestCaseWithCeleryTasks): OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell' ) class UserTest(TestCaseWithCeleryTasks): + def setUp(self): + self.customer = Customer.objects.create_user('test') + super(UserTest, self).setUp() def test___str__(self): - user = User.objects.create_user() + user = User.objects.create_user(self.customer) self.assertEqual(str(user), 'test01 (10000)') def test_set_password(self): - user = User.objects.create_user() + user = User.objects.create_user(self.customer) self.assertFalse(sha512_crypt.verify('test', user.shadow.passwd)) - UserTaskResult.objects.all().delete() user.set_password('test') self.assertTrue(sha512_crypt.verify('test', user.shadow.passwd)) - taskres = UserTaskResult.objects.all() - self.assertEqual(len(taskres), 1) - self.assertEqual(taskres[0].user, user) - self.assertEqual(taskres[0].task_name, 'create_ldap_user') def test_save(self): - user = User.objects.create_user() - UserTaskResult.objects.all().delete() + user = User.objects.create_user(self.customer) + TaskResult.objects.all().delete() user.save() - taskres = UserTaskResult.objects.all() - self.assertEqual(len(taskres), 1) - self.assertEqual(taskres[0].user, user) - self.assertEqual(taskres[0].task_name, 'create_ldap_user') + taskres = TaskResult.objects.all() + self.assertEqual(len(taskres), 2) + task_names = [r.task_name for r in taskres] + self.assertIn('setup_file_sftp_userdir', task_names) + self.assertIn('setup_file_mail_userdir', task_names) def test_delete_only_user(self): - user = User.objects.create_user() + user = User.objects.create_user(self.customer) + TaskResult.objects.all().delete() user.delete() - taskres = DeleteTaskResult.objects.all() - self.assertEqual(len(taskres), 2) - self.assertIn('delete_ldap_user', - [r.task_name for r in taskres]) - self.assertIn('delete_ldap_group_if_empty', - [r.task_name for r in taskres]) + taskres = TaskResult.objects.all() + self.assertEqual(len(taskres), 3) + tasknames = [r.task_name for r in taskres] + self.assertEqual(tasknames.count('delete_file_mail_userdir'), 1) + self.assertEqual(tasknames.count('delete_file_sftp_userdir'), 1) + self.assertEqual(tasknames.count('delete_ldap_group'), 1) self.assertEqual(len(User.objects.all()), 0) def test_delete_additional_groups(self): group1 = Group.objects.create(gid=2000, groupname='group1') group2 = Group.objects.create(gid=2001, groupname='group2') - user = User.objects.create_user() + user = User.objects.create_user(self.customer) for group in [group1, group2]: user.additionalgroup_set.add( AdditionalGroup.objects.create(user=user, group=group)) + TaskResult.objects.all().delete() user.delete() - taskres = DeleteTaskResult.objects.all() - self.assertEqual(len(taskres), 4) + taskres = TaskResult.objects.all() + self.assertEqual(len(taskres), 3) tasknames = [t.task_name for t in taskres] - self.assertEqual(tasknames.count('remove_ldap_user_from_group'), 2) - self.assertEqual(tasknames.count('delete_ldap_user'), 1) - self.assertEqual(tasknames.count('delete_ldap_group_if_empty'), 1) + self.assertEqual(tasknames.count('delete_file_mail_userdir'), 1) + self.assertEqual(tasknames.count('delete_file_sftp_userdir'), 1) + self.assertEqual(tasknames.count('delete_ldap_group'), 1) self.assertEqual(len(User.objects.all()), 0) self.assertEqual(len(AdditionalGroup.objects.all()), 0) + + def test_is_sftp_user(self): + user = User.objects.create_user(self.customer) + self.assertFalse(user.is_sftp_user()) + + sftp_group = Group.objects.create( + gid=2000, groupname=settings.OSUSER_SFTP_GROUP) + user.additionalgroup_set.add( + AdditionalGroup.objects.create(user=user, group=sftp_group)) + self.assertTrue(user.is_sftp_user()) + + +class SshPublicKeyManagerTest(TestCaseWithCeleryTasks): + def test_parse_keytext_rfc4716_1(self): + res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_1_RFC4716) + self.assertEqual(len(res), 3) + self.assertGreater(len(res[1]), 40) + self.assertEqual(res[0], 'ssh-rsa') + self.assertEqual( + res[2], '"1024-bit RSA, converted from OpenSSH by me@example.com"') + + def test_parse_keytext_rfc4716_2(self): + res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_2_RFC4716) + self.assertEqual(len(res), 3) + self.assertEqual(res[0], 'ssh-dss') + self.assertGreater(len(res[1]), 40) + self.assertEqual( + res[2], + "This is my public key for use on servers which I don't like.") + + def test_parse_keytext_rfc4716_3(self): + res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_3_RFC4716) + self.assertEqual(len(res), 3) + self.assertEqual(res[0], 'ssh-dss') + self.assertGreater(len(res[1]), 40) + self.assertEqual(res[2], "DSA Public Key for use with MyIsp") + + def test_parse_keytext_openssh(self): + res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_4_OPENSSH) + self.assertEquals(len(res), 3) + self.assertEqual(res[0], 'ssh-rsa') + self.assertGreater(len(res[1]), 40) + self.assertEqual(res[2], '') + + def test_parse_keytext_invalid(self): + with self.assertRaises(ValueError): + SshPublicKey.objects.parse_keytext("\r\n".join(["xx"]*10)) + + def test_parse_keytext_empty_line(self): + res = SshPublicKey.objects.parse_keytext( + EXAMPLE_KEY_6_RFC4716_EMPTY_LINE) + self.assertEqual(len(res), 3) + self.assertEqual(res[0], 'ssh-dss') + self.assertGreater(len(res[1]), 40) + self.assertEqual(res[2], "DSA Public Key for use with MyIsp") + + def test_parse_keytext_multiline_comment(self): + res = SshPublicKey.objects.parse_keytext( + EXAMPLE_KEY_5_RFC4716_MULTILINE) + self.assertEqual(len(res), 3) + self.assertEqual(res[0], 'ssh-dss') + self.assertGreater(len(res[1]), 40) + self.assertEqual(res[2], "DSA Public Key for use with MyIsp") + + def test_create_ssh_public_key(self): + customer = Customer.objects.create_user('test') + user = User.objects.create_user(customer) + key = SshPublicKey.objects.create_ssh_public_key( + user, EXAMPLE_KEY_4_OPENSSH) + self.assertIsInstance(key, SshPublicKey) + self.assertEqual(key.user, user) + self.assertEqual(key.algorithm, 'ssh-rsa') + self.assertEqual(key.data, EXAMPLE_KEY_4_OPENSSH.split()[1]) + self.assertEqual(key.comment, '') + + +class SshPublicKeyTest(TestCaseWithCeleryTasks): + def setUp(self): + super(SshPublicKeyTest, self).setUp() + customer = Customer.objects.create_user('test') + self.user = User.objects.create_user(customer) + + def test__str__rfc4716(self): + res = SshPublicKey.objects.create_ssh_public_key( + self.user, EXAMPLE_KEY_3_RFC4716) + self.assertEqual( + str(res), 'ssh-dss AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxae' + 'hvx5wOJ0rzZdzoSOXxbETW6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7St' + 'xyltHnXF1YLfKD1G4T6JYrdHYI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3g' + 'Jq2e7Yisk/gF+1VAAAAFQDb8D5cvwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4' + 'KLYk3IwRbXblwXdkPggA4pfdtW9vGfJ0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/F' + 'XPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAAvioUPkmdMc0zuWoSOEsSNhVDtX3WdvVc' + 'GcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACBAN7CY+KKv1gHpRzFwdQm7HK9bb1LA' + 'o2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HSn24VYtYtsMu74qXviYjziVucWK' + 'jjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5sY29ouezv4Xz2PuMch5VGPP' + '+CDqzCM4loWgV DSA Public Key for use with MyIsp') + + def test__str__openssh(self): + res = SshPublicKey.objects.create_ssh_public_key( + self.user, EXAMPLE_KEY_4_OPENSSH) + self.assertEqual(str(res), EXAMPLE_KEY_4_OPENSSH)