gva/gnuviechadmin/osusers/models.py

528 lines
16 KiB
Python

"""
This module defines the database models of operating system users.
"""
import base64
import logging
import os
from datetime import date
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models, transaction
from django.dispatch import Signal
from django.utils import timezone
from django.utils.translation import gettext as _
from model_utils.models import TimeStampedModel
from passlib.handlers.sha2_crypt import sha512_crypt
from passlib.pwd import genword
_LOGGER = logging.getLogger(__name__)
password_set = Signal()
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL = _("You can not use a user's primary group.")
class GroupManager(models.Manager):
"""
Manager class for :py:class:`osusers.models.Group`.
"""
def get_next_gid(self):
"""
Get the next available group id.
:returns: group id
:rtype: int
"""
q = self.aggregate(models.Max("gid"))
if q["gid__max"] is None:
return settings.OSUSER_MINGID
return max(settings.OSUSER_MINGID, q["gid__max"] + 1)
class Group(TimeStampedModel, models.Model):
"""
This entity class corresponds to an operating system group.
"""
groupname = models.CharField(_("Group name"), max_length=16, unique=True)
gid = models.PositiveSmallIntegerField(_("Group ID"), unique=True, primary_key=True)
descr = models.TextField(_("Description"), blank=True)
passwd = models.CharField(_("Group password"), max_length=128, blank=True)
objects = GroupManager()
class Meta:
verbose_name = _("Group")
verbose_name_plural = _("Groups")
def __str__(self):
return "{0} ({1})".format(self.groupname, self.gid)
@transaction.atomic
def save(self, *args, **kwargs):
"""
Save the group to the database and synchronizes group information to
LDAP.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.save`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.save`
:return: self
:rtype: :py:class:`osusers.models.Group`
"""
super(Group, self).save(*args, **kwargs)
return self
@transaction.atomic
def delete(self, *args, **kwargs):
"""
Delete the group from LDAP and the database.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.delete`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.delete`
"""
super(Group, self).delete(*args, **kwargs)
class UserManager(models.Manager):
"""
Manager class for :py:class:`osusers.models.User`.
"""
def get_next_uid(self):
"""
Get the next available user id.
:return: user id
:rtype: int
"""
q = self.aggregate(models.Max("uid"))
if q["uid__max"] is None:
return settings.OSUSER_MINUID
return max(settings.OSUSER_MINUID, q["uid__max"] + 1)
def get_next_username(self):
"""
Get the next available user name.
:return: user name
:rtype: str
"""
count = 1
usernameformat = "{0}{1:02d}"
nextuser = usernameformat.format(settings.OSUSER_USERNAME_PREFIX, count)
for user in (
self.values("username")
.filter(username__startswith=settings.OSUSER_USERNAME_PREFIX)
.order_by("username")
):
if user["username"] == nextuser:
count += 1
nextuser = usernameformat.format(settings.OSUSER_USERNAME_PREFIX, count)
else:
break
return nextuser
@transaction.atomic
def create_user(self, customer, username=None, password=None, commit=False):
"""
Create a new user with a primary group named the same as the user and
an initial password.
If username is None the result of :py:meth:`get_next_username` is used.
If password is None a new password will be generated using passlib's
:py:func:`genword`.
:param customer: Django User instance this user is associated to
:param str username: the username or None
:param str password: the password or None
:param boolean commit: whether to commit the user data to the database
or not
:return: new user
:rtype: :py:class:`osusers.models.User`
"""
uid = self.get_next_uid()
gid = Group.objects.get_next_gid()
if username is None:
username = self.get_next_username()
if password is None:
password = genword(entropy=128)
homedir = os.path.join(settings.OSUSER_HOME_BASEPATH, username)
group = Group.objects.create(groupname=username, gid=gid)
user = self.create(
username=username,
group=group,
uid=uid,
homedir=homedir,
customer=customer,
shell=settings.OSUSER_DEFAULT_SHELL,
)
user.set_password(password)
user.save()
return user
class User(TimeStampedModel, models.Model):
"""
This entity class corresponds to an operating system user.
"""
username = models.CharField(_("User name"), max_length=64, unique=True)
uid = models.PositiveSmallIntegerField(_("User ID"), unique=True, primary_key=True)
group = models.ForeignKey(Group, verbose_name=_("Group"), on_delete=models.CASCADE)
gecos = models.CharField(_("Gecos field"), max_length=128, blank=True)
homedir = models.CharField(_("Home directory"), max_length=256)
shell = models.CharField(_("Login shell"), max_length=64)
customer = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
objects = UserManager()
class Meta:
verbose_name = _("User")
verbose_name_plural = _("Users")
def __str__(self):
return "{0} ({1})".format(self.username, self.uid)
@transaction.atomic
def set_password(self, password):
"""
Set the password of the user.
The password is set to the user's
:py:class:`Shadow <osusers.models.Shadow>` instance and to LDAP.
:param str password: the new password
"""
if hasattr(self, "shadow"):
self.shadow.set_password(password)
else:
self.shadow = Shadow.objects.create_shadow(user=self, password=password)
password_set.send(sender=self.__class__, password=password, instance=self)
return True
def is_sftp_user(self):
# noinspection PyUnresolvedReferences
return self.additionalgroup_set.filter(
group__groupname=settings.OSUSER_SFTP_GROUP
).exists()
@transaction.atomic
def save(self, *args, **kwargs):
"""
Save the user to the database, create user directories and synchronize
user information to LDAP.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.save`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.save`
:return: self
:rtype: :py:class:`osusers.models.User`
"""
return super(User, self).save(*args, **kwargs)
@transaction.atomic
def delete(self, *args, **kwargs):
"""
Delete the user and its groups from LDAP and the database and remove
the user's directories.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.delete`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.delete`
"""
# noinspection PyUnresolvedReferences
self.group.delete()
super(User, self).delete(*args, **kwargs)
class ShadowManager(models.Manager):
"""
Manager class for :py:class:`osusers.models.Shadow`.
"""
@transaction.atomic
def create_shadow(self, user, password):
"""
Create a new shadow instance with typical Linux settings for the given
user with the given password.
:param user: :py:class:`User <osusers.models.User>` instance
:param str password: the password
:return: new Shadow instance
:rtype: :py:class:`osusers.models.Shadow` instance
"""
changedays = (timezone.now().date() - date(1970, 1, 1)).days
shadow = self.create(
user=user,
changedays=changedays,
minage=0,
maxage=None,
gracedays=7,
inactdays=30,
expiredays=None,
)
shadow.set_password(password)
shadow.save()
return shadow
class Shadow(TimeStampedModel, models.Model):
"""
This entity class corresponds to an operating system user's shadow file
entry.
"""
user = models.OneToOneField(
User, primary_key=True, verbose_name=_("User"), on_delete=models.CASCADE
)
passwd = models.CharField(_("Encrypted password"), max_length=128)
changedays = models.PositiveSmallIntegerField(
_("Date of last change"),
help_text=_("This is expressed in days since Jan 1, 1970"),
blank=True,
null=True,
)
minage = models.PositiveSmallIntegerField(
_("Minimum age"),
help_text=_("Minimum number of days before the password can be" " changed"),
blank=True,
null=True,
)
maxage = models.PositiveSmallIntegerField(
_("Maximum age"),
help_text=_(
"Maximum number of days after which the password has to" " be changed"
),
blank=True,
null=True,
)
gracedays = models.PositiveSmallIntegerField(
_("Grace period"),
help_text=_("The number of days before the password is going to" " expire"),
blank=True,
null=True,
)
inactdays = models.PositiveSmallIntegerField(
_("Inactivity period"),
help_text=_(
"The number of days after the password has expired during"
" which the password should still be accepted"
),
blank=True,
null=True,
)
expiredays = models.PositiveSmallIntegerField(
_("Account expiration date"),
help_text=_(
"The date of expiration of the account, expressed as"
" number of days since Jan 1, 1970"
),
blank=True,
null=True,
default=None,
)
objects = ShadowManager()
class Meta:
verbose_name = _("Shadow password")
verbose_name_plural = _("Shadow passwords")
def __str__(self):
return "for user {0}".format(self.user)
def set_password(self, password):
"""
Set and encrypt the password.
:param str password: the password
"""
self.passwd = sha512_crypt.hash(password)
class AdditionalGroup(TimeStampedModel, models.Model):
"""
This entity class corresponds to additional group assignments for an
:py:class:`operating system user <osusers.models.User>`.
"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
group = models.ForeignKey(Group, on_delete=models.CASCADE)
class Meta:
unique_together = ("user", "group")
verbose_name = _("Additional group")
verbose_name_plural = _("Additional groups")
def __str__(self):
return "{0} in {1}".format(self.user, self.group)
def clean(self):
"""
Ensure that the assigned group is different from the user's primary
group.
"""
# noinspection PyUnresolvedReferences
if self.user.group == self.group:
raise ValidationError(CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)
@transaction.atomic
def save(self, *args, **kwargs):
"""
Persists the group assignment to LDAP and the database.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.save`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.save`
:return: this instance
:rtype: :py:class:`AdditionalGroup <osusers.models.AdditionalGroup>`
"""
return super(AdditionalGroup, self).save(*args, **kwargs)
@transaction.atomic
def delete(self, *args, **kwargs):
"""
Delete the group assignment from LDAP and the database.
:param args: positional arguments to be passed on to
:py:meth:`django.db.Model.delete`
:param kwargs: keyword arguments to be passed on to
:py:meth:`django.db.Model.delete`
"""
super(AdditionalGroup, self).delete(*args, **kwargs)
class SshPublicKeyManager(models.Manager):
"""
Default manager for :py:class:`SSH public key
<osusers.models.SshPublicKey>` instances.
"""
def parse_key_text(self, key_text: str):
"""
Parse a SSH public key in OpenSSH or :rfc:`4716` format into its
components algorithm, key data and comment.
:param str key_text: key text
:return: triple of algorithm name, key data and comment
:rtype: triple of str
"""
if key_text.startswith("---- BEGIN SSH2 PUBLIC KEY ----"):
comment = ""
data = ""
continued = ""
headers = {}
header_tag = None
for line in key_text.splitlines():
if line == "---- BEGIN SSH2 PUBLIC KEY ----":
continue
elif ":" in line: # a header line
header_tag, header_value = [
item.strip() for item in line.split(":", 1)
]
if header_value.endswith("\\"):
continued = header_value[:-1]
else:
headers[header_tag.lower()] = header_value
elif continued:
if line.endswith("\\"):
continued += line[:-1]
continue
header_value = continued + line
headers[header_tag.lower()] = header_value
continued = ""
elif line == "---- END SSH2 PUBLIC KEY ----":
break
elif line: # ignore empty lines
data += line
if "comment" in headers:
comment = headers["comment"]
else:
parts = key_text.split(None, 2)
if len(parts) < 2:
raise ValueError("invalid SSH public key")
data = parts[1]
comment = len(parts) == 3 and parts[2] or ""
try:
keybytes = base64.b64decode(data)
except TypeError:
raise ValueError("invalid SSH public key")
parts = keybytes.split(b"\x00" * 3)
if len(parts) < 2:
raise ValueError("invalid SSH public key")
key_length = int.from_bytes(parts[1], byteorder="big")
key_algorithm = parts[1][1 : 1 + key_length].decode("utf-8")
return key_algorithm, data, comment
def create_ssh_public_key(self, user, keytext):
"""
Create a new :py:class:`SSH public key <osusers.models.SshPublicKey>`
for a user from the given key text representation. The text can be
either in openssh format or :rfc:`4716` format.
:param user: :py:class:`operating system user <osusers.models.User>`
:param str keytext: key text
:return: public key
:retype: :py:class:`osusers.models.SshPublicKey`
"""
algorithm, data, comment = self.parse_key_text(keytext)
return self.create(user=user, algorithm=algorithm, data=data, comment=comment)
class SshPublicKey(TimeStampedModel):
"""
This entity class represents single SSH keys for an :py:class:`operating
system user <osusers.models.User>`.
"""
user = models.ForeignKey(User, verbose_name=_("User"), on_delete=models.CASCADE)
algorithm = models.CharField(_("Algorithm"), max_length=20)
data = models.TextField(_("Key bytes"), help_text=_("Base64 encoded key bytes"))
comment = models.TextField(_("Comment"), blank=True)
objects = SshPublicKeyManager()
class Meta:
verbose_name = _("SSH public key")
verbose_name_plural = _("SSH public keys")
unique_together = [("user", "algorithm", "data")]
def __str__(self):
return "{algorithm} {data} {comment}".format(
algorithm=self.algorithm, data=self.data, comment=self.comment
).strip()