gvapgsql/gvapgsql/pgsqltasks/tasks.py

129 lines
3.1 KiB
Python

"""
This module defines Celery_ tasks to manage PostgreSQL users and databases.
"""
from __future__ import absolute_import
from celery import shared_task
from celery.utils.log import get_task_logger
from gvapgsql import settings
from psycopg2 import connect
_LOGGER = get_task_logger(__name__)
def _get_connection():
return connect(
host=settings.GVAPGSQL_DBADMIN_HOST,
port=settings.GVAPGSQL_DBADMIN_PORT,
user=settings.GVAPGSQL_DBADMIN_USER,
password=settings.GVAPGSQL_DBADMIN_PASSWORD,
database='postgres',
)
@shared_task
def create_pgsql_user(username, password):
"""
This task creates a new PostgreSQL user.
:param str username: the user name
:param str password: the password
:return: the created user's name
:rtype: str
"""
with _get_connection() as conn:
with conn.cursor() as curs:
curs.execute(
"""
CREATE USER %(username)s WITH PASSWORD %(password)s
""",
{'username': username, 'password': password}
)
@shared_task
def set_pgsql_userpassword(username, password):
"""
Set a new password for an existing PostgreSQL user.
:param str username: the user name
:param str password: the password
:return: True if the password could be set, False otherwise
:rtype: boolean
"""
with _get_connection() as conn:
with conn.cursor() as curs:
curs.execute(
"""
ALTER ROLE %(username)s WITH PASSWORD %(password)s
""",
{'username': username, 'password': password}
)
@shared_task
def delete_pgsql_user(username):
"""
This task deletes an existing PostgreSQL user.
:param str username: the user name
:return: True if the user has been deleted, False otherwise
:rtype: boolean
"""
with _get_connection() as conn:
with conn.cursor() as curs:
curs.execute(
"""
DROP ROLE %(username)s
""",
{'username': username}
)
@shared_task
def create_pgsql_database(dbname, username):
"""
This task creates a new PostgreSQL database for the given PostgreSQL user.
:param str dbname: database name
:param str username: the user name of an existing PostgreSQL user
:return: the database name
:rtype: str
"""
with _get_connection() as conn:
with conn.cursor() as curs:
curs.execute(
"""
CREATE DATABASE %(dbname)s OWNER %(username)s
""",
{'dbname': dbname, 'username': username}
)
@shared_task
def delete_pgsql_database(dbname):
"""
This task deletes an existing PostgreSQL database.
:param str dbname: database name
:return: True if the database has been deleted, False otherwise
:rtype: boolean
"""
with _get_connection() as conn:
with conn.cursor() as curs:
curs.execute(
"""
DROP DATABASE %(dbname)s
""",
{'dbname': dbname}
)