1
0
Fork 0
gnuviechadminweb-historic/gnuviechadminweb/model/GVAUsers.py

201 rivejä
6.6 KiB
Python

# -*- coding: utf-8 -*-
from authkit.users import Users
from gnuviechadminweb.model.user import Group, Role, User
from paste.util.import_string import eval_import
import logging
log = logging.getLogger(__name__)
def needsConnection(func):
def wrapper(*__args, **__kw):
from sqlalchemy.orm import create_session
engine = __args[0].meta.engine
conn = engine.contextual_connect()
if conn.closed:
conn = engine.connect()
__args[0].session = create_session(bind=conn)
try:
return func(*__args, **__kw)
finally:
conn.close()
else:
__args[0].session = create_session(bind=conn)
return func(*__args, **__kw)
return wrapper
class GVAUsers(Users):
def __init__(self, data, encrypt = None):
Users.__init__(self, data, encrypt)
log.debug("in __init__")
self.meta = eval_import(self.data)
self.session = self.meta.Session
@needsConnection
def _getSession(self):
return self.session
def _get_group(self, groupname):
return self._getSession().query(Group).filter_by(name=groupname).one()
def _get_user(self, username):
return self._getSession().query(User).filter_by(name=username).one()
def _get_role(self, rolename):
return self._getSession().query(Role).filter_by(name=rolename).one()
# Create Methods
def user_create(self, username, password, group=None):
n_user = User()
n_user.name = username
n_user.password = self.encrypt(password)
if group:
n_user.group = self._get_group(group)
self._getSession().save(n_user)
self._getSession().commit()
def role_create(self, role):
n_role = Role()
n_role.name = role
self._getSession().save(n_role)
self._getSession().commit()
def group_create(self, group):
n_group = Group()
n_group.name = group
self._getSession().save(n_group)
self._getSession().commit()
# Delete Methods
def user_delete(self, username):
self._getSession().delete(self._get_user(username))
self._getSession().commit()
def role_delete(self, role):
self._getSession().delete(self._get_role(role))
self._getSession().commit()
def group_delete(self, group):
self._getSession().delete(self._get_group())
self._getSession().commit()
# Delete Cascade Methods
def role_delete_cascade(self, role):
n_role = self._get_role(role)
for user in self._getSession().query(User).roles.any(name=role).all():
del user.roles[n_role.id]
self._getSession().delete(n_role)
self._getSession().commit()
def group_delete_cascade(self, group):
n_group = self._get_group(group)
self._getSession().delete(self._getSession().query(User).filter_by(
User.group==n_group))
self._getSession().delete(n_group)
self._getSession().commit()
# Existence Methods
def user_exists(self, username):
return self._getSession().query(User).filter_by(
name=username).count() == 1
def role_exists(self, role):
return self._getSession().query(Role).filter_by(
name=role).count() == 1
def group_exists(self, group):
return self._getSession().query(Group).filter_by(
name=group).count() == 1
# List Methods
def list_roles(self):
return [role.name.lower() for role in self._getSession().query(
Role).all()]
def list_users(self):
return [user.name.lower() for user in self._getSession().query(
User).all()]
def list_groups(self):
return [group.name.lower() for group in self._getSession().query(
Group).all()]
# User Methods
def user(self, username):
user = self._get_user(username)
roles = [role.name.lower() for role in user.roles]
roles.sort()
return {
'username' : user.name,
'group' : None if user.group is None else user.group.name,
'password' : user.password,
'roles' : roles
}
def user_roles(self, username):
user = self._get_user(username)
roles = [role.name.lower() for role in user.roles]
roles.sort()
return roles
def user_group(self, username):
user = self._get_user(username)
return None if user.group is None else user.group.name
def user_password(self, username):
user = self._get_user(username)
return user.password
def user_has_role(self, username, role):
user = self._get_user(username)
return role in [role.name for role in user.roles]
def user_has_group(self, username, group):
user = self._get_user(username)
return (group is None and user.group is None) or \
(group is not None and user.group is not None and \
group == user.group.name)
def user_has_password(self, username, password):
user = self._get_user(username)
return user.password == self.encrypt(password)
def user_set_username(self, username, new_username):
user = self._get_user(username)
user.name = new_username
self._getSession().update(user)
self._getSession().commit()
def user_set_group(self, username, group, add_if_necessary=False):
if add_if_necessary and self._getSession().query(Group).filter_by(
name=group).count() == 0:
self.group_create(group)
groupobj = self._get_group(group)
user = self._get_user(user)
user.group = groupobj
self._getSession().update(user)
self._getSession().commit()
def user_add_role(self, username, role, add_if_necessary=False):
if add_if_necessary and self._getSession().query(Role).filter_by(
name=role).count() == 0:
self.role_create(role)
roleobj = self._get_role(role)
user = self._get_user(username)
if not roleobj in user.roles:
user.roles.append(roleobj)
self._getSession().update(user)
self._getSession().commit()
def user_remove_role(self, username, role):
roleobj = self._get_role(role)
user = self._get_user(username)
if roleobj in user.roles:
del user.roles[roleobj.id]
self._getSession().commit()
def user_remove_group(self, username):
user = self._get_user(username)
user.group = None
self._getSession().update(user)
self._getSession().commit()