Merge branch 'release/0.2.0' into production

This commit is contained in:
Jan Dittberner 2014-06-01 23:33:31 +02:00
commit f6e7519c51
27 changed files with 1529 additions and 47 deletions

1
.gitignore vendored
View file

@ -42,3 +42,4 @@ Desktop.ini
.ropeproject/
htmlcov/
tags
_build/

14
docs/changelog.rst Normal file
View file

@ -0,0 +1,14 @@
Changelog
=========
* :release:`0.2.0 <2014-06-01>`
* :feature:`-` full test suite for osusers
* :feature:`-` full test suite for managemails app
* :feature:`-` full test suite for domains app
* :feature:`-` `Celery <http://www.celeryproject.com/>`_ integration for ldap
synchronization
* :release:`0.1 <2014-05-25>`
* :feature:`-` initial model code for os users
* :feature:`-` initial model code for mail address and mailbox management
* :feature:`-` initial model code for domains

View file

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
# pymode:lint_ignore=E501
#
# gnuviechadmin documentation build configuration file, created by
# sphinx-quickstart on Sun May 18, 2014.
@ -12,7 +13,8 @@
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
#import sys
#import os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
@ -26,7 +28,11 @@ import sys, os
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = []
extensions = ['releases', 'sphinx.ext.autodoc', 'celery.contrib.sphinx']
# configuration for releases extension
releases_issue_uri = 'https://dev.gnuviech-server.de/gva/ticket/%s'
releases_release_uri = 'https://dev.gnuviech-server.de/gva/milestone/%s'
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
@ -49,9 +55,9 @@ copyright = u'2014, Jan Dittberner'
# built documents.
#
# The short X.Y version.
version = '0.1'
version = '0.2.0'
# The full version, including alpha/beta/rc tags.
release = '0.1'
release = '0.2.0'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
@ -171,21 +177,21 @@ htmlhelp_basename = 'gnuviechadmindoc'
# -- Options for LaTeX output --------------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#'papersize': 'letterpaper',
# The paper size ('letterpaper' or 'a4paper').
#'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#'pointsize': '10pt',
# The font size ('10pt', '11pt' or '12pt').
#'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#'preamble': '',
# Additional stuff for the LaTeX preamble.
#'preamble': '',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
('index', 'gnuviechadmin.tex', u'gnuviechadmin Documentation',
u'Jan Dittberner', 'manual'),
('index', 'gnuviechadmin.tex', u'gnuviechadmin Documentation',
u'Jan Dittberner', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
@ -228,9 +234,9 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
('index', 'gnuviechadmin', u'gnuviechadmin Documentation',
u'Jan Dittberner', 'gnuviechadmin', 'Customer center for gnuviech servers.',
'Miscellaneous'),
('index', 'gnuviechadmin', u'gnuviechadmin Documentation',
u'Jan Dittberner', 'gnuviechadmin', 'Customer center for gnuviech servers.',
'Miscellaneous'),
]
# Documents to append as an appendix to all manuals.

View file

@ -4,7 +4,7 @@
contain the root `toctree` directive.
Welcome to gnuviechadmin's documentation!
====================================
=========================================
Contents:
@ -14,7 +14,7 @@ Contents:
install
deploy
tests
changelog
Indices and tables

26
docs/tests.rst Normal file
View file

@ -0,0 +1,26 @@
Tests
=====
To run the tests you can just use the :program:`manage.py` script:
.. code-block:: sh
$ python manage.py test
Coverage
--------
To capture test coverage information you can run:
.. code-block:: sh
$ coverage run --branch manage.py test
To view the coverage data use:
.. code-block:: sh
$ coverage report -m
The coverage configuration is in :file:`.coveragerc`. Add new apps to the
`source` configuration in the `[run]` section of that configuration file.

View file

@ -0,0 +1,8 @@
from django.test import TestCase
from django.core.urlresolvers import reverse
class TestMailDomainAdmin(TestCase):
def test_admin_for_maildomain(self):
admin_url = reverse('admin:domains_maildomain_changelist')
self.assertIsNotNone(admin_url)

View file

@ -1,3 +0,0 @@
from django.shortcuts import render
# Create your views here.

View file

@ -0,0 +1 @@
from gnuviechadmin.celery import app as celery_app

View file

@ -0,0 +1,16 @@
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE',
'gnuviechadmin.settings.production')
app = Celery('gnuviechadmin')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

View file

@ -279,6 +279,20 @@ SOUTH_TESTS_MIGRATE = False
########## END SOUTH CONFIGURATION
########## CELERY CONFIGURATION
BROKER_URL = get_env_variable('GVA_BROKER_URL')
CELERY_RESULT_BACKEND = 'amqp'
CELERY_RESULT_PERSISTENT = True
CELERY_TASK_RESULT_EXPIRES = None
CELERY_ROUTES = (
'osusers.tasks.LdapRouter',
)
CELERY_ACCEPT_CONTENT = ['yaml']
CELERY_TASK_SERIALIZER = 'yaml'
CELERY_RESULT_SERIALIZER = 'yaml'
########## END CELERY CONFIGURATION
########## CUSTOM APP CONFIGURATION
OSUSER_MINUID = int(get_env_variable('GVA_MIN_OS_UID'))
OSUSER_MINGID = int(get_env_variable('GVA_MIN_OS_GID'))

View file

@ -0,0 +1,189 @@
from django import forms
from django.core.urlresolvers import reverse
from django.test import TestCase
from django.test.utils import override_settings
from django.utils.html import format_html
from django.utils.translation import ugettext as _
from django.contrib.admin import AdminSite
from mock import Mock
from osusers.models import User
from managemails.admin import (
ActivationChangeMixin,
MailboxAdmin,
MailboxChangeForm,
MailboxCreationForm,
PASSWORD_MISMATCH_ERROR,
ReadOnlyPasswordHashField,
ReadOnlyPasswordHashWidget,
)
from managemails.models import (
Mailbox,
)
class ReadOnlyPasswordHashWidgetTest(TestCase):
def test_render(self):
widget = ReadOnlyPasswordHashWidget()
rendered = widget.render('password', 'secret', {'class': 'test'})
self.assertEqual(
rendered,
format_html(
'<div class="test">{0}</div>',
format_html('<strong>{0}</strong>: secret ',
_('Hash'))
))
class ReadOnlyPasswordHashFieldTest(TestCase):
def test___init__(self):
field = ReadOnlyPasswordHashField()
self.assertFalse(field.required)
def test_bound_data(self):
field = ReadOnlyPasswordHashField()
self.assertEqual(field.bound_data('new', 'old'), 'old')
def test__has_changed(self):
field = ReadOnlyPasswordHashField()
self.assertFalse(field._has_changed('new', 'old'))
class MailboxCreationFormTest(TestCase):
def test_clean_password2_same(self):
form = MailboxCreationForm()
form.cleaned_data = {'password1': 'secret', 'password2': 'secret'}
self.assertEqual(form.clean_password2(), 'secret')
def test_clean_password2_empty(self):
form = MailboxCreationForm()
form.cleaned_data = {}
self.assertIsNone(form.clean_password2())
def test_clean_password2_mismatch(self):
form = MailboxCreationForm()
form.cleaned_data = {'password1': 'secretx', 'password2': 'secrety'}
with self.assertRaises(forms.ValidationError) as cm:
form.clean_password2()
self.assertEqual(cm.exception.message, PASSWORD_MISMATCH_ERROR)
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
)
def test_save_commit(self):
user = User.objects.create_user()
form = MailboxCreationForm(data={
'osuser': user.uid,
'password1': 'secret',
'password2': 'secret',
})
mailbox = form.save()
self.assertIsNotNone(mailbox)
self.assertEqual(
len(Mailbox.objects.filter(osuser=user)), 1)
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
)
def test_save_no_commit(self):
user = User.objects.create_user()
form = MailboxCreationForm(data={
'osuser': user.uid,
'password1': 'secret',
'password2': 'secret',
})
mailbox = form.save(commit=False)
self.assertIsNotNone(mailbox)
self.assertEqual(
len(Mailbox.objects.filter(osuser=user)), 0)
class MailboxChangeFormTest(TestCase):
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
)
def test_clean_password(self):
mailbox = Mailbox(username='test', osuser=User.objects.create_user())
mailbox.set_password('test')
mailbox.save()
form = MailboxChangeForm(instance=mailbox, data={'password': 'blub'})
self.assertEqual(form.clean_password(), mailbox.password)
class ActivationChangeMixinTest(TestCase):
def test_activate(self):
querysetmock = Mock()
activationchange = ActivationChangeMixin()
activationchange.activate(Mock(), querysetmock)
querysetmock.update.called_with(active=True)
def test_deactivate(self):
querysetmock = Mock()
activationchange = ActivationChangeMixin()
activationchange.deactivate(Mock(), querysetmock)
querysetmock.update.called_with(active=False)
class MailBoxAdminTest(TestCase):
def setUp(self):
site = AdminSite()
self.mbadmin = MailboxAdmin(Mailbox, site)
def test_get_fieldsets_without_object(self):
self.assertEqual(
self.mbadmin.get_fieldsets(Mock()),
self.mbadmin.add_fieldsets)
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
)
def test_get_fieldsets_with_object(self):
mailbox = Mailbox(username='test', osuser=User.objects.create_user())
mailbox.set_password('test')
mailbox.save()
self.assertEqual(
self.mbadmin.get_fieldsets(Mock(), mailbox),
self.mbadmin.fieldsets)
def test_get_form_without_object(self):
form = self.mbadmin.get_form(Mock)
self.assertEqual(
form.Meta.fields,
['username', 'password1', 'password2']
)
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
)
def test_get_form_with_object(self):
mailbox = Mailbox(username='test', osuser=User.objects.create_user())
mailbox.set_password('test')
mailbox.save()
form = self.mbadmin.get_form(Mock, mailbox)
self.assertEqual(
form.Meta.fields,
['username', 'password', 'osuser', 'active']
)
def test_admin_for_mailbox(self):
admin_url = reverse('admin:managemails_mailaddress_changelist')
self.assertIsNotNone(admin_url)
class MailAddressAdminTest(TestCase):
def test_admin_for_mailaddress(self):
admin_url = reverse('admin:managemails_mailaddress_changelist')
self.assertIsNotNone(admin_url)

View file

@ -1,14 +1,22 @@
from django.test import TestCase
from django.test.utils import override_settings
from passlib.hash import sha512_crypt
from domains.models import MailDomain
from osusers.models import User
from managemails.models import (
MailAddress,
Mailbox,
)
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
)
class MailboxTest(TestCase):
def test_set_password(self):
user = User.objects.create_user()
@ -16,6 +24,12 @@ class MailboxTest(TestCase):
mb.set_password('test')
self.assertTrue(sha512_crypt.verify('test', mb.password))
def test___str__(self):
user = User.objects.create_user()
mb = Mailbox.objects.create(username='test', osuser=user)
mb.set_password('test')
self.assertEqual(str(mb), 'test')
class MailAddressTest(TestCase):
def test__str__(self):

View file

@ -1,3 +0,0 @@
from django.shortcuts import render
# Create your views here.

View file

@ -1,12 +1,19 @@
from django import forms
from django.utils.translation import ugettext as _
from django.contrib import admin
from .models import (
AdditionalGroup,
DeleteTaskResult,
Group,
GroupTaskResult,
Shadow,
User,
UserTaskResult,
)
PASSWORD_MISMATCH_ERROR = _("Passwords don't match")
class AdditionalGroupInline(admin.TabularInline):
model = AdditionalGroup
@ -18,10 +25,127 @@ class ShadowInline(admin.TabularInline):
can_delete = False
class TaskResultInline(admin.TabularInline):
can_delete = False
extra = 0
readonly_fields = ['task_uuid', 'task_name', 'is_finished', 'is_success',
'state', 'result_body']
def get_queryset(self, request):
qs = super(TaskResultInline, self).get_queryset(request)
for entry in qs:
entry.update_taskstatus()
return qs
def has_add_permission(self, request, obj=None):
return False
class UserTaskResultInline(TaskResultInline):
model = UserTaskResult
class GroupTaskResultInline(TaskResultInline):
model = GroupTaskResult
class UserCreationForm(forms.ModelForm):
"""
A form for creating system users.
"""
password1 = forms.CharField(label=_('Password'),
widget=forms.PasswordInput)
password2 = forms.CharField(label=_('Password (again)'),
widget=forms.PasswordInput)
class Meta:
model = User
fields = []
def clean_password2(self):
"""
Check that the two password entries match.
"""
password1 = self.cleaned_data.get('password1')
password2 = self.cleaned_data.get('password2')
if password1 and password2 and password1 != password2:
raise forms.ValidationError(PASSWORD_MISMATCH_ERROR)
return password2
def save(self, commit=True):
"""
Save the provided password in hashed format.
"""
user = User.objects.create_user(
password=self.cleaned_data['password1'], commit=commit)
return user
def save_m2m(self):
"""
No additional groups are created when this form is saved, so this
method just does nothing.
"""
class UserAdmin(admin.ModelAdmin):
inlines = [AdditionalGroupInline, ShadowInline]
inlines = [AdditionalGroupInline, ShadowInline, UserTaskResultInline]
readonly_fields = ['uid']
add_form = UserCreationForm
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('password1', 'password2')}),
)
def get_form(self, request, obj=None, **kwargs):
"""
Use special form during user creation.
"""
defaults = {}
if obj is None:
defaults.update({
'form': self.add_form,
'fields': admin.util.flatten_fieldsets(self.add_fieldsets),
})
defaults.update(kwargs)
return super(UserAdmin, self).get_form(request, obj, **defaults)
def get_inline_instances(self, request, obj=None):
if obj is None:
return []
return super(UserAdmin, self).get_inline_instances(request, obj)
admin.site.register(Group)
class GroupAdmin(admin.ModelAdmin):
inlines = [GroupTaskResultInline]
def get_inline_instances(self, request, obj=None):
if obj is None:
return []
return super(GroupAdmin, self).get_inline_instances(request, obj)
class DeleteTaskResultAdmin(admin.ModelAdmin):
readonly_fields = ['task_uuid', 'task_name', 'modeltype', 'modelname',
'is_finished', 'is_success', 'state', 'result_body']
list_display = ('task_uuid', 'task_name', 'modeltype', 'modelname',
'is_finished', 'state')
def has_add_permission(self, request, obj=None):
return False
def get_queryset(self, request):
qs = super(DeleteTaskResultAdmin, self).get_queryset(request)
for entry in qs:
entry.update_taskstatus()
return qs
admin.site.register(Group, GroupAdmin)
admin.site.register(User, UserAdmin)
admin.site.register(DeleteTaskResult, DeleteTaskResultAdmin)

View file

@ -0,0 +1,147 @@
# -*- coding: utf-8 -*-
from south.utils import datetime_utils as datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding model 'GroupTaskResult'
db.create_table(u'osusers_grouptaskresult', (
('created', self.gf('model_utils.fields.AutoCreatedField')(default=datetime.datetime.now)),
('modified', self.gf('model_utils.fields.AutoLastModifiedField')(default=datetime.datetime.now)),
('task_uuid', self.gf('django.db.models.fields.CharField')(max_length=64, primary_key=True)),
('task_name', self.gf('django.db.models.fields.CharField')(max_length=255)),
('is_finished', self.gf('django.db.models.fields.BooleanField')(default=False)),
('is_success', self.gf('django.db.models.fields.BooleanField')(default=False)),
('state', self.gf('django.db.models.fields.CharField')(max_length=10)),
('result_body', self.gf('django.db.models.fields.TextField')(blank=True)),
('group', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['osusers.Group'])),
))
db.send_create_signal(u'osusers', ['GroupTaskResult'])
# Adding model 'UserTaskResult'
db.create_table(u'osusers_usertaskresult', (
('created', self.gf('model_utils.fields.AutoCreatedField')(default=datetime.datetime.now)),
('modified', self.gf('model_utils.fields.AutoLastModifiedField')(default=datetime.datetime.now)),
('task_uuid', self.gf('django.db.models.fields.CharField')(max_length=64, primary_key=True)),
('task_name', self.gf('django.db.models.fields.CharField')(max_length=255)),
('is_finished', self.gf('django.db.models.fields.BooleanField')(default=False)),
('is_success', self.gf('django.db.models.fields.BooleanField')(default=False)),
('state', self.gf('django.db.models.fields.CharField')(max_length=10)),
('result_body', self.gf('django.db.models.fields.TextField')(blank=True)),
('user', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['osusers.User'])),
))
db.send_create_signal(u'osusers', ['UserTaskResult'])
# Adding model 'DeleteTaskResult'
db.create_table(u'osusers_deletetaskresult', (
('created', self.gf('model_utils.fields.AutoCreatedField')(default=datetime.datetime.now)),
('modified', self.gf('model_utils.fields.AutoLastModifiedField')(default=datetime.datetime.now)),
('task_uuid', self.gf('django.db.models.fields.CharField')(max_length=64, primary_key=True)),
('task_name', self.gf('django.db.models.fields.CharField')(max_length=255)),
('is_finished', self.gf('django.db.models.fields.BooleanField')(default=False)),
('is_success', self.gf('django.db.models.fields.BooleanField')(default=False)),
('state', self.gf('django.db.models.fields.CharField')(max_length=10)),
('result_body', self.gf('django.db.models.fields.TextField')(blank=True)),
('modeltype', self.gf('django.db.models.fields.CharField')(max_length=20, db_index=True)),
('modelname', self.gf('django.db.models.fields.CharField')(max_length=255)),
))
db.send_create_signal(u'osusers', ['DeleteTaskResult'])
def backwards(self, orm):
# Deleting model 'GroupTaskResult'
db.delete_table(u'osusers_grouptaskresult')
# Deleting model 'UserTaskResult'
db.delete_table(u'osusers_usertaskresult')
# Deleting model 'DeleteTaskResult'
db.delete_table(u'osusers_deletetaskresult')
models = {
u'osusers.additionalgroup': {
'Meta': {'unique_together': "(('user', 'group'),)", 'object_name': 'AdditionalGroup'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'user': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.User']"})
},
u'osusers.deletetaskresult': {
'Meta': {'object_name': 'DeleteTaskResult'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'modelname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'modeltype': ('django.db.models.fields.CharField', [], {'max_length': '20', 'db_index': 'True'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}),
'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'})
},
u'osusers.group': {
'Meta': {'object_name': 'Group'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'descr': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'gid': ('django.db.models.fields.PositiveSmallIntegerField', [], {'unique': 'True', 'primary_key': 'True'}),
'groupname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '16'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'passwd': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'})
},
u'osusers.grouptaskresult': {
'Meta': {'object_name': 'GroupTaskResult'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}),
'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}),
'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'})
},
u'osusers.shadow': {
'Meta': {'object_name': 'Shadow'},
'changedays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}),
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'expiredays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'default': 'None', 'null': 'True', 'blank': 'True'}),
'gracedays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}),
'inactdays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}),
'maxage': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}),
'minage': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'passwd': ('django.db.models.fields.CharField', [], {'max_length': '128'}),
'user': ('django.db.models.fields.related.OneToOneField', [], {'to': u"orm['osusers.User']", 'unique': 'True', 'primary_key': 'True'})
},
u'osusers.user': {
'Meta': {'object_name': 'User'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'gecos': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}),
'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}),
'homedir': ('django.db.models.fields.CharField', [], {'max_length': '256'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'shell': ('django.db.models.fields.CharField', [], {'max_length': '64'}),
'uid': ('django.db.models.fields.PositiveSmallIntegerField', [], {'unique': 'True', 'primary_key': 'True'}),
'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64'})
},
u'osusers.usertaskresult': {
'Meta': {'object_name': 'UserTaskResult'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}),
'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'}),
'user': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.User']"})
}
}
complete_apps = ['osusers']

View file

@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
from south.utils import datetime_utils as datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding index on 'GroupTaskResult', fields ['task_name']
db.create_index(u'osusers_grouptaskresult', ['task_name'])
# Adding index on 'UserTaskResult', fields ['task_name']
db.create_index(u'osusers_usertaskresult', ['task_name'])
# Adding index on 'DeleteTaskResult', fields ['task_name']
db.create_index(u'osusers_deletetaskresult', ['task_name'])
def backwards(self, orm):
# Removing index on 'DeleteTaskResult', fields ['task_name']
db.delete_index(u'osusers_deletetaskresult', ['task_name'])
# Removing index on 'UserTaskResult', fields ['task_name']
db.delete_index(u'osusers_usertaskresult', ['task_name'])
# Removing index on 'GroupTaskResult', fields ['task_name']
db.delete_index(u'osusers_grouptaskresult', ['task_name'])
models = {
u'osusers.additionalgroup': {
'Meta': {'unique_together': "(('user', 'group'),)", 'object_name': 'AdditionalGroup'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'user': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.User']"})
},
u'osusers.deletetaskresult': {
'Meta': {'object_name': 'DeleteTaskResult'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'modelname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'modeltype': ('django.db.models.fields.CharField', [], {'max_length': '20', 'db_index': 'True'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}),
'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}),
'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'})
},
u'osusers.group': {
'Meta': {'object_name': 'Group'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'descr': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'gid': ('django.db.models.fields.PositiveSmallIntegerField', [], {'unique': 'True', 'primary_key': 'True'}),
'groupname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '16'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'passwd': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'})
},
u'osusers.grouptaskresult': {
'Meta': {'object_name': 'GroupTaskResult'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}),
'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}),
'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}),
'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'})
},
u'osusers.shadow': {
'Meta': {'object_name': 'Shadow'},
'changedays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}),
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'expiredays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'default': 'None', 'null': 'True', 'blank': 'True'}),
'gracedays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}),
'inactdays': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}),
'maxage': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}),
'minage': ('django.db.models.fields.PositiveSmallIntegerField', [], {'null': 'True', 'blank': 'True'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'passwd': ('django.db.models.fields.CharField', [], {'max_length': '128'}),
'user': ('django.db.models.fields.related.OneToOneField', [], {'to': u"orm['osusers.User']", 'unique': 'True', 'primary_key': 'True'})
},
u'osusers.user': {
'Meta': {'object_name': 'User'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'gecos': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}),
'group': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.Group']"}),
'homedir': ('django.db.models.fields.CharField', [], {'max_length': '256'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'shell': ('django.db.models.fields.CharField', [], {'max_length': '64'}),
'uid': ('django.db.models.fields.PositiveSmallIntegerField', [], {'unique': 'True', 'primary_key': 'True'}),
'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64'})
},
u'osusers.usertaskresult': {
'Meta': {'object_name': 'UserTaskResult'},
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'is_finished': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'is_success': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'result_body': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'max_length': '10'}),
'task_name': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}),
'task_uuid': ('django.db.models.fields.CharField', [], {'max_length': '64', 'primary_key': 'True'}),
'user': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['osusers.User']"})
}
}
complete_apps = ['osusers']

View file

@ -1,7 +1,7 @@
from datetime import date
import os
from django.db import models, transaction
from django.db import models
from django.conf import settings
from django.core.exceptions import ValidationError
from django.utils import timezone
@ -10,9 +10,51 @@ from django.utils.translation import ugettext as _
from model_utils.models import TimeStampedModel
from celery.result import AsyncResult
from passlib.hash import sha512_crypt
from passlib.utils import generate_password
from .tasks import (
add_ldap_user_to_group,
create_ldap_group,
create_ldap_user,
delete_ldap_group_if_empty,
delete_ldap_user,
remove_ldap_user_from_group,
)
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL = _(
"You can not use a user's primary group.")
class TaskResult(TimeStampedModel, models.Model):
task_uuid = models.CharField(primary_key=True, max_length=64, blank=False)
task_name = models.CharField(max_length=255, blank=False, db_index=True)
is_finished = models.BooleanField(default=False)
is_success = models.BooleanField(default=False)
state = models.CharField(max_length=10)
result_body = models.TextField(blank=True)
class Meta:
abstract = True
def _set_result_fields(self, asyncresult):
if asyncresult.ready():
self.is_finished = True
self.is_success = asyncresult.state == 'SUCCESS'
self.result_body = str(asyncresult.result)
self.state = asyncresult.state
asyncresult.get(no_ack=False)
def update_taskstatus(self):
if not self.is_finished:
asyncresult = AsyncResult(self.task_uuid)
self._set_result_fields(asyncresult)
self.save()
class GroupManager(models.Manager):
@ -42,6 +84,73 @@ class Group(TimeStampedModel, models.Model):
def __str__(self):
return '{0} ({1})'.format(self.groupname, self.gid)
def save(self, *args, **kwargs):
super(Group, self).save(*args, **kwargs)
GroupTaskResult.objects.create_grouptaskresult(
self,
create_ldap_group.delay(self.groupname, self.gid, self.descr),
'create_ldap_group'
)
return self
def delete(self, *args, **kwargs):
DeleteTaskResult.objects.create_deletetaskresult(
'group', self.groupname,
delete_ldap_group_if_empty.delay(self.groupname),
'delete_ldap_group_if_empty'
)
super(Group, self).delete(*args, **kwargs)
class TaskResultManager(models.Manager):
def create(self, asyncresult, task_name):
result = self.model(
task_uuid=asyncresult.task_id, task_name=task_name
)
result._set_result_fields(asyncresult)
return result
class DeleteTaskResultManager(TaskResultManager):
def create_deletetaskresult(
self, modeltype, modelname, asyncresult, task_name
):
taskresult = super(DeleteTaskResultManager, self).create(
asyncresult, task_name)
taskresult.modeltype = modeltype
taskresult.modelname = modelname
taskresult.save()
return taskresult
class DeleteTaskResult(TaskResult):
modeltype = models.CharField(max_length=20, db_index=True)
modelname = models.CharField(max_length=255)
objects = DeleteTaskResultManager()
class GroupTaskResultManager(TaskResultManager):
def create_grouptaskresult(
self, group, asyncresult, task_name, commit=False
):
taskresult = super(GroupTaskResultManager, self).create(
asyncresult, task_name)
taskresult.group = group
taskresult.save()
return taskresult
class GroupTaskResult(TaskResult):
group = models.ForeignKey(Group)
objects = GroupTaskResultManager()
class UserManager(models.Manager):
@ -59,7 +168,7 @@ class UserManager(models.Manager):
for user in self.values('username').filter(
username__startswith=settings.OSUSER_USERNAME_PREFIX).order_by(
'username'):
if user == nextuser:
if user['username'] == nextuser:
count += 1
nextuser = usernameformat.format(
settings.OSUSER_USERNAME_PREFIX, count)
@ -67,7 +176,7 @@ class UserManager(models.Manager):
break
return nextuser
def create_user(self, username=None, password=None):
def create_user(self, username=None, password=None, commit=False):
uid = self.get_next_uid()
gid = Group.objects.get_next_gid()
if username is None:
@ -75,19 +184,13 @@ class UserManager(models.Manager):
if password is None:
password = generate_password()
homedir = os.path.join(settings.OSUSER_HOME_BASEPATH, username)
autocommit = transaction.get_autocommit()
if autocommit:
transaction.set_autocommit(False)
group = Group.objects.create(groupname=username, gid=gid)
user = self.create(username=username, group=group, uid=uid,
homedir=homedir,
shell=settings.OSUSER_DEFAULT_SHELL)
shadow = Shadow.objects.create_shadow(user=user, password=password)
user.save()
shadow.save()
transaction.commit()
if autocommit:
transaction.set_autocommit(True)
user.set_password(password)
if commit:
user.save()
return user
@ -111,17 +214,82 @@ class User(TimeStampedModel, models.Model):
def __str__(self):
return '{0} ({1})'.format(self.username, self.uid)
def set_password(self, password):
if hasattr(self, 'shadow'):
self.shadow.set_password(password)
else:
self.shadow = Shadow.objects.create_shadow(
user=self, password=password
)
UserTaskResult.objects.create_usertaskresult(
self,
create_ldap_user.delay(
self.username, self.uid, self.group.gid, self.gecos,
self.homedir, self.shell, password
),
'create_ldap_user',
commit=True
)
def save(self, *args, **kwargs):
UserTaskResult.objects.create_usertaskresult(
self,
create_ldap_user.delay(
self.username, self.uid, self.group.gid, self.gecos,
self.homedir, self.shell, password=None
),
'create_ldap_user'
)
return super(User, self).save(*args, **kwargs)
def delete(self, *args, **kwargs):
for group in [
ag.group for ag in AdditionalGroup.objects.filter(user=self)
]:
DeleteTaskResult.objects.create_deletetaskresult(
'usergroup',
'{0} in {1}'.format(self.username, group.groupname),
remove_ldap_user_from_group.delay(
self.username, group.groupname),
'remove_ldap_user_from_group',
)
DeleteTaskResult.objects.create_deletetaskresult(
'user', self.username,
delete_ldap_user.delay(self.username),
'delete_ldap_user'
)
self.group.delete()
super(User, self).delete(*args, **kwargs)
class UserTaskResultManager(TaskResultManager):
def create_usertaskresult(
self, user, asyncresult, task_name, commit=False
):
taskresult = self.create(asyncresult, task_name)
taskresult.user = user
taskresult.save()
return taskresult
class UserTaskResult(TaskResult):
user = models.ForeignKey(User)
objects = UserTaskResultManager()
class ShadowManager(models.Manager):
def create_shadow(self, user, password):
changedays = (timezone.now().date() - date(1970, 1, 1)).days
pwhash = sha512_crypt.encrypt(password)
shadow = self.create(
user=user, changedays=changedays,
minage=0, maxage=None, gracedays=7,
inactdays=30, expiredays=None, passwd=pwhash
inactdays=30, expiredays=None
)
shadow.set_password(password)
shadow.save()
return shadow
@ -169,6 +337,9 @@ class Shadow(TimeStampedModel, models.Model):
def __str__(self):
return 'for user {0}'.format(self.user)
def set_password(self, password):
self.passwd = sha512_crypt.encrypt(password)
@python_2_unicode_compatible
class AdditionalGroup(TimeStampedModel, models.Model):
@ -182,8 +353,26 @@ class AdditionalGroup(TimeStampedModel, models.Model):
def clean(self):
if self.user.group == self.group:
raise ValidationError(_(
"You can not use a user's primary group."))
raise ValidationError(CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)
def save(self, *args, **kwargs):
GroupTaskResult.objects.create_grouptaskresult(
self.group,
add_ldap_user_to_group.delay(
self.user.username, self.group.groupname),
'add_ldap_user_to_group'
)
super(AdditionalGroup, self).save(*args, **kwargs)
def delete(self, *args, **kwargs):
DeleteTaskResult.objects.create_deletetaskresult(
'usergroup',
str(self),
remove_ldap_user_from_group.delay(
self.user.username, self.group.groupname),
'remove_ldap_user_from_group'
)
super(AdditionalGroup, self).delete(*args, **kwargs)
def __str__(self):
return '{0} in {1}'.format(self.user, self.group)

View file

@ -0,0 +1,43 @@
from __future__ import absolute_import
from celery import shared_task
class LdapRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if 'ldap' in task:
return {'exchange': 'ldap',
'exchange_type': 'direct',
'queue': 'ldap'}
return None
@shared_task
def create_ldap_group(groupname, gid, descr):
pass
@shared_task
def create_ldap_user(username, uid, gid, gecos, homedir, shell, password):
pass
@shared_task
def add_ldap_user_to_group(username, groupname):
pass
@shared_task
def remove_ldap_user_from_group(username, groupname):
pass
@shared_task
def delete_ldap_user(username):
pass
@shared_task
def delete_ldap_group_if_empty(groupname):
pass

View file

@ -1,3 +0,0 @@
from django.test import TestCase
# Create your tests here.

View file

View file

@ -0,0 +1,167 @@
from django import forms
from django.contrib.admin import AdminSite
from django.test import TestCase
from django.test.utils import override_settings
from mock import patch, Mock
from osusers.models import (
DeleteTaskResult,
Group,
User,
)
from osusers.admin import (
DeleteTaskResultAdmin,
GroupAdmin,
PASSWORD_MISMATCH_ERROR,
UserAdmin,
UserCreationForm,
UserTaskResultInline,
)
class TaskResultInlineTest(TestCase):
def setUp(self):
self.site = AdminSite()
super(TaskResultInlineTest, self).setUp()
def test_get_queryset_calls_update_taskstatus(self):
with patch('osusers.admin.admin.TabularInline.get_queryset') as mock:
entrymock = Mock(name='entry')
mock.return_value = [entrymock]
requestmock = Mock(name='request')
UserTaskResultInline(User, self.site).get_queryset(requestmock)
entrymock.update_taskstatus.assert_calledwith()
def test_has_add_permissions_returns_false(self):
self.assertFalse(
UserTaskResultInline(User, self.site).has_add_permission(
self, Mock(name='request'))
)
class UserCreationFormTest(TestCase):
def test_clean_password2_same(self):
form = UserCreationForm()
form.cleaned_data = {'password1': 'secret', 'password2': 'secret'}
self.assertEqual(form.clean_password2(), 'secret')
def test_clean_password2_empty(self):
form = UserCreationForm()
form.cleaned_data = {}
self.assertIsNone(form.clean_password2())
def test_clean_password2_mismatch(self):
form = UserCreationForm()
form.cleaned_data = {'password1': 'secretx', 'password2': 'secrety'}
with self.assertRaises(forms.ValidationError) as cm:
form.clean_password2()
self.assertEqual(cm.exception.message, PASSWORD_MISMATCH_ERROR)
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
)
def test_save_commit(self):
form = UserCreationForm()
form.cleaned_data = {'password1': 'secret', 'password2': 'secret'}
user = form.save()
self.assertIsNotNone(user)
self.assertEqual(User.objects.get(pk=user.uid), user)
def test_save_m2m_returns_none(self):
form = UserCreationForm()
self.assertIsNone(form.save_m2m())
class UserAdminTest(TestCase):
def setUp(self):
site = AdminSite()
self.uadmin = UserAdmin(User, site)
super(UserAdminTest, self).setUp()
def test_get_form_without_object(self):
form = self.uadmin.get_form(Mock(name='request'))
self.assertEqual(
form.Meta.fields,
['password1', 'password2']
)
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
)
def test_get_form_with_object(self):
user = User.objects.create_user()
form = self.uadmin.get_form(Mock(name='request'), user)
self.assertEqual(
form.Meta.fields,
['username', 'group', 'gecos', 'homedir', 'shell', 'uid']
)
def test_get_inline_instances_without_object(self):
inlines = self.uadmin.get_inline_instances(Mock(name='request'))
self.assertEqual(inlines, [])
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
)
def test_get_inline_instances_with_object(self):
user = User.objects.create_user()
inlines = self.uadmin.get_inline_instances(
Mock(name='request'), user)
self.assertEqual(len(inlines), len(UserAdmin.inlines))
for index in range(len(inlines)):
self.assertIsInstance(inlines[index], UserAdmin.inlines[index])
class GroupAdminTest(TestCase):
def setUp(self):
site = AdminSite()
self.gadmin = GroupAdmin(Group, site)
super(GroupAdminTest, self).setUp()
def test_get_inline_instances_without_object(self):
inlines = self.gadmin.get_inline_instances(Mock(name='request'))
self.assertEqual(inlines, [])
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
)
def test_get_inline_instances_with_object(self):
group = Group.objects.create(gid=1000, groupname='test')
inlines = self.gadmin.get_inline_instances(
Mock(name='request'), group)
self.assertEqual(len(inlines), len(GroupAdmin.inlines))
for index in range(len(inlines)):
self.assertIsInstance(inlines[index], GroupAdmin.inlines[index])
class DeleteTaskResultAdminTest(TestCase):
def setUp(self):
site = AdminSite()
self.dtradmin = DeleteTaskResultAdmin(DeleteTaskResult, site)
super(DeleteTaskResultAdminTest, self).setUp()
def test_has_add_permission_returns_false_without_object(self):
self.assertFalse(
self.dtradmin.has_add_permission(Mock(name='request')))
def test_has_add_permission_returns_false_with_object(self):
self.assertFalse(
self.dtradmin.has_add_permission(Mock(name='request'),
Mock(name='test')))
def test_get_queryset_calls_update_taskstatus(self):
with patch('osusers.admin.admin.ModelAdmin.get_queryset') as mock:
entrymock = Mock(name='entry')
mock.return_value = [entrymock]
requestmock = Mock(name='request')
self.dtradmin.get_queryset(requestmock)
entrymock.update_taskstatus.assert_calledwith()

View file

@ -0,0 +1,392 @@
from datetime import date
from django.core.exceptions import ValidationError
from django.test import TestCase
from django.test.utils import override_settings
from django.utils import timezone
from mock import patch, MagicMock
from passlib.hash import sha512_crypt
from osusers.models import (
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL,
AdditionalGroup,
DeleteTaskResult,
Group,
GroupTaskResult,
Shadow,
User,
UserTaskResult,
)
@override_settings(
CELERY_ALWAYS_EAGER=True,
CELERY_CACHE_BACKEND='memory',
BROKER_BACKEND='memory'
)
class TestCaseWithCeleryTasks(TestCase):
pass
class AdditionalGroupTest(TestCaseWithCeleryTasks):
def setUp(self):
self.group1 = Group.objects.create(groupname='test1', gid=1000)
self.user = User.objects.create(
username='test', uid=1000, group=self.group1,
homedir='/home/test', shell='/bin/bash')
def test_clean_primary_group(self):
testsubj = AdditionalGroup(user=self.user, group=self.group1)
with self.assertRaises(ValidationError) as cm:
testsubj.clean()
self.assertEqual(
cm.exception.message, CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)
def test_clean_other_group(self):
group2 = Group.objects.create(groupname='test2', gid=1001)
testsubj = AdditionalGroup(user=self.user, group=group2)
testsubj.clean()
def test_save(self):
group2 = Group.objects.create(groupname='test2', gid=1001)
GroupTaskResult.objects.all().delete()
addgroup = AdditionalGroup(user=self.user, group=group2)
addgroup.save()
taskres = GroupTaskResult.objects.all()
self.assertTrue(len(taskres), 1)
self.assertEqual(taskres[0].task_name, 'add_ldap_user_to_group')
self.assertEqual(taskres[0].group, group2)
def test_delete(self):
group2 = Group.objects.create(groupname='test2', gid=1001)
addgroup = AdditionalGroup.objects.create(user=self.user, group=group2)
DeleteTaskResult.objects.all().delete()
addgroup.delete()
taskres = DeleteTaskResult.objects.all()
self.assertTrue(len(taskres), 1)
self.assertEqual(taskres[0].task_name, 'remove_ldap_user_from_group')
self.assertEqual(taskres[0].modeltype, 'usergroup')
self.assertEqual(taskres[0].modelname, 'test (1000) in test2 (1001)')
self.assertEqual(len(AdditionalGroup.objects.all()), 0)
def test___str__(self):
group2 = Group.objects.create(groupname='test2', gid=1001)
addgroup = AdditionalGroup.objects.create(user=self.user, group=group2)
self.assertEqual(str(addgroup), 'test (1000) in test2 (1001)')
@override_settings(OSUSER_MINGID=10000)
class GroupManagerTest(TestCaseWithCeleryTasks):
def test_get_next_gid_first(self):
self.assertEqual(Group.objects.get_next_gid(), 10000)
def test_get_next_gid_second(self):
Group.objects.create(gid=10010, groupname='test')
self.assertEqual(Group.objects.get_next_gid(), 10011)
class GroupTest(TestCaseWithCeleryTasks):
def test___str__(self):
group = Group.objects.create(gid=10000, groupname='test')
self.assertEqual(str(group), 'test (10000)')
def test_save(self):
group = Group(gid=10000, groupname='test')
self.assertIs(group.save(), group)
taskres = GroupTaskResult.objects.all()
self.assertEqual(len(taskres), 1)
self.assertEqual(taskres[0].group, group)
self.assertEqual(taskres[0].task_name, 'create_ldap_group')
def test_delete(self):
group = Group.objects.create(gid=10000, groupname='test')
self.assertEqual(len(Group.objects.all()), 1)
self.assertEqual(len(GroupTaskResult.objects.all()), 1)
group.delete()
self.assertEqual(len(Group.objects.all()), 0)
self.assertEqual(len(GroupTaskResult.objects.all()), 0)
taskres = DeleteTaskResult.objects.all()
self.assertEqual(len(taskres), 1)
self.assertEqual(taskres[0].task_name,
'delete_ldap_group_if_empty')
self.assertEqual(taskres[0].modeltype, 'group')
self.assertEqual(taskres[0].modelname, 'test')
class ShadowManagerTest(TestCaseWithCeleryTasks):
def test_create_shadow(self):
user = User(
username='test', uid=1000,
group=Group(gid=1000, groupname='test'),
homedir='/home/test', shell='/bin/fooshell')
shadow = Shadow.objects.create_shadow(user, 'test')
self.assertTrue(sha512_crypt.verify('test', shadow.passwd))
self.assertEqual(shadow.changedays,
(timezone.now().date() - date(1970, 1, 1)).days)
self.assertEqual(shadow.user, user)
self.assertEqual(shadow.minage, 0)
self.assertIsNone(shadow.maxage)
self.assertEqual(shadow.gracedays, 7)
self.assertEqual(shadow.inactdays, 30)
self.assertIsNone(shadow.expiredays)
class ShadowTest(TestCaseWithCeleryTasks):
def test___str__(self):
group = Group.objects.create(
groupname='test', gid=1000)
user = User.objects.create(
username='test', uid=1000, group=group, homedir='/home/test',
shell='/bin/bash')
shadow = Shadow(user=user)
self.assertEqual(str(shadow), 'for user test (1000)')
def test_set_password(self):
group = Group.objects.create(
groupname='test', gid=1000)
user = User.objects.create(
username='test', uid=1000, group=group, homedir='/home/test',
shell='/bin/bash')
shadow = Shadow(user=user)
shadow.set_password('test')
self.assertTrue(sha512_crypt.verify('test', shadow.passwd))
TEST_TASK_UUID = '3120f6a8-2665-4fa3-a785-79efd28bfe92'
TEST_TASK_NAME = 'test.task'
TEST_TASK_RESULT = '4ll y0ur b453 4r3 b3l0ng t0 u5'
class TaskResultTest(TestCase):
def test__set_result_fields_not_ready(self):
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME)
mock.ready.return_value = False
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
self.assertFalse(tr.is_finished)
self.assertFalse(tr.is_success)
self.assertEqual(tr.state, '')
self.assertEqual(tr.result_body, '')
def test__set_result_fields_ready(self):
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME,
state='SUCCESS', result=TEST_TASK_RESULT)
mock.ready.return_value = True
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
self.assertTrue(tr.is_finished)
self.assertTrue(tr.is_success)
self.assertEqual(tr.state, 'SUCCESS')
self.assertEqual(tr.result_body, TEST_TASK_RESULT)
def test__set_result_fields_exception(self):
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME,
state='FAILURE', result=Exception('Fail'))
mock.ready.return_value = True
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
self.assertTrue(tr.is_finished)
self.assertFalse(tr.is_success)
self.assertEqual(tr.state, 'FAILURE')
self.assertEqual(tr.result_body, 'Fail')
@patch('osusers.models.AsyncResult')
def test_update_taskstatus_unfinished(self, asyncres):
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME)
mock.ready.return_value = False
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
self.assertFalse(tr.is_finished)
mymock = asyncres(TEST_TASK_UUID)
mymock.ready.return_value = True
mymock.state = 'SUCCESS'
mymock.result = TEST_RESULT
tr.update_taskstatus()
mymock.ready.assert_called_with()
self.assertTrue(tr.is_finished)
@patch('osusers.models.AsyncResult')
def test_update_taskstatus_finished(self, asyncres):
mock = MagicMock(task_id=TEST_TASK_UUID, task_name=TEST_TASK_NAME)
mock.ready.return_value = True
mock.state = 'SUCCESS'
mock.result = TEST_RESULT
tr = DeleteTaskResult.objects.create(mock, TEST_TASK_NAME)
self.assertTrue(tr.is_finished)
mymock = asyncres(TEST_TASK_UUID)
tr.update_taskstatus()
self.assertFalse(mymock.ready.called)
self.assertTrue(tr.is_finished)
TEST_RESULT = MagicMock()
TEST_RESULT.task_id = TEST_TASK_UUID
TEST_RESULT.task_name = TEST_TASK_NAME
TEST_RESULT.ready.return_value = False
class TaskResultManagerTest(TestCase):
def test_create(self):
tr = DeleteTaskResult.objects.create(TEST_RESULT, TEST_TASK_NAME)
self.assertIsInstance(tr, DeleteTaskResult)
self.assertEqual(tr.task_uuid, TEST_TASK_UUID)
self.assertEqual(tr.task_name, TEST_TASK_NAME)
@override_settings(
OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test',
OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell'
)
class UserManagerTest(TestCaseWithCeleryTasks):
def _create_group(self):
return Group.objects.create(gid=10000, groupname='foo')
def test_get_next_uid_first(self):
self.assertEqual(User.objects.get_next_uid(), 10000)
def test_get_next_uid_second(self):
User.objects.create(
uid=10010, username='foo', group=self._create_group(),
homedir='/home/foo', shell='/bin/fooshell')
self.assertEqual(User.objects.get_next_uid(), 10011)
def test_get_next_username_first(self):
self.assertEqual(User.objects.get_next_username(), 'test01')
def test_get_next_username_second(self):
User.objects.create(
uid=10000, username='test01', group=self._create_group(),
homedir='/home/foo', shell='/bin/fooshell')
self.assertEqual(User.objects.get_next_username(), 'test02')
def test_get_next_username_gaps(self):
group = self._create_group()
User.objects.create(
uid=10000, username='test01', group=group,
homedir='/home/foo', shell='/bin/fooshell')
User.objects.create(
uid=10002, username='test03', group=group,
homedir='/home/foo', shell='/bin/fooshell')
self.assertEqual(User.objects.get_next_username(), 'test02')
def test_create_user_first(self):
user = User.objects.create_user()
self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000)
self.assertEqual(user.group.groupname, 'test01')
self.assertEqual(user.username, 'test01')
self.assertEqual(user.homedir, '/home/test01')
self.assertEqual(user.shell, '/bin/fooshell')
self.assertIsNotNone(user.shadow)
def test_create_user_tasks(self):
user = User.objects.create_user()
gtaskres = GroupTaskResult.objects.all()
self.assertEqual(len(gtaskres), 1)
self.assertEqual(gtaskres[0].task_name, 'create_ldap_group')
self.assertEqual(gtaskres[0].group, user.group)
def test_create_user_second(self):
User.objects.create_user()
user = User.objects.create_user()
self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10001)
self.assertEqual(user.group.gid, 10001)
self.assertEqual(user.group.groupname, 'test02')
self.assertEqual(user.username, 'test02')
self.assertEqual(user.homedir, '/home/test02')
self.assertEqual(user.shell, '/bin/fooshell')
self.assertIsNotNone(user.shadow)
self.assertEqual(len(User.objects.all()), 2)
def test_create_user_known_password(self):
user = User.objects.create_user(password='foobar')
self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000)
self.assertEqual(user.group.groupname, 'test01')
self.assertEqual(user.username, 'test01')
self.assertEqual(user.homedir, '/home/test01')
self.assertEqual(user.shell, '/bin/fooshell')
self.assertIsNotNone(user.shadow)
self.assertTrue(sha512_crypt.verify('foobar', user.shadow.passwd))
def test_create_user_predefined_username(self):
user = User.objects.create_user(username='tester')
self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000)
self.assertEqual(user.group.groupname, 'tester')
self.assertEqual(user.username, 'tester')
self.assertEqual(user.homedir, '/home/tester')
self.assertEqual(user.shell, '/bin/fooshell')
self.assertIsNotNone(user.shadow)
def test_create_user_commit(self):
user = User.objects.create_user(commit=True)
self.assertIsInstance(user, User)
self.assertEqual(user.uid, 10000)
self.assertEqual(user.group.gid, 10000)
self.assertEqual(user.group.groupname, 'test01')
self.assertEqual(user.username, 'test01')
self.assertEqual(user.homedir, '/home/test01')
self.assertEqual(user.shell, '/bin/fooshell')
self.assertIsNotNone(user.shadow)
@override_settings(
OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test',
OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell'
)
class UserTest(TestCaseWithCeleryTasks):
def test___str__(self):
user = User.objects.create_user()
self.assertEqual(str(user), 'test01 (10000)')
def test_set_password(self):
user = User.objects.create_user()
self.assertFalse(sha512_crypt.verify('test', user.shadow.passwd))
UserTaskResult.objects.all().delete()
user.set_password('test')
self.assertTrue(sha512_crypt.verify('test', user.shadow.passwd))
taskres = UserTaskResult.objects.all()
self.assertEqual(len(taskres), 1)
self.assertEqual(taskres[0].user, user)
self.assertEqual(taskres[0].task_name, 'create_ldap_user')
def test_save(self):
user = User.objects.create_user()
UserTaskResult.objects.all().delete()
user.save()
taskres = UserTaskResult.objects.all()
self.assertEqual(len(taskres), 1)
self.assertEqual(taskres[0].user, user)
self.assertEqual(taskres[0].task_name, 'create_ldap_user')
def test_delete_only_user(self):
user = User.objects.create_user()
user.delete()
taskres = DeleteTaskResult.objects.all()
self.assertEqual(len(taskres), 2)
self.assertIn('delete_ldap_user',
[r.task_name for r in taskres])
self.assertIn('delete_ldap_group_if_empty',
[r.task_name for r in taskres])
self.assertEqual(len(User.objects.all()), 0)
def test_delete_additional_groups(self):
group1 = Group.objects.create(gid=2000, groupname='group1')
group2 = Group.objects.create(gid=2001, groupname='group2')
user = User.objects.create_user()
for group in [group1, group2]:
user.additionalgroup_set.add(
AdditionalGroup.objects.create(user=user, group=group))
user.delete()
taskres = DeleteTaskResult.objects.all()
self.assertEqual(len(taskres), 4)
tasknames = [t.task_name for t in taskres]
self.assertEqual(tasknames.count('remove_ldap_user_from_group'), 2)
self.assertEqual(tasknames.count('delete_ldap_user'), 1)
self.assertEqual(tasknames.count('delete_ldap_group_if_empty'), 1)
self.assertEqual(len(User.objects.all()), 0)
self.assertEqual(len(AdditionalGroup.objects.all()), 0)

View file

@ -0,0 +1,22 @@
from django.test import TestCase
from osusers.tasks import LdapRouter
class LdapRouterTest(TestCase):
def setUp(self):
self.router = LdapRouter()
super(LdapRouterTest, self).setUp()
def test_ldap_tasks_are_routed_to_ldap_queue(self):
route = self.router.route_for_task(
'some_ldap_task')
self.assertEqual(
route,
{'exchange': 'ldap',
'exchange_type': 'direct',
'queue': 'ldap'})
def test_non_ldap_tasks_are_routed_to_default(self):
self.assertIsNone(
self.router.route_for_task('other'))

View file

@ -1,3 +0,0 @@
from django.shortcuts import render
# Create your views here.

View file

@ -6,3 +6,8 @@ logutils==0.3.3
South==0.8.4
psycopg2==2.5.3
passlib==1.6.2
celery==3.1.11
billiard==3.3.0.17
kombu==3.0.16
pytz==2014.3
pyaml==14.05.7

View file

@ -1,5 +1,7 @@
# Local development dependencies go here
-r base.txt
coverage==3.7.1
mock==1.0.1
django-debug-toolbar==1.2.1
Sphinx==1.2.2
releases==0.6.1

View file

@ -1,3 +1,4 @@
# Test dependencies go here.
-r base.txt
coverage==3.7.1
mock==1.0.1