#!/usr/bin/env python import psycopg, pwd, grp, smtplib, os from email.MIMEText import MIMEText import Settings, PasswordTools class InvalidDomain(Exception): """This exception is raised if an invalid domain is used.""" def __init__(self, domain): self.domain = domain def __str__(self): return repr("Invalid domain %s" % (self.domain)) class NoSysuserForDomain(Exception): """This exception is raised if no system user is associated with a domain.""" def __init__(self, domain): self.domain = domain def __str__(self): return repr("No system user for domain %s" % (self.domain)) class InvalidPopUser(Exception): """This exception is raised if an invalid POP3/IMAP user has been specified.""" def __init__(self, domain, username): self.domain = domain self.username = username def __str__(self): return "Invalid POP3/IMAP user %s in domain %s." % (self.username, self.domain) class Domain: """A Domain representation object with service methods.""" def __init__(self, domain): self.cnx = psycopg.connect("user=%(dbuser)s password=%(dbpassword)s dbname=%(dbname)s" % Settings.dbsettings) self.domain = domain self.validate_domain() def validate_domain(self): """This function validates whether the given domain is allowed. That means that the domain needs to be registered in the database. If the domain is invalid InvalidDomain is raised.""" cr = self.cnx.cursor() cr.execute("SELECT * FROM domain WHERE domainname=%(name)s" % {'name': psycopg.QuotedString(self.domain)}) self.cnx.commit() result = cr.fetchall() if (not result): raise InvalidDomain(self) def __str__(self): return str(self.domain) def getSysuser(self): """Gets the system user id of the domain.""" cr = self.cnx.cursor() cr.execute("""SELECT sysuser.name FROM domain, sysuser WHERE domain.domainname=%(name)s AND domain.sysuserid=sysuser.sysuserid""" % {'name': psycopg.QuotedString(self.domain)}) self.cnx.commit() result = cr.fetchall() if (not result): raise NoSysuserForDomain(self) # return row 0, field 0 return result[0][0] def getNextPopUser(self): """Gets the user id of the next available POP3/IMAP-user for the domain.""" cr = self.cnx.cursor() sysuser = self.getSysuser() cr.execute("""SELECT max(id) FROM mailpasswd WHERE id LIKE %(username)s""" % {'username': psycopg.QuotedString(sysuser+'%')}) self.cnx.commit() result = cr.fetchall() if (not result): return sysuser + "p1" maxpopuser = result[0][0] if (not maxpopuser): return sysuser + "p1" num = int(maxpopuser[len(sysuser)+1:])+1 return "%sp%d" % (sysuser, num) def makePopUser(self, password): """Creates a new POP3/IMAP-user for the domain using the given password.""" cr = self.cnx.cursor() sysuser = self.getSysuser() popaccount = self.getNextPopUser() crypted = PasswordTools.md5_crypt_password(password) uid = pwd.getpwnam(sysuser)[2] gid = grp.getgrnam(Settings.popgroup)[2] homedir = Settings.pophome + popaccount os.mkdir(homedir, 0755) os.system("maildirmake \"%s/Maildir\"" % (homedir)) os.system("chown -R %s.%s %s" % ( sysuser, Settings.popgroup, homedir )) cr = self.cnx.cursor() cr.execute("""INSERT INTO mailpasswd (id, crypt, clear, uid, gid, home) VALUES (%(id)s, %(crypt)s, %(clear)s, %(uid)d, %(gid)d, %(home)s)""" % { 'id': psycopg.QuotedString(popaccount), 'crypt': psycopg.QuotedString(crypted), 'clear': psycopg.QuotedString(password), 'uid': uid, 'gid': gid, 'home': psycopg.QuotedString(homedir)}) self.cnx.commit() text = """A new POP3/IMAP account has been created Domain: %(domain)s User: %(user)s Password: %(password)s""" % {'domain': self.domain, 'user': popaccount, 'password': password} themail = MIMEText(text) themail['Subject'] = "A new POP3/IMAP account has been created" themail['From'] = Settings.mailsender themail['To'] = Settings.mailreceiver s = smtplib.SMTP() s.connect() s.sendmail(Settings.mailsender, [Settings.mailreceiver], themail.as_string()) s.close() def listPopUsers(self): sysuser = self.getSysuser() cr = self.cnx.cursor() cr.execute("SELECT id FROM mailpasswd WHERE id LIKE %(user)s" % { 'user': psycopg.QuotedString(sysuser + '%')}) self.cnx.commit() result = cr.fetchall() return [line[0] for line in result] def hasPopUser(self, username): """Checks whether the specified POP3/IMAP user exists in the domain.""" return ([user for user in self.listPopUsers() if (user == username)]) def updatePopPassword(self, username, password=PasswordTools.generate_password()): """Updates the password of the given POP3/IMAP user.""" if self.hasPopUser(username): crypted = PasswordTools.md5_crypt_password(password) cr = self.cnx.cursor() cr.execute("UPDATE mailpasswd SET clear=%(clear)s, crypt=%(crypt)s WHERE id=%(user)s" % { 'clear': psycopg.QuotedString(password), 'crypt': psycopg.QuotedString(crypted), 'user': psycopg.QuotedString(username)}) self.cnx.commit() print("updated password of user %s to %s" % (username, password)) else: raise InvalidPopUser(self, username) if __name__ == '__main__': domain = Domain('centrum-warenhaus-dresden.de') # list pop users print ", ".join(domain.listPopUsers()) # check for not existing user #try: # domain.updatePopPassword('usr03p2', 'test') #except InvalidPopUser, ipu: # print ipu #domain.updatePopPassword('usr05p2')