Refactoring of ldaptasks
All LDAP tasks return a dictionary now to allow chaining with other tasks, the new task delete_ldap_user_chained has been added to allow user deletion after other Celery tasks.
This commit is contained in:
		
							parent
							
								
									54875619aa
								
							
						
					
					
						commit
						2ff7dd8902
					
				
					 2 changed files with 208 additions and 86 deletions
				
			
		|  | @ -7,6 +7,7 @@ This module defines `Celery`_ tasks to manage LDAP entities. | |||
| 
 | ||||
| from __future__ import absolute_import | ||||
| 
 | ||||
| from copy import deepcopy | ||||
| from django.core.exceptions import ObjectDoesNotExist | ||||
| from celery import shared_task | ||||
| from celery.utils.log import get_task_logger | ||||
|  | @ -22,7 +23,7 @@ _LOGGER = get_task_logger(__name__) | |||
| 
 | ||||
| 
 | ||||
| @shared_task | ||||
| def create_ldap_group(groupname, gid, descr): | ||||
| def create_ldap_group(groupname, gid, description): | ||||
|     """ | ||||
|     This task creates an :py:class:`LDAP group <ldapentities.models.LdapGroup>` | ||||
|     if it does not exist yet. | ||||
|  | @ -32,9 +33,12 @@ def create_ldap_group(groupname, gid, descr): | |||
| 
 | ||||
|     :param str groupname: the group name | ||||
|     :param int gid: the group id | ||||
|     :param str descr: description text for the group | ||||
|     :return: the distinguished name of the group | ||||
|     :rtype: str | ||||
|     :param str description: description text for the group | ||||
|     :return: dictionary containing groupname, gid, description and | ||||
|         :py:const:`group_dn` set to the distinguished name of the newly created | ||||
|         or existing LDAP group | ||||
|     :rtype: dict | ||||
| 
 | ||||
|     """ | ||||
|     try: | ||||
|         ldapgroup = LdapGroup.objects.get(name=groupname) | ||||
|  | @ -45,10 +49,15 @@ def create_ldap_group(groupname, gid, descr): | |||
|     except LdapGroup.DoesNotExist: | ||||
|         ldapgroup = LdapGroup(gid=gid, name=groupname) | ||||
|         _LOGGER.info('created LDAP group %s', ldapgroup.dn) | ||||
|     ldapgroup.description = descr | ||||
|     ldapgroup.description = description | ||||
|     ldapgroup.save() | ||||
|     _LOGGER.info('set description of LDAP group %s', ldapgroup.dn) | ||||
|     return ldapgroup.dn | ||||
|     return { | ||||
|         'groupname': groupname, | ||||
|         'gid': gid, | ||||
|         'description': description, | ||||
|         'group_dn': ldapgroup.dn, | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| @shared_task | ||||
|  | @ -71,8 +80,10 @@ def create_ldap_user(username, uid, gid, gecos, homedir, shell, password): | |||
|         is passed the password is not touched | ||||
|     :raises celery.exceptions.Reject: if the specified primary group does not | ||||
|         exist | ||||
|     :return: the distinguished name of the user | ||||
|     :rtype: str | ||||
|     :return: dictionary containing username, uid, gid, gecos, homedir, shell, | ||||
|         password and :py:const:`user_dn` set to the distinguished name of the | ||||
|         newly created or existing LDAP user | ||||
|     :rtype: dict | ||||
| 
 | ||||
|     """ | ||||
|     try: | ||||
|  | @ -109,30 +120,42 @@ def create_ldap_user(username, uid, gid, gecos, homedir, shell, password): | |||
|         _LOGGER.info( | ||||
|             'LDAP user %s has been added to LDAP group %s', | ||||
|             ldapuser.dn, ldapgroup.dn) | ||||
|     return ldapuser.dn | ||||
|     return { | ||||
|         'username': username, | ||||
|         'uid': uid, | ||||
|         'gid': gid, | ||||
|         'gecos': gecos, | ||||
|         'homedir': homedir, | ||||
|         'shell': shell, | ||||
|         'user_dn': ldapuser.dn, | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| @shared_task(bind=True) | ||||
| def set_ldap_user_password(self, username, password): | ||||
| @shared_task | ||||
| def set_ldap_user_password(username, password): | ||||
|     """ | ||||
|     This task sets the password of an existing :py:class:`LDAP user | ||||
|     <ldapentities.models.LdapUser>`. | ||||
| 
 | ||||
|     :param str username: the user name | ||||
|     :param str password: teh clear text password | ||||
|     :return: :py:const:`True` if the password has been set, :py:const:`False` | ||||
|         if the user does not exist. | ||||
|     :return: dictionary containing the username and a flag | ||||
|         :py:const:`password_set` that is set to  :py:const:`True` if the | ||||
|         password has been set, :py:const:`False` if the user does not exist. | ||||
|     :rtype: dict | ||||
| 
 | ||||
|     """ | ||||
|     retval = {'username': username, 'password_set': False} | ||||
|     try: | ||||
|         ldapuser = LdapUser.objects.get(username=username) | ||||
|     except LdapUser.DoesNotExist: | ||||
|         _LOGGER.info('there is no LDAP user with username %s', username) | ||||
|         return False | ||||
|         return retval | ||||
|     ldapuser.set_password(password) | ||||
|     ldapuser.save() | ||||
|     _LOGGER.info("set new password for LDAP user %s", ldapuser.dn) | ||||
|     return True | ||||
|     retval['password_set'] = True | ||||
|     return retval | ||||
| 
 | ||||
| 
 | ||||
| @shared_task(bind=True) | ||||
|  | @ -146,10 +169,13 @@ def add_ldap_user_to_group(self, username, groupname): | |||
|     :param str groupname: the group name | ||||
|     :raises celery.exceptions.Retry: if the user does not exist yet, | ||||
|         :py:func:`create_ldap_user` should be called before | ||||
|     :return: True if the user has been added to the group otherwise False | ||||
|     :rtype: boolean | ||||
|     :return: dictionary containing the username, groupname and a flag | ||||
|         :py:const`added` that is as a :py:const:`True` if the user has been | ||||
|         added to the group otherwise to :py:const:`False` | ||||
|     :rtype: dict | ||||
| 
 | ||||
|     """ | ||||
|     retval = {'username': username, 'groupname': groupname, 'added': False} | ||||
|     try: | ||||
|         ldapgroup = LdapGroup.objects.get(name=groupname) | ||||
|         ldapuser = LdapUser.objects.get(username=username) | ||||
|  | @ -169,8 +195,8 @@ def add_ldap_user_to_group(self, username, groupname): | |||
|             _LOGGER.info( | ||||
|                 'LDAP user %s is already in LDAP group %s', | ||||
|                 ldapuser.username, ldapgroup.dn) | ||||
|         return True | ||||
|     return False | ||||
|         retval['added'] = True | ||||
|     return retval | ||||
| 
 | ||||
| 
 | ||||
| @shared_task | ||||
|  | @ -180,10 +206,13 @@ def remove_ldap_user_from_group(username, groupname): | |||
| 
 | ||||
|     :param str username: the user name | ||||
|     :param str groupname: the group name | ||||
|     :return: True if the user has been removed, False otherwise | ||||
|     :rtype: boolean | ||||
|     :return: dictionary containing the input parameters and a flag | ||||
|         :py:const:`removed` that is set to :py:const:`True` if the user has | ||||
|         been removed, False otherwise | ||||
|     :rtype: dict | ||||
| 
 | ||||
|     """ | ||||
|     retval = {'username': username, 'groupname': groupname, 'removed': False} | ||||
|     try: | ||||
|         ldapgroup = LdapGroup.objects.get(name=groupname) | ||||
|         ldapuser = LdapUser.objects.get(username=username) | ||||
|  | @ -198,24 +227,35 @@ def remove_ldap_user_from_group(username, groupname): | |||
|                 'removed LDAP user %s from LDAP group %s', | ||||
|                 ldapuser.dn, ldapgroup.dn) | ||||
|             ldapgroup.save() | ||||
|             return True | ||||
|             retval['removed'] = True | ||||
|         else: | ||||
|             _LOGGER.info( | ||||
|                 'LDAP user %s is not a member of LDAP group %s', | ||||
|                 ldapuser.dn, ldapgroup.dn) | ||||
|     return False | ||||
|     return retval | ||||
| 
 | ||||
| 
 | ||||
| @shared_task | ||||
| def delete_ldap_user(username): | ||||
| def delete_ldap_user(username, *args, **kwargs): | ||||
|     """ | ||||
|     This task deletes the given user. | ||||
| 
 | ||||
|     :param str username: the user name | ||||
|     :return: True if the user has been deleted, False otherwise | ||||
|     :rtype: boolean | ||||
|     :return: dictionary containing the username and a flag :py:const:`deleted` | ||||
|         that is set to :py:const:`True` if the user has been deleted and is set | ||||
|         to :py:const:`False` otherwise | ||||
|     :rtype: dict | ||||
| 
 | ||||
|     .. note:: | ||||
| 
 | ||||
|         This variant can only be used at the beginning of a Celery task chain | ||||
|         or as a standalone task. | ||||
| 
 | ||||
|         Use :py:func:`ldaptasks.tasks.delete_ldap_user_chained` at other | ||||
|         positions in the task chain | ||||
| 
 | ||||
|     """ | ||||
|     retval = {'username': username, 'deleted': False} | ||||
|     try: | ||||
|         ldapuser = LdapUser.objects.get(username=username) | ||||
|     except LdapUser.DoesNotExist: | ||||
|  | @ -237,20 +277,43 @@ def delete_ldap_user(username): | |||
|         userdn = ldapuser.dn | ||||
|         ldapuser.delete() | ||||
|         _LOGGER.info('deleted LDAP user %s', userdn) | ||||
|         return True | ||||
|     return False | ||||
|         retval['deleted'] = True | ||||
|     return retval | ||||
| 
 | ||||
| 
 | ||||
| @shared_task | ||||
| def delete_ldap_user_chained(previous_result, *args, **kwargs): | ||||
|     """ | ||||
|     This task deletes the given user. | ||||
| 
 | ||||
|     :param dict previous_result: a dictionary describing the result of the | ||||
|         previous step in the Celery task chain. This dictionary must contain a | ||||
|         :py:const:`username` key | ||||
|     :return: a copy of the :py:obj:`previous_result` dictionary with a new | ||||
|         :py:const:`deleted` key set to :py:const:`True` if the user has been | ||||
|         deleted and set to :py:const:`False` otherwise | ||||
|     :rtype: dict | ||||
| 
 | ||||
|     """ | ||||
|     username = previous_result['username'] | ||||
|     retval = deepcopy(previous_result) | ||||
|     retval.update(delete_ldap_user(username)) | ||||
|     return retval | ||||
| 
 | ||||
| 
 | ||||
| @shared_task | ||||
| def delete_ldap_group_if_empty(groupname): | ||||
|     """ | ||||
|     This task deletes the given group. | ||||
|     This task deletes the given group if it is empty. | ||||
| 
 | ||||
|     :param str groupname: the group name | ||||
|     :return: True if the user has been deleted, False otherwise | ||||
|     :rtype: boolean | ||||
|     :return: dictionary that contains the groupname and a flag | ||||
|         :py:const:`deleted` that is set to :py:const:`True` if the group has | ||||
|         been deleted and is set to :py:const:`False` otherwise | ||||
|     :rtype: dict | ||||
| 
 | ||||
|     """ | ||||
|     retval = {'groupname': groupname, 'deleted': False} | ||||
|     try: | ||||
|         ldapgroup = LdapGroup.objects.get(name=groupname) | ||||
|     except LdapGroup.DoesNotExist: | ||||
|  | @ -261,24 +324,27 @@ def delete_ldap_group_if_empty(groupname): | |||
|             ldapgroup.delete() | ||||
|             _LOGGER.info( | ||||
|                 'deleted LDAP group %s', groupdn) | ||||
|             return True | ||||
|             retval['deleted'] = True | ||||
|         else: | ||||
|             _LOGGER.info( | ||||
|                 'LDAP group %s has not been deleted. It still has %d members', | ||||
|                 ldapgroup.dn, len(ldapgroup.members)) | ||||
|     return False | ||||
|     return retval | ||||
| 
 | ||||
| 
 | ||||
| @shared_task | ||||
| def delete_ldap_group(groupname): | ||||
|     """ | ||||
|     This taks deletes the given group. | ||||
|     This task deletes the given group. | ||||
| 
 | ||||
|     :param str groupname: the group name | ||||
|     :return: True if the user has been deleted, False otherwise | ||||
|     :rtype: boolean | ||||
|     :return: dictionary that contains the groupname and a flag | ||||
|         :py:const:`deleted` that is set to :py:const:`True` if the group has | ||||
|         been deleted and is set to :py:const:`False` otherwise | ||||
|     :rtype: dict | ||||
| 
 | ||||
|     """ | ||||
|     retval = {'groupname': groupname, 'deleted': False} | ||||
|     try: | ||||
|         ldapgroup = LdapGroup.objects.get(name=groupname) | ||||
|     except LdapGroup.DoesNotExist: | ||||
|  | @ -287,5 +353,5 @@ def delete_ldap_group(groupname): | |||
|         groupdn = ldapgroup.dn | ||||
|         ldapgroup.delete() | ||||
|         _LOGGER.info('deleted LDAP group %s', groupdn) | ||||
|         return True | ||||
|     return False | ||||
|         retval['deleted'] = True | ||||
|     return retval | ||||
|  |  | |||
|  | @ -18,6 +18,7 @@ from ldaptasks.tasks import ( | |||
|     delete_ldap_group, | ||||
|     delete_ldap_group_if_empty, | ||||
|     delete_ldap_user, | ||||
|     delete_ldap_user_chained, | ||||
|     remove_ldap_user_from_group, | ||||
|     set_ldap_user_password, | ||||
| ) | ||||
|  | @ -73,33 +74,49 @@ class LdapTaskTestCase(TestCase): | |||
|         del self.ldapobj | ||||
| 
 | ||||
|     def test_create_ldap_group(self): | ||||
|         dn = create_ldap_group('test', 5000, 'test group') | ||||
|         self.assertEqual('cn=test,%s' % settings.GROUP_BASE_DN, dn) | ||||
|         result = create_ldap_group('test', 5000, 'test group') | ||||
|         self.assertEqual({ | ||||
|             'groupname': 'test', 'gid': 5000, 'description': 'test group', | ||||
|             'group_dn': 'cn=test,%s' % settings.GROUP_BASE_DN | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'add_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_create_ldap_group_existing(self): | ||||
|         dn = create_ldap_group('existing', 4711, 'existing test group') | ||||
|         self.assertEqual('cn=existing,%s' % settings.GROUP_BASE_DN, dn) | ||||
|         result = create_ldap_group('existing', 4711, 'existing test group') | ||||
|         self.assertEqual({ | ||||
|             'groupname': 'existing', 'gid': 4711, | ||||
|             'description': 'existing test group', | ||||
|             'group_dn': 'cn=existing,%s' % settings.GROUP_BASE_DN | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'search_s', 'search_s', | ||||
|             'search_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_create_ldap_group_existing_modify(self): | ||||
|         dn = create_ldap_group('existing', 4711, 'change existing test group') | ||||
|         self.assertEqual('cn=existing,%s' % settings.GROUP_BASE_DN, dn) | ||||
|         result = create_ldap_group( | ||||
|             'existing', 4711, 'change existing test group') | ||||
|         self.assertEqual({ | ||||
|             'groupname': 'existing', 'gid': 4711, | ||||
|             'description': 'change existing test group', | ||||
|             'group_dn': 'cn=existing,%s' % settings.GROUP_BASE_DN | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'search_s', | ||||
|             'search_s', 'search_s', 'modify_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_create_ldap_user(self): | ||||
|         dn = create_ldap_user( | ||||
|         result = create_ldap_user( | ||||
|             'test', 5000, 4711, 'Test User', '/home/test', '/bin/bash', | ||||
|             'secret') | ||||
|         self.assertEqual('uid=test,%s' % settings.USER_BASE_DN, dn) | ||||
|         self.assertEqual({ | ||||
|             'username': 'test', 'uid': 5000, 'gid': 4711, 'gecos': 'Test User', | ||||
|             'homedir': '/home/test', 'shell': '/bin/bash', | ||||
|             'user_dn': 'uid=test,%s' % settings.USER_BASE_DN | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'search_s', 'search_s', | ||||
|             'add_s', 'search_s', 'search_s', 'modify_s' | ||||
|  | @ -115,44 +132,59 @@ class LdapTaskTestCase(TestCase): | |||
|         ]) | ||||
| 
 | ||||
|     def test_create_ldap_user_no_password(self): | ||||
|         dn = create_ldap_user( | ||||
|         result = create_ldap_user( | ||||
|             'test', 5000, 4711, 'Test User', '/home/test', '/bin/bash', | ||||
|             None) | ||||
|         self.assertEqual('uid=test,%s' % settings.USER_BASE_DN, dn) | ||||
|         self.assertEqual({ | ||||
|             'username': 'test', 'uid': 5000, 'gid': 4711, 'gecos': 'Test User', | ||||
|             'homedir': '/home/test', 'shell': '/bin/bash', | ||||
|             'user_dn': 'uid=test,%s' % settings.USER_BASE_DN | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'search_s', 'search_s', | ||||
|             'add_s', 'search_s', 'search_s', 'modify_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_create_ldap_user_existing(self): | ||||
|         dn = create_ldap_user( | ||||
|         result = create_ldap_user( | ||||
|             'existing', 815, 4711, 'existing test user', '/home/existing', | ||||
|             '/bin/bash', 'secret' | ||||
|         ) | ||||
|         self.assertEqual('uid=existing,%s' % settings.USER_BASE_DN, dn) | ||||
|         self.assertEqual({ | ||||
|             'username': 'existing',  'uid': 815, 'gid': 4711, | ||||
|             'gecos': 'existing test user', 'homedir': '/home/existing', | ||||
|             'shell': '/bin/bash', | ||||
|             'user_dn': u'uid=existing,%s' % settings.USER_BASE_DN | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'search_s', 'search_s', | ||||
|             'search_s', 'search_s', 'search_s', 'modify_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_set_ldap_user_password_existing(self): | ||||
|         res = set_ldap_user_password('existing', 'newpassword') | ||||
|         self.assertTrue(res) | ||||
|         result = set_ldap_user_password('existing', 'newpassword') | ||||
|         self.assertEqual({ | ||||
|             'username': 'existing', 'password_set': True | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'search_s', 'search_s', | ||||
|             'search_s', 'modify_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_set_ldap_user_password_missing(self): | ||||
|         res = set_ldap_user_password('missing', 'newpassword') | ||||
|         self.assertFalse(res) | ||||
|         result = set_ldap_user_password('missing', 'newpassword') | ||||
|         self.assertEqual({ | ||||
|             'username': 'missing', 'password_set': False | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_add_ldap_user_to_group_existing(self): | ||||
|         res = add_ldap_user_to_group('existing', 'existing') | ||||
|         self.assertTrue(res) | ||||
|         result = add_ldap_user_to_group('existing', 'existing') | ||||
|         self.assertEqual({ | ||||
|             'username': 'existing', 'groupname': 'existing', 'added': True | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'search_s', 'search_s', | ||||
|             'search_s' | ||||
|  | @ -160,8 +192,10 @@ class LdapTaskTestCase(TestCase): | |||
| 
 | ||||
|     def test_add_ldap_user_to_group_new_user(self): | ||||
|         create_ldap_group('test', 5000, 'test group') | ||||
|         res = add_ldap_user_to_group('existing', 'test') | ||||
|         self.assertTrue(res) | ||||
|         result = add_ldap_user_to_group('existing', 'test') | ||||
|         self.assertEqual({ | ||||
|             'username': 'existing', 'groupname': 'test', 'added': True | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'add_s', 'search_s', | ||||
|             'search_s', 'search_s', 'search_s', 'search_s', 'search_s', | ||||
|  | @ -169,8 +203,10 @@ class LdapTaskTestCase(TestCase): | |||
|         ]) | ||||
| 
 | ||||
|     def test_add_ldap_user_to_group_no_group(self): | ||||
|         res = add_ldap_user_to_group('existing', 'test') | ||||
|         self.assertFalse(res) | ||||
|         result = add_ldap_user_to_group('existing', 'test') | ||||
|         self.assertEqual({ | ||||
|             'username': 'existing', 'groupname': 'test', 'added': False | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s' | ||||
|         ]) | ||||
|  | @ -183,8 +219,10 @@ class LdapTaskTestCase(TestCase): | |||
|         ]) | ||||
| 
 | ||||
|     def test_remove_ldap_user_from_group_existing(self): | ||||
|         res = remove_ldap_user_from_group('existing', 'existing') | ||||
|         self.assertTrue(res) | ||||
|         result = remove_ldap_user_from_group('existing', 'existing') | ||||
|         self.assertEqual({ | ||||
|             'username': 'existing', 'groupname': 'existing', 'removed': True | ||||
|         }, result) | ||||
|         self.assertNotIn('memberUid', self.ldapobj.directory[ | ||||
|             'cn=existing,' + settings.GROUP_BASE_DN]) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|  | @ -194,30 +232,36 @@ class LdapTaskTestCase(TestCase): | |||
| 
 | ||||
|     def test_remove_ldap_user_from_group_not_in_group(self): | ||||
|         create_ldap_group('test', 5000, 'test group') | ||||
|         res = remove_ldap_user_from_group('existing', 'test') | ||||
|         self.assertFalse(res) | ||||
|         result = remove_ldap_user_from_group('existing', 'test') | ||||
|         self.assertEqual({ | ||||
|             'username': 'existing', 'groupname': 'test', 'removed': False | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'add_s', 'search_s', | ||||
|             'search_s', 'search_s', 'search_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_remove_ldap_user_from_group_no_group(self): | ||||
|         res = remove_ldap_user_from_group('existing', 'test') | ||||
|         self.assertFalse(res) | ||||
|         result = remove_ldap_user_from_group('existing', 'test') | ||||
|         self.assertEqual({ | ||||
|             'username': 'existing', 'groupname': 'test', 'removed': False | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_remove_ldap_user_from_group_no_user(self): | ||||
|         res = remove_ldap_user_from_group('test', 'existing') | ||||
|         self.assertFalse(res) | ||||
|         result = remove_ldap_user_from_group('test', 'existing') | ||||
|         self.assertEqual({ | ||||
|             'username': 'test', 'groupname': 'existing', 'removed': False | ||||
|         }, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'search_s', 'search_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_delete_ldap_user_existing(self): | ||||
|         res = delete_ldap_user('existing') | ||||
|         self.assertTrue(res) | ||||
|         result = delete_ldap_user('existing') | ||||
|         self.assertEqual({'username': 'existing', 'deleted': True}, result) | ||||
|         self.assertNotIn( | ||||
|             'uid=existing,' + settings.USER_BASE_DN, self.ldapobj.directory) | ||||
|         self.assertNotIn('memberUid', self.ldapobj.directory[ | ||||
|  | @ -228,8 +272,8 @@ class LdapTaskTestCase(TestCase): | |||
|         ]) | ||||
| 
 | ||||
|     def test_delete_ldap_user_missing(self): | ||||
|         res = delete_ldap_user('missing') | ||||
|         self.assertFalse(res) | ||||
|         result = delete_ldap_user('missing') | ||||
|         self.assertEqual({'username': 'missing', 'deleted': False}, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s' | ||||
|         ]) | ||||
|  | @ -237,8 +281,8 @@ class LdapTaskTestCase(TestCase): | |||
|     def test_delete_ldap_user_no_group(self): | ||||
|         self.ldapobj.directory[ | ||||
|             'uid=existing,' + settings.USER_BASE_DN]['gidNumber'] = ['5000'] | ||||
|         res = delete_ldap_user('existing') | ||||
|         self.assertTrue(res) | ||||
|         result = delete_ldap_user('existing') | ||||
|         self.assertEqual({'username': 'existing', 'deleted': True}, result) | ||||
|         self.assertNotIn( | ||||
|             'uid=existing,' + settings.USER_BASE_DN, self.ldapobj.directory) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|  | @ -249,8 +293,8 @@ class LdapTaskTestCase(TestCase): | |||
|     def test_delete_ldap_user_not_in_group(self): | ||||
|         del self.ldapobj.directory[ | ||||
|              'cn=existing,' + settings.GROUP_BASE_DN]['memberUid'] | ||||
|         res = delete_ldap_user('existing') | ||||
|         self.assertTrue(res) | ||||
|         result = delete_ldap_user('existing') | ||||
|         self.assertEqual({'username': 'existing', 'deleted': True}, result) | ||||
|         self.assertNotIn( | ||||
|             'uid=existing,' + settings.USER_BASE_DN, self.ldapobj.directory) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|  | @ -258,9 +302,21 @@ class LdapTaskTestCase(TestCase): | |||
|             'search_s', 'delete_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_delete_ldap_user_chained_exsting(self): | ||||
|         result = delete_ldap_user_chained({'username': 'existing'}) | ||||
|         self.assertEqual({'username': 'existing', 'deleted': True}, result) | ||||
|         self.assertNotIn( | ||||
|             'uid=existing,' + settings.USER_BASE_DN, self.ldapobj.directory) | ||||
|         self.assertNotIn('memberUid', self.ldapobj.directory[ | ||||
|             'cn=existing,' + settings.GROUP_BASE_DN]) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s', 'search_s', 'search_s', | ||||
|             'search_s', 'search_s', 'search_s', 'modify_s', 'delete_s' | ||||
|         ]) | ||||
| 
 | ||||
|     def test_delete_ldap_group_if_empty_nonempty(self): | ||||
|         res = delete_ldap_group_if_empty('existing') | ||||
|         self.assertFalse(res) | ||||
|         result = delete_ldap_group_if_empty('existing') | ||||
|         self.assertEqual({'groupname': 'existing', 'deleted': False}, result) | ||||
|         self.assertIn( | ||||
|             'cn=existing,' + settings.GROUP_BASE_DN, self.ldapobj.directory) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|  | @ -268,8 +324,8 @@ class LdapTaskTestCase(TestCase): | |||
|         ]) | ||||
| 
 | ||||
|     def test_delete_ldap_group_if_empty_missing(self): | ||||
|         res = delete_ldap_group_if_empty('missing') | ||||
|         self.assertFalse(res) | ||||
|         result = delete_ldap_group_if_empty('missing') | ||||
|         self.assertEqual({'groupname': 'missing', 'deleted': False}, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s' | ||||
|         ]) | ||||
|  | @ -277,8 +333,8 @@ class LdapTaskTestCase(TestCase): | |||
|     def test_delete_ldap_group_if_empty_empty(self): | ||||
|         del self.ldapobj.directory[ | ||||
|              'cn=existing,' + settings.GROUP_BASE_DN]['memberUid'] | ||||
|         res = delete_ldap_group_if_empty('existing') | ||||
|         self.assertTrue(res) | ||||
|         result = delete_ldap_group_if_empty('existing') | ||||
|         self.assertEqual({'groupname': 'existing', 'deleted': True}, result) | ||||
|         self.assertNotIn( | ||||
|             'cn=existing,' + settings.GROUP_BASE_DN, self.ldapobj.directory) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|  | @ -286,8 +342,8 @@ class LdapTaskTestCase(TestCase): | |||
|         ]) | ||||
| 
 | ||||
|     def test_delete_ldap_group_existing(self): | ||||
|         res = delete_ldap_group('existing') | ||||
|         self.assertTrue(res) | ||||
|         result = delete_ldap_group('existing') | ||||
|         self.assertEqual({'groupname': 'existing', 'deleted': True}, result) | ||||
|         self.assertNotIn( | ||||
|             'cn=existing,' + settings.GROUP_BASE_DN, self.ldapobj.directory) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|  | @ -295,8 +351,8 @@ class LdapTaskTestCase(TestCase): | |||
|         ]) | ||||
| 
 | ||||
|     def test_delete_ldap_group_missing(self): | ||||
|         res = delete_ldap_group('missing') | ||||
|         self.assertFalse(res) | ||||
|         result = delete_ldap_group('missing') | ||||
|         self.assertEqual({'groupname': 'missing', 'deleted': False}, result) | ||||
|         self.assertEqual(self.ldapobj.methods_called(), [ | ||||
|             'initialize', 'simple_bind_s', 'search_s' | ||||
|         ]) | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue