from datetime import date from django.core.exceptions import ValidationError from django.test import TestCase from django.test.utils import override_settings from django.utils import timezone from mock import patch, MagicMock from passlib.hash import sha512_crypt from osusers.models import ( CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL, AdditionalGroup, DeleteTaskResult, Group, GroupTaskResult, Shadow, User, UserTaskResult, ) @override_settings( CELERY_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND='memory', BROKER_BACKEND='memory' ) class TestCaseWithCeleryTasks(TestCase): pass class AdditionalGroupTest(TestCaseWithCeleryTasks): def setUp(self): self.group1 = Group.objects.create(groupname='test1', gid=1000) self.user = User.objects.create( username='test', uid=1000, group=self.group1, homedir='/home/test', shell='/bin/bash') def test_clean_primary_group(self): testsubj = AdditionalGroup(user=self.user, group=self.group1) with self.assertRaises(ValidationError) as cm: testsubj.clean() self.assertEqual( cm.exception.message, CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL) def test_clean_other_group(self): group2 = Group.objects.create(groupname='test2', gid=1001) testsubj = AdditionalGroup(user=self.user, group=group2) testsubj.clean() def test_save(self): group2 = Group.objects.create(groupname='test2', gid=1001) GroupTaskResult.objects.all().delete() addgroup = AdditionalGroup(user=self.user, group=group2) addgroup.save() taskres = GroupTaskResult.objects.all() self.assertTrue(len(taskres), 1) self.assertEqual(taskres[0].task_name, 'add_ldap_user_to_group') self.assertEqual(taskres[0].group, group2) def test_delete(self): group2 = Group.objects.create(groupname='test2', gid=1001) addgroup = AdditionalGroup.objects.create(user=self.user, group=group2) DeleteTaskResult.objects.all().delete() addgroup.delete() taskres = DeleteTaskResult.objects.all() self.assertTrue(len(taskres), 1) self.assertEqual(taskres[0].task_name, 'remove_ldap_user_from_group') self.assertEqual(taskres[0].modeltype, 'usergroup') self.assertEqual(taskres[0].modelname, 'test (1000) in test2 (1001)') self.assertEqual(len(AdditionalGroup.objects.all()), 0) def test___str__(self): group2 = Group.objects.create(groupname='test2', gid=1001) addgroup = AdditionalGroup.objects.create(user=self.user, group=group2) self.assertEqual(str(addgroup), 'test (1000) in test2 (1001)') @override_settings(OSUSER_MINGID=10000) class GroupManagerTest(TestCase): def test_get_next_gid_first(self): self.assertEqual(Group.objects.get_next_gid(), 10000) def test_get_next_gid_second(self): Group.objects.create(gid=10010, groupname='test') self.assertEqual(Group.objects.get_next_gid(), 10011) class GroupTest(TestCaseWithCeleryTasks): def test___str__(self): group = Group.objects.create(gid=10000, groupname='test') self.assertEqual(str(group), 'test (10000)') def test_save(self): group = Group(gid=10000, groupname='test') self.assertIs(group.save(), group) taskres = GroupTaskResult.objects.all() self.assertEqual(len(taskres), 1) self.assertEqual(taskres[0].group, group) self.assertEqual(taskres[0].task_name, 'create_ldap_group') def test_delete(self): group = Group.objects.create(gid=10000, groupname='test') self.assertEqual(len(Group.objects.all()), 1) self.assertEqual(len(GroupTaskResult.objects.all()), 1) group.delete() self.assertEqual(len(Group.objects.all()), 0) self.assertEqual(len(GroupTaskResult.objects.all()), 0) taskres = DeleteTaskResult.objects.all() self.assertEqual(len(taskres), 1) self.assertEqual(taskres[0].task_name, 'delete_ldap_group_if_empty') self.assertEqual(taskres[0].modeltype, 'group') self.assertEqual(taskres[0].modelname, 'test') class ShadowManagerTest(TestCaseWithCeleryTasks): def test_create_shadow(self): user = User( username='test', uid=1000, group=Group(gid=1000, groupname='test'), homedir='/home/test', shell='/bin/fooshell') shadow = Shadow.objects.create_shadow(user, 'test') self.assertTrue(sha512_crypt.verify('test', shadow.passwd)) self.assertEqual(shadow.changedays, (timezone.now().date() - date(1970, 1, 1)).days) self.assertEqual(shadow.user, user) self.assertEqual(shadow.minage, 0) self.assertIsNone(shadow.maxage) self.assertEqual(shadow.gracedays, 7) self.assertEqual(shadow.inactdays, 30) self.assertIsNone(shadow.expiredays) class ShadowTest(TestCaseWithCeleryTasks): def test___str__(self): group = Group.objects.create( groupname='test', gid=1000) user = User.objects.create( username='test', uid=1000, group=group, homedir='/home/test', shell='/bin/bash') shadow = Shadow(user=user) self.assertEqual(str(shadow), 'for user test (1000)') def test_set_password(self): group = Group.objects.create( groupname='test', gid=1000) user = User.objects.create( username='test', uid=1000, group=group, homedir='/home/test', shell='/bin/bash') shadow = Shadow(user=user) shadow.set_password('test') self.assertTrue(sha512_crypt.verify('test', shadow.passwd)) TEST_TASK_UUID = '3120f6a8-2665-4fa3-a785-79efd28bfe92' TEST_TASK_NAME = 'test.task' TEST_TASK_RESULT = '4ll y0ur b453 4r3 b3l0ng t0 u5' class TaskResultTest(TestCase): def test__set_result_fields_not_ready(self): mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME) mock.ready.return_value = False tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) self.assertFalse(tr.is_finished) self.assertFalse(tr.is_success) self.assertEqual(tr.state, '') self.assertEqual(tr.result_body, '') def test__set_result_fields_ready(self): mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME, state='SUCCESS', result=TEST_TASK_RESULT) mock.ready.return_value = True tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) self.assertTrue(tr.is_finished) self.assertTrue(tr.is_success) self.assertEqual(tr.state, 'SUCCESS') self.assertEqual(tr.result_body, TEST_TASK_RESULT) def test__set_result_fields_exception(self): mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME, state='FAILURE', result=Exception('Fail')) mock.ready.return_value = True tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) self.assertTrue(tr.is_finished) self.assertFalse(tr.is_success) self.assertEqual(tr.state, 'FAILURE') self.assertEqual(tr.result_body, 'Fail') @patch('osusers.models.AsyncResult') def test_update_taskstatus_unfinished(self, asyncres): mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME) mock.ready.return_value = False tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) self.assertFalse(tr.is_finished) mymock = asyncres(TEST_TASK_UUID) mymock.ready.return_value = True mymock.state = 'SUCCESS' mymock.result = TEST_RESULT tr.update_taskstatus() mymock.ready.assert_called_with() self.assertTrue(tr.is_finished) @patch('osusers.models.AsyncResult') def test_update_taskstatus_finished(self, asyncres): mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME) mock.ready.return_value = True mock.state = 'SUCCESS' mock.result = TEST_RESULT tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) self.assertTrue(tr.is_finished) mymock = asyncres(TEST_TASK_UUID) tr.update_taskstatus() self.assertFalse(mymock.ready.called) self.assertTrue(tr.is_finished) TEST_RESULT = MagicMock() TEST_RESULT.task_id = TEST_TASK_UUID TEST_RESULT.task_name = TEST_TASK_NAME TEST_RESULT.ready.return_value = False class TaskResultManagerTest(TestCase): def test_create(self): tr = DeleteTaskResult.objects.create(TEST_RESULT, TEST_TASK_NAME) self.assertIsInstance(tr, DeleteTaskResult) self.assertEqual(tr.task_uuid, TEST_TASK_UUID) self.assertEqual(tr.task_name, TEST_TASK_NAME) @override_settings( OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test', OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell' ) class UserManagerTest(TestCaseWithCeleryTasks): def _create_group(self): return Group.objects.create(gid=10000, groupname='foo') def test_get_next_uid_first(self): self.assertEqual(User.objects.get_next_uid(), 10000) def test_get_next_uid_second(self): User.objects.create( uid=10010, username='foo', group=self._create_group(), homedir='/home/foo', shell='/bin/fooshell') self.assertEqual(User.objects.get_next_uid(), 10011) def test_get_next_username_first(self): self.assertEqual(User.objects.get_next_username(), 'test01') def test_get_next_username_second(self): User.objects.create( uid=10000, username='test01', group=self._create_group(), homedir='/home/foo', shell='/bin/fooshell') self.assertEqual(User.objects.get_next_username(), 'test02') def test_get_next_username_gaps(self): group = self._create_group() User.objects.create( uid=10000, username='test01', group=group, homedir='/home/foo', shell='/bin/fooshell') User.objects.create( uid=10002, username='test03', group=group, homedir='/home/foo', shell='/bin/fooshell') self.assertEqual(User.objects.get_next_username(), 'test02') def test_create_user_first(self): user = User.objects.create_user() self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.groupname, 'test01') self.assertEqual(user.username, 'test01') self.assertEqual(user.homedir, '/home/test01') self.assertEqual(user.shell, '/bin/fooshell') self.assertIsNotNone(user.shadow) def test_create_user_tasks(self): user = User.objects.create_user() gtaskres = GroupTaskResult.objects.all() self.assertEqual(len(gtaskres), 1) self.assertEqual(gtaskres[0].task_name, 'create_ldap_group') self.assertEqual(gtaskres[0].group, user.group) def test_create_user_second(self): User.objects.create_user() user = User.objects.create_user() self.assertIsInstance(user, User) self.assertEqual(user.uid, 10001) self.assertEqual(user.group.gid, 10001) self.assertEqual(user.group.groupname, 'test02') self.assertEqual(user.username, 'test02') self.assertEqual(user.homedir, '/home/test02') self.assertEqual(user.shell, '/bin/fooshell') self.assertIsNotNone(user.shadow) self.assertEqual(len(User.objects.all()), 2) def test_create_user_known_password(self): user = User.objects.create_user(password='foobar') self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.groupname, 'test01') self.assertEqual(user.username, 'test01') self.assertEqual(user.homedir, '/home/test01') self.assertEqual(user.shell, '/bin/fooshell') self.assertIsNotNone(user.shadow) self.assertTrue(sha512_crypt.verify('foobar', user.shadow.passwd)) def test_create_user_predefined_username(self): user = User.objects.create_user(username='tester') self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.groupname, 'tester') self.assertEqual(user.username, 'tester') self.assertEqual(user.homedir, '/home/tester') self.assertEqual(user.shell, '/bin/fooshell') self.assertIsNotNone(user.shadow) def test_create_user_commit(self): user = User.objects.create_user(commit=True) self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.groupname, 'test01') self.assertEqual(user.username, 'test01') self.assertEqual(user.homedir, '/home/test01') self.assertEqual(user.shell, '/bin/fooshell') self.assertIsNotNone(user.shadow) @override_settings( OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test', OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell' ) class UserTest(TestCaseWithCeleryTasks): def test___str__(self): user = User.objects.create_user() self.assertEqual(str(user), 'test01 (10000)') def test_set_password(self): user = User.objects.create_user() self.assertFalse(sha512_crypt.verify('test', user.shadow.passwd)) UserTaskResult.objects.all().delete() user.set_password('test') self.assertTrue(sha512_crypt.verify('test', user.shadow.passwd)) taskres = UserTaskResult.objects.all() self.assertEqual(len(taskres), 1) self.assertEqual(taskres[0].user, user) self.assertEqual(taskres[0].task_name, 'create_ldap_user') def test_save(self): user = User.objects.create_user() UserTaskResult.objects.all().delete() user.save() taskres = UserTaskResult.objects.all() self.assertEqual(len(taskres), 1) self.assertEqual(taskres[0].user, user) self.assertEqual(taskres[0].task_name, 'create_ldap_user') def test_delete_only_user(self): user = User.objects.create_user() user.delete() taskres = DeleteTaskResult.objects.all() self.assertEqual(len(taskres), 2) self.assertIn('delete_ldap_user', [r.task_name for r in taskres]) self.assertIn('delete_ldap_group_if_empty', [r.task_name for r in taskres]) self.assertEqual(len(User.objects.all()), 0) def test_delete_additional_groups(self): group1 = Group.objects.create(gid=2000, groupname='group1') group2 = Group.objects.create(gid=2001, groupname='group2') user = User.objects.create_user() for group in [group1, group2]: user.additionalgroup_set.add( AdditionalGroup.objects.create(user=user, group=group)) user.delete() taskres = DeleteTaskResult.objects.all() self.assertEqual(len(taskres), 4) tasknames = [t.task_name for t in taskres] self.assertEqual(tasknames.count('remove_ldap_user_from_group'), 2) self.assertEqual(tasknames.count('delete_ldap_user'), 1) self.assertEqual(tasknames.count('delete_ldap_group_if_empty'), 1) self.assertEqual(len(User.objects.all()), 0) self.assertEqual(len(AdditionalGroup.objects.all()), 0)