--- /dev/null
+# vi: encoding=utf-8 ts=8 sts=4 sw=4 et
+
+import ConfigParser
+import string
+import fnmatch
+import os
+import stat
+
+import path
+import log
+import status
+from mailer import Message
+from config import config
+
+class User:
+ def __init__(self, p, login):
+ self.login = login
+ self.privs = []
+ self.gpg_emails = []
+ self.mailto = ""
+
+ if p.has_option(login, "gpg_emails"):
+ self.gpg_emails = string.split(p.get(login, "gpg_emails"))
+ else:
+ log.panic("acl: [%s] has no gpg_emails" % login)
+
+ if p.has_option(login, "mailto"):
+ self.mailto = p.get(login, "mailto")
+ else:
+ if len(self.gpg_emails) > 0:
+ self.mailto = self.gpg_emails[0]
+
+ if p.has_option(login, "privs"):
+ for p in string.split(p.get(login, "privs")):
+ l = string.split(p, ":")
+ if len(l) == 2:
+ p+=":*"
+ if len(l) not in (2,3) or l[0] == "" or l[1] == "":
+ log.panic("acl: invalid priv format: '%s' [%s]" % (p, login))
+ else:
+ self.privs.append(p)
+ else:
+ log.panic("acl: [%s] has no privs" % login)
+
+ def can_do(self, what, where, branch=None):
+ if branch:
+ action = "%s:%s:%s" % (what, where, branch)
+ else:
+ action = "%s:%s:N-A" % (what, where)
+ for priv in self.privs:
+ if priv[0] == "!":
+ ret = 0
+ priv = priv[1:]
+ else:
+ ret = 1
+ pwhat,pwhere,pbranch=priv.split(":")
+ for pbranch in pbranch.split(","):
+ priv="%s:%s:%s" % (pwhat,pwhere,pbranch)
+ if fnmatch.fnmatch(action, priv):
+ return ret
+ return 0
+
+ def check_priority(self, prio, where):
+ for priv in self.privs:
+ val,builder=priv.split(":")[0:2]
+ if fnmatch.fnmatch(where, builder):
+ try:
+ val=int(val)
+ except ValueError:
+ continue
+ if prio>=val:
+ return prio
+ else:
+ return val
+ return prio
+
+ def mail_to(self):
+ return self.mailto
+
+ def message_to(self):
+ m = Message()
+ m.set_headers(to = self.mail_to(), cc = config.builder_list)
+ return m
+
+ def get_login(self):
+ return self.login
+
+class ACL_Conf:
+ def __init__(self):
+ self.reload()
+
+ def try_reload(self):
+ mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
+ if mtime != self.acl_conf_mtime:
+ log.notice("acl.conf has changed, reloading...")
+ self.reload()
+ return True
+ return False
+
+ def reload(self):
+ self.acl_conf_mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
+ self.current_user = None
+ status.push("reading acl.conf")
+ p = ConfigParser.ConfigParser()
+ p.readfp(open(path.acl_conf))
+ self.users = {}
+ for login in p.sections():
+ if self.users.has_key(login):
+ log.panic("acl: duplicate login: %s" % login)
+ continue
+ user = User(p, login)
+ for e in user.gpg_emails:
+ if self.users.has_key(e):
+ log.panic("acl: user email colision %s <-> %s" % \
+ (self.users[e].login, login))
+ else:
+ self.users[e] = user
+ self.users[login] = user
+ status.pop()
+
+ def user_by_email(self, ems):
+ for e in ems:
+ if self.users.has_key(e):
+ return self.users[e]
+ return None
+
+ def user(self, l):
+ if not self.users.has_key(l):
+ log.panic("no such user: %s" % l)
+ return self.users[l]
+
+ def set_current_user(self, u):
+ self.current_user = u
+ if u != None:
+ status.email = u.mail_to()
+
+ def current_user_login(self):
+ if self.current_user != None:
+ return self.current_user.login
+ else:
+ return ""
+
+acl = ACL_Conf()