gva/gnuviechadmin/osusers/models.py

380 lines
12 KiB
Python

from datetime import date
import os
from django.db import models, transaction
from django.conf import settings
from django.core.exceptions import ValidationError
from django.utils import timezone
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext as _
from model_utils.models import TimeStampedModel
from celery.result import AsyncResult
from passlib.hash import sha512_crypt
from passlib.utils import generate_password
from .tasks import (
add_ldap_user_to_group,
create_ldap_group,
create_ldap_user,
delete_ldap_group_if_empty,
delete_ldap_user,
remove_ldap_user_from_group,
)
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL = _(
"You can not use a user's primary group.")
class TaskResult(TimeStampedModel, models.Model):
task_uuid = models.CharField(primary_key=True, max_length=64, blank=False)
task_name = models.CharField(max_length=255, blank=False, db_index=True)
is_finished = models.BooleanField(default=False)
is_success = models.BooleanField(default=False)
state = models.CharField(max_length=10)
result_body = models.TextField(blank=True)
class Meta:
abstract = True
def _set_result_fields(self, asyncresult):
if asyncresult.ready():
self.is_finished = True
self.is_success = asyncresult.state == 'SUCCESS'
self.result_body = str(asyncresult.result)
self.state = asyncresult.state
asyncresult.get(no_ack=False)
def update_taskstatus(self):
if not self.is_finished:
asyncresult = AsyncResult(self.task_uuid)
self._set_result_fields(asyncresult)
self.save()
class GroupManager(models.Manager):
def get_next_gid(self):
q = self.aggregate(models.Max('gid'))
if q['gid__max'] is None:
return settings.OSUSER_MINGID
return max(settings.OSUSER_MINGID, q['gid__max'] + 1)
@python_2_unicode_compatible
class Group(TimeStampedModel, models.Model):
groupname = models.CharField(
_('Group name'), max_length=16, unique=True)
gid = models.PositiveSmallIntegerField(
_('Group ID'), unique=True, primary_key=True)
descr = models.TextField(_('Description'), blank=True)
passwd = models.CharField(
_('Group password'), max_length=128, blank=True)
objects = GroupManager()
class Meta:
verbose_name = _('Group')
verbose_name_plural = _('Groups')
def __str__(self):
return '{0} ({1})'.format(self.groupname, self.gid)
def save(self, *args, **kwargs):
super(Group, self).save(*args, **kwargs)
GroupTaskResult.objects.create_grouptaskresult(
self,
create_ldap_group.delay(self.groupname, self.gid, self.descr),
'create_ldap_group'
)
return self
def delete(self, *args, **kwargs):
DeleteTaskResult.objects.create_deletetaskresult(
'group', self.groupname,
delete_ldap_group_if_empty.delay(self.groupname),
'delete_ldap_group_if_empty'
)
super(Group, self).delete(*args, **kwargs)
class TaskResultManager(models.Manager):
def create(self, asyncresult, task_name):
result = self.model(
task_uuid=asyncresult.task_id, task_name=task_name
)
result._set_result_fields(asyncresult)
return result
class DeleteTaskResultManager(TaskResultManager):
def create_deletetaskresult(
self, modeltype, modelname, asyncresult, task_name
):
taskresult = super(DeleteTaskResultManager, self).create(
asyncresult, task_name)
taskresult.modeltype = modeltype
taskresult.modelname = modelname
taskresult.save()
return taskresult
class DeleteTaskResult(TaskResult):
modeltype = models.CharField(max_length=20, db_index=True)
modelname = models.CharField(max_length=255)
objects = DeleteTaskResultManager()
class GroupTaskResultManager(TaskResultManager):
def create_grouptaskresult(
self, group, asyncresult, task_name, commit=False
):
taskresult = super(GroupTaskResultManager, self).create(
asyncresult, task_name)
taskresult.group = group
taskresult.save()
return taskresult
class GroupTaskResult(TaskResult):
group = models.ForeignKey(Group)
objects = GroupTaskResultManager()
class UserManager(models.Manager):
def get_next_uid(self):
q = self.aggregate(models.Max('uid'))
if q['uid__max'] is None:
return settings.OSUSER_MINUID
return max(settings.OSUSER_MINUID, q['uid__max'] + 1)
def get_next_username(self):
count = 1
usernameformat = "{0}{1:02d}"
nextuser = usernameformat.format(settings.OSUSER_USERNAME_PREFIX,
count)
for user in self.values('username').filter(
username__startswith=settings.OSUSER_USERNAME_PREFIX).order_by(
'username'):
if user['username'] == nextuser:
count += 1
nextuser = usernameformat.format(
settings.OSUSER_USERNAME_PREFIX, count)
else:
break
return nextuser
@transaction.atomic
def create_user(self, username=None, password=None, commit=False):
uid = self.get_next_uid()
gid = Group.objects.get_next_gid()
if username is None:
username = self.get_next_username()
if password is None:
password = generate_password()
homedir = os.path.join(settings.OSUSER_HOME_BASEPATH, username)
group = Group.objects.create(groupname=username, gid=gid)
user = self.create(username=username, group=group, uid=uid,
homedir=homedir,
shell=settings.OSUSER_DEFAULT_SHELL)
user.set_password(password)
if commit:
user.save()
return user
@python_2_unicode_compatible
class User(TimeStampedModel, models.Model):
username = models.CharField(
_('User name'), max_length=64, unique=True)
uid = models.PositiveSmallIntegerField(
_('User ID'), unique=True, primary_key=True)
group = models.ForeignKey(Group, verbose_name=_('Group'))
gecos = models.CharField(_('Gecos field'), max_length=128, blank=True)
homedir = models.CharField(_('Home directory'), max_length=256)
shell = models.CharField(_('Login shell'), max_length=64)
objects = UserManager()
class Meta:
verbose_name = _('User')
verbose_name_plural = _('Users')
def __str__(self):
return '{0} ({1})'.format(self.username, self.uid)
def set_password(self, password):
if hasattr(self, 'shadow'):
self.shadow.set_password(password)
else:
self.shadow = Shadow.objects.create_shadow(
user=self, password=password
)
UserTaskResult.objects.create_usertaskresult(
self,
create_ldap_user.delay(
self.username, self.uid, self.group.gid, self.gecos,
self.homedir, self.shell, password
),
'create_ldap_user',
commit=True
)
def save(self, *args, **kwargs):
UserTaskResult.objects.create_usertaskresult(
self,
create_ldap_user.delay(
self.username, self.uid, self.group.gid, self.gecos,
self.homedir, self.shell, password=None
),
'create_ldap_user'
)
return super(User, self).save(*args, **kwargs)
def delete(self, *args, **kwargs):
for group in [
ag.group for ag in AdditionalGroup.objects.filter(user=self)
]:
DeleteTaskResult.objects.create_deletetaskresult(
'usergroup',
'{0} in {1}'.format(self.username, group.groupname),
remove_ldap_user_from_group.delay(
self.username, group.groupname),
'remove_ldap_user_from_group',
)
DeleteTaskResult.objects.create_deletetaskresult(
'user', self.username,
delete_ldap_user.delay(self.username),
'delete_ldap_user'
)
self.group.delete()
super(User, self).delete(*args, **kwargs)
class UserTaskResultManager(TaskResultManager):
def create_usertaskresult(
self, user, asyncresult, task_name, commit=False
):
taskresult = self.create(asyncresult, task_name)
taskresult.user = user
taskresult.save()
return taskresult
class UserTaskResult(TaskResult):
user = models.ForeignKey(User)
objects = UserTaskResultManager()
class ShadowManager(models.Manager):
def create_shadow(self, user, password):
changedays = (timezone.now().date() - date(1970, 1, 1)).days
shadow = self.create(
user=user, changedays=changedays,
minage=0, maxage=None, gracedays=7,
inactdays=30, expiredays=None
)
shadow.set_password(password)
shadow.save()
return shadow
@python_2_unicode_compatible
class Shadow(TimeStampedModel, models.Model):
user = models.OneToOneField(User, primary_key=True, verbose_name=_('User'))
passwd = models.CharField(_('Encrypted password'), max_length=128)
changedays = models.PositiveSmallIntegerField(
_('Date of last change'),
help_text=_('This is expressed in days since Jan 1, 1970'),
blank=True, null=True)
minage = models.PositiveSmallIntegerField(
_('Minimum age'),
help_text=_('Minimum number of days before the password can be'
' changed'),
blank=True, null=True)
maxage = models.PositiveSmallIntegerField(
_('Maximum age'),
help_text=_('Maximum number of days after which the password has to'
' be changed'),
blank=True, null=True)
gracedays = models.PositiveSmallIntegerField(
_('Grace period'),
help_text=_('The number of days before the password is going to'
' expire'),
blank=True, null=True)
inactdays = models.PositiveSmallIntegerField(
_('Inactivity period'),
help_text=_('The number of days after the password has expired during'
' which the password should still be accepted'),
blank=True, null=True)
expiredays = models.PositiveSmallIntegerField(
_('Account expiration date'),
help_text=_('The date of expiration of the account, expressed as'
' number of days since Jan 1, 1970'),
blank=True, null=True, default=None)
objects = ShadowManager()
class Meta:
verbose_name = _('Shadow password')
verbose_name_plural = _('Shadow passwords')
def __str__(self):
return 'for user {0}'.format(self.user)
def set_password(self, password):
self.passwd = sha512_crypt.encrypt(password)
@python_2_unicode_compatible
class AdditionalGroup(TimeStampedModel, models.Model):
user = models.ForeignKey(User)
group = models.ForeignKey(Group)
class Meta:
unique_together = ('user', 'group')
verbose_name = _('Additional group')
verbose_name_plural = _('Additional groups')
def clean(self):
if self.user.group == self.group:
raise ValidationError(CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)
def save(self, *args, **kwargs):
GroupTaskResult.objects.create_grouptaskresult(
self.group,
add_ldap_user_to_group.delay(
self.user.username, self.group.groupname),
'add_ldap_user_to_group'
)
super(AdditionalGroup, self).save(*args, **kwargs)
def delete(self, *args, **kwargs):
DeleteTaskResult.objects.create_deletetaskresult(
'usergroup',
str(self),
remove_ldap_user_from_group.delay(
self.user.username, self.group.groupname),
'remove_ldap_user_from_group'
)
super(AdditionalGroup, self).delete(*args, **kwargs)
def __str__(self):
return '{0} in {1}'.format(self.user, self.group)