Jan Dittberner
4af1a39ca4
- update dependencies - fix deprecation warnings - fix tests - skip some tests that need more work - reformat changed code with isort and black
529 lines
16 KiB
Python
529 lines
16 KiB
Python
"""
|
|
This module defines the database models of operating system users.
|
|
|
|
"""
|
|
import base64
|
|
import logging
|
|
import os
|
|
from datetime import date
|
|
|
|
from django.conf import settings
|
|
from django.core.exceptions import ValidationError
|
|
from django.db import models, transaction
|
|
from django.dispatch import Signal
|
|
from django.utils import timezone
|
|
from django.utils.translation import gettext as _
|
|
from model_utils.models import TimeStampedModel
|
|
from passlib.hash import sha512_crypt
|
|
from passlib.pwd import genword
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
password_set = Signal()
|
|
|
|
|
|
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL = _("You can not use a user's primary group.")
|
|
|
|
|
|
class GroupManager(models.Manager):
|
|
"""
|
|
Manager class for :py:class:`osusers.models.Group`.
|
|
|
|
"""
|
|
|
|
def get_next_gid(self):
|
|
"""
|
|
Get the next available group id.
|
|
|
|
:returns: group id
|
|
:rtype: int
|
|
|
|
"""
|
|
q = self.aggregate(models.Max("gid"))
|
|
if q["gid__max"] is None:
|
|
return settings.OSUSER_MINGID
|
|
return max(settings.OSUSER_MINGID, q["gid__max"] + 1)
|
|
|
|
|
|
class Group(TimeStampedModel, models.Model):
|
|
"""
|
|
This entity class corresponds to an operating system group.
|
|
|
|
"""
|
|
|
|
groupname = models.CharField(_("Group name"), max_length=16, unique=True)
|
|
gid = models.PositiveSmallIntegerField(_("Group ID"), unique=True, primary_key=True)
|
|
descr = models.TextField(_("Description"), blank=True)
|
|
passwd = models.CharField(_("Group password"), max_length=128, blank=True)
|
|
|
|
objects = GroupManager()
|
|
|
|
class Meta:
|
|
verbose_name = _("Group")
|
|
verbose_name_plural = _("Groups")
|
|
|
|
def __str__(self):
|
|
return "{0} ({1})".format(self.groupname, self.gid)
|
|
|
|
@transaction.atomic
|
|
def save(self, *args, **kwargs):
|
|
"""
|
|
Save the group to the database and synchronizes group information to
|
|
LDAP.
|
|
|
|
:param args: positional arguments to be passed on to
|
|
:py:meth:`django.db.Model.save`
|
|
:param kwargs: keyword arguments to be passed on to
|
|
:py:meth:`django.db.Model.save`
|
|
:return: self
|
|
:rtype: :py:class:`osusers.models.Group`
|
|
|
|
"""
|
|
super(Group, self).save(*args, **kwargs)
|
|
return self
|
|
|
|
@transaction.atomic
|
|
def delete(self, *args, **kwargs):
|
|
"""
|
|
Delete the group from LDAP and the database.
|
|
|
|
:param args: positional arguments to be passed on to
|
|
:py:meth:`django.db.Model.delete`
|
|
:param kwargs: keyword arguments to be passed on to
|
|
:py:meth:`django.db.Model.delete`
|
|
|
|
"""
|
|
super(Group, self).delete(*args, **kwargs)
|
|
|
|
|
|
class UserManager(models.Manager):
|
|
"""
|
|
Manager class for :py:class:`osusers.models.User`.
|
|
|
|
"""
|
|
|
|
def get_next_uid(self):
|
|
"""
|
|
Get the next available user id.
|
|
|
|
:return: user id
|
|
:rtype: int
|
|
|
|
"""
|
|
q = self.aggregate(models.Max("uid"))
|
|
if q["uid__max"] is None:
|
|
return settings.OSUSER_MINUID
|
|
return max(settings.OSUSER_MINUID, q["uid__max"] + 1)
|
|
|
|
def get_next_username(self):
|
|
"""
|
|
Get the next available user name.
|
|
|
|
:return: user name
|
|
:rtype: str
|
|
|
|
"""
|
|
count = 1
|
|
usernameformat = "{0}{1:02d}"
|
|
nextuser = usernameformat.format(settings.OSUSER_USERNAME_PREFIX, count)
|
|
for user in (
|
|
self.values("username")
|
|
.filter(username__startswith=settings.OSUSER_USERNAME_PREFIX)
|
|
.order_by("username")
|
|
):
|
|
if user["username"] == nextuser:
|
|
count += 1
|
|
nextuser = usernameformat.format(settings.OSUSER_USERNAME_PREFIX, count)
|
|
else:
|
|
break
|
|
return nextuser
|
|
|
|
@transaction.atomic
|
|
def create_user(self, customer, username=None, password=None, commit=False):
|
|
"""
|
|
Create a new user with a primary group named the same as the user and
|
|
an initial password.
|
|
|
|
If username is None the result of :py:meth:`get_next_username` is used.
|
|
If password is None a new password will be generated using passlib's
|
|
:py:func:`genword`.
|
|
|
|
:param customer: Django User instance this user is associated to
|
|
:param str username: the username or None
|
|
:param str password: the password or None
|
|
:param boolean commit: whether to commit the user data to the database
|
|
or not
|
|
:return: new user
|
|
:rtype: :py:class:`osusers.models.User`
|
|
|
|
"""
|
|
uid = self.get_next_uid()
|
|
gid = Group.objects.get_next_gid()
|
|
if username is None:
|
|
username = self.get_next_username()
|
|
if password is None:
|
|
password = genword(entropy=128)
|
|
homedir = os.path.join(settings.OSUSER_HOME_BASEPATH, username)
|
|
group = Group.objects.create(groupname=username, gid=gid)
|
|
user = self.create(
|
|
username=username,
|
|
group=group,
|
|
uid=uid,
|
|
homedir=homedir,
|
|
customer=customer,
|
|
shell=settings.OSUSER_DEFAULT_SHELL,
|
|
)
|
|
user.set_password(password)
|
|
if commit:
|
|
user.save()
|
|
return user
|
|
|
|
|
|
class User(TimeStampedModel, models.Model):
|
|
"""
|
|
This entity class corresponds to an operating system user.
|
|
|
|
"""
|
|
|
|
username = models.CharField(_("User name"), max_length=64, unique=True)
|
|
uid = models.PositiveSmallIntegerField(_("User ID"), unique=True, primary_key=True)
|
|
group = models.ForeignKey(Group, verbose_name=_("Group"), on_delete=models.CASCADE)
|
|
gecos = models.CharField(_("Gecos field"), max_length=128, blank=True)
|
|
homedir = models.CharField(_("Home directory"), max_length=256)
|
|
shell = models.CharField(_("Login shell"), max_length=64)
|
|
customer = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
|
|
|
|
objects = UserManager()
|
|
|
|
class Meta:
|
|
verbose_name = _("User")
|
|
verbose_name_plural = _("Users")
|
|
|
|
def __str__(self):
|
|
return "{0} ({1})".format(self.username, self.uid)
|
|
|
|
@transaction.atomic
|
|
def set_password(self, password):
|
|
"""
|
|
Set the password of the user.
|
|
|
|
The password is set to the user's
|
|
:py:class:`Shadow <osusers.models.Shadow>` instance and to LDAP.
|
|
|
|
:param str password: the new password
|
|
|
|
"""
|
|
if hasattr(self, "shadow"):
|
|
self.shadow.set_password(password)
|
|
else:
|
|
self.shadow = Shadow.objects.create_shadow(user=self, password=password)
|
|
password_set.send(sender=self.__class__, password=password, instance=self)
|
|
return True
|
|
|
|
def is_sftp_user(self):
|
|
# noinspection PyUnresolvedReferences
|
|
return self.additionalgroup_set.filter(
|
|
group__groupname=settings.OSUSER_SFTP_GROUP
|
|
).exists()
|
|
|
|
@transaction.atomic
|
|
def save(self, *args, **kwargs):
|
|
"""
|
|
Save the user to the database, create user directories and synchronize
|
|
user information to LDAP.
|
|
|
|
:param args: positional arguments to be passed on to
|
|
:py:meth:`django.db.Model.save`
|
|
:param kwargs: keyword arguments to be passed on to
|
|
:py:meth:`django.db.Model.save`
|
|
:return: self
|
|
:rtype: :py:class:`osusers.models.User`
|
|
|
|
"""
|
|
return super(User, self).save(*args, **kwargs)
|
|
|
|
@transaction.atomic
|
|
def delete(self, *args, **kwargs):
|
|
"""
|
|
Delete the user and its groups from LDAP and the database and remove
|
|
the user's directories.
|
|
|
|
:param args: positional arguments to be passed on to
|
|
:py:meth:`django.db.Model.delete`
|
|
:param kwargs: keyword arguments to be passed on to
|
|
:py:meth:`django.db.Model.delete`
|
|
|
|
"""
|
|
# noinspection PyUnresolvedReferences
|
|
self.group.delete()
|
|
super(User, self).delete(*args, **kwargs)
|
|
|
|
|
|
class ShadowManager(models.Manager):
|
|
"""
|
|
Manager class for :py:class:`osusers.models.Shadow`.
|
|
|
|
"""
|
|
|
|
@transaction.atomic
|
|
def create_shadow(self, user, password):
|
|
"""
|
|
Create a new shadow instance with typical Linux settings for the given
|
|
user with the given password.
|
|
|
|
:param user: :py:class:`User <osusers.models.User>` instance
|
|
:param str password: the password
|
|
:return: new Shadow instance
|
|
:rtype: :py:class:`osusers.models.Shadow` instance
|
|
|
|
"""
|
|
changedays = (timezone.now().date() - date(1970, 1, 1)).days
|
|
shadow = self.create(
|
|
user=user,
|
|
changedays=changedays,
|
|
minage=0,
|
|
maxage=None,
|
|
gracedays=7,
|
|
inactdays=30,
|
|
expiredays=None,
|
|
)
|
|
shadow.set_password(password)
|
|
shadow.save()
|
|
return shadow
|
|
|
|
|
|
class Shadow(TimeStampedModel, models.Model):
|
|
"""
|
|
This entity class corresponds to an operating system user's shadow file
|
|
entry.
|
|
|
|
"""
|
|
|
|
user = models.OneToOneField(
|
|
User, primary_key=True, verbose_name=_("User"), on_delete=models.CASCADE
|
|
)
|
|
passwd = models.CharField(_("Encrypted password"), max_length=128)
|
|
changedays = models.PositiveSmallIntegerField(
|
|
_("Date of last change"),
|
|
help_text=_("This is expressed in days since Jan 1, 1970"),
|
|
blank=True,
|
|
null=True,
|
|
)
|
|
minage = models.PositiveSmallIntegerField(
|
|
_("Minimum age"),
|
|
help_text=_("Minimum number of days before the password can be" " changed"),
|
|
blank=True,
|
|
null=True,
|
|
)
|
|
maxage = models.PositiveSmallIntegerField(
|
|
_("Maximum age"),
|
|
help_text=_(
|
|
"Maximum number of days after which the password has to" " be changed"
|
|
),
|
|
blank=True,
|
|
null=True,
|
|
)
|
|
gracedays = models.PositiveSmallIntegerField(
|
|
_("Grace period"),
|
|
help_text=_("The number of days before the password is going to" " expire"),
|
|
blank=True,
|
|
null=True,
|
|
)
|
|
inactdays = models.PositiveSmallIntegerField(
|
|
_("Inactivity period"),
|
|
help_text=_(
|
|
"The number of days after the password has expired during"
|
|
" which the password should still be accepted"
|
|
),
|
|
blank=True,
|
|
null=True,
|
|
)
|
|
expiredays = models.PositiveSmallIntegerField(
|
|
_("Account expiration date"),
|
|
help_text=_(
|
|
"The date of expiration of the account, expressed as"
|
|
" number of days since Jan 1, 1970"
|
|
),
|
|
blank=True,
|
|
null=True,
|
|
default=None,
|
|
)
|
|
|
|
objects = ShadowManager()
|
|
|
|
class Meta:
|
|
verbose_name = _("Shadow password")
|
|
verbose_name_plural = _("Shadow passwords")
|
|
|
|
def __str__(self):
|
|
return "for user {0}".format(self.user)
|
|
|
|
def set_password(self, password):
|
|
"""
|
|
Set and encrypt the password.
|
|
|
|
:param str password: the password
|
|
"""
|
|
self.passwd = sha512_crypt.hash(password)
|
|
|
|
|
|
class AdditionalGroup(TimeStampedModel, models.Model):
|
|
"""
|
|
This entity class corresponds to additional group assignments for an
|
|
:py:class:`operating system user <osusers.models.User>`.
|
|
|
|
"""
|
|
|
|
user = models.ForeignKey(User, on_delete=models.CASCADE)
|
|
group = models.ForeignKey(Group, on_delete=models.CASCADE)
|
|
|
|
class Meta:
|
|
unique_together = ("user", "group")
|
|
verbose_name = _("Additional group")
|
|
verbose_name_plural = _("Additional groups")
|
|
|
|
def __str__(self):
|
|
return "{0} in {1}".format(self.user, self.group)
|
|
|
|
def clean(self):
|
|
"""
|
|
Ensure that the assigned group is different from the user's primary
|
|
group.
|
|
|
|
"""
|
|
# noinspection PyUnresolvedReferences
|
|
if self.user.group == self.group:
|
|
raise ValidationError(CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)
|
|
|
|
@transaction.atomic
|
|
def save(self, *args, **kwargs):
|
|
"""
|
|
Persists the group assignment to LDAP and the database.
|
|
|
|
:param args: positional arguments to be passed on to
|
|
:py:meth:`django.db.Model.save`
|
|
:param kwargs: keyword arguments to be passed on to
|
|
:py:meth:`django.db.Model.save`
|
|
:return: this instance
|
|
:rtype: :py:class:`AdditionalGroup <osusers.models.AdditionalGroup>`
|
|
|
|
"""
|
|
return super(AdditionalGroup, self).save(*args, **kwargs)
|
|
|
|
@transaction.atomic
|
|
def delete(self, *args, **kwargs):
|
|
"""
|
|
Delete the group assignment from LDAP and the database.
|
|
|
|
:param args: positional arguments to be passed on to
|
|
:py:meth:`django.db.Model.delete`
|
|
:param kwargs: keyword arguments to be passed on to
|
|
:py:meth:`django.db.Model.delete`
|
|
"""
|
|
super(AdditionalGroup, self).delete(*args, **kwargs)
|
|
|
|
|
|
class SshPublicKeyManager(models.Manager):
|
|
"""
|
|
Default manager for :py:class:`SSH public key
|
|
<osusers.models.SshPublicKey>` instances.
|
|
|
|
"""
|
|
|
|
def parse_key_text(self, key_text: str):
|
|
"""
|
|
Parse a SSH public key in OpenSSH or :rfc:`4716` format into its
|
|
components algorithm, key data and comment.
|
|
|
|
:param str key_text: key text
|
|
:return: triple of algorithm name, key data and comment
|
|
:rtype: triple of str
|
|
|
|
"""
|
|
if key_text.startswith("---- BEGIN SSH2 PUBLIC KEY ----"):
|
|
comment = ""
|
|
data = ""
|
|
continued = ""
|
|
headers = {}
|
|
header_tag = None
|
|
for line in key_text.splitlines():
|
|
if line == "---- BEGIN SSH2 PUBLIC KEY ----":
|
|
continue
|
|
elif ":" in line: # a header line
|
|
header_tag, header_value = [
|
|
item.strip() for item in line.split(":", 1)
|
|
]
|
|
if header_value.endswith("\\"):
|
|
continued = header_value[:-1]
|
|
else:
|
|
headers[header_tag.lower()] = header_value
|
|
elif continued:
|
|
if line.endswith("\\"):
|
|
continued += line[:-1]
|
|
continue
|
|
header_value = continued + line
|
|
headers[header_tag.lower()] = header_value
|
|
continued = ""
|
|
elif line == "---- END SSH2 PUBLIC KEY ----":
|
|
break
|
|
elif line: # ignore empty lines
|
|
data += line
|
|
if "comment" in headers:
|
|
comment = headers["comment"]
|
|
else:
|
|
parts = key_text.split(None, 2)
|
|
if len(parts) < 2:
|
|
raise ValueError("invalid SSH public key")
|
|
data = parts[1]
|
|
comment = len(parts) == 3 and parts[2] or ""
|
|
try:
|
|
keybytes = base64.b64decode(data)
|
|
except TypeError:
|
|
raise ValueError("invalid SSH public key")
|
|
parts = keybytes.split(b"\x00" * 3)
|
|
if len(parts) < 2:
|
|
raise ValueError("invalid SSH public key")
|
|
key_length = int.from_bytes(parts[1], byteorder="big")
|
|
key_algorithm = parts[1][1 : 1 + key_length].decode("utf-8")
|
|
return key_algorithm, data, comment
|
|
|
|
def create_ssh_public_key(self, user, keytext):
|
|
"""
|
|
Create a new :py:class:`SSH public key <osusers.models.SshPublicKey>`
|
|
for a user from the given key text representation. The text can be
|
|
either in openssh format or :rfc:`4716` format.
|
|
|
|
:param user: :py:class:`operating system user <osusers.models.User>`
|
|
:param str keytext: key text
|
|
:return: public key
|
|
:retype: :py:class:`osusers.models.SshPublicKey`
|
|
|
|
"""
|
|
algorithm, data, comment = self.parse_key_text(keytext)
|
|
return self.create(user=user, algorithm=algorithm, data=data, comment=comment)
|
|
|
|
|
|
class SshPublicKey(TimeStampedModel):
|
|
"""
|
|
This entity class represents single SSH keys for an :py:class:`operating
|
|
system user <osusers.models.User>`.
|
|
|
|
"""
|
|
|
|
user = models.ForeignKey(User, verbose_name=_("User"), on_delete=models.CASCADE)
|
|
algorithm = models.CharField(_("Algorithm"), max_length=20)
|
|
data = models.TextField(_("Key bytes"), help_text=_("Base64 encoded key bytes"))
|
|
comment = models.TextField(_("Comment"), blank=True)
|
|
|
|
objects = SshPublicKeyManager()
|
|
|
|
class Meta:
|
|
verbose_name = _("SSH public key")
|
|
verbose_name_plural = _("SSH public keys")
|
|
unique_together = [("user", "algorithm", "data")]
|
|
|
|
def __str__(self):
|
|
return "{algorithm} {data} {comment}".format(
|
|
algorithm=self.algorithm, data=self.data, comment=self.comment
|
|
).strip()
|