Jan Dittberner
865f54ab67
- modify osusers.tasks and osusers.models to avoid serialization of full models for celery tasks and use the required fields only
237 lines
8.1 KiB
Python
237 lines
8.1 KiB
Python
from datetime import date
|
|
import os
|
|
|
|
from django.db import models
|
|
from django.conf import settings
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils import timezone
|
|
from django.utils.encoding import python_2_unicode_compatible
|
|
from django.utils.translation import ugettext as _
|
|
|
|
from model_utils.models import TimeStampedModel
|
|
|
|
from passlib.hash import sha512_crypt
|
|
from passlib.utils import generate_password
|
|
|
|
from .tasks import (
|
|
add_ldap_user_to_group,
|
|
create_ldap_group,
|
|
create_ldap_user,
|
|
delete_ldap_group_if_empty,
|
|
delete_ldap_user,
|
|
remove_ldap_user_from_group,
|
|
)
|
|
|
|
|
|
class GroupManager(models.Manager):
|
|
|
|
def get_next_gid(self):
|
|
q = self.aggregate(models.Max('gid'))
|
|
if q['gid__max'] is None:
|
|
return settings.OSUSER_MINGID
|
|
return max(settings.OSUSER_MINGID, q['gid__max'] + 1)
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class Group(TimeStampedModel, models.Model):
|
|
groupname = models.CharField(
|
|
_('Group name'), max_length=16, unique=True)
|
|
gid = models.PositiveSmallIntegerField(
|
|
_('Group ID'), unique=True, primary_key=True)
|
|
descr = models.TextField(_('Description'), blank=True)
|
|
passwd = models.CharField(
|
|
_('Group password'), max_length=128, blank=True)
|
|
|
|
objects = GroupManager()
|
|
|
|
class Meta:
|
|
verbose_name = _('Group')
|
|
verbose_name_plural = _('Groups')
|
|
|
|
def __str__(self):
|
|
return '{0} ({1})'.format(self.groupname, self.gid)
|
|
|
|
def save(self, *args, **kwargs):
|
|
super(Group, self).save(*args, **kwargs)
|
|
create_ldap_group.delay(self.groupname, self.gid, self.descr)
|
|
return self
|
|
|
|
def delete(self, *args, **kwargs):
|
|
delete_ldap_group_if_empty.delay(self.groupname)
|
|
super(Group, self).delete(*args, **kwargs)
|
|
|
|
|
|
class UserManager(models.Manager):
|
|
|
|
def get_next_uid(self):
|
|
q = self.aggregate(models.Max('uid'))
|
|
if q['uid__max'] is None:
|
|
return settings.OSUSER_MINUID
|
|
return max(settings.OSUSER_MINUID, q['uid__max'] + 1)
|
|
|
|
def get_next_username(self):
|
|
count = 1
|
|
usernameformat = "{0}{1:02d}"
|
|
nextuser = usernameformat.format(settings.OSUSER_USERNAME_PREFIX,
|
|
count)
|
|
for user in self.values('username').filter(
|
|
username__startswith=settings.OSUSER_USERNAME_PREFIX).order_by(
|
|
'username'):
|
|
if user['username'] == nextuser:
|
|
count += 1
|
|
nextuser = usernameformat.format(
|
|
settings.OSUSER_USERNAME_PREFIX, count)
|
|
else:
|
|
break
|
|
return nextuser
|
|
|
|
def create_user(self, username=None, password=None, commit=False):
|
|
uid = self.get_next_uid()
|
|
gid = Group.objects.get_next_gid()
|
|
if username is None:
|
|
username = self.get_next_username()
|
|
if password is None:
|
|
password = generate_password()
|
|
homedir = os.path.join(settings.OSUSER_HOME_BASEPATH, username)
|
|
group = Group.objects.create(groupname=username, gid=gid)
|
|
create_ldap_group.delay(group.groupname, group.gid, group.descr)
|
|
user = self.create(username=username, group=group, uid=uid,
|
|
homedir=homedir,
|
|
shell=settings.OSUSER_DEFAULT_SHELL)
|
|
Shadow.objects.create_shadow(user=user, password=password)
|
|
user.set_password(password)
|
|
if commit:
|
|
user.save()
|
|
return user
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class User(TimeStampedModel, models.Model):
|
|
username = models.CharField(
|
|
_('User name'), max_length=64, unique=True)
|
|
uid = models.PositiveSmallIntegerField(
|
|
_('User ID'), unique=True, primary_key=True)
|
|
group = models.ForeignKey(Group, verbose_name=_('Group'))
|
|
gecos = models.CharField(_('Gecos field'), max_length=128, blank=True)
|
|
homedir = models.CharField(_('Home directory'), max_length=256)
|
|
shell = models.CharField(_('Login shell'), max_length=64)
|
|
|
|
objects = UserManager()
|
|
|
|
class Meta:
|
|
verbose_name = _('User')
|
|
verbose_name_plural = _('Users')
|
|
|
|
def __str__(self):
|
|
return '{0} ({1})'.format(self.username, self.uid)
|
|
|
|
def set_password(self, password):
|
|
create_ldap_user.delay(
|
|
self.username, self.uid, self.group.id, self.gecos, self.homedir,
|
|
self.shell, password
|
|
)
|
|
|
|
def save(self, *args, **kwargs):
|
|
create_ldap_user.delay(
|
|
self.username, self.uid, self.group.id, self.gecos, self.homedir,
|
|
self.shell, password=None
|
|
)
|
|
return super(User, self).save(*args, **kwargs)
|
|
|
|
def delete(self, *args, **kwargs):
|
|
for group in [
|
|
ag.group for ag in AdditionalGroup.objects.filter(user=self)
|
|
]:
|
|
remove_ldap_user_from_group.delay(self.username, group.groupname)
|
|
delete_ldap_user.delay(self.username)
|
|
delete_ldap_group_if_empty.delay(self.group.groupname)
|
|
self.group.delete()
|
|
super(User, self).delete(*args, **kwargs)
|
|
|
|
|
|
class ShadowManager(models.Manager):
|
|
|
|
def create_shadow(self, user, password):
|
|
changedays = (timezone.now().date() - date(1970, 1, 1)).days
|
|
pwhash = sha512_crypt.encrypt(password)
|
|
shadow = self.create(
|
|
user=user, changedays=changedays,
|
|
minage=0, maxage=None, gracedays=7,
|
|
inactdays=30, expiredays=None, passwd=pwhash
|
|
)
|
|
shadow.save()
|
|
return shadow
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class Shadow(TimeStampedModel, models.Model):
|
|
user = models.OneToOneField(User, primary_key=True, verbose_name=_('User'))
|
|
passwd = models.CharField(_('Encrypted password'), max_length=128)
|
|
changedays = models.PositiveSmallIntegerField(
|
|
_('Date of last change'),
|
|
help_text=_('This is expressed in days since Jan 1, 1970'),
|
|
blank=True, null=True)
|
|
minage = models.PositiveSmallIntegerField(
|
|
_('Minimum age'),
|
|
help_text=_('Minimum number of days before the password can be'
|
|
' changed'),
|
|
blank=True, null=True)
|
|
maxage = models.PositiveSmallIntegerField(
|
|
_('Maximum age'),
|
|
help_text=_('Maximum number of days after which the password has to'
|
|
' be changed'),
|
|
blank=True, null=True)
|
|
gracedays = models.PositiveSmallIntegerField(
|
|
_('Grace period'),
|
|
help_text=_('The number of days before the password is going to'
|
|
' expire'),
|
|
blank=True, null=True)
|
|
inactdays = models.PositiveSmallIntegerField(
|
|
_('Inactivity period'),
|
|
help_text=_('The number of days after the password has expired during'
|
|
' which the password should still be accepted'),
|
|
blank=True, null=True)
|
|
expiredays = models.PositiveSmallIntegerField(
|
|
_('Account expiration date'),
|
|
help_text=_('The date of expiration of the account, expressed as'
|
|
' number of days since Jan 1, 1970'),
|
|
blank=True, null=True, default=None)
|
|
|
|
objects = ShadowManager()
|
|
|
|
class Meta:
|
|
verbose_name = _('Shadow password')
|
|
verbose_name_plural = _('Shadow passwords')
|
|
|
|
def __str__(self):
|
|
return 'for user {0}'.format(self.user)
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class AdditionalGroup(TimeStampedModel, models.Model):
|
|
user = models.ForeignKey(User)
|
|
group = models.ForeignKey(Group)
|
|
|
|
class Meta:
|
|
unique_together = ('user', 'group')
|
|
verbose_name = _('Additional group')
|
|
verbose_name_plural = _('Additional groups')
|
|
|
|
def clean(self):
|
|
if self.user.group == self.group:
|
|
raise ValidationError(_(
|
|
"You can not use a user's primary group."))
|
|
|
|
def save(self, *args, **kwargs):
|
|
add_ldap_user_to_group.delay(
|
|
self.user.username, self.group.groupname)
|
|
super(AdditionalGroup, self).save(*args, **kwargs)
|
|
|
|
def delete(self, *args, **kwargs):
|
|
remove_ldap_user_from_group.delay(
|
|
self.user.username, self.group.groupname)
|
|
super(AdditionalGroup, self).delete(*args, **kwargs)
|
|
|
|
def __str__(self):
|
|
return '{0} in {1}'.format(self.user, self.group)
|