Jan Dittberner
842e207acc
- add tests for AdditionalGroups methods save, delete and __str__ - add TaskResultTest.test_update_taskstatus_finished
392 lines
15 KiB
Python
392 lines
15 KiB
Python
from datetime import date
|
|
|
|
from django.core.exceptions import ValidationError
|
|
from django.test import TestCase
|
|
from django.test.utils import override_settings
|
|
from django.utils import timezone
|
|
|
|
from mock import patch, MagicMock
|
|
|
|
from passlib.hash import sha512_crypt
|
|
|
|
from osusers.models import (
|
|
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL,
|
|
AdditionalGroup,
|
|
DeleteTaskResult,
|
|
Group,
|
|
GroupTaskResult,
|
|
Shadow,
|
|
User,
|
|
UserTaskResult,
|
|
)
|
|
|
|
|
|
@override_settings(
|
|
CELERY_ALWAYS_EAGER=True,
|
|
CELERY_CACHE_BACKEND='memory',
|
|
BROKER_BACKEND='memory'
|
|
)
|
|
class TestCaseWithCeleryTasks(TestCase):
|
|
pass
|
|
|
|
|
|
class AdditionalGroupTest(TestCaseWithCeleryTasks):
|
|
def setUp(self):
|
|
self.group1 = Group.objects.create(groupname='test1', gid=1000)
|
|
self.user = User.objects.create(
|
|
username='test', uid=1000, group=self.group1,
|
|
homedir='/home/test', shell='/bin/bash')
|
|
|
|
def test_clean_primary_group(self):
|
|
testsubj = AdditionalGroup(user=self.user, group=self.group1)
|
|
with self.assertRaises(ValidationError) as cm:
|
|
testsubj.clean()
|
|
self.assertEqual(
|
|
cm.exception.message, CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)
|
|
|
|
def test_clean_other_group(self):
|
|
group2 = Group.objects.create(groupname='test2', gid=1001)
|
|
testsubj = AdditionalGroup(user=self.user, group=group2)
|
|
testsubj.clean()
|
|
|
|
def test_save(self):
|
|
group2 = Group.objects.create(groupname='test2', gid=1001)
|
|
GroupTaskResult.objects.all().delete()
|
|
addgroup = AdditionalGroup(user=self.user, group=group2)
|
|
addgroup.save()
|
|
taskres = GroupTaskResult.objects.all()
|
|
self.assertTrue(len(taskres), 1)
|
|
self.assertEqual(taskres[0].task_name, 'add_ldap_user_to_group')
|
|
self.assertEqual(taskres[0].group, group2)
|
|
|
|
def test_delete(self):
|
|
group2 = Group.objects.create(groupname='test2', gid=1001)
|
|
addgroup = AdditionalGroup.objects.create(user=self.user, group=group2)
|
|
DeleteTaskResult.objects.all().delete()
|
|
addgroup.delete()
|
|
taskres = DeleteTaskResult.objects.all()
|
|
self.assertTrue(len(taskres), 1)
|
|
self.assertEqual(taskres[0].task_name, 'remove_ldap_user_from_group')
|
|
self.assertEqual(taskres[0].modeltype, 'usergroup')
|
|
self.assertEqual(taskres[0].modelname, 'test (1000) in test2 (1001)')
|
|
self.assertEqual(len(AdditionalGroup.objects.all()), 0)
|
|
|
|
def test___str__(self):
|
|
group2 = Group.objects.create(groupname='test2', gid=1001)
|
|
addgroup = AdditionalGroup.objects.create(user=self.user, group=group2)
|
|
self.assertEqual(str(addgroup), 'test (1000) in test2 (1001)')
|
|
|
|
|
|
@override_settings(OSUSER_MINGID=10000)
|
|
class GroupManagerTest(TestCase):
|
|
def test_get_next_gid_first(self):
|
|
self.assertEqual(Group.objects.get_next_gid(), 10000)
|
|
|
|
def test_get_next_gid_second(self):
|
|
Group.objects.create(gid=10010, groupname='test')
|
|
self.assertEqual(Group.objects.get_next_gid(), 10011)
|
|
|
|
|
|
class GroupTest(TestCaseWithCeleryTasks):
|
|
def test___str__(self):
|
|
group = Group.objects.create(gid=10000, groupname='test')
|
|
self.assertEqual(str(group), 'test (10000)')
|
|
|
|
def test_save(self):
|
|
group = Group(gid=10000, groupname='test')
|
|
self.assertIs(group.save(), group)
|
|
taskres = GroupTaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 1)
|
|
self.assertEqual(taskres[0].group, group)
|
|
self.assertEqual(taskres[0].task_name, 'create_ldap_group')
|
|
|
|
def test_delete(self):
|
|
group = Group.objects.create(gid=10000, groupname='test')
|
|
self.assertEqual(len(Group.objects.all()), 1)
|
|
self.assertEqual(len(GroupTaskResult.objects.all()), 1)
|
|
group.delete()
|
|
self.assertEqual(len(Group.objects.all()), 0)
|
|
self.assertEqual(len(GroupTaskResult.objects.all()), 0)
|
|
taskres = DeleteTaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 1)
|
|
self.assertEqual(taskres[0].task_name,
|
|
'delete_ldap_group_if_empty')
|
|
self.assertEqual(taskres[0].modeltype, 'group')
|
|
self.assertEqual(taskres[0].modelname, 'test')
|
|
|
|
|
|
class ShadowManagerTest(TestCaseWithCeleryTasks):
|
|
def test_create_shadow(self):
|
|
user = User(
|
|
username='test', uid=1000,
|
|
group=Group(gid=1000, groupname='test'),
|
|
homedir='/home/test', shell='/bin/fooshell')
|
|
shadow = Shadow.objects.create_shadow(user, 'test')
|
|
self.assertTrue(sha512_crypt.verify('test', shadow.passwd))
|
|
self.assertEqual(shadow.changedays,
|
|
(timezone.now().date() - date(1970, 1, 1)).days)
|
|
self.assertEqual(shadow.user, user)
|
|
self.assertEqual(shadow.minage, 0)
|
|
self.assertIsNone(shadow.maxage)
|
|
self.assertEqual(shadow.gracedays, 7)
|
|
self.assertEqual(shadow.inactdays, 30)
|
|
self.assertIsNone(shadow.expiredays)
|
|
|
|
|
|
class ShadowTest(TestCaseWithCeleryTasks):
|
|
def test___str__(self):
|
|
group = Group.objects.create(
|
|
groupname='test', gid=1000)
|
|
user = User.objects.create(
|
|
username='test', uid=1000, group=group, homedir='/home/test',
|
|
shell='/bin/bash')
|
|
shadow = Shadow(user=user)
|
|
self.assertEqual(str(shadow), 'for user test (1000)')
|
|
|
|
def test_set_password(self):
|
|
group = Group.objects.create(
|
|
groupname='test', gid=1000)
|
|
user = User.objects.create(
|
|
username='test', uid=1000, group=group, homedir='/home/test',
|
|
shell='/bin/bash')
|
|
shadow = Shadow(user=user)
|
|
shadow.set_password('test')
|
|
self.assertTrue(sha512_crypt.verify('test', shadow.passwd))
|
|
|
|
|
|
TEST_TASK_UUID = '3120f6a8-2665-4fa3-a785-79efd28bfe92'
|
|
TEST_TASK_NAME = 'test.task'
|
|
TEST_TASK_RESULT = '4ll y0ur b453 4r3 b3l0ng t0 u5'
|
|
|
|
|
|
class TaskResultTest(TestCase):
|
|
def test__set_result_fields_not_ready(self):
|
|
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME)
|
|
mock.ready.return_value = False
|
|
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
|
|
self.assertFalse(tr.is_finished)
|
|
self.assertFalse(tr.is_success)
|
|
self.assertEqual(tr.state, '')
|
|
self.assertEqual(tr.result_body, '')
|
|
|
|
def test__set_result_fields_ready(self):
|
|
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME,
|
|
state='SUCCESS', result=TEST_TASK_RESULT)
|
|
mock.ready.return_value = True
|
|
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
|
|
self.assertTrue(tr.is_finished)
|
|
self.assertTrue(tr.is_success)
|
|
self.assertEqual(tr.state, 'SUCCESS')
|
|
self.assertEqual(tr.result_body, TEST_TASK_RESULT)
|
|
|
|
def test__set_result_fields_exception(self):
|
|
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME,
|
|
state='FAILURE', result=Exception('Fail'))
|
|
mock.ready.return_value = True
|
|
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
|
|
self.assertTrue(tr.is_finished)
|
|
self.assertFalse(tr.is_success)
|
|
self.assertEqual(tr.state, 'FAILURE')
|
|
self.assertEqual(tr.result_body, 'Fail')
|
|
|
|
@patch('osusers.models.AsyncResult')
|
|
def test_update_taskstatus_unfinished(self, asyncres):
|
|
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME)
|
|
mock.ready.return_value = False
|
|
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
|
|
self.assertFalse(tr.is_finished)
|
|
mymock = asyncres(TEST_TASK_UUID)
|
|
mymock.ready.return_value = True
|
|
mymock.state = 'SUCCESS'
|
|
mymock.result = TEST_RESULT
|
|
tr.update_taskstatus()
|
|
mymock.ready.assert_called_with()
|
|
self.assertTrue(tr.is_finished)
|
|
|
|
@patch('osusers.models.AsyncResult')
|
|
def test_update_taskstatus_finished(self, asyncres):
|
|
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME)
|
|
mock.ready.return_value = True
|
|
mock.state = 'SUCCESS'
|
|
mock.result = TEST_RESULT
|
|
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
|
|
self.assertTrue(tr.is_finished)
|
|
mymock = asyncres(TEST_TASK_UUID)
|
|
tr.update_taskstatus()
|
|
self.assertFalse(mymock.ready.called)
|
|
self.assertTrue(tr.is_finished)
|
|
|
|
|
|
TEST_RESULT = MagicMock()
|
|
TEST_RESULT.task_id = TEST_TASK_UUID
|
|
TEST_RESULT.task_name = TEST_TASK_NAME
|
|
TEST_RESULT.ready.return_value = False
|
|
|
|
|
|
class TaskResultManagerTest(TestCase):
|
|
def test_create(self):
|
|
tr = DeleteTaskResult.objects.create(TEST_RESULT, TEST_TASK_NAME)
|
|
self.assertIsInstance(tr, DeleteTaskResult)
|
|
self.assertEqual(tr.task_uuid, TEST_TASK_UUID)
|
|
self.assertEqual(tr.task_name, TEST_TASK_NAME)
|
|
|
|
|
|
@override_settings(
|
|
OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test',
|
|
OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell'
|
|
)
|
|
class UserManagerTest(TestCaseWithCeleryTasks):
|
|
def _create_group(self):
|
|
return Group.objects.create(gid=10000, groupname='foo')
|
|
|
|
def test_get_next_uid_first(self):
|
|
self.assertEqual(User.objects.get_next_uid(), 10000)
|
|
|
|
def test_get_next_uid_second(self):
|
|
User.objects.create(
|
|
uid=10010, username='foo', group=self._create_group(),
|
|
homedir='/home/foo', shell='/bin/fooshell')
|
|
self.assertEqual(User.objects.get_next_uid(), 10011)
|
|
|
|
def test_get_next_username_first(self):
|
|
self.assertEqual(User.objects.get_next_username(), 'test01')
|
|
|
|
def test_get_next_username_second(self):
|
|
User.objects.create(
|
|
uid=10000, username='test01', group=self._create_group(),
|
|
homedir='/home/foo', shell='/bin/fooshell')
|
|
self.assertEqual(User.objects.get_next_username(), 'test02')
|
|
|
|
def test_get_next_username_gaps(self):
|
|
group = self._create_group()
|
|
User.objects.create(
|
|
uid=10000, username='test01', group=group,
|
|
homedir='/home/foo', shell='/bin/fooshell')
|
|
User.objects.create(
|
|
uid=10002, username='test03', group=group,
|
|
homedir='/home/foo', shell='/bin/fooshell')
|
|
self.assertEqual(User.objects.get_next_username(), 'test02')
|
|
|
|
def test_create_user_first(self):
|
|
user = User.objects.create_user()
|
|
self.assertIsInstance(user, User)
|
|
self.assertEqual(user.uid, 10000)
|
|
self.assertEqual(user.group.gid, 10000)
|
|
self.assertEqual(user.group.groupname, 'test01')
|
|
self.assertEqual(user.username, 'test01')
|
|
self.assertEqual(user.homedir, '/home/test01')
|
|
self.assertEqual(user.shell, '/bin/fooshell')
|
|
self.assertIsNotNone(user.shadow)
|
|
|
|
def test_create_user_tasks(self):
|
|
user = User.objects.create_user()
|
|
gtaskres = GroupTaskResult.objects.all()
|
|
self.assertEqual(len(gtaskres), 1)
|
|
self.assertEqual(gtaskres[0].task_name, 'create_ldap_group')
|
|
self.assertEqual(gtaskres[0].group, user.group)
|
|
|
|
def test_create_user_second(self):
|
|
User.objects.create_user()
|
|
user = User.objects.create_user()
|
|
self.assertIsInstance(user, User)
|
|
self.assertEqual(user.uid, 10001)
|
|
self.assertEqual(user.group.gid, 10001)
|
|
self.assertEqual(user.group.groupname, 'test02')
|
|
self.assertEqual(user.username, 'test02')
|
|
self.assertEqual(user.homedir, '/home/test02')
|
|
self.assertEqual(user.shell, '/bin/fooshell')
|
|
self.assertIsNotNone(user.shadow)
|
|
self.assertEqual(len(User.objects.all()), 2)
|
|
|
|
def test_create_user_known_password(self):
|
|
user = User.objects.create_user(password='foobar')
|
|
self.assertIsInstance(user, User)
|
|
self.assertEqual(user.uid, 10000)
|
|
self.assertEqual(user.group.gid, 10000)
|
|
self.assertEqual(user.group.groupname, 'test01')
|
|
self.assertEqual(user.username, 'test01')
|
|
self.assertEqual(user.homedir, '/home/test01')
|
|
self.assertEqual(user.shell, '/bin/fooshell')
|
|
self.assertIsNotNone(user.shadow)
|
|
self.assertTrue(sha512_crypt.verify('foobar', user.shadow.passwd))
|
|
|
|
def test_create_user_predefined_username(self):
|
|
user = User.objects.create_user(username='tester')
|
|
self.assertIsInstance(user, User)
|
|
self.assertEqual(user.uid, 10000)
|
|
self.assertEqual(user.group.gid, 10000)
|
|
self.assertEqual(user.group.groupname, 'tester')
|
|
self.assertEqual(user.username, 'tester')
|
|
self.assertEqual(user.homedir, '/home/tester')
|
|
self.assertEqual(user.shell, '/bin/fooshell')
|
|
self.assertIsNotNone(user.shadow)
|
|
|
|
def test_create_user_commit(self):
|
|
user = User.objects.create_user(commit=True)
|
|
self.assertIsInstance(user, User)
|
|
self.assertEqual(user.uid, 10000)
|
|
self.assertEqual(user.group.gid, 10000)
|
|
self.assertEqual(user.group.groupname, 'test01')
|
|
self.assertEqual(user.username, 'test01')
|
|
self.assertEqual(user.homedir, '/home/test01')
|
|
self.assertEqual(user.shell, '/bin/fooshell')
|
|
self.assertIsNotNone(user.shadow)
|
|
|
|
|
|
@override_settings(
|
|
OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test',
|
|
OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell'
|
|
)
|
|
class UserTest(TestCaseWithCeleryTasks):
|
|
|
|
def test___str__(self):
|
|
user = User.objects.create_user()
|
|
self.assertEqual(str(user), 'test01 (10000)')
|
|
|
|
def test_set_password(self):
|
|
user = User.objects.create_user()
|
|
self.assertFalse(sha512_crypt.verify('test', user.shadow.passwd))
|
|
UserTaskResult.objects.all().delete()
|
|
user.set_password('test')
|
|
self.assertTrue(sha512_crypt.verify('test', user.shadow.passwd))
|
|
taskres = UserTaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 1)
|
|
self.assertEqual(taskres[0].user, user)
|
|
self.assertEqual(taskres[0].task_name, 'create_ldap_user')
|
|
|
|
def test_save(self):
|
|
user = User.objects.create_user()
|
|
UserTaskResult.objects.all().delete()
|
|
user.save()
|
|
taskres = UserTaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 1)
|
|
self.assertEqual(taskres[0].user, user)
|
|
self.assertEqual(taskres[0].task_name, 'create_ldap_user')
|
|
|
|
def test_delete_only_user(self):
|
|
user = User.objects.create_user()
|
|
user.delete()
|
|
taskres = DeleteTaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 2)
|
|
self.assertIn('delete_ldap_user',
|
|
[r.task_name for r in taskres])
|
|
self.assertIn('delete_ldap_group_if_empty',
|
|
[r.task_name for r in taskres])
|
|
self.assertEqual(len(User.objects.all()), 0)
|
|
|
|
def test_delete_additional_groups(self):
|
|
group1 = Group.objects.create(gid=2000, groupname='group1')
|
|
group2 = Group.objects.create(gid=2001, groupname='group2')
|
|
user = User.objects.create_user()
|
|
for group in [group1, group2]:
|
|
user.additionalgroup_set.add(
|
|
AdditionalGroup.objects.create(user=user, group=group))
|
|
user.delete()
|
|
taskres = DeleteTaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 4)
|
|
tasknames = [t.task_name for t in taskres]
|
|
self.assertEqual(tasknames.count('remove_ldap_user_from_group'), 2)
|
|
self.assertEqual(tasknames.count('delete_ldap_user'), 1)
|
|
self.assertEqual(tasknames.count('delete_ldap_group_if_empty'), 1)
|
|
self.assertEqual(len(User.objects.all()), 0)
|
|
self.assertEqual(len(AdditionalGroup.objects.all()), 0)
|