from datetime import date from django.conf import settings from django.core.exceptions import ValidationError from django.test import TestCase from django.test.utils import override_settings from django.utils import timezone from django.contrib.auth import get_user_model from passlib.hash import sha512_crypt from osusers.models import ( CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL, AdditionalGroup, Group, Shadow, SshPublicKey, User, ) from taskresults.models import TaskResult EXAMPLE_KEY_1_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ---- Comment: "1024-bit RSA, converted from OpenSSH by me@example.com" x-command: /home/me/bin/lock-in-guest.sh AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ 5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE= ---- END SSH2 PUBLIC KEY ----""" EXAMPLE_KEY_2_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ---- Comment: This is my public key for use on \ servers which I don't like. AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5 sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV ---- END SSH2 PUBLIC KEY ----""" EXAMPLE_KEY_3_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ---- Comment: DSA Public Key for use with MyIsp AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5 sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV ---- END SSH2 PUBLIC KEY ----""" EXAMPLE_KEY_4_OPENSSH = "".join(( "ssh-rsa ", "AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb", "YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ", "5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE=" )) EXAMPLE_KEY_5_RFC4716_MULTILINE = """---- BEGIN SSH2 PUBLIC KEY ---- Comment: DSA Public Key \\ for use with \\ MyIsp AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5 sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV ---- END SSH2 PUBLIC KEY ----""" EXAMPLE_KEY_6_RFC4716_EMPTY_LINE = """---- BEGIN SSH2 PUBLIC KEY ---- Comment: DSA Public Key for use with MyIsp AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5 sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV ---- END SSH2 PUBLIC KEY ----""" Customer = get_user_model() @override_settings( CELERY_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND='memory', BROKER_BACKEND='memory' ) class TestCaseWithCeleryTasks(TestCase): pass class AdditionalGroupTest(TestCaseWithCeleryTasks): def setUp(self): customer = Customer.objects.create(username='test') self.group1 = Group.objects.create(groupname='test1', gid=1000) self.user = User.objects.create( customer=customer, username='test', uid=1000, group=self.group1, homedir='/home/test', shell='/bin/bash') def test_clean_primary_group(self): testsubj = AdditionalGroup(user=self.user, group=self.group1) with self.assertRaises(ValidationError) as cm: testsubj.clean() self.assertEqual( cm.exception.message, CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL) def test_clean_other_group(self): group2 = Group.objects.create(groupname='test2', gid=1001) testsubj = AdditionalGroup(user=self.user, group=group2) testsubj.clean() def test_save(self): group2 = Group.objects.create(groupname='test2', gid=1001) addgroup = AdditionalGroup(user=self.user, group=group2) addgroup.save() taskres = TaskResult.objects.all() self.assertTrue(len(taskres), 4) creators = [r.creator for r in taskres] for tcount, tcreator in [ (2, 'handle_group_created'), (1, 'handle_user_created'), (1, 'handle_user_added_to_group')]: self.assertEqual(creators.count(tcreator), tcount) def test_delete(self): group2 = Group.objects.create(groupname='test2', gid=1001) addgroup = AdditionalGroup.objects.create(user=self.user, group=group2) addgroup.delete() self.assertEqual(len(AdditionalGroup.objects.all()), 0) def test___str__(self): group2 = Group.objects.create(groupname='test2', gid=1001) addgroup = AdditionalGroup.objects.create(user=self.user, group=group2) self.assertEqual(str(addgroup), 'test (1000) in test2 (1001)') @override_settings(OSUSER_MINGID=10000) class GroupManagerTest(TestCaseWithCeleryTasks): def test_get_next_gid_first(self): self.assertEqual(Group.objects.get_next_gid(), 10000) def test_get_next_gid_second(self): Group.objects.create(gid=10010, groupname='test') self.assertEqual(Group.objects.get_next_gid(), 10011) class GroupTest(TestCaseWithCeleryTasks): def test___str__(self): group = Group.objects.create(gid=10000, groupname='test') self.assertEqual(str(group), 'test (10000)') def test_save(self): group = Group(gid=10000, groupname='test') self.assertIs(group.save(), group) def test_delete(self): group = Group.objects.create(gid=10000, groupname='test') self.assertEqual(len(Group.objects.all()), 1) group.delete() self.assertEqual(len(Group.objects.all()), 0) self.assertEqual(len(TaskResult.objects.all()), 2) tr = TaskResult.objects.first() self.assertEqual(tr.creator, 'handle_group_created') class ShadowManagerTest(TestCaseWithCeleryTasks): def setUp(self): self.customer = Customer.objects.create(username='test') super(ShadowManagerTest, self).setUp() def test_create_shadow(self): user = User( customer=self.customer, username='test', uid=1000, group=Group(gid=1000, groupname='test'), homedir='/home/test', shell='/bin/fooshell') shadow = Shadow.objects.create_shadow(user, 'test') self.assertTrue(sha512_crypt.verify('test', shadow.passwd)) self.assertEqual(shadow.changedays, (timezone.now().date() - date(1970, 1, 1)).days) self.assertEqual(shadow.user, user) self.assertEqual(shadow.minage, 0) self.assertIsNone(shadow.maxage) self.assertEqual(shadow.gracedays, 7) self.assertEqual(shadow.inactdays, 30) self.assertIsNone(shadow.expiredays) class ShadowTest(TestCaseWithCeleryTasks): def setUp(self): self.customer = Customer.objects.create(username='test') super(ShadowTest, self).setUp() def test___str__(self): group = Group.objects.create( groupname='test', gid=1000) user = User.objects.create( customer=self.customer, username='test', uid=1000, group=group, homedir='/home/test', shell='/bin/bash') shadow = Shadow(user=user) self.assertEqual(str(shadow), 'for user test (1000)') def test_set_password(self): group = Group.objects.create( groupname='test', gid=1000) user = User.objects.create( customer=self.customer, username='test', uid=1000, group=group, homedir='/home/test', shell='/bin/bash') shadow = Shadow(user=user) shadow.set_password('test') self.assertTrue(sha512_crypt.verify('test', shadow.passwd)) @override_settings( OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test', OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell' ) class UserManagerTest(TestCaseWithCeleryTasks): def _create_group(self): return Group.objects.create(gid=10000, groupname='foo') def setUp(self): self.customer = Customer.objects.create(username='test') super(UserManagerTest, self).setUp() def test_get_next_uid_first(self): self.assertEqual(User.objects.get_next_uid(), 10000) def test_get_next_uid_second(self): User.objects.create( customer=self.customer, uid=10010, username='foo', group=self._create_group(), homedir='/home/foo', shell='/bin/fooshell') self.assertEqual(User.objects.get_next_uid(), 10011) def test_get_next_username_first(self): self.assertEqual(User.objects.get_next_username(), 'test01') def test_get_next_username_second(self): User.objects.create( customer=self.customer, uid=10000, username='test01', group=self._create_group(), homedir='/home/foo', shell='/bin/fooshell') self.assertEqual(User.objects.get_next_username(), 'test02') def test_get_next_username_gaps(self): group = self._create_group() User.objects.create( customer=self.customer, uid=10000, username='test01', group=group, homedir='/home/foo', shell='/bin/fooshell') User.objects.create( customer=self.customer, uid=10002, username='test03', group=group, homedir='/home/foo', shell='/bin/fooshell') self.assertEqual(User.objects.get_next_username(), 'test02') def test_create_user_first(self): user = User.objects.create_user(customer=self.customer) self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.groupname, 'test01') self.assertEqual(user.username, 'test01') self.assertEqual(user.homedir, '/home/test01') self.assertEqual(user.shell, '/bin/fooshell') self.assertIsNotNone(user.shadow) def test_create_user_tasks(self): User.objects.create_user(customer=self.customer) taskres = TaskResult.objects.all() self.assertEqual(len(taskres), 3) creators = [r.creator for r in taskres] for creator in [ 'handle_group_created', 'handle_user_created', 'handle_user_password_set']: self.assertIn(creator, creators) def test_create_user_second(self): User.objects.create_user(customer=self.customer) user = User.objects.create_user(customer=self.customer) self.assertIsInstance(user, User) self.assertEqual(user.uid, 10001) self.assertEqual(user.group.gid, 10001) self.assertEqual(user.group.groupname, 'test02') self.assertEqual(user.username, 'test02') self.assertEqual(user.homedir, '/home/test02') self.assertEqual(user.shell, '/bin/fooshell') self.assertIsNotNone(user.shadow) self.assertEqual(len(User.objects.all()), 2) def test_create_user_known_password(self): user = User.objects.create_user( customer=self.customer, password='foobar') self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.groupname, 'test01') self.assertEqual(user.username, 'test01') self.assertEqual(user.homedir, '/home/test01') self.assertEqual(user.shell, '/bin/fooshell') self.assertIsNotNone(user.shadow) self.assertTrue(sha512_crypt.verify('foobar', user.shadow.passwd)) def test_create_user_predefined_username(self): user = User.objects.create_user( customer=self.customer, username='tester') self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.groupname, 'tester') self.assertEqual(user.username, 'tester') self.assertEqual(user.homedir, '/home/tester') self.assertEqual(user.shell, '/bin/fooshell') self.assertIsNotNone(user.shadow) def test_create_user_commit(self): user = User.objects.create_user(customer=self.customer, commit=True) self.assertIsInstance(user, User) self.assertEqual(user.uid, 10000) self.assertEqual(user.group.gid, 10000) self.assertEqual(user.group.groupname, 'test01') self.assertEqual(user.username, 'test01') self.assertEqual(user.homedir, '/home/test01') self.assertEqual(user.shell, '/bin/fooshell') self.assertIsNotNone(user.shadow) @override_settings( OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test', OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell' ) class UserTest(TestCaseWithCeleryTasks): def setUp(self): self.customer = Customer.objects.create_user('test') super(UserTest, self).setUp() def test___str__(self): user = User.objects.create_user(self.customer) self.assertEqual(str(user), 'test01 (10000)') def test_set_password(self): user = User.objects.create_user(self.customer) self.assertFalse(sha512_crypt.verify('test', user.shadow.passwd)) user.set_password('test') self.assertTrue(sha512_crypt.verify('test', user.shadow.passwd)) def test_save(self): user = User.objects.create_user(self.customer) user.save() taskres = TaskResult.objects.all() self.assertEqual(len(taskres), 3) creators = [r.creator for r in taskres] for task in [ 'handle_group_created', 'handle_user_created', 'handle_user_password_set']: self.assertIn(task, creators) def test_delete_only_user(self): user = User.objects.create_user(self.customer) user.delete() taskres = TaskResult.objects.all() self.assertEqual(len(taskres), 6) creators = [r.creator for r in taskres] for task in [ 'handle_group_created', 'handle_user_created', 'handle_user_password_set', 'handle_user_deleted', 'handle_group_deleted', 'handle_user_deleted']: self.assertIn(task, creators) self.assertEqual(len(User.objects.all()), 0) def test_delete_additional_groups(self): group1 = Group.objects.create(gid=2000, groupname='group1') group2 = Group.objects.create(gid=2001, groupname='group2') user = User.objects.create_user(self.customer) for group in [group1, group2]: user.additionalgroup_set.add( AdditionalGroup.objects.create(user=user, group=group)) TaskResult.objects.all().delete() user.delete() taskres = TaskResult.objects.all() self.assertEqual(len(taskres), 5) creators = [t.creator for t in taskres] for tcount, tcreator in [ (2, 'handle_user_removed_from_group'), (2, 'handle_user_deleted'), (1, 'handle_group_deleted')]: self.assertEqual(creators.count(tcreator), tcount) self.assertEqual(len(User.objects.all()), 0) self.assertEqual(len(AdditionalGroup.objects.all()), 0) def test_is_sftp_user(self): user = User.objects.create_user(self.customer) self.assertFalse(user.is_sftp_user()) sftp_group = Group.objects.create( gid=2000, groupname=settings.OSUSER_SFTP_GROUP) user.additionalgroup_set.add( AdditionalGroup.objects.create(user=user, group=sftp_group)) self.assertTrue(user.is_sftp_user()) class SshPublicKeyManagerTest(TestCaseWithCeleryTasks): def test_parse_keytext_rfc4716_1(self): res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_1_RFC4716) self.assertEqual(len(res), 3) self.assertGreater(len(res[1]), 40) self.assertEqual(res[0], 'ssh-rsa') self.assertEqual( res[2], '"1024-bit RSA, converted from OpenSSH by me@example.com"') def test_parse_keytext_rfc4716_2(self): res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_2_RFC4716) self.assertEqual(len(res), 3) self.assertEqual(res[0], 'ssh-dss') self.assertGreater(len(res[1]), 40) self.assertEqual( res[2], "This is my public key for use on servers which I don't like.") def test_parse_keytext_rfc4716_3(self): res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_3_RFC4716) self.assertEqual(len(res), 3) self.assertEqual(res[0], 'ssh-dss') self.assertGreater(len(res[1]), 40) self.assertEqual(res[2], "DSA Public Key for use with MyIsp") def test_parse_keytext_openssh(self): res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_4_OPENSSH) self.assertEquals(len(res), 3) self.assertEqual(res[0], 'ssh-rsa') self.assertGreater(len(res[1]), 40) self.assertEqual(res[2], '') def test_parse_keytext_invalid(self): with self.assertRaises(ValueError): SshPublicKey.objects.parse_keytext("\r\n".join(["xx"]*10)) def test_parse_keytext_empty_line(self): res = SshPublicKey.objects.parse_keytext( EXAMPLE_KEY_6_RFC4716_EMPTY_LINE) self.assertEqual(len(res), 3) self.assertEqual(res[0], 'ssh-dss') self.assertGreater(len(res[1]), 40) self.assertEqual(res[2], "DSA Public Key for use with MyIsp") def test_parse_keytext_multiline_comment(self): res = SshPublicKey.objects.parse_keytext( EXAMPLE_KEY_5_RFC4716_MULTILINE) self.assertEqual(len(res), 3) self.assertEqual(res[0], 'ssh-dss') self.assertGreater(len(res[1]), 40) self.assertEqual(res[2], "DSA Public Key for use with MyIsp") def test_create_ssh_public_key(self): customer = Customer.objects.create_user('test') user = User.objects.create_user(customer) key = SshPublicKey.objects.create_ssh_public_key( user, EXAMPLE_KEY_4_OPENSSH) self.assertIsInstance(key, SshPublicKey) self.assertEqual(key.user, user) self.assertEqual(key.algorithm, 'ssh-rsa') self.assertEqual(key.data, EXAMPLE_KEY_4_OPENSSH.split()[1]) self.assertEqual(key.comment, '') class SshPublicKeyTest(TestCaseWithCeleryTasks): def setUp(self): super(SshPublicKeyTest, self).setUp() customer = Customer.objects.create_user('test') self.user = User.objects.create_user(customer) TaskResult.objects.all().delete() def test__str__rfc4716(self): res = SshPublicKey.objects.create_ssh_public_key( self.user, EXAMPLE_KEY_3_RFC4716) self.assertEqual( str(res), 'ssh-dss AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxae' 'hvx5wOJ0rzZdzoSOXxbETW6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7St' 'xyltHnXF1YLfKD1G4T6JYrdHYI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3g' 'Jq2e7Yisk/gF+1VAAAAFQDb8D5cvwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4' 'KLYk3IwRbXblwXdkPggA4pfdtW9vGfJ0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/F' 'XPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAAvioUPkmdMc0zuWoSOEsSNhVDtX3WdvVc' 'GcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACBAN7CY+KKv1gHpRzFwdQm7HK9bb1LA' 'o2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HSn24VYtYtsMu74qXviYjziVucWK' 'jjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5sY29ouezv4Xz2PuMch5VGPP' '+CDqzCM4loWgV DSA Public Key for use with MyIsp') def test__str__openssh(self): res = SshPublicKey.objects.create_ssh_public_key( self.user, EXAMPLE_KEY_4_OPENSSH) self.assertEqual(str(res), EXAMPLE_KEY_4_OPENSSH) def test_call_tasks_on_save(self): SshPublicKey.objects.create_ssh_public_key( self.user, EXAMPLE_KEY_4_OPENSSH) taskresults = TaskResult.objects.all() self.assertEqual(len(taskresults), 1) self.assertEqual( taskresults[0].creator, 'handle_ssh_keys_changed') def test_call_tasks_on_delete(self): key = SshPublicKey.objects.create_ssh_public_key( self.user, EXAMPLE_KEY_4_OPENSSH) TaskResult.objects.all().delete() key.delete() taskresults = TaskResult.objects.all() self.assertEqual(len(taskresults), 1) self.assertEqual( taskresults[0].creator, 'handle_ssh_keys_changed')