X-Git-Url: https://git.tld-linux.org/?p=TLD.git;a=blobdiff_plain;f=pld-builder.new%2FPLD_Builder%2Facl.py;fp=pld-builder.new%2FPLD_Builder%2Facl.py;h=a769d0694570dc3e85d6503f28deaef09055b086;hp=0000000000000000000000000000000000000000;hb=90809c8fec988489786ce00247d9a4150070748b;hpb=ab3934fab858112cd552359b18cb980ea07c310b diff --git a/pld-builder.new/PLD_Builder/acl.py b/pld-builder.new/PLD_Builder/acl.py new file mode 100644 index 0000000..a769d06 --- /dev/null +++ b/pld-builder.new/PLD_Builder/acl.py @@ -0,0 +1,143 @@ +# vi: encoding=utf-8 ts=8 sts=4 sw=4 et + +import ConfigParser +import string +import fnmatch +import os +import stat + +import path +import log +import status +from mailer import Message +from config import config + +class User: + def __init__(self, p, login): + self.login = login + self.privs = [] + self.gpg_emails = [] + self.mailto = "" + + if p.has_option(login, "gpg_emails"): + self.gpg_emails = string.split(p.get(login, "gpg_emails")) + else: + log.panic("acl: [%s] has no gpg_emails" % login) + + if p.has_option(login, "mailto"): + self.mailto = p.get(login, "mailto") + else: + if len(self.gpg_emails) > 0: + self.mailto = self.gpg_emails[0] + + if p.has_option(login, "privs"): + for p in string.split(p.get(login, "privs")): + l = string.split(p, ":") + if len(l) == 2: + p+=":*" + if len(l) not in (2,3) or l[0] == "" or l[1] == "": + log.panic("acl: invalid priv format: '%s' [%s]" % (p, login)) + else: + self.privs.append(p) + else: + log.panic("acl: [%s] has no privs" % login) + + def can_do(self, what, where, branch=None): + if branch: + action = "%s:%s:%s" % (what, where, branch) + else: + action = "%s:%s:N-A" % (what, where) + for priv in self.privs: + if priv[0] == "!": + ret = 0 + priv = priv[1:] + else: + ret = 1 + pwhat,pwhere,pbranch=priv.split(":") + for pbranch in pbranch.split(","): + priv="%s:%s:%s" % (pwhat,pwhere,pbranch) + if fnmatch.fnmatch(action, priv): + return ret + return 0 + + def check_priority(self, prio, where): + for priv in self.privs: + val,builder=priv.split(":")[0:2] + if fnmatch.fnmatch(where, builder): + try: + val=int(val) + except ValueError: + continue + if prio>=val: + return prio + else: + return val + return prio + + def mail_to(self): + return self.mailto + + def message_to(self): + m = Message() + m.set_headers(to = self.mail_to(), cc = config.builder_list) + return m + + def get_login(self): + return self.login + +class ACL_Conf: + def __init__(self): + self.reload() + + def try_reload(self): + mtime = os.stat(path.acl_conf)[stat.ST_MTIME] + if mtime != self.acl_conf_mtime: + log.notice("acl.conf has changed, reloading...") + self.reload() + return True + return False + + def reload(self): + self.acl_conf_mtime = os.stat(path.acl_conf)[stat.ST_MTIME] + self.current_user = None + status.push("reading acl.conf") + p = ConfigParser.ConfigParser() + p.readfp(open(path.acl_conf)) + self.users = {} + for login in p.sections(): + if self.users.has_key(login): + log.panic("acl: duplicate login: %s" % login) + continue + user = User(p, login) + for e in user.gpg_emails: + if self.users.has_key(e): + log.panic("acl: user email colision %s <-> %s" % \ + (self.users[e].login, login)) + else: + self.users[e] = user + self.users[login] = user + status.pop() + + def user_by_email(self, ems): + for e in ems: + if self.users.has_key(e): + return self.users[e] + return None + + def user(self, l): + if not self.users.has_key(l): + log.panic("no such user: %s" % l) + return self.users[l] + + def set_current_user(self, u): + self.current_user = u + if u != None: + status.email = u.mail_to() + + def current_user_login(self): + if self.current_user != None: + return self.current_user.login + else: + return "" + +acl = ACL_Conf()