diff --git a/.gitignore b/.gitignore index 5f8ae64..f25647b 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,4 @@ Desktop.ini .ropeproject/ htmlcov/ tags +_build/ diff --git a/docs/changelog.rst b/docs/changelog.rst new file mode 100644 index 0000000..099a067 --- /dev/null +++ b/docs/changelog.rst @@ -0,0 +1,14 @@ +Changelog +========= + +* :release:`0.2.0 <2014-06-01>` +* :feature:`-` full test suite for osusers +* :feature:`-` full test suite for managemails app +* :feature:`-` full test suite for domains app +* :feature:`-` `Celery `_ integration for ldap + synchronization + +* :release:`0.1 <2014-05-25>` +* :feature:`-` initial model code for os users +* :feature:`-` initial model code for mail address and mailbox management +* :feature:`-` initial model code for domains diff --git a/docs/conf.py b/docs/conf.py index 72bbeb9..3a6ad3a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +# pymode:lint_ignore=E501 # # gnuviechadmin documentation build configuration file, created by # sphinx-quickstart on Sun May 18, 2014. @@ -12,7 +13,8 @@ # All configuration values have a default; values that are commented out # serve to show the default. -import sys, os +#import sys +#import os # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the @@ -26,7 +28,11 @@ import sys, os # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. -extensions = [] +extensions = ['releases', 'sphinx.ext.autodoc', 'celery.contrib.sphinx'] + +# configuration for releases extension +releases_issue_uri = 'https://dev.gnuviech-server.de/gva/ticket/%s' +releases_release_uri = 'https://dev.gnuviech-server.de/gva/milestone/%s' # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] @@ -49,9 +55,9 @@ copyright = u'2014, Jan Dittberner' # built documents. # # The short X.Y version. -version = '0.1' +version = '0.2.0' # The full version, including alpha/beta/rc tags. -release = '0.1' +release = '0.2.0' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. @@ -171,21 +177,21 @@ htmlhelp_basename = 'gnuviechadmindoc' # -- Options for LaTeX output -------------------------------------------------- latex_elements = { -# The paper size ('letterpaper' or 'a4paper'). -#'papersize': 'letterpaper', + # The paper size ('letterpaper' or 'a4paper'). + #'papersize': 'letterpaper', -# The font size ('10pt', '11pt' or '12pt'). -#'pointsize': '10pt', + # The font size ('10pt', '11pt' or '12pt'). + #'pointsize': '10pt', -# Additional stuff for the LaTeX preamble. -#'preamble': '', + # Additional stuff for the LaTeX preamble. + #'preamble': '', } # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, author, documentclass [howto/manual]). latex_documents = [ - ('index', 'gnuviechadmin.tex', u'gnuviechadmin Documentation', - u'Jan Dittberner', 'manual'), + ('index', 'gnuviechadmin.tex', u'gnuviechadmin Documentation', + u'Jan Dittberner', 'manual'), ] # The name of an image file (relative to this directory) to place at the top of @@ -228,9 +234,9 @@ man_pages = [ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - ('index', 'gnuviechadmin', u'gnuviechadmin Documentation', - u'Jan Dittberner', 'gnuviechadmin', 'Customer center for gnuviech servers.', - 'Miscellaneous'), + ('index', 'gnuviechadmin', u'gnuviechadmin Documentation', + u'Jan Dittberner', 'gnuviechadmin', 'Customer center for gnuviech servers.', + 'Miscellaneous'), ] # Documents to append as an appendix to all manuals. diff --git a/docs/index.rst b/docs/index.rst index 99ec616..0d9e0c2 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -4,7 +4,7 @@ contain the root `toctree` directive. Welcome to gnuviechadmin's documentation! -==================================== +========================================= Contents: @@ -14,7 +14,7 @@ Contents: install deploy tests - + changelog Indices and tables diff --git a/docs/tests.rst b/docs/tests.rst new file mode 100644 index 0000000..b56edd1 --- /dev/null +++ b/docs/tests.rst @@ -0,0 +1,26 @@ +Tests +===== + +To run the tests you can just use the :program:`manage.py` script: + +.. code-block:: sh + + $ python manage.py test + +Coverage +-------- + +To capture test coverage information you can run: + +.. code-block:: sh + + $ coverage run --branch manage.py test + +To view the coverage data use: + +.. code-block:: sh + + $ coverage report -m + +The coverage configuration is in :file:`.coveragerc`. Add new apps to the +`source` configuration in the `[run]` section of that configuration file. diff --git a/gnuviechadmin/domains/tests/test_admin.py b/gnuviechadmin/domains/tests/test_admin.py new file mode 100644 index 0000000..aa717a9 --- /dev/null +++ b/gnuviechadmin/domains/tests/test_admin.py @@ -0,0 +1,8 @@ +from django.test import TestCase +from django.core.urlresolvers import reverse + + +class TestMailDomainAdmin(TestCase): + def test_admin_for_maildomain(self): + admin_url = reverse('admin:domains_maildomain_changelist') + self.assertIsNotNone(admin_url) diff --git a/gnuviechadmin/domains/views.py b/gnuviechadmin/domains/views.py deleted file mode 100644 index 91ea44a..0000000 --- a/gnuviechadmin/domains/views.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.shortcuts import render - -# Create your views here. diff --git a/gnuviechadmin/gnuviechadmin/__init__.py b/gnuviechadmin/gnuviechadmin/__init__.py index e69de29..bd9391f 100644 --- a/gnuviechadmin/gnuviechadmin/__init__.py +++ b/gnuviechadmin/gnuviechadmin/__init__.py @@ -0,0 +1 @@ +from gnuviechadmin.celery import app as celery_app diff --git a/gnuviechadmin/gnuviechadmin/celery.py b/gnuviechadmin/gnuviechadmin/celery.py new file mode 100644 index 0000000..b8be2e5 --- /dev/null +++ b/gnuviechadmin/gnuviechadmin/celery.py @@ -0,0 +1,16 @@ +from __future__ import absolute_import + +import os + +from celery import Celery + +from django.conf import settings + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', + 'gnuviechadmin.settings.production') + + +app = Celery('gnuviechadmin') + +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) diff --git a/gnuviechadmin/gnuviechadmin/settings/base.py b/gnuviechadmin/gnuviechadmin/settings/base.py index 0a1b858..2b1322f 100644 --- a/gnuviechadmin/gnuviechadmin/settings/base.py +++ b/gnuviechadmin/gnuviechadmin/settings/base.py @@ -279,6 +279,20 @@ SOUTH_TESTS_MIGRATE = False ########## END SOUTH CONFIGURATION +########## CELERY CONFIGURATION +BROKER_URL = get_env_variable('GVA_BROKER_URL') +CELERY_RESULT_BACKEND = 'amqp' +CELERY_RESULT_PERSISTENT = True +CELERY_TASK_RESULT_EXPIRES = None +CELERY_ROUTES = ( + 'osusers.tasks.LdapRouter', +) +CELERY_ACCEPT_CONTENT = ['yaml'] +CELERY_TASK_SERIALIZER = 'yaml' +CELERY_RESULT_SERIALIZER = 'yaml' +########## END CELERY CONFIGURATION + + ########## CUSTOM APP CONFIGURATION OSUSER_MINUID = int(get_env_variable('GVA_MIN_OS_UID')) OSUSER_MINGID = int(get_env_variable('GVA_MIN_OS_GID')) diff --git a/gnuviechadmin/managemails/tests/test_admin.py b/gnuviechadmin/managemails/tests/test_admin.py new file mode 100644 index 0000000..58e99b7 --- /dev/null +++ b/gnuviechadmin/managemails/tests/test_admin.py @@ -0,0 +1,189 @@ +from django import forms +from django.core.urlresolvers import reverse +from django.test import TestCase +from django.test.utils import override_settings +from django.utils.html import format_html +from django.utils.translation import ugettext as _ + +from django.contrib.admin import AdminSite + +from mock import Mock + +from osusers.models import User + +from managemails.admin import ( + ActivationChangeMixin, + MailboxAdmin, + MailboxChangeForm, + MailboxCreationForm, + PASSWORD_MISMATCH_ERROR, + ReadOnlyPasswordHashField, + ReadOnlyPasswordHashWidget, +) +from managemails.models import ( + Mailbox, +) + + +class ReadOnlyPasswordHashWidgetTest(TestCase): + def test_render(self): + widget = ReadOnlyPasswordHashWidget() + rendered = widget.render('password', 'secret', {'class': 'test'}) + self.assertEqual( + rendered, + format_html( + '
{0}
', + format_html('{0}: secret ', + _('Hash')) + )) + + +class ReadOnlyPasswordHashFieldTest(TestCase): + def test___init__(self): + field = ReadOnlyPasswordHashField() + self.assertFalse(field.required) + + def test_bound_data(self): + field = ReadOnlyPasswordHashField() + self.assertEqual(field.bound_data('new', 'old'), 'old') + + def test__has_changed(self): + field = ReadOnlyPasswordHashField() + self.assertFalse(field._has_changed('new', 'old')) + + +class MailboxCreationFormTest(TestCase): + def test_clean_password2_same(self): + form = MailboxCreationForm() + form.cleaned_data = {'password1': 'secret', 'password2': 'secret'} + self.assertEqual(form.clean_password2(), 'secret') + + def test_clean_password2_empty(self): + form = MailboxCreationForm() + form.cleaned_data = {} + self.assertIsNone(form.clean_password2()) + + def test_clean_password2_mismatch(self): + form = MailboxCreationForm() + form.cleaned_data = {'password1': 'secretx', 'password2': 'secrety'} + with self.assertRaises(forms.ValidationError) as cm: + form.clean_password2() + self.assertEqual(cm.exception.message, PASSWORD_MISMATCH_ERROR) + + @override_settings( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND='memory', + BROKER_BACKEND='memory' + ) + def test_save_commit(self): + user = User.objects.create_user() + form = MailboxCreationForm(data={ + 'osuser': user.uid, + 'password1': 'secret', + 'password2': 'secret', + }) + mailbox = form.save() + self.assertIsNotNone(mailbox) + self.assertEqual( + len(Mailbox.objects.filter(osuser=user)), 1) + + @override_settings( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND='memory', + BROKER_BACKEND='memory' + ) + def test_save_no_commit(self): + user = User.objects.create_user() + form = MailboxCreationForm(data={ + 'osuser': user.uid, + 'password1': 'secret', + 'password2': 'secret', + }) + mailbox = form.save(commit=False) + self.assertIsNotNone(mailbox) + self.assertEqual( + len(Mailbox.objects.filter(osuser=user)), 0) + + +class MailboxChangeFormTest(TestCase): + @override_settings( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND='memory', + BROKER_BACKEND='memory' + ) + def test_clean_password(self): + mailbox = Mailbox(username='test', osuser=User.objects.create_user()) + mailbox.set_password('test') + mailbox.save() + form = MailboxChangeForm(instance=mailbox, data={'password': 'blub'}) + self.assertEqual(form.clean_password(), mailbox.password) + + +class ActivationChangeMixinTest(TestCase): + def test_activate(self): + querysetmock = Mock() + activationchange = ActivationChangeMixin() + activationchange.activate(Mock(), querysetmock) + querysetmock.update.called_with(active=True) + + def test_deactivate(self): + querysetmock = Mock() + activationchange = ActivationChangeMixin() + activationchange.deactivate(Mock(), querysetmock) + querysetmock.update.called_with(active=False) + + +class MailBoxAdminTest(TestCase): + def setUp(self): + site = AdminSite() + self.mbadmin = MailboxAdmin(Mailbox, site) + + def test_get_fieldsets_without_object(self): + self.assertEqual( + self.mbadmin.get_fieldsets(Mock()), + self.mbadmin.add_fieldsets) + + @override_settings( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND='memory', + BROKER_BACKEND='memory' + ) + def test_get_fieldsets_with_object(self): + mailbox = Mailbox(username='test', osuser=User.objects.create_user()) + mailbox.set_password('test') + mailbox.save() + self.assertEqual( + self.mbadmin.get_fieldsets(Mock(), mailbox), + self.mbadmin.fieldsets) + + def test_get_form_without_object(self): + form = self.mbadmin.get_form(Mock) + self.assertEqual( + form.Meta.fields, + ['username', 'password1', 'password2'] + ) + + @override_settings( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND='memory', + BROKER_BACKEND='memory' + ) + def test_get_form_with_object(self): + mailbox = Mailbox(username='test', osuser=User.objects.create_user()) + mailbox.set_password('test') + mailbox.save() + form = self.mbadmin.get_form(Mock, mailbox) + self.assertEqual( + form.Meta.fields, + ['username', 'password', 'osuser', 'active'] + ) + + def test_admin_for_mailbox(self): + admin_url = reverse('admin:managemails_mailaddress_changelist') + self.assertIsNotNone(admin_url) + + +class MailAddressAdminTest(TestCase): + def test_admin_for_mailaddress(self): + admin_url = reverse('admin:managemails_mailaddress_changelist') + self.assertIsNotNone(admin_url) diff --git a/gnuviechadmin/managemails/tests/test_models.py b/gnuviechadmin/managemails/tests/test_models.py index a731ae0..4df1793 100644 --- a/gnuviechadmin/managemails/tests/test_models.py +++ b/gnuviechadmin/managemails/tests/test_models.py @@ -1,14 +1,22 @@ from django.test import TestCase +from django.test.utils import override_settings + from passlib.hash import sha512_crypt from domains.models import MailDomain from osusers.models import User + from managemails.models import ( MailAddress, Mailbox, ) +@override_settings( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND='memory', + BROKER_BACKEND='memory' +) class MailboxTest(TestCase): def test_set_password(self): user = User.objects.create_user() @@ -16,6 +24,12 @@ class MailboxTest(TestCase): mb.set_password('test') self.assertTrue(sha512_crypt.verify('test', mb.password)) + def test___str__(self): + user = User.objects.create_user() + mb = Mailbox.objects.create(username='test', osuser=user) + mb.set_password('test') + self.assertEqual(str(mb), 'test') + class MailAddressTest(TestCase): def test__str__(self): diff --git a/gnuviechadmin/managemails/views.py b/gnuviechadmin/managemails/views.py deleted file mode 100644 index 91ea44a..0000000 --- a/gnuviechadmin/managemails/views.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.shortcuts import render - -# Create your views here. diff --git a/gnuviechadmin/osusers/admin.py b/gnuviechadmin/osusers/admin.py index e0e9adc..fb21576 100644 --- a/gnuviechadmin/osusers/admin.py +++ b/gnuviechadmin/osusers/admin.py @@ -1,12 +1,19 @@ +from django import forms +from django.utils.translation import ugettext as _ from django.contrib import admin from .models import ( AdditionalGroup, + DeleteTaskResult, Group, + GroupTaskResult, Shadow, User, + UserTaskResult, ) +PASSWORD_MISMATCH_ERROR = _("Passwords don't match") + class AdditionalGroupInline(admin.TabularInline): model = AdditionalGroup @@ -18,10 +25,127 @@ class ShadowInline(admin.TabularInline): can_delete = False +class TaskResultInline(admin.TabularInline): + can_delete = False + extra = 0 + readonly_fields = ['task_uuid', 'task_name', 'is_finished', 'is_success', + 'state', 'result_body'] + + def get_queryset(self, request): + qs = super(TaskResultInline, self).get_queryset(request) + for entry in qs: + entry.update_taskstatus() + return qs + + def has_add_permission(self, request, obj=None): + return False + + +class UserTaskResultInline(TaskResultInline): + model = UserTaskResult + + +class GroupTaskResultInline(TaskResultInline): + model = GroupTaskResult + + +class UserCreationForm(forms.ModelForm): + """ + A form for creating system users. + + """ + password1 = forms.CharField(label=_('Password'), + widget=forms.PasswordInput) + password2 = forms.CharField(label=_('Password (again)'), + widget=forms.PasswordInput) + + class Meta: + model = User + fields = [] + + def clean_password2(self): + """ + Check that the two password entries match. + + """ + password1 = self.cleaned_data.get('password1') + password2 = self.cleaned_data.get('password2') + if password1 and password2 and password1 != password2: + raise forms.ValidationError(PASSWORD_MISMATCH_ERROR) + return password2 + + def save(self, commit=True): + """ + Save the provided password in hashed format. + + """ + user = User.objects.create_user( + password=self.cleaned_data['password1'], commit=commit) + return user + + def save_m2m(self): + """ + No additional groups are created when this form is saved, so this + method just does nothing. + """ + + class UserAdmin(admin.ModelAdmin): - inlines = [AdditionalGroupInline, ShadowInline] + inlines = [AdditionalGroupInline, ShadowInline, UserTaskResultInline] readonly_fields = ['uid'] + add_form = UserCreationForm + + add_fieldsets = ( + (None, { + 'classes': ('wide',), + 'fields': ('password1', 'password2')}), + ) + + def get_form(self, request, obj=None, **kwargs): + """ + Use special form during user creation. + + """ + defaults = {} + if obj is None: + defaults.update({ + 'form': self.add_form, + 'fields': admin.util.flatten_fieldsets(self.add_fieldsets), + }) + defaults.update(kwargs) + return super(UserAdmin, self).get_form(request, obj, **defaults) + + def get_inline_instances(self, request, obj=None): + if obj is None: + return [] + return super(UserAdmin, self).get_inline_instances(request, obj) -admin.site.register(Group) +class GroupAdmin(admin.ModelAdmin): + inlines = [GroupTaskResultInline] + + def get_inline_instances(self, request, obj=None): + if obj is None: + return [] + return super(GroupAdmin, self).get_inline_instances(request, obj) + + +class DeleteTaskResultAdmin(admin.ModelAdmin): + readonly_fields = ['task_uuid', 'task_name', 'modeltype', 'modelname', + 'is_finished', 'is_success', 'state', 'result_body'] + list_display = ('task_uuid', 'task_name', 'modeltype', 'modelname', + 'is_finished', 'state') + + def has_add_permission(self, request, obj=None): + return False + + def get_queryset(self, request): + qs = super(DeleteTaskResultAdmin, self).get_queryset(request) + for entry in qs: + entry.update_taskstatus() + return qs + + +admin.site.register(Group, GroupAdmin) admin.site.register(User, UserAdmin) +admin.site.register(DeleteTaskResult, DeleteTaskResultAdmin) diff --git a/gnuviechadmin/osusers/migrations/0003_auto__add_grouptaskresult__add_usertaskresult__add_deletetaskresult.py b/gnuviechadmin/osusers/migrations/0003_auto__add_grouptaskresult__add_usertaskresult__add_deletetaskresult.py new file mode 100644 index 0000000..c23b290 --- /dev/null +++ b/gnuviechadmin/osusers/migrations/0003_auto__add_grouptaskresult__add_usertaskresult__add_deletetaskresult.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +from south.utils import datetime_utils as datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding model 'GroupTaskResult' + db.create_table(u'osusers_grouptaskresult', ( + ('created', self.gf('model_utils.fields.AutoCreatedField')(default=datetime.datetime.now)), + ('modified', self.gf('model_utils.fields.AutoLastModifiedField')(default=datetime.datetime.now)), + ('task_uuid', self.gf('django.db.models.fields.CharField')(max_length=64, primary_key=True)), + ('task_name', self.gf('django.db.models.fields.CharField')(max_length=255)), + ('is_finished', self.gf('django.db.models.fields.BooleanField')(default=False)), + ('is_success', self.gf('django.db.models.fields.BooleanField')(default=False)), + ('state', self.gf('django.db.models.fields.CharField')(max_length=10)), + ('result_body', self.gf('django.db.models.fields.TextField')(blank=True)), + ('group', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['osusers.Group'])), + )) + db.send_create_signal(u'osusers', ['GroupTaskResult']) + + # Adding model 'UserTaskResult' + db.create_table(u'osusers_usertaskresult', ( + ('created', self.gf('model_utils.fields.AutoCreatedField')(default=datetime.datetime.now)), + ('modified', self.gf('model_utils.fields.AutoLastModifiedField')(default=datetime.datetime.now)), + ('task_uuid', self.gf('django.db.models.fields.CharField')(max_length=64, primary_key=True)), + ('task_name', self.gf('django.db.models.fields.CharField')(max_length=255)), + ('is_finished', self.gf('django.db.models.fields.BooleanField')(default=False)), + ('is_success', self.gf('django.db.models.fields.BooleanField')(default=False)), + ('state', self.gf('django.db.models.fields.CharField')(max_length=10)), + ('result_body', self.gf('django.db.models.fields.TextField')(blank=True)), + ('user', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['osusers.User'])), + )) + db.send_create_signal(u'osusers', ['UserTaskResult']) + + # Adding model 'DeleteTaskResult' + db.create_table(u'osusers_deletetaskresult', ( + ('created', self.gf('model_utils.fields.AutoCreatedField')(default=datetime.datetime.now)), + ('modified', self.gf('model_utils.fields.AutoLastModifiedField')(default=datetime.datetime.now)), + ('task_uuid', self.gf('django.db.models.fields.CharField')(max_length=64, primary_key=True)), + ('task_name', self.gf('django.db.models.fields.CharField')(max_length=255)), + ('is_finished', self.gf('django.db.models.fields.BooleanField')(default=False)), + ('is_success', self.gf('django.db.models.fields.BooleanField')(default=False)), + ('state', self.gf('django.db.models.fields.CharField')(max_length=10)), + ('result_body', self.gf('django.db.models.fields.TextField')(blank=True)), + ('modeltype', self.gf('django.db.models.fields.CharField')(max_length=20, db_index=True)), + ('modelname', self.gf('django.db.models.fields.CharField')(max_length=255)), + )) + db.send_create_signal(u'osusers', ['DeleteTaskResult']) + + + def backwards(self, orm): + # Deleting model 'GroupTaskResult' + db.delete_table(u'osusers_grouptaskresult') + + # Deleting model 'UserTaskResult' + db.delete_table(u'osusers_usertaskresult') + + # Deleting model 'DeleteTaskResult' + db.delete_table(u'osusers_deletetaskresult') + + + models = { + u'osusers.additionalgroup': { + 'Meta': {'unique_together': "(('user', 'group'),)", 'object_name': 'AdditionalGroup'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}), + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'user': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.User']"}) + }, + u'osusers.deletetaskresult': { + 'Meta': {'object_name': 'DeleteTaskResult'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'modelname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'modeltype': ('django.db.models.fields.CharField', [], {'max_length': '20', 'db_index': 'True'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}), + 'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'}) + }, + u'osusers.group': { + 'Meta': {'object_name': 'Group'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'descr': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'gid': ('django.db.models.fields.PositiveSmallIntegerField', [], {'unique': 'True', 'primary_key': 'True'}), + 'groupname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '16'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'passwd': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}) + }, + u'osusers.grouptaskresult': { + 'Meta': {'object_name': 'GroupTaskResult'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}), + 'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}), + 'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'}) + }, + u'osusers.shadow': { + 'Meta': {'object_name': 'Shadow'}, + 'changedays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}), + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'expiredays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'default': 'None', 'null': 'True', 'blank': 'True'}), + 'gracedays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}), + 'inactdays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}), + 'maxage': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}), + 'minage': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'passwd': ('django.db.models.fields.CharField', [], {'max_length': '128'}), + 'user': ('django.db.models.fields.related.OneToOneField', [], {'to': u"orm['osusers.User']", 'unique': 'True', 'primary_key': 'True'}) + }, + u'osusers.user': { + 'Meta': {'object_name': 'User'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'gecos': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}), + 'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}), + 'homedir': ('django.db.models.fields.CharField', [], {'max_length': '256'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'shell': ('django.db.models.fields.CharField', [], {'max_length': '64'}), + 'uid': ('django.db.models.fields.PositiveSmallIntegerField', [], {'unique': 'True', 'primary_key': 'True'}), + 'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64'}) + }, + u'osusers.usertaskresult': { + 'Meta': {'object_name': 'UserTaskResult'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}), + 'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'}), + 'user': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.User']"}) + } + } + + complete_apps = ['osusers'] \ No newline at end of file diff --git a/gnuviechadmin/osusers/migrations/0004_auto__add_index_grouptaskresult_task_name__add_index_usertaskresult_ta.py b/gnuviechadmin/osusers/migrations/0004_auto__add_index_grouptaskresult_task_name__add_index_usertaskresult_ta.py new file mode 100644 index 0000000..1657e0d --- /dev/null +++ b/gnuviechadmin/osusers/migrations/0004_auto__add_index_grouptaskresult_task_name__add_index_usertaskresult_ta.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +from south.utils import datetime_utils as datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding index on 'GroupTaskResult', fields ['task_name'] + db.create_index(u'osusers_grouptaskresult', ['task_name']) + + # Adding index on 'UserTaskResult', fields ['task_name'] + db.create_index(u'osusers_usertaskresult', ['task_name']) + + # Adding index on 'DeleteTaskResult', fields ['task_name'] + db.create_index(u'osusers_deletetaskresult', ['task_name']) + + + def backwards(self, orm): + # Removing index on 'DeleteTaskResult', fields ['task_name'] + db.delete_index(u'osusers_deletetaskresult', ['task_name']) + + # Removing index on 'UserTaskResult', fields ['task_name'] + db.delete_index(u'osusers_usertaskresult', ['task_name']) + + # Removing index on 'GroupTaskResult', fields ['task_name'] + db.delete_index(u'osusers_grouptaskresult', ['task_name']) + + + models = { + u'osusers.additionalgroup': { + 'Meta': {'unique_together': "(('user', 'group'),)", 'object_name': 'AdditionalGroup'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}), + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'user': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.User']"}) + }, + u'osusers.deletetaskresult': { + 'Meta': {'object_name': 'DeleteTaskResult'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'modelname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'modeltype': ('django.db.models.fields.CharField', [], {'max_length': '20', 'db_index': 'True'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}), + 'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}), + 'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'}) + }, + u'osusers.group': { + 'Meta': {'object_name': 'Group'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'descr': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'gid': ('django.db.models.fields.PositiveSmallIntegerField', [], {'unique': 'True', 'primary_key': 'True'}), + 'groupname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '16'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'passwd': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}) + }, + u'osusers.grouptaskresult': { + 'Meta': {'object_name': 'GroupTaskResult'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}), + 'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}), + 'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}), + 'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'}) + }, + u'osusers.shadow': { + 'Meta': {'object_name': 'Shadow'}, + 'changedays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}), + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'expiredays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'default': 'None', 'null': 'True', 'blank': 'True'}), + 'gracedays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}), + 'inactdays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}), + 'maxage': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}), + 'minage': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'passwd': ('django.db.models.fields.CharField', [], {'max_length': '128'}), + 'user': ('django.db.models.fields.related.OneToOneField', [], {'to': u"orm['osusers.User']", 'unique': 'True', 'primary_key': 'True'}) + }, + u'osusers.user': { + 'Meta': {'object_name': 'User'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'gecos': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}), + 'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}), + 'homedir': ('django.db.models.fields.CharField', [], {'max_length': '256'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'shell': ('django.db.models.fields.CharField', [], {'max_length': '64'}), + 'uid': ('django.db.models.fields.PositiveSmallIntegerField', [], {'unique': 'True', 'primary_key': 'True'}), + 'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64'}) + }, + u'osusers.usertaskresult': { + 'Meta': {'object_name': 'UserTaskResult'}, + 'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}), + 'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}), + 'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}), + 'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}), + 'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'}), + 'user': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.User']"}) + } + } + + complete_apps = ['osusers'] \ No newline at end of file diff --git a/gnuviechadmin/osusers/models.py b/gnuviechadmin/osusers/models.py index ecb9beb..6ad01b9 100644 --- a/gnuviechadmin/osusers/models.py +++ b/gnuviechadmin/osusers/models.py @@ -1,7 +1,7 @@ from datetime import date import os -from django.db import models, transaction +from django.db import models from django.conf import settings from django.core.exceptions import ValidationError from django.utils import timezone @@ -10,9 +10,51 @@ from django.utils.translation import ugettext as _ from model_utils.models import TimeStampedModel +from celery.result import AsyncResult + from passlib.hash import sha512_crypt from passlib.utils import generate_password +from .tasks import ( + add_ldap_user_to_group, + create_ldap_group, + create_ldap_user, + delete_ldap_group_if_empty, + delete_ldap_user, + remove_ldap_user_from_group, +) + + +CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL = _( + "You can not use a user's primary group.") + + +class TaskResult(TimeStampedModel, models.Model): + + task_uuid = models.CharField(primary_key=True, max_length=64, blank=False) + task_name = models.CharField(max_length=255, blank=False, db_index=True) + is_finished = models.BooleanField(default=False) + is_success = models.BooleanField(default=False) + state = models.CharField(max_length=10) + result_body = models.TextField(blank=True) + + class Meta: + abstract = True + + def _set_result_fields(self, asyncresult): + if asyncresult.ready(): + self.is_finished = True + self.is_success = asyncresult.state == 'SUCCESS' + self.result_body = str(asyncresult.result) + self.state = asyncresult.state + asyncresult.get(no_ack=False) + + def update_taskstatus(self): + if not self.is_finished: + asyncresult = AsyncResult(self.task_uuid) + self._set_result_fields(asyncresult) + self.save() + class GroupManager(models.Manager): @@ -42,6 +84,73 @@ class Group(TimeStampedModel, models.Model): def __str__(self): return '{0} ({1})'.format(self.groupname, self.gid) + def save(self, *args, **kwargs): + super(Group, self).save(*args, **kwargs) + GroupTaskResult.objects.create_grouptaskresult( + self, + create_ldap_group.delay(self.groupname, self.gid, self.descr), + 'create_ldap_group' + ) + return self + + def delete(self, *args, **kwargs): + DeleteTaskResult.objects.create_deletetaskresult( + 'group', self.groupname, + delete_ldap_group_if_empty.delay(self.groupname), + 'delete_ldap_group_if_empty' + ) + super(Group, self).delete(*args, **kwargs) + + +class TaskResultManager(models.Manager): + + def create(self, asyncresult, task_name): + result = self.model( + task_uuid=asyncresult.task_id, task_name=task_name + ) + result._set_result_fields(asyncresult) + return result + + +class DeleteTaskResultManager(TaskResultManager): + + def create_deletetaskresult( + self, modeltype, modelname, asyncresult, task_name + ): + taskresult = super(DeleteTaskResultManager, self).create( + asyncresult, task_name) + taskresult.modeltype = modeltype + taskresult.modelname = modelname + taskresult.save() + return taskresult + + +class DeleteTaskResult(TaskResult): + + modeltype = models.CharField(max_length=20, db_index=True) + modelname = models.CharField(max_length=255) + + objects = DeleteTaskResultManager() + + +class GroupTaskResultManager(TaskResultManager): + + def create_grouptaskresult( + self, group, asyncresult, task_name, commit=False + ): + taskresult = super(GroupTaskResultManager, self).create( + asyncresult, task_name) + taskresult.group = group + taskresult.save() + return taskresult + + +class GroupTaskResult(TaskResult): + + group = models.ForeignKey(Group) + + objects = GroupTaskResultManager() + class UserManager(models.Manager): @@ -59,7 +168,7 @@ class UserManager(models.Manager): for user in self.values('username').filter( username__startswith=settings.OSUSER_USERNAME_PREFIX).order_by( 'username'): - if user == nextuser: + if user['username'] == nextuser: count += 1 nextuser = usernameformat.format( settings.OSUSER_USERNAME_PREFIX, count) @@ -67,7 +176,7 @@ class UserManager(models.Manager): break return nextuser - def create_user(self, username=None, password=None): + def create_user(self, username=None, password=None, commit=False): uid = self.get_next_uid() gid = Group.objects.get_next_gid() if username is None: @@ -75,19 +184,13 @@ class UserManager(models.Manager): if password is None: password = generate_password() homedir = os.path.join(settings.OSUSER_HOME_BASEPATH, username) - autocommit = transaction.get_autocommit() - if autocommit: - transaction.set_autocommit(False) group = Group.objects.create(groupname=username, gid=gid) user = self.create(username=username, group=group, uid=uid, homedir=homedir, shell=settings.OSUSER_DEFAULT_SHELL) - shadow = Shadow.objects.create_shadow(user=user, password=password) - user.save() - shadow.save() - transaction.commit() - if autocommit: - transaction.set_autocommit(True) + user.set_password(password) + if commit: + user.save() return user @@ -111,17 +214,82 @@ class User(TimeStampedModel, models.Model): def __str__(self): return '{0} ({1})'.format(self.username, self.uid) + def set_password(self, password): + if hasattr(self, 'shadow'): + self.shadow.set_password(password) + else: + self.shadow = Shadow.objects.create_shadow( + user=self, password=password + ) + UserTaskResult.objects.create_usertaskresult( + self, + create_ldap_user.delay( + self.username, self.uid, self.group.gid, self.gecos, + self.homedir, self.shell, password + ), + 'create_ldap_user', + commit=True + ) + + def save(self, *args, **kwargs): + UserTaskResult.objects.create_usertaskresult( + self, + create_ldap_user.delay( + self.username, self.uid, self.group.gid, self.gecos, + self.homedir, self.shell, password=None + ), + 'create_ldap_user' + ) + return super(User, self).save(*args, **kwargs) + + def delete(self, *args, **kwargs): + for group in [ + ag.group for ag in AdditionalGroup.objects.filter(user=self) + ]: + DeleteTaskResult.objects.create_deletetaskresult( + 'usergroup', + '{0} in {1}'.format(self.username, group.groupname), + remove_ldap_user_from_group.delay( + self.username, group.groupname), + 'remove_ldap_user_from_group', + ) + DeleteTaskResult.objects.create_deletetaskresult( + 'user', self.username, + delete_ldap_user.delay(self.username), + 'delete_ldap_user' + ) + self.group.delete() + super(User, self).delete(*args, **kwargs) + + +class UserTaskResultManager(TaskResultManager): + + def create_usertaskresult( + self, user, asyncresult, task_name, commit=False + ): + taskresult = self.create(asyncresult, task_name) + taskresult.user = user + taskresult.save() + return taskresult + + +class UserTaskResult(TaskResult): + + user = models.ForeignKey(User) + + objects = UserTaskResultManager() + class ShadowManager(models.Manager): def create_shadow(self, user, password): changedays = (timezone.now().date() - date(1970, 1, 1)).days - pwhash = sha512_crypt.encrypt(password) shadow = self.create( user=user, changedays=changedays, minage=0, maxage=None, gracedays=7, - inactdays=30, expiredays=None, passwd=pwhash + inactdays=30, expiredays=None ) + shadow.set_password(password) shadow.save() return shadow @@ -169,6 +337,9 @@ class Shadow(TimeStampedModel, models.Model): def __str__(self): return 'for user {0}'.format(self.user) + def set_password(self, password): + self.passwd = sha512_crypt.encrypt(password) + @python_2_unicode_compatible class AdditionalGroup(TimeStampedModel, models.Model): @@ -182,8 +353,26 @@ class AdditionalGroup(TimeStampedModel, models.Model): def clean(self): if self.user.group == self.group: - raise ValidationError(_( - "You can not use a user's primary group.")) + raise ValidationError(CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL) + + def save(self, *args, **kwargs): + GroupTaskResult.objects.create_grouptaskresult( + self.group, + add_ldap_user_to_group.delay( + self.user.username, self.group.groupname), + 'add_ldap_user_to_group' + ) + super(AdditionalGroup, self).save(*args, **kwargs) + + def delete(self, *args, **kwargs): + DeleteTaskResult.objects.create_deletetaskresult( + 'usergroup', + str(self), + remove_ldap_user_from_group.delay( + self.user.username, self.group.groupname), + 'remove_ldap_user_from_group' + ) + super(AdditionalGroup, self).delete(*args, **kwargs) def __str__(self): return '{0} in {1}'.format(self.user, self.group) diff --git a/gnuviechadmin/osusers/tasks.py b/gnuviechadmin/osusers/tasks.py new file mode 100644 index 0000000..049eb65 --- /dev/null +++ b/gnuviechadmin/osusers/tasks.py @@ -0,0 +1,43 @@ +from __future__ import absolute_import + +from celery import shared_task + + +class LdapRouter(object): + + def route_for_task(self, task, args=None, kwargs=None): + if 'ldap' in task: + return {'exchange': 'ldap', + 'exchange_type': 'direct', + 'queue': 'ldap'} + return None + + +@shared_task +def create_ldap_group(groupname, gid, descr): + pass + + +@shared_task +def create_ldap_user(username, uid, gid, gecos, homedir, shell, password): + pass + + +@shared_task +def add_ldap_user_to_group(username, groupname): + pass + + +@shared_task +def remove_ldap_user_from_group(username, groupname): + pass + + +@shared_task +def delete_ldap_user(username): + pass + + +@shared_task +def delete_ldap_group_if_empty(groupname): + pass diff --git a/gnuviechadmin/osusers/tests.py b/gnuviechadmin/osusers/tests.py deleted file mode 100644 index 7ce503c..0000000 --- a/gnuviechadmin/osusers/tests.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.test import TestCase - -# Create your tests here. diff --git a/gnuviechadmin/osusers/tests/__init__.py b/gnuviechadmin/osusers/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gnuviechadmin/osusers/tests/test_admin.py b/gnuviechadmin/osusers/tests/test_admin.py new file mode 100644 index 0000000..cf75a5a --- /dev/null +++ b/gnuviechadmin/osusers/tests/test_admin.py @@ -0,0 +1,167 @@ +from django import forms +from django.contrib.admin import AdminSite +from django.test import TestCase +from django.test.utils import override_settings + +from mock import patch, Mock + +from osusers.models import ( + DeleteTaskResult, + Group, + User, +) +from osusers.admin import ( + DeleteTaskResultAdmin, + GroupAdmin, + PASSWORD_MISMATCH_ERROR, + UserAdmin, + UserCreationForm, + UserTaskResultInline, +) + + +class TaskResultInlineTest(TestCase): + def setUp(self): + self.site = AdminSite() + super(TaskResultInlineTest, self).setUp() + + def test_get_queryset_calls_update_taskstatus(self): + with patch('osusers.admin.admin.TabularInline.get_queryset') as mock: + entrymock = Mock(name='entry') + mock.return_value = [entrymock] + requestmock = Mock(name='request') + UserTaskResultInline(User, self.site).get_queryset(requestmock) + entrymock.update_taskstatus.assert_calledwith() + + def test_has_add_permissions_returns_false(self): + self.assertFalse( + UserTaskResultInline(User, self.site).has_add_permission( + self, Mock(name='request')) + ) + + +class UserCreationFormTest(TestCase): + def test_clean_password2_same(self): + form = UserCreationForm() + form.cleaned_data = {'password1': 'secret', 'password2': 'secret'} + self.assertEqual(form.clean_password2(), 'secret') + + def test_clean_password2_empty(self): + form = UserCreationForm() + form.cleaned_data = {} + self.assertIsNone(form.clean_password2()) + + def test_clean_password2_mismatch(self): + form = UserCreationForm() + form.cleaned_data = {'password1': 'secretx', 'password2': 'secrety'} + with self.assertRaises(forms.ValidationError) as cm: + form.clean_password2() + self.assertEqual(cm.exception.message, PASSWORD_MISMATCH_ERROR) + + @override_settings( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND='memory', + BROKER_BACKEND='memory' + ) + def test_save_commit(self): + form = UserCreationForm() + form.cleaned_data = {'password1': 'secret', 'password2': 'secret'} + user = form.save() + self.assertIsNotNone(user) + self.assertEqual(User.objects.get(pk=user.uid), user) + + def test_save_m2m_returns_none(self): + form = UserCreationForm() + self.assertIsNone(form.save_m2m()) + + +class UserAdminTest(TestCase): + def setUp(self): + site = AdminSite() + self.uadmin = UserAdmin(User, site) + super(UserAdminTest, self).setUp() + + def test_get_form_without_object(self): + form = self.uadmin.get_form(Mock(name='request')) + self.assertEqual( + form.Meta.fields, + ['password1', 'password2'] + ) + + @override_settings( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND='memory', + BROKER_BACKEND='memory' + ) + def test_get_form_with_object(self): + user = User.objects.create_user() + form = self.uadmin.get_form(Mock(name='request'), user) + self.assertEqual( + form.Meta.fields, + ['username', 'group', 'gecos', 'homedir', 'shell', 'uid'] + ) + + def test_get_inline_instances_without_object(self): + inlines = self.uadmin.get_inline_instances(Mock(name='request')) + self.assertEqual(inlines, []) + + @override_settings( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND='memory', + BROKER_BACKEND='memory' + ) + def test_get_inline_instances_with_object(self): + user = User.objects.create_user() + inlines = self.uadmin.get_inline_instances( + Mock(name='request'), user) + self.assertEqual(len(inlines), len(UserAdmin.inlines)) + for index in range(len(inlines)): + self.assertIsInstance(inlines[index], UserAdmin.inlines[index]) + + +class GroupAdminTest(TestCase): + def setUp(self): + site = AdminSite() + self.gadmin = GroupAdmin(Group, site) + super(GroupAdminTest, self).setUp() + + def test_get_inline_instances_without_object(self): + inlines = self.gadmin.get_inline_instances(Mock(name='request')) + self.assertEqual(inlines, []) + + @override_settings( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND='memory', + BROKER_BACKEND='memory' + ) + def test_get_inline_instances_with_object(self): + group = Group.objects.create(gid=1000, groupname='test') + inlines = self.gadmin.get_inline_instances( + Mock(name='request'), group) + self.assertEqual(len(inlines), len(GroupAdmin.inlines)) + for index in range(len(inlines)): + self.assertIsInstance(inlines[index], GroupAdmin.inlines[index]) + + +class DeleteTaskResultAdminTest(TestCase): + def setUp(self): + site = AdminSite() + self.dtradmin = DeleteTaskResultAdmin(DeleteTaskResult, site) + super(DeleteTaskResultAdminTest, self).setUp() + + def test_has_add_permission_returns_false_without_object(self): + self.assertFalse( + self.dtradmin.has_add_permission(Mock(name='request'))) + + def test_has_add_permission_returns_false_with_object(self): + self.assertFalse( + self.dtradmin.has_add_permission(Mock(name='request'), + Mock(name='test'))) + + def test_get_queryset_calls_update_taskstatus(self): + with patch('osusers.admin.admin.ModelAdmin.get_queryset') as mock: + entrymock = Mock(name='entry') + mock.return_value = [entrymock] + requestmock = Mock(name='request') + self.dtradmin.get_queryset(requestmock) + entrymock.update_taskstatus.assert_calledwith() diff --git a/gnuviechadmin/osusers/tests/test_models.py b/gnuviechadmin/osusers/tests/test_models.py new file mode 100644 index 0000000..7fe2736 --- /dev/null +++ b/gnuviechadmin/osusers/tests/test_models.py @@ -0,0 +1,392 @@ +from datetime import date + +from django.core.exceptions import ValidationError +from django.test import TestCase +from django.test.utils import override_settings +from django.utils import timezone + +from mock import patch, MagicMock + +from passlib.hash import sha512_crypt + +from osusers.models import ( + CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL, + AdditionalGroup, + DeleteTaskResult, + Group, + GroupTaskResult, + Shadow, + User, + UserTaskResult, +) + + +@override_settings( + CELERY_ALWAYS_EAGER=True, + CELERY_CACHE_BACKEND='memory', + BROKER_BACKEND='memory' +) +class TestCaseWithCeleryTasks(TestCase): + pass + + +class AdditionalGroupTest(TestCaseWithCeleryTasks): + def setUp(self): + self.group1 = Group.objects.create(groupname='test1', gid=1000) + self.user = User.objects.create( + username='test', uid=1000, group=self.group1, + homedir='/home/test', shell='/bin/bash') + + def test_clean_primary_group(self): + testsubj = AdditionalGroup(user=self.user, group=self.group1) + with self.assertRaises(ValidationError) as cm: + testsubj.clean() + self.assertEqual( + cm.exception.message, CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL) + + def test_clean_other_group(self): + group2 = Group.objects.create(groupname='test2', gid=1001) + testsubj = AdditionalGroup(user=self.user, group=group2) + testsubj.clean() + + def test_save(self): + group2 = Group.objects.create(groupname='test2', gid=1001) + GroupTaskResult.objects.all().delete() + addgroup = AdditionalGroup(user=self.user, group=group2) + addgroup.save() + taskres = GroupTaskResult.objects.all() + self.assertTrue(len(taskres), 1) + self.assertEqual(taskres[0].task_name, 'add_ldap_user_to_group') + self.assertEqual(taskres[0].group, group2) + + def test_delete(self): + group2 = Group.objects.create(groupname='test2', gid=1001) + addgroup = AdditionalGroup.objects.create(user=self.user, group=group2) + DeleteTaskResult.objects.all().delete() + addgroup.delete() + taskres = DeleteTaskResult.objects.all() + self.assertTrue(len(taskres), 1) + self.assertEqual(taskres[0].task_name, 'remove_ldap_user_from_group') + self.assertEqual(taskres[0].modeltype, 'usergroup') + self.assertEqual(taskres[0].modelname, 'test (1000) in test2 (1001)') + self.assertEqual(len(AdditionalGroup.objects.all()), 0) + + def test___str__(self): + group2 = Group.objects.create(groupname='test2', gid=1001) + addgroup = AdditionalGroup.objects.create(user=self.user, group=group2) + self.assertEqual(str(addgroup), 'test (1000) in test2 (1001)') + + +@override_settings(OSUSER_MINGID=10000) +class GroupManagerTest(TestCaseWithCeleryTasks): + def test_get_next_gid_first(self): + self.assertEqual(Group.objects.get_next_gid(), 10000) + + def test_get_next_gid_second(self): + Group.objects.create(gid=10010, groupname='test') + self.assertEqual(Group.objects.get_next_gid(), 10011) + + +class GroupTest(TestCaseWithCeleryTasks): + def test___str__(self): + group = Group.objects.create(gid=10000, groupname='test') + self.assertEqual(str(group), 'test (10000)') + + def test_save(self): + group = Group(gid=10000, groupname='test') + self.assertIs(group.save(), group) + taskres = GroupTaskResult.objects.all() + self.assertEqual(len(taskres), 1) + self.assertEqual(taskres[0].group, group) + self.assertEqual(taskres[0].task_name, 'create_ldap_group') + + def test_delete(self): + group = Group.objects.create(gid=10000, groupname='test') + self.assertEqual(len(Group.objects.all()), 1) + self.assertEqual(len(GroupTaskResult.objects.all()), 1) + group.delete() + self.assertEqual(len(Group.objects.all()), 0) + self.assertEqual(len(GroupTaskResult.objects.all()), 0) + taskres = DeleteTaskResult.objects.all() + self.assertEqual(len(taskres), 1) + self.assertEqual(taskres[0].task_name, + 'delete_ldap_group_if_empty') + self.assertEqual(taskres[0].modeltype, 'group') + self.assertEqual(taskres[0].modelname, 'test') + + +class ShadowManagerTest(TestCaseWithCeleryTasks): + def test_create_shadow(self): + user = User( + username='test', uid=1000, + group=Group(gid=1000, groupname='test'), + homedir='/home/test', shell='/bin/fooshell') + shadow = Shadow.objects.create_shadow(user, 'test') + self.assertTrue(sha512_crypt.verify('test', shadow.passwd)) + self.assertEqual(shadow.changedays, + (timezone.now().date() - date(1970, 1, 1)).days) + self.assertEqual(shadow.user, user) + self.assertEqual(shadow.minage, 0) + self.assertIsNone(shadow.maxage) + self.assertEqual(shadow.gracedays, 7) + self.assertEqual(shadow.inactdays, 30) + self.assertIsNone(shadow.expiredays) + + +class ShadowTest(TestCaseWithCeleryTasks): + def test___str__(self): + group = Group.objects.create( + groupname='test', gid=1000) + user = User.objects.create( + username='test', uid=1000, group=group, homedir='/home/test', + shell='/bin/bash') + shadow = Shadow(user=user) + self.assertEqual(str(shadow), 'for user test (1000)') + + def test_set_password(self): + group = Group.objects.create( + groupname='test', gid=1000) + user = User.objects.create( + username='test', uid=1000, group=group, homedir='/home/test', + shell='/bin/bash') + shadow = Shadow(user=user) + shadow.set_password('test') + self.assertTrue(sha512_crypt.verify('test', shadow.passwd)) + + +TEST_TASK_UUID = '3120f6a8-2665-4fa3-a785-79efd28bfe92' +TEST_TASK_NAME = 'test.task' +TEST_TASK_RESULT = '4ll y0ur b453 4r3 b3l0ng t0 u5' + + +class TaskResultTest(TestCase): + def test__set_result_fields_not_ready(self): + mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME) + mock.ready.return_value = False + tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) + self.assertFalse(tr.is_finished) + self.assertFalse(tr.is_success) + self.assertEqual(tr.state, '') + self.assertEqual(tr.result_body, '') + + def test__set_result_fields_ready(self): + mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME, + state='SUCCESS', result=TEST_TASK_RESULT) + mock.ready.return_value = True + tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) + self.assertTrue(tr.is_finished) + self.assertTrue(tr.is_success) + self.assertEqual(tr.state, 'SUCCESS') + self.assertEqual(tr.result_body, TEST_TASK_RESULT) + + def test__set_result_fields_exception(self): + mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME, + state='FAILURE', result=Exception('Fail')) + mock.ready.return_value = True + tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) + self.assertTrue(tr.is_finished) + self.assertFalse(tr.is_success) + self.assertEqual(tr.state, 'FAILURE') + self.assertEqual(tr.result_body, 'Fail') + + @patch('osusers.models.AsyncResult') + def test_update_taskstatus_unfinished(self, asyncres): + mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME) + mock.ready.return_value = False + tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) + self.assertFalse(tr.is_finished) + mymock = asyncres(TEST_TASK_UUID) + mymock.ready.return_value = True + mymock.state = 'SUCCESS' + mymock.result = TEST_RESULT + tr.update_taskstatus() + mymock.ready.assert_called_with() + self.assertTrue(tr.is_finished) + + @patch('osusers.models.AsyncResult') + def test_update_taskstatus_finished(self, asyncres): + mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME) + mock.ready.return_value = True + mock.state = 'SUCCESS' + mock.result = TEST_RESULT + tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME) + self.assertTrue(tr.is_finished) + mymock = asyncres(TEST_TASK_UUID) + tr.update_taskstatus() + self.assertFalse(mymock.ready.called) + self.assertTrue(tr.is_finished) + + +TEST_RESULT = MagicMock() +TEST_RESULT.task_id = TEST_TASK_UUID +TEST_RESULT.task_name = TEST_TASK_NAME +TEST_RESULT.ready.return_value = False + + +class TaskResultManagerTest(TestCase): + def test_create(self): + tr = DeleteTaskResult.objects.create(TEST_RESULT, TEST_TASK_NAME) + self.assertIsInstance(tr, DeleteTaskResult) + self.assertEqual(tr.task_uuid, TEST_TASK_UUID) + self.assertEqual(tr.task_name, TEST_TASK_NAME) + + +@override_settings( + OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test', + OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell' +) +class UserManagerTest(TestCaseWithCeleryTasks): + def _create_group(self): + return Group.objects.create(gid=10000, groupname='foo') + + def test_get_next_uid_first(self): + self.assertEqual(User.objects.get_next_uid(), 10000) + + def test_get_next_uid_second(self): + User.objects.create( + uid=10010, username='foo', group=self._create_group(), + homedir='/home/foo', shell='/bin/fooshell') + self.assertEqual(User.objects.get_next_uid(), 10011) + + def test_get_next_username_first(self): + self.assertEqual(User.objects.get_next_username(), 'test01') + + def test_get_next_username_second(self): + User.objects.create( + uid=10000, username='test01', group=self._create_group(), + homedir='/home/foo', shell='/bin/fooshell') + self.assertEqual(User.objects.get_next_username(), 'test02') + + def test_get_next_username_gaps(self): + group = self._create_group() + User.objects.create( + uid=10000, username='test01', group=group, + homedir='/home/foo', shell='/bin/fooshell') + User.objects.create( + uid=10002, username='test03', group=group, + homedir='/home/foo', shell='/bin/fooshell') + self.assertEqual(User.objects.get_next_username(), 'test02') + + def test_create_user_first(self): + user = User.objects.create_user() + self.assertIsInstance(user, User) + self.assertEqual(user.uid, 10000) + self.assertEqual(user.group.gid, 10000) + self.assertEqual(user.group.groupname, 'test01') + self.assertEqual(user.username, 'test01') + self.assertEqual(user.homedir, '/home/test01') + self.assertEqual(user.shell, '/bin/fooshell') + self.assertIsNotNone(user.shadow) + + def test_create_user_tasks(self): + user = User.objects.create_user() + gtaskres = GroupTaskResult.objects.all() + self.assertEqual(len(gtaskres), 1) + self.assertEqual(gtaskres[0].task_name, 'create_ldap_group') + self.assertEqual(gtaskres[0].group, user.group) + + def test_create_user_second(self): + User.objects.create_user() + user = User.objects.create_user() + self.assertIsInstance(user, User) + self.assertEqual(user.uid, 10001) + self.assertEqual(user.group.gid, 10001) + self.assertEqual(user.group.groupname, 'test02') + self.assertEqual(user.username, 'test02') + self.assertEqual(user.homedir, '/home/test02') + self.assertEqual(user.shell, '/bin/fooshell') + self.assertIsNotNone(user.shadow) + self.assertEqual(len(User.objects.all()), 2) + + def test_create_user_known_password(self): + user = User.objects.create_user(password='foobar') + self.assertIsInstance(user, User) + self.assertEqual(user.uid, 10000) + self.assertEqual(user.group.gid, 10000) + self.assertEqual(user.group.groupname, 'test01') + self.assertEqual(user.username, 'test01') + self.assertEqual(user.homedir, '/home/test01') + self.assertEqual(user.shell, '/bin/fooshell') + self.assertIsNotNone(user.shadow) + self.assertTrue(sha512_crypt.verify('foobar', user.shadow.passwd)) + + def test_create_user_predefined_username(self): + user = User.objects.create_user(username='tester') + self.assertIsInstance(user, User) + self.assertEqual(user.uid, 10000) + self.assertEqual(user.group.gid, 10000) + self.assertEqual(user.group.groupname, 'tester') + self.assertEqual(user.username, 'tester') + self.assertEqual(user.homedir, '/home/tester') + self.assertEqual(user.shell, '/bin/fooshell') + self.assertIsNotNone(user.shadow) + + def test_create_user_commit(self): + user = User.objects.create_user(commit=True) + self.assertIsInstance(user, User) + self.assertEqual(user.uid, 10000) + self.assertEqual(user.group.gid, 10000) + self.assertEqual(user.group.groupname, 'test01') + self.assertEqual(user.username, 'test01') + self.assertEqual(user.homedir, '/home/test01') + self.assertEqual(user.shell, '/bin/fooshell') + self.assertIsNotNone(user.shadow) + + +@override_settings( + OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test', + OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell' +) +class UserTest(TestCaseWithCeleryTasks): + + def test___str__(self): + user = User.objects.create_user() + self.assertEqual(str(user), 'test01 (10000)') + + def test_set_password(self): + user = User.objects.create_user() + self.assertFalse(sha512_crypt.verify('test', user.shadow.passwd)) + UserTaskResult.objects.all().delete() + user.set_password('test') + self.assertTrue(sha512_crypt.verify('test', user.shadow.passwd)) + taskres = UserTaskResult.objects.all() + self.assertEqual(len(taskres), 1) + self.assertEqual(taskres[0].user, user) + self.assertEqual(taskres[0].task_name, 'create_ldap_user') + + def test_save(self): + user = User.objects.create_user() + UserTaskResult.objects.all().delete() + user.save() + taskres = UserTaskResult.objects.all() + self.assertEqual(len(taskres), 1) + self.assertEqual(taskres[0].user, user) + self.assertEqual(taskres[0].task_name, 'create_ldap_user') + + def test_delete_only_user(self): + user = User.objects.create_user() + user.delete() + taskres = DeleteTaskResult.objects.all() + self.assertEqual(len(taskres), 2) + self.assertIn('delete_ldap_user', + [r.task_name for r in taskres]) + self.assertIn('delete_ldap_group_if_empty', + [r.task_name for r in taskres]) + self.assertEqual(len(User.objects.all()), 0) + + def test_delete_additional_groups(self): + group1 = Group.objects.create(gid=2000, groupname='group1') + group2 = Group.objects.create(gid=2001, groupname='group2') + user = User.objects.create_user() + for group in [group1, group2]: + user.additionalgroup_set.add( + AdditionalGroup.objects.create(user=user, group=group)) + user.delete() + taskres = DeleteTaskResult.objects.all() + self.assertEqual(len(taskres), 4) + tasknames = [t.task_name for t in taskres] + self.assertEqual(tasknames.count('remove_ldap_user_from_group'), 2) + self.assertEqual(tasknames.count('delete_ldap_user'), 1) + self.assertEqual(tasknames.count('delete_ldap_group_if_empty'), 1) + self.assertEqual(len(User.objects.all()), 0) + self.assertEqual(len(AdditionalGroup.objects.all()), 0) diff --git a/gnuviechadmin/osusers/tests/test_tasks.py b/gnuviechadmin/osusers/tests/test_tasks.py new file mode 100644 index 0000000..e3b409f --- /dev/null +++ b/gnuviechadmin/osusers/tests/test_tasks.py @@ -0,0 +1,22 @@ +from django.test import TestCase + +from osusers.tasks import LdapRouter + + +class LdapRouterTest(TestCase): + def setUp(self): + self.router = LdapRouter() + super(LdapRouterTest, self).setUp() + + def test_ldap_tasks_are_routed_to_ldap_queue(self): + route = self.router.route_for_task( + 'some_ldap_task') + self.assertEqual( + route, + {'exchange': 'ldap', + 'exchange_type': 'direct', + 'queue': 'ldap'}) + + def test_non_ldap_tasks_are_routed_to_default(self): + self.assertIsNone( + self.router.route_for_task('other')) diff --git a/gnuviechadmin/osusers/views.py b/gnuviechadmin/osusers/views.py deleted file mode 100644 index 91ea44a..0000000 --- a/gnuviechadmin/osusers/views.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.shortcuts import render - -# Create your views here. diff --git a/requirements/base.txt b/requirements/base.txt index 2cedcab..f3b8dd8 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -6,3 +6,8 @@ logutils==0.3.3 South==0.8.4 psycopg2==2.5.3 passlib==1.6.2 +celery==3.1.11 +billiard==3.3.0.17 +kombu==3.0.16 +pytz==2014.3 +pyaml==14.05.7 diff --git a/requirements/local.txt b/requirements/local.txt index 644abf3..df01fa9 100644 --- a/requirements/local.txt +++ b/requirements/local.txt @@ -1,5 +1,7 @@ # Local development dependencies go here -r base.txt coverage==3.7.1 +mock==1.0.1 django-debug-toolbar==1.2.1 Sphinx==1.2.2 +releases==0.6.1 diff --git a/requirements/test.txt b/requirements/test.txt index bb78fde..8bf1098 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -1,3 +1,4 @@ # Test dependencies go here. -r base.txt coverage==3.7.1 +mock==1.0.1