gva/gnuviechadmin/osusers/models.py

606 lines
20 KiB
Python

"""
This module defines the database models of operating system users.
"""
from __future__ import unicode_literals
import base64
from datetime import date
import logging
import os
import six
from django.db import models, transaction
from django.conf import settings
from django.core.exceptions import ValidationError
from django.utils import timezone
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext as _
from model_utils.models import TimeStampedModel
from passlib.hash import sha512_crypt
from passlib.utils import generate_password
from taskresults.models import TaskResult
from ldaptasks.tasks import (
add_ldap_user_to_group,
create_ldap_group,
create_ldap_user,
delete_ldap_group,
delete_ldap_user,
remove_ldap_user_from_group,
set_ldap_user_password,
)
from fileservertasks.tasks import (
delete_file_mail_userdir,
delete_file_sftp_userdir,
set_file_ssh_authorized_keys,
setup_file_mail_userdir,
setup_file_sftp_userdir,
)
_LOGGER = logging.getLogger(__name__)
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL = _(
"You can not use a user's primary group.")
class GroupManager(models.Manager):
"""
Manager class for :py:class:`osusers.models.Group`.
"""
def get_next_gid(self):
"""
Get the next available group id.
:returns: group id
:rtype: int
"""
q = self.aggregate(models.Max('gid'))
if q['gid__max'] is None:
return settings.OSUSER_MINGID
return max(settings.OSUSER_MINGID, q['gid__max'] + 1)
@python_2_unicode_compatible
class Group(TimeStampedModel, models.Model):
"""
This entity class corresponds to an operating system group.
"""
groupname = models.CharField(
_('Group name'), max_length=16, unique=True)
gid = models.PositiveSmallIntegerField(
_('Group ID'), unique=True, primary_key=True)
descr = models.TextField(_('Description'), blank=True)
passwd = models.CharField(
_('Group password'), max_length=128, blank=True)
objects = GroupManager()
class Meta:
verbose_name = _('Group')
verbose_name_plural = _('Groups')
def __str__(self):
return '{0} ({1})'.format(self.groupname, self.gid)
@transaction.atomic
def save(self, *args, **kwargs):
"""
Save the group to the database and synchronizes group information to
LDAP.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.save`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.save`
:return: self
:rtype: :py:class:`osusers.models.Group`
"""
super(Group, self).save(*args, **kwargs)
dn = create_ldap_group.delay(
self.groupname, self.gid, self.descr).get()
_LOGGER.info("created LDAP group with dn %s", dn)
return self
@transaction.atomic
def delete(self, *args, **kwargs):
"""
Delete the group from LDAP and the database.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.delete`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.delete`
"""
TaskResult.objects.create_task_result(
delete_ldap_group.delay(self.groupname),
'delete_ldap_group'
)
super(Group, self).delete(*args, **kwargs)
class UserManager(models.Manager):
"""
Manager class for :py:class:`osusers.models.User`.
"""
def get_next_uid(self):
"""
Get the next available user id.
:return: user id
:rtype: int
"""
q = self.aggregate(models.Max('uid'))
if q['uid__max'] is None:
return settings.OSUSER_MINUID
return max(settings.OSUSER_MINUID, q['uid__max'] + 1)
def get_next_username(self):
"""
Get the next available user name.
:return: user name
:rtype: str
"""
count = 1
usernameformat = "{0}{1:02d}"
nextuser = usernameformat.format(settings.OSUSER_USERNAME_PREFIX,
count)
for user in self.values('username').filter(
username__startswith=settings.OSUSER_USERNAME_PREFIX
).order_by('username'):
if user['username'] == nextuser:
count += 1
nextuser = usernameformat.format(
settings.OSUSER_USERNAME_PREFIX, count)
else:
break
return nextuser
@transaction.atomic
def create_user(
self, customer, username=None, password=None, commit=False
):
"""
Create a new user with a primary group named the same as the user and
an initial password.
If username is None the result of :py:meth:`get_next_username` is used.
If password is None a new password will be generated using passlib's
:py:func:`generate_password`.
:param customer: Django User instance this user is associated to
:param str username: the username or None
:param str password: the password or None
:param boolean commit: whether to commit the user data to the database
or not
:return: new user
:rtype: :py:class:`osusers.models.User`
"""
uid = self.get_next_uid()
gid = Group.objects.get_next_gid()
if username is None:
username = self.get_next_username()
if password is None:
password = generate_password()
homedir = os.path.join(settings.OSUSER_HOME_BASEPATH, username)
group = Group.objects.create(groupname=username, gid=gid)
user = self.create(username=username, group=group, uid=uid,
homedir=homedir, customer=customer,
shell=settings.OSUSER_DEFAULT_SHELL)
user.set_password(password)
if commit:
user.save()
return user
@python_2_unicode_compatible
class User(TimeStampedModel, models.Model):
"""
This entity class corresponds to an operating system user.
"""
username = models.CharField(
_('User name'), max_length=64, unique=True)
uid = models.PositiveSmallIntegerField(
_('User ID'), unique=True, primary_key=True)
group = models.ForeignKey(Group, verbose_name=_('Group'))
gecos = models.CharField(_('Gecos field'), max_length=128, blank=True)
homedir = models.CharField(_('Home directory'), max_length=256)
shell = models.CharField(_('Login shell'), max_length=64)
customer = models.ForeignKey(settings.AUTH_USER_MODEL)
objects = UserManager()
class Meta:
verbose_name = _('User')
verbose_name_plural = _('Users')
def __str__(self):
return '{0} ({1})'.format(self.username, self.uid)
@transaction.atomic
def set_password(self, password):
"""
Set the password of the user.
The password is set to the user's
:py:class:`Shadow <osusers.models.Shadow>` instance and to LDAP.
:param str password: the new password
"""
if hasattr(self, 'shadow'):
self.shadow.set_password(password)
success = set_ldap_user_password.delay(
self.username, password).get()
if success:
_LOGGER.info(
"successfully set LDAP password for %s", self.username)
else:
_LOGGER.error(
"setting the LDAP password for %s failed", self.username)
return success
else:
self.shadow = Shadow.objects.create_shadow(
user=self, password=password
)
dn = create_ldap_user.delay(
self.username, self.uid, self.group.gid, self.gecos,
self.homedir, self.shell, password
).get()
_LOGGER.info("set LDAP password for %s", dn)
return True
def is_sftp_user(self):
return self.additionalgroup_set.filter(
group__groupname=settings.OSUSER_SFTP_GROUP
).exists()
@transaction.atomic
def save(self, *args, **kwargs):
"""
Save the user to the database, create user directories and synchronize
user information to LDAP.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.save`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.save`
:return: self
:rtype: :py:class:`osusers.models.User`
"""
dn = create_ldap_user.delay(
self.username, self.uid, self.group.gid, self.gecos,
self.homedir, self.shell, password=None).get()
TaskResult.objects.create_task_result(
setup_file_sftp_userdir.delay(self.username),
'setup_file_sftp_userdir'
)
TaskResult.objects.create_task_result(
setup_file_mail_userdir.delay(self.username),
'setup_file_mail_userdir'
)
_LOGGER.info(
"created user %(user)s with LDAP dn %(dn)s, scheduled home "
"directory and mail base directory creation.", {
'user': self, 'dn': dn,
})
return super(User, self).save(*args, **kwargs)
@transaction.atomic
def delete(self, *args, **kwargs):
"""
Delete the user and its groups from LDAP and the database and remove
the user's directories.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.delete`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.delete`
"""
TaskResult.objects.create_task_result(
delete_file_mail_userdir.delay(self.username),
'delete_file_mail_userdir'
)
TaskResult.objects.create_task_result(
delete_file_sftp_userdir.delay(self.username),
'delete_file_sftp_userdir'
)
for group in [ag.group for ag in self.additionalgroup_set.all()]:
remove_ldap_user_from_group.delay(
self.username, group.groupname).get()
delete_ldap_user.delay(self.username).get()
self.group.delete()
super(User, self).delete(*args, **kwargs)
class ShadowManager(models.Manager):
"""
Manager class for :py:class:`osusers.models.Shadow`.
"""
@transaction.atomic
def create_shadow(self, user, password):
"""
Create a new shadow instance with typical Linux settings for the given
user with the given password.
:param user: :py:class:`User <osusers.models.User>` instance
:param str password: the password
:return: new Shadow instance
:rtype: :py:class:`osusers.models.Shadow` instance
"""
changedays = (timezone.now().date() - date(1970, 1, 1)).days
shadow = self.create(
user=user, changedays=changedays,
minage=0, maxage=None, gracedays=7,
inactdays=30, expiredays=None
)
shadow.set_password(password)
shadow.save()
return shadow
@python_2_unicode_compatible
class Shadow(TimeStampedModel, models.Model):
"""
This entity class corresponds to an operating system user's shadow file
entry.
"""
user = models.OneToOneField(User, primary_key=True, verbose_name=_('User'))
passwd = models.CharField(_('Encrypted password'), max_length=128)
changedays = models.PositiveSmallIntegerField(
_('Date of last change'),
help_text=_('This is expressed in days since Jan 1, 1970'),
blank=True, null=True)
minage = models.PositiveSmallIntegerField(
_('Minimum age'),
help_text=_('Minimum number of days before the password can be'
' changed'),
blank=True, null=True)
maxage = models.PositiveSmallIntegerField(
_('Maximum age'),
help_text=_('Maximum number of days after which the password has to'
' be changed'),
blank=True, null=True)
gracedays = models.PositiveSmallIntegerField(
_('Grace period'),
help_text=_('The number of days before the password is going to'
' expire'),
blank=True, null=True)
inactdays = models.PositiveSmallIntegerField(
_('Inactivity period'),
help_text=_('The number of days after the password has expired during'
' which the password should still be accepted'),
blank=True, null=True)
expiredays = models.PositiveSmallIntegerField(
_('Account expiration date'),
help_text=_('The date of expiration of the account, expressed as'
' number of days since Jan 1, 1970'),
blank=True, null=True, default=None)
objects = ShadowManager()
class Meta:
verbose_name = _('Shadow password')
verbose_name_plural = _('Shadow passwords')
def __str__(self):
return 'for user {0}'.format(self.user)
def set_password(self, password):
"""
Set and encrypt the password.
:param str password: the password
"""
self.passwd = sha512_crypt.encrypt(password)
@python_2_unicode_compatible
class AdditionalGroup(TimeStampedModel, models.Model):
"""
This entity class corresponds to additional group assignments for an
:py:class:`operating system user <osusers.models.User>`.
"""
user = models.ForeignKey(User)
group = models.ForeignKey(Group)
class Meta:
unique_together = ('user', 'group')
verbose_name = _('Additional group')
verbose_name_plural = _('Additional groups')
def __str__(self):
return '{0} in {1}'.format(self.user, self.group)
def clean(self):
"""
Ensure that the assigned group is different from the user's primary
group.
"""
if self.user.group == self.group:
raise ValidationError(CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)
@transaction.atomic
def save(self, *args, **kwargs):
"""
Persists the group assignment to LDAP and the database.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.save`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.save`
:return: this instance
:rtype: :py:class:`AdditionalGroup <osusers.models.AdditionalGroup>`
"""
add_ldap_user_to_group.delay(
self.user.username, self.group.groupname).get()
return super(AdditionalGroup, self).save(*args, **kwargs)
@transaction.atomic
def delete(self, *args, **kwargs):
"""
Delete the group assignment from LDAP and the database.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.delete`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.delete`
"""
TaskResult.objects.create_task_result(
remove_ldap_user_from_group.delay(
self.user.username, self.group.groupname),
'remove_ldap_user_from_group'
)
super(AdditionalGroup, self).delete(*args, **kwargs)
class SshPublicKeyManager(models.Manager):
"""
Default manager for :py:class:`SSH public key
<osusers.models.SshPublicKey>` instances.
"""
def parse_keytext(self, keytext):
"""
Parse a SSH public key in OpenSSH or :rfc:`4716` format into its
components algorithm, key data and comment.
:param str keytext: key text
:return: triple of algorithm name, key data and comment
:rtype: triple of str
"""
if keytext.startswith('---- BEGIN SSH2 PUBLIC KEY ----'):
comment = ''
data = ''
continued = ''
headers = {}
for line in keytext.splitlines():
if line == '---- BEGIN SSH2 PUBLIC KEY ----':
continue
elif ':' in line: # a header line
header_tag, header_value = [
item.strip() for item in line.split(':', 1)]
if header_value.endswith('\\'):
continued = header_value[:-1]
else:
headers[header_tag.lower()] = header_value
elif continued:
if line.endswith('\\'):
continued += line[:-1]
continue
header_value = continued + line
headers[header_tag.lower()] = header_value
continued = ''
elif line == '---- END SSH2 PUBLIC KEY ----':
break
elif line: # ignore empty lines
data += line
if 'comment' in headers:
comment = headers['comment']
else:
parts = keytext.split(None, 2)
if len(parts) < 2:
raise ValueError('invalid SSH public key')
data = parts[1]
comment = len(parts) == 3 and parts[2] or ""
try:
keybytes = base64.b64decode(data)
except TypeError:
raise ValueError('invalid SSH public key')
parts = keybytes.split(b'\x00' * 3)
alglength = six.byte2int(parts[1])
algname = parts[1][1:1+alglength]
return algname, data, comment
def create_ssh_public_key(self, user, keytext):
"""
Create a new :py:class:`SSH public key <osusers.models.SshPublicKey>`
for a user from the given key text representation. The text can be
either in openssh format or :rfc:`4716` format.
:param user: :py:class:`operating system user <osusers.models.User>`
:param str keytext: key text
:return: public key
:retype: :py:class:`osusers.models.SshPublicKey`
"""
algorithm, data, comment = self.parse_keytext(keytext)
return self.create(
user=user, algorithm=algorithm, data=data, comment=comment)
@python_2_unicode_compatible
class SshPublicKey(TimeStampedModel):
"""
This entity class represents single SSH keys for an :py:class:`operating
system user <osusers.models.User>`.
"""
user = models.ForeignKey(User, verbose_name=_('User'))
algorithm = models.CharField(_('Algorithm'), max_length=20)
data = models.TextField(_('Key bytes'),
help_text=_('Base64 encoded key bytes'))
comment = models.TextField(_('Comment'), blank=True)
objects = SshPublicKeyManager()
class Meta:
verbose_name = _('SSH public key')
verbose_name_plural = _('SSH public keys')
unique_together = [('user', 'algorithm', 'data')]
def __str__(self):
return "{algorithm} {data} {comment}".format(
algorithm=self.algorithm, data=self.data, comment=self.comment
).strip()
def save(self, **kwargs):
key = super(SshPublicKey, self).save(**kwargs)
TaskResult.objects.create_task_result(
set_file_ssh_authorized_keys.delay(
self.user.username, [
str(key) for key in
SshPublicKey.objects.filter(user=self.user)]),
'set_file_ssh_authorized_keys'
)
return key
def delete(self, **kwargs):
super(SshPublicKey, self).delete(**kwargs)
TaskResult.objects.create_task_result(
set_file_ssh_authorized_keys.delay(
self.user.username, [
str(key) for key in
SshPublicKey.objects.filter(user=self.user)]),
'set_file_ssh_authorized_keys'
)