# -*- coding: UTF-8 -*- # # Copyright (C) 2007 by Jan Dittberner. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. # # Version: $Id$ import smtplib, os, logging, tempfile from email.MIMEText import MIMEText from pyme import core from pyme.constants.sig import mode from settings import config from gnuviechadmin.exceptions import * from subprocess import * import sqlalchemy class BackendEntity(object): """This is the abstract base class for all backend entity classes.""" def __init__(self, delegateto, verbose = False): self.logger = logging.getLogger("%s.%s" % ( self.__class__.__module__, self.__class__.__name__)) self.delegateto = delegateto self.verbose = verbose def __repr__(self): return self.delegateto.__repr__(verbose = self.verbose) def sucommand(self, cmdline, pipedata = None): """Executes a command as root using the configured suwrapper command. If a pipe is specified it is used as stdin of the subprocess.""" self.logger.debug("sucommand called: %s (pipedata=%s)", cmdline, str(pipedata)) suwrapper = config.get('common', 'suwrapper') toexec = "%s %s" % (suwrapper, cmdline) if pipedata: p = Popen(toexec, shell = True, stdin=PIPE) pipe = p.stdin print >>pipe, pipedata pipe.close() sts = os.waitpid(p.pid, 0) if self.verbose: print "%s|%s: %d" % (pipedata, toexec, sts[1]) self.logger.info("%s|%s: %d", pipedata, toexec, sts[1]) else: p = Popen(toexec, shell = True) sts = os.waitpid(p.pid, 0) if self.verbose: print "%s: %s" % (toexec, sts[1]) self.logger.info("%s: %s", toexec, sts[1]) return sts[1] def send_mail(self, subject, text): """This method sends a mail with the given text and subject and signs it usign GnuPG. If a public key of the recipient is available the mail is encrypted.""" plain = core.Data(text) cipher = core.Data() c = core.Context() c.set_armor(1) signer = config.get('common', 'mailfrom') rcpt = config.get('common', 'mailto') c.signers_clear() for sigkey in [x for x in c.op_keylist_all(signer, 1)]: if sigkey.can_sign: c.signers_add(sigkey) if not c.signers_enum(0): raise Exception("No secret keys for signing available for %s." % ( signer)) keylist = [] for key in c.op_keylist_all(rcpt, 0): valid = 0 subkey = key.subkeys while subkey: keyid = subkey.keyid if keyid == None: break can_encrypt = subkey.can_encrypt valid += can_encrypt subkey = subkey.next if valid: keylist.append(key) if keylist: c.op_encrypt_sign(keylist, 1, plain, cipher) else: c.op_sign(plain, cipher, mode.CLEAR) cipher.seek(0,0) msg = MIMEText(cipher.read()) msg['Subject'] = subject msg['From'] = signer msg['To'] = rcpt s = smtplib.SMTP() s.connect() s.sendmail(signer, [rcpt], msg.as_string()) s.close() def validate(self): """Validates whether all mandatory fields of the entity have values.""" missingfields = [] for key in [col.name for col in \ sqlalchemy.object_mapper( self.delegateto).local_table.columns \ if not col.primary_key and not col.nullable]: if self.delegateto.__getattribute__(key) is None: missingfields.append(key) if missingfields: raise MissingFieldsError(missingfields) def write_to_file(self, filename, template): """Write the data from template to the specified file.""" tmp = tempfile.NamedTemporaryFile() tmp.write(template) tmp.flush() cmd = 'cp "%s" "%s"' % (tmp.name, filename) self.sucommand(cmd) tmp.close()