-# vi: encoding=utf-8 ts=8 sts=4 sw=4 et
-
-import ConfigParser
-import string
-import fnmatch
-import os
-import stat
-
-import path
-import log
-import status
-from mailer import Message
-from config import config
-
-class User:
- def __init__(self, p, login):
- self.login = login
- self.privs = []
- self.gpg_emails = []
- self.mailto = ""
-
- if p.has_option(login, "gpg_emails"):
- self.gpg_emails = string.split(p.get(login, "gpg_emails"))
- else:
- log.panic("acl: [%s] has no gpg_emails" % login)
-
- if p.has_option(login, "mailto"):
- self.mailto = p.get(login, "mailto")
- else:
- if len(self.gpg_emails) > 0:
- self.mailto = self.gpg_emails[0]
-
- if p.has_option(login, "privs"):
- for p in string.split(p.get(login, "privs")):
- l = string.split(p, ":")
- if len(l) == 2:
- p+=":*"
- if len(l) not in (2,3) or l[0] == "" or l[1] == "":
- log.panic("acl: invalid priv format: '%s' [%s]" % (p, login))
- else:
- self.privs.append(p)
- else:
- log.panic("acl: [%s] has no privs" % login)
-
- def can_do(self, what, where, branch=None):
- if branch:
- action = "%s:%s:%s" % (what, where, branch)
- else:
- action = "%s:%s:N-A" % (what, where)
- for priv in self.privs:
- if priv[0] == "!":
- ret = 0
- priv = priv[1:]
- else:
- ret = 1
- pwhat,pwhere,pbranch=priv.split(":")
- for pbranch in pbranch.split(","):
- priv="%s:%s:%s" % (pwhat,pwhere,pbranch)
- if fnmatch.fnmatch(action, priv):
- return ret
- return 0
-
- def check_priority(self, prio, where):
- for priv in self.privs:
- val,builder=priv.split(":")[0:2]
- if fnmatch.fnmatch(where, builder):
- try:
- val=int(val)
- except ValueError:
- continue
- if prio>=val:
- return prio
- else:
- return val
- return prio
-
- def mail_to(self):
- return self.mailto
-
- def message_to(self):
- m = Message()
- m.set_headers(to = self.mail_to(), cc = config.builder_list)
- return m
-
- def get_login(self):
- return self.login
-
-class ACL_Conf:
- def __init__(self):
- self.reload()
-
- def try_reload(self):
- mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
- if mtime != self.acl_conf_mtime:
- log.notice("acl.conf has changed, reloading...")
- self.reload()
- return True
- return False
-
- def reload(self):
- self.acl_conf_mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
- self.current_user = None
- status.push("reading acl.conf")
- p = ConfigParser.ConfigParser()
- p.readfp(open(path.acl_conf))
- self.users = {}
- for login in p.sections():
- if self.users.has_key(login):
- log.panic("acl: duplicate login: %s" % login)
- continue
- user = User(p, login)
- for e in user.gpg_emails:
- if self.users.has_key(e):
- log.panic("acl: user email colision %s <-> %s" % \
- (self.users[e].login, login))
- else:
- self.users[e] = user
- self.users[login] = user
- status.pop()
-
- def user_by_email(self, ems):
- for e in ems:
- if self.users.has_key(e):
- return self.users[e]
- return None
-
- def user(self, l):
- if not self.users.has_key(l):
- log.panic("no such user: %s" % l)
- return self.users[l]
-
- def set_current_user(self, u):
- self.current_user = u
- if u != None:
- status.email = u.mail_to()
-
- def current_user_login(self):
- if self.current_user != None:
- return self.current_user.login
- else:
- return ""
-
-acl = ACL_Conf()