gva/gnuviechadmin/osusers/models.py

362 lines
12 KiB
Python
Raw Normal View History

from datetime import date
import os
from django.db import models
from django.conf import settings
2014-05-24 21:53:49 +02:00
from django.core.exceptions import ValidationError
from django.utils import timezone
from django.utils.encoding import python_2_unicode_compatible
2014-05-24 21:53:49 +02:00
from django.utils.translation import ugettext as _
2014-05-24 21:28:33 +02:00
from model_utils.models import TimeStampedModel
from celery.result import AsyncResult
from passlib.hash import sha512_crypt
from passlib.utils import generate_password
from .tasks import (
add_ldap_user_to_group,
create_ldap_group,
create_ldap_user,
delete_ldap_group_if_empty,
delete_ldap_user,
remove_ldap_user_from_group,
)
class TaskResult(TimeStampedModel, models.Model):
task_uuid = models.CharField(primary_key=True, max_length=64, blank=False)
task_name = models.CharField(max_length=255, blank=False, db_index=True)
is_finished = models.BooleanField(default=False)
is_success = models.BooleanField(default=False)
state = models.CharField(max_length=10)
result_body = models.TextField(blank=True)
class Meta:
abstract = True
def _set_result_fields(self, asyncresult):
if asyncresult.ready():
self.is_finished = True
self.is_success = asyncresult.state == 'SUCCESS'
self.result_body = str(asyncresult.result)
self.state = asyncresult.state
asyncresult.get(no_ack=False)
def update_taskstatus(self):
if not self.is_finished:
asyncresult = AsyncResult(self.task_uuid, task_name=self.task_name)
self._set_result_fields(asyncresult)
self.save()
class GroupManager(models.Manager):
def get_next_gid(self):
q = self.aggregate(models.Max('gid'))
if q['gid__max'] is None:
return settings.OSUSER_MINGID
return max(settings.OSUSER_MINGID, q['gid__max'] + 1)
2014-05-24 21:28:33 +02:00
@python_2_unicode_compatible
2014-05-24 21:28:33 +02:00
class Group(TimeStampedModel, models.Model):
groupname = models.CharField(
_('Group name'), max_length=16, unique=True)
gid = models.PositiveSmallIntegerField(
_('Group ID'), unique=True, primary_key=True)
descr = models.TextField(_('Description'), blank=True)
passwd = models.CharField(
_('Group password'), max_length=128, blank=True)
objects = GroupManager()
class Meta:
verbose_name = _('Group')
verbose_name_plural = _('Groups')
def __str__(self):
return '{0} ({1})'.format(self.groupname, self.gid)
2014-05-24 21:28:33 +02:00
def save(self, *args, **kwargs):
super(Group, self).save(*args, **kwargs)
GroupTaskResult.objects.create_grouptaskresult(
self, create_ldap_group.delay(self.groupname, self.gid, self.descr)
)
return self
def delete(self, *args, **kwargs):
DeleteTaskResult.objects.create_deletetaskresult(
'group', self.groupname,
delete_ldap_group_if_empty.delay(self.groupname)
)
super(Group, self).delete(*args, **kwargs)
2014-05-24 21:28:33 +02:00
class TaskResultManager(models.Manager):
def create(self, asyncresult):
result = self.model(
task_uuid=asyncresult.task_id, task_name=asyncresult.task_name
)
result._set_result_fields(asyncresult)
return result
class DeleteTaskResultManager(TaskResultManager):
def create_deletetaskresult(self, modeltype, modelname, asyncresult):
taskresult = super(DeleteTaskResultManager, self).create(
asyncresult)
taskresult.modeltype = modeltype
taskresult.modelname = modelname
taskresult.save()
return taskresult
class DeleteTaskResult(TaskResult):
modeltype = models.CharField(max_length=20, db_index=True)
modelname = models.CharField(max_length=255)
objects = DeleteTaskResultManager()
class GroupTaskResultManager(TaskResultManager):
def create_grouptaskresult(self, group, asyncresult, commit=False):
taskresult = super(GroupTaskResultManager, self).create(
asyncresult)
taskresult.group = group
taskresult.save()
return taskresult
class GroupTaskResult(TaskResult):
group = models.ForeignKey(Group)
objects = GroupTaskResultManager()
class UserManager(models.Manager):
def get_next_uid(self):
q = self.aggregate(models.Max('uid'))
if q['uid__max'] is None:
return settings.OSUSER_MINUID
return max(settings.OSUSER_MINUID, q['uid__max'] + 1)
def get_next_username(self):
count = 1
usernameformat = "{0}{1:02d}"
nextuser = usernameformat.format(settings.OSUSER_USERNAME_PREFIX,
count)
for user in self.values('username').filter(
username__startswith=settings.OSUSER_USERNAME_PREFIX).order_by(
'username'):
if user['username'] == nextuser:
count += 1
nextuser = usernameformat.format(
settings.OSUSER_USERNAME_PREFIX, count)
else:
break
return nextuser
def create_user(self, username=None, password=None, commit=False):
uid = self.get_next_uid()
gid = Group.objects.get_next_gid()
if username is None:
username = self.get_next_username()
if password is None:
password = generate_password()
homedir = os.path.join(settings.OSUSER_HOME_BASEPATH, username)
group = Group.objects.create(groupname=username, gid=gid)
GroupTaskResult.objects.create(
group,
create_ldap_group.delay(group.groupname, group.gid, group.descr)
)
user = self.create(username=username, group=group, uid=uid,
homedir=homedir,
shell=settings.OSUSER_DEFAULT_SHELL)
Shadow.objects.create_shadow(user=user, password=password)
user.set_password(password)
if commit:
user.save()
return user
@python_2_unicode_compatible
2014-05-24 21:28:33 +02:00
class User(TimeStampedModel, models.Model):
username = models.CharField(
_('User name'), max_length=64, unique=True)
uid = models.PositiveSmallIntegerField(
_('User ID'), unique=True, primary_key=True)
group = models.ForeignKey(Group, verbose_name=_('Group'))
gecos = models.CharField(_('Gecos field'), max_length=128, blank=True)
homedir = models.CharField(_('Home directory'), max_length=256)
shell = models.CharField(_('Login shell'), max_length=64)
objects = UserManager()
class Meta:
verbose_name = _('User')
verbose_name_plural = _('Users')
2014-05-24 21:28:33 +02:00
def __str__(self):
return '{0} ({1})'.format(self.username, self.uid)
2014-05-24 21:28:33 +02:00
def set_password(self, password):
UserTaskResult.objects.create_usertaskresult(
self,
create_ldap_user.delay(
self.username, self.uid, self.group.id, self.gecos,
self.homedir, self.shell, password
),
commit=True
)
def save(self, *args, **kwargs):
UserTaskResult.objects.create_usertaskresult(
self,
create_ldap_user.delay(
self.username, self.uid, self.group.id, self.gecos,
self.homedir, self.shell, password=None
)
)
return super(User, self).save(*args, **kwargs)
def delete(self, *args, **kwargs):
for group in [
ag.group for ag in AdditionalGroup.objects.filter(user=self)
]:
DeleteTaskResult.objects.create_deletetaskresult(
'usergroup',
'{0} in {1}'.format(self.username, group.groupname),
remove_ldap_user_from_group.delay(
self.username, group.groupname)
)
DeleteTaskResult.objects.create_deletetaskresult(
'user', self.username,
delete_ldap_user.delay(self.username)
)
DeleteTaskResult.objects.create_deletetaskresult(
'group', self.group.groupname,
delete_ldap_group_if_empty.delay(self.group.groupname)
)
self.group.delete()
super(User, self).delete(*args, **kwargs)
class UserTaskResultManager(TaskResultManager):
def create_usertaskresult(self, user, asyncresult, commit=False):
taskresult = self.create(asyncresult)
taskresult.user = user
taskresult.save()
return taskresult
class UserTaskResult(TaskResult):
user = models.ForeignKey(User)
objects = UserTaskResultManager()
class ShadowManager(models.Manager):
def create_shadow(self, user, password):
changedays = (timezone.now().date() - date(1970, 1, 1)).days
pwhash = sha512_crypt.encrypt(password)
shadow = self.create(
user=user, changedays=changedays,
minage=0, maxage=None, gracedays=7,
inactdays=30, expiredays=None, passwd=pwhash
)
shadow.save()
return shadow
@python_2_unicode_compatible
2014-05-24 21:28:33 +02:00
class Shadow(TimeStampedModel, models.Model):
user = models.OneToOneField(User, primary_key=True, verbose_name=_('User'))
passwd = models.CharField(_('Encrypted password'), max_length=128)
changedays = models.PositiveSmallIntegerField(
_('Date of last change'),
help_text=_('This is expressed in days since Jan 1, 1970'),
blank=True, null=True)
minage = models.PositiveSmallIntegerField(
_('Minimum age'),
help_text=_('Minimum number of days before the password can be'
' changed'),
blank=True, null=True)
maxage = models.PositiveSmallIntegerField(
_('Maximum age'),
help_text=_('Maximum number of days after which the password has to'
' be changed'),
blank=True, null=True)
gracedays = models.PositiveSmallIntegerField(
_('Grace period'),
help_text=_('The number of days before the password is going to'
' expire'),
blank=True, null=True)
inactdays = models.PositiveSmallIntegerField(
_('Inactivity period'),
help_text=_('The number of days after the password has expired during'
' which the password should still be accepted'),
blank=True, null=True)
expiredays = models.PositiveSmallIntegerField(
_('Account expiration date'),
help_text=_('The date of expiration of the account, expressed as'
' number of days since Jan 1, 1970'),
blank=True, null=True, default=None)
objects = ShadowManager()
class Meta:
verbose_name = _('Shadow password')
verbose_name_plural = _('Shadow passwords')
2014-05-24 21:53:49 +02:00
def __str__(self):
return 'for user {0}'.format(self.user)
2014-05-24 21:53:49 +02:00
@python_2_unicode_compatible
2014-05-24 21:53:49 +02:00
class AdditionalGroup(TimeStampedModel, models.Model):
user = models.ForeignKey(User)
group = models.ForeignKey(Group)
class Meta:
unique_together = ('user', 'group')
verbose_name = _('Additional group')
verbose_name_plural = _('Additional groups')
2014-05-24 21:53:49 +02:00
def clean(self):
if self.user.group == self.group:
raise ValidationError(_(
"You can not use a user's primary group."))
def save(self, *args, **kwargs):
res = add_ldap_user_to_group.delay(
self.user.username, self.group.groupname)
GroupTaskResult.objects.create(
group=self.group, task_uuid=res.id,
task_name=res.task_name
)
super(AdditionalGroup, self).save(*args, **kwargs)
def delete(self, *args, **kwargs):
DeleteTaskResult.objects.create_deletetaskresult(
'usergroup',
str(self),
remove_ldap_user_from_group.delay(
self.user.username, self.group.groupname)
)
super(AdditionalGroup, self).delete(*args, **kwargs)
def __str__(self):
return '{0} in {1}'.format(self.user, self.group)