""" This module defines Celery_ tasks to manage PostgreSQL users and databases. """ from celery import shared_task from celery.utils.log import get_task_logger from psycopg2 import connect from psycopg2.extensions import ISQLQuote from pgsqltasks import settings _LOGGER = get_task_logger(__name__) class Ident(object): """ Wrap a PostgreSQL identifier. Inspired by http://osdir.com/ml/python-db-psycopg-devel/2009-03/msg00019.html. Note that this adapter needs to make an SQL SELECT to correctly quote the identifier (that is then cached) so it can be a good idea to pre-allocate identifiers by calling getquoted() on the, before entering critical sections of code. """ def __init__(self, ident): self.ident = ident self.ident_quoted = None def prepare(self, conn): self._conn = conn def getquoted(self): if not self.ident_quoted: with self._conn.cursor() as curs: curs.execute("SELECT quote_ident(%s)", (self.ident,)) self.ident_quoted = curs.fetchone()[0] return self.ident_quoted def __str__(self): return str(self.ident_quoted) def __conform__(self, proto): if proto == ISQLQuote: return self def _get_connection(): return connect( host=settings.GVAPGSQL_DBADMIN_HOST, port=settings.GVAPGSQL_DBADMIN_PORT, user=settings.GVAPGSQL_DBADMIN_USER, password=settings.GVAPGSQL_DBADMIN_PASSWORD, database="postgres", ) @shared_task def create_pgsql_user(username, password): """ This task creates a new PostgreSQL user. :param str username: the user name :param str password: the password :return: the created user's name :rtype: str """ with _get_connection() as conn: with conn.cursor() as curs: curs.execute( """ CREATE USER %(username)s WITH PASSWORD %(password)s """, {"username": Ident(username), "password": password}, ) @shared_task def set_pgsql_userpassword(username, password): """ Set a new password for an existing PostgreSQL user. :param str username: the user name :param str password: the password :return: True if the password could be set, False otherwise :rtype: boolean """ with _get_connection() as conn: with conn.cursor() as curs: curs.execute( """ ALTER ROLE %(username)s WITH PASSWORD %(password)s """, {"username": Ident(username), "password": password}, ) @shared_task def delete_pgsql_user(username): """ This task deletes an existing PostgreSQL user. :param str username: the user name :return: True if the user has been deleted, False otherwise :rtype: boolean """ with _get_connection() as conn: with conn.cursor() as curs: curs.execute( """ DROP ROLE %(username)s """, {"username": Ident(username)}, ) @shared_task def create_pgsql_database(dbname, username): """ This task creates a new PostgreSQL database for the given PostgreSQL user. :param str dbname: database name :param str username: the user name of an existing PostgreSQL user :return: the database name :rtype: str """ with _get_connection() as conn: conn.autocommit = True curs = conn.cursor() curs.execute( """ CREATE DATABASE %(dbname)s OWNER %(username)s TEMPLATE template0 ENCODING 'UTF8' """, {"dbname": Ident(dbname), "username": Ident(username)}, ) @shared_task def delete_pgsql_database(dbname): """ This task deletes an existing PostgreSQL database. :param str dbname: database name :return: True if the database has been deleted, False otherwise :rtype: boolean """ with _get_connection() as conn: conn.autocommit = True curs = conn.cursor() curs.execute( """ DROP DATABASE %(dbname)s """, {"dbname": Ident(dbname)}, )