1
0
Fork 0
gnuviechadmin-historic/backend/gvadm/DomainTools.py
Jan Dittberner 61569aa807 - non destructive default behaviour
git-svn-id: file:///home/www/usr01/svn/gnuviechadmin/gnuviech.info/gnuviechadmin/trunk@151 a67ec6bc-e5d5-0310-a910-815c51eb3124
2006-01-31 09:04:53 +00:00

175 lines
5.6 KiB
Python

#!/usr/bin/env python
import psycopg, pwd, grp, smtplib, os
from email.MIMEText import MIMEText
import Settings, PasswordTools
class InvalidDomain(Exception):
"""This exception is raised if an invalid domain is used."""
def __init__(self, domain):
self.domain = domain
def __str__(self):
return repr("Invalid domain %s" % (self.domain))
class NoSysuserForDomain(Exception):
"""This exception is raised if no system user is associated with a domain."""
def __init__(self, domain):
self.domain = domain
def __str__(self):
return repr("No system user for domain %s" % (self.domain))
class InvalidPopUser(Exception):
"""This exception is raised if an invalid POP3/IMAP user has been specified."""
def __init__(self, domain, username):
self.domain = domain
self.username = username
def __str__(self):
return "Invalid POP3/IMAP user %s in domain %s." % (self.username, self.domain)
class Domain:
"""A Domain representation object with service methods."""
def __init__(self, domain):
self.cnx = psycopg.connect("user=%(dbuser)s password=%(dbpassword)s dbname=%(dbname)s" % Settings.dbsettings)
self.domain = domain
self.validate_domain()
def validate_domain(self):
"""This function validates whether the given domain is allowed.
That means that the domain needs to be registered in the database.
If the domain is invalid InvalidDomain is raised."""
cr = self.cnx.cursor()
cr.execute("SELECT * FROM domain WHERE domainname=%(name)s" %
{'name': psycopg.QuotedString(self.domain)})
self.cnx.commit()
result = cr.fetchall()
if (not result):
raise InvalidDomain(self)
def __str__(self):
return str(self.domain)
def getSysuser(self):
"""Gets the system user id of the domain."""
cr = self.cnx.cursor()
cr.execute("""SELECT sysuser.name FROM domain, sysuser
WHERE domain.domainname=%(name)s
AND domain.sysuserid=sysuser.sysuserid""" %
{'name': psycopg.QuotedString(self.domain)})
self.cnx.commit()
result = cr.fetchall()
if (not result):
raise NoSysuserForDomain(self)
# return row 0, field 0
return result[0][0]
def getNextPopUser(self):
"""Gets the user id of the next available POP3/IMAP-user for the domain."""
cr = self.cnx.cursor()
sysuser = self.getSysuser()
cr.execute("""SELECT max(id) FROM mailpasswd WHERE
id LIKE %(username)s""" %
{'username': psycopg.QuotedString(sysuser+'%')})
self.cnx.commit()
result = cr.fetchall()
if (not result):
return sysuser + "p1"
maxpopuser = result[0][0]
if (not maxpopuser):
return sysuser + "p1"
num = int(maxpopuser[len(sysuser)+1:])+1
return "%sp%d" % (sysuser, num)
def makePopUser(self, password):
"""Creates a new POP3/IMAP-user for the domain using the given password."""
cr = self.cnx.cursor()
sysuser = self.getSysuser()
popaccount = self.getNextPopUser()
crypted = PasswordTools.md5_crypt_password(password)
uid = pwd.getpwnam(sysuser)[2]
gid = grp.getgrnam(Settings.popgroup)[2]
homedir = Settings.pophome + popaccount
os.mkdir(homedir, 0755)
os.system("maildirmake \"%s/Maildir\"" % (homedir))
os.system("chown -R %s.%s %s" % ( sysuser, Settings.popgroup, homedir ))
cr = self.cnx.cursor()
cr.execute("""INSERT INTO mailpasswd (id, crypt, clear, uid, gid, home)
VALUES (%(id)s, %(crypt)s, %(clear)s, %(uid)d, %(gid)d, %(home)s)""" % {
'id': psycopg.QuotedString(popaccount),
'crypt': psycopg.QuotedString(crypted),
'clear': psycopg.QuotedString(password),
'uid': uid, 'gid': gid,
'home': psycopg.QuotedString(homedir)})
self.cnx.commit()
text = """A new POP3/IMAP account has been created
Domain: %(domain)s
User: %(user)s
Password: %(password)s""" % {'domain': self.domain,
'user': popaccount,
'password': password}
themail = MIMEText(text)
themail['Subject'] = "A new POP3/IMAP account has been created"
themail['From'] = Settings.mailsender
themail['To'] = Settings.mailreceiver
s = smtplib.SMTP()
s.connect()
s.sendmail(Settings.mailsender, [Settings.mailreceiver], themail.as_string())
s.close()
def listPopUsers(self):
sysuser = self.getSysuser()
cr = self.cnx.cursor()
cr.execute("SELECT id FROM mailpasswd WHERE id LIKE %(user)s" % {
'user': psycopg.QuotedString(sysuser + '%')})
self.cnx.commit()
result = cr.fetchall()
return [line[0] for line in result]
def hasPopUser(self, username):
"""Checks whether the specified POP3/IMAP user exists in the domain."""
return ([user for user in self.listPopUsers() if (user == username)])
def updatePopPassword(self, username, password=PasswordTools.generate_password()):
"""Updates the password of the given POP3/IMAP user."""
if self.hasPopUser(username):
crypted = PasswordTools.md5_crypt_password(password)
cr = self.cnx.cursor()
cr.execute("UPDATE mailpasswd SET clear=%(clear)s, crypt=%(crypt)s WHERE id=%(user)s" % {
'clear': psycopg.QuotedString(password),
'crypt': psycopg.QuotedString(crypted),
'user': psycopg.QuotedString(username)})
self.cnx.commit()
print("updated password of user %s to %s" % (username, password))
else:
raise InvalidPopUser(self, username)
if __name__ == '__main__':
domain = Domain('centrum-warenhaus-dresden.de')
# list pop users
print ", ".join(domain.listPopUsers())
# check for not existing user
#try:
# domain.updatePopPassword('usr03p2', 'test')
#except InvalidPopUser, ipu:
# print ipu
#domain.updatePopPassword('usr05p2')