gva/gnuviechadmin/userdbs/models.py

277 lines
8.8 KiB
Python
Raw Normal View History

2015-01-04 17:57:51 +01:00
from __future__ import unicode_literals
from django.db import models
from django.db import transaction
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext as _
from model_utils import Choices
from model_utils.models import TimeStampedModel
from passlib.utils import generate_password
from osusers.models import User as OsUser
from mysqltasks.tasks import (
create_mysql_database,
create_mysql_user,
delete_mysql_database,
delete_mysql_user,
set_mysql_userpassword,
)
from pgsqltasks.tasks import (
create_pgsql_database,
create_pgsql_user,
delete_pgsql_database,
delete_pgsql_user,
set_pgsql_userpassword,
)
DB_TYPES = Choices(
(0, 'pgsql', _('PostgreSQL')),
(1, 'mysql', _('MySQL')),
)
"""
Database type choice enumeration.
"""
class DatabaseUserManager(models.Manager):
"""
Default Manager for :py:class:`userdbs.models.DatabaseUser`.
"""
def _get_next_dbuser_name(self, osuser, db_type):
2015-01-04 17:57:51 +01:00
"""
Get the next available database user name.
:param osuser: :py:class:`osusers.models.User` instance
:param db_type: value from :py:data:`DB_TYPES`
:return: database user name
:rtype: str
"""
count = 1
dbuser_name_format = "{0}db{{0:02d}}".format(osuser.username)
nextname = dbuser_name_format.format(count)
2015-01-04 17:57:51 +01:00
for user in self.values('name').filter(
osuser=osuser, db_type=db_type
).order_by('name'):
if user['name'] == nextname:
count += 1
nextname = dbuser_name_format.format(count)
2015-01-04 17:57:51 +01:00
else:
break
return nextname
@transaction.atomic
def create_database_user(
self, osuser, db_type, username=None, password=None, commit=True
):
"""
Create a database user of the given type for the given OS user.
If username or password are not specified they are generated.
:param osuser: the :py:class:`osusers.models.User` instance
:param db_type: value from :py:data:`DB_TYPES`
:param str username: database user name
:param str password: initial password or None
2015-01-04 17:57:51 +01:00
:param boolean commit: whether the user should be persisted
:return: :py:class:`userdbs.models.DatabaseUser` instance
.. note::
The password is not persisted it is only used to set the password
on the database side.
"""
if username is None:
username = self._get_next_dbuser_name(osuser, db_type)
2015-01-04 17:57:51 +01:00
db_user = DatabaseUser(
osuser=osuser, db_type=db_type, name=username, password=password)
2015-01-04 17:57:51 +01:00
if commit:
db_user.create_in_database()
2015-01-04 17:57:51 +01:00
db_user.save()
return db_user
@python_2_unicode_compatible
class DatabaseUser(TimeStampedModel, models.Model):
osuser = models.ForeignKey(OsUser)
name = models.CharField(
_('username'), max_length=63)
db_type = models.PositiveSmallIntegerField(
_('database type'), choices=DB_TYPES)
objects = DatabaseUserManager()
class Meta:
unique_together = ['name', 'db_type']
verbose_name = _('database user')
verbose_name_plural = _('database users')
def __str__(self):
return "%(name)s (%(db_type)s for %(osuser)s)" % {
'name': self.name,
'db_type': self.get_db_type_display(),
2015-01-04 17:57:51 +01:00
'osuser': self.osuser.username,
}
def create_in_database(self, password=None):
2015-01-04 17:57:51 +01:00
"""
Create this user in the target database.
:param str password: initial password for the database user
"""
if password is None:
password = generate_password()
# TODO: send GPG encrypted mail with this information
2015-01-04 17:57:51 +01:00
if self.db_type == DB_TYPES.pgsql:
create_pgsql_user.delay(self.name, password).get()
elif self.db_type == DB_TYPES.mysql:
create_mysql_user.delay(self.name, password).get()
else:
raise ValueError('Unknown database type %d' % self.db_type)
def set_password(self, password):
"""
Set an existing user's password.
:param str password: new password for the database user
"""
if self.db_type == DB_TYPES.pgsql:
set_pgsql_userpassword.delay(self.name, password).get(timeout=5)
2015-01-04 17:57:51 +01:00
elif self.db_type == DB_TYPES.mysql:
set_mysql_userpassword.delay(self.name, password).get(timeout=5)
2015-01-04 17:57:51 +01:00
else:
raise ValueError('Unknown database type %d' % self.db_type)
@transaction.atomic
def delete(self, *args, **kwargs):
"""
Delete the database user from the target database and the Django
database.
:param args: positional arguments for
:py:meth:`django.db.models.Model.delete`
:param kwargs: keyword arguments for
:py:meth:`django.db.models.Model.delete`
"""
for database in self.userdatabase_set.all():
database.delete()
2015-01-04 17:57:51 +01:00
if self.db_type == DB_TYPES.pgsql:
delete_pgsql_user.delay(self.name).get(propagate=False, timeout=5)
2015-01-04 17:57:51 +01:00
elif self.db_type == DB_TYPES.mysql:
delete_mysql_user.delay(self.name).get(propagate=False, timeout=5)
2015-01-04 17:57:51 +01:00
else:
raise ValueError('Unknown database type %d' % self.db_type)
super(DatabaseUser, self).delete(*args, **kwargs)
class UserDatabaseManager(models.Manager):
"""
Default manager for :py:class:`userdbs.models.UserDatabase` instances.
"""
def _get_next_dbname(self, db_user):
"""
Get the next available database name for the given database user.
:param db_user: :py:class:`userdbs.models.DatabaseUser` instance
:return: database name
:rtype: str
"""
count = 1
db_name_format = "{0}_{{0:02d}}".format(db_user.name)
2015-01-04 17:57:51 +01:00
# first db is named the same as the user
nextname = db_user.name
for name in self.values('db_name').filter(db_user=db_user).order_by(
'db_name'
):
if name['db_name'] == nextname:
count += 1
nextname = db_name_format.format(count)
else:
break
return nextname
@transaction.atomic
def create_userdatabase(self, db_user, db_name=None, commit=True):
"""
Creates a new user database.
:param db_user: :py:class:`userdbs.models.DatabaseUser` instance
:param str db_name: database name
:param boolean commit: whether the database should be persisted
:return: :py:class:`userdbs.models.UserDatabase` instance
"""
if db_name is None:
db_name = self._get_next_dbname(db_user)
database = UserDatabase(db_user=db_user, db_name=db_name)
if commit:
database.create_in_database()
database.save()
return database
@python_2_unicode_compatible
class UserDatabase(TimeStampedModel, models.Model):
# MySQL limits to 64, PostgreSQL to 63 characters
db_name = models.CharField(
_('database name'), max_length=63)
db_user = models.ForeignKey(DatabaseUser, verbose_name=_('database user'))
objects = UserDatabaseManager()
2015-01-04 17:57:51 +01:00
class Meta:
unique_together = ['db_name', 'db_user']
verbose_name = _('user database')
verbose_name_plural = _('user specific database')
def __str__(self):
return "%(db_name)s (%(db_user)s)" % {
'db_name': self.db_name,
'db_user': self.db_user,
}
def create_in_database(self):
"""
Create this database (schema) in the target database.
"""
# TODO: send GPG encrypted mail with this information
2015-01-04 17:57:51 +01:00
if self.db_user.db_type == DB_TYPES.pgsql:
create_pgsql_database.delay(self.db_name, self.db_user.name).get()
elif self.db_user.db_type == DB_TYPES.mysql:
create_mysql_database.delay(self.db_name, self.db_user.name).get()
else:
raise ValueError('Unknown database type %d' % self.db_type)
@transaction.atomic
def delete(self, *args, **kwargs):
"""
Delete the database (schema) from the target database and the Django
database.
:param args: positional arguments for
:py:meth:`django.db.models.Model.delete`
:param kwargs: keyword arguments for
:py:meth:`django.db.models.Model.delete`
"""
if self.db_user.db_type == DB_TYPES.pgsql:
2015-01-04 17:57:51 +01:00
delete_pgsql_database.delay(self.db_name, self.db_user.name).get()
elif self.db_user.db_type == DB_TYPES.mysql:
2015-01-04 17:57:51 +01:00
delete_mysql_database.delay(self.db_name, self.db_user.name).get()
else:
raise ValueError('Unknown database type %d' % self.db_type)
super(UserDatabase, self).delete(*args, **kwargs)