]> TLD Linux GIT Repositories - TLD.git/blob - pld-builder.new/PLD_Builder/acl.py
a769d0694570dc3e85d6503f28deaef09055b086
[TLD.git] / pld-builder.new / PLD_Builder / acl.py
1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
2
3 import ConfigParser
4 import string
5 import fnmatch
6 import os
7 import stat
8
9 import path
10 import log
11 import status
12 from mailer import Message
13 from config import config
14
15 class User:
16     def __init__(self, p, login):
17         self.login = login
18         self.privs = []
19         self.gpg_emails = []
20         self.mailto = ""
21
22         if p.has_option(login, "gpg_emails"):
23             self.gpg_emails = string.split(p.get(login, "gpg_emails"))
24         else:
25             log.panic("acl: [%s] has no gpg_emails" % login)
26
27         if p.has_option(login, "mailto"):
28             self.mailto = p.get(login, "mailto")
29         else:
30             if len(self.gpg_emails) > 0:
31                 self.mailto = self.gpg_emails[0]
32
33         if p.has_option(login, "privs"):
34             for p in string.split(p.get(login, "privs")):
35                 l = string.split(p, ":")
36                 if len(l) == 2:
37                     p+=":*"
38                 if len(l) not in (2,3) or l[0] == "" or l[1] == "":
39                     log.panic("acl: invalid priv format: '%s' [%s]" % (p, login))
40                 else:
41                     self.privs.append(p)
42         else:
43             log.panic("acl: [%s] has no privs" % login)
44
45     def can_do(self, what, where, branch=None):
46         if branch:
47             action = "%s:%s:%s" % (what, where, branch)
48         else:
49             action = "%s:%s:N-A" % (what, where)
50         for priv in self.privs:
51             if priv[0] == "!":
52                 ret = 0
53                 priv = priv[1:]
54             else:
55                 ret = 1
56             pwhat,pwhere,pbranch=priv.split(":")
57             for pbranch in pbranch.split(","):
58                 priv="%s:%s:%s" % (pwhat,pwhere,pbranch)
59                 if fnmatch.fnmatch(action, priv):
60                     return ret
61         return 0
62
63     def check_priority(self, prio, where):
64         for priv in self.privs:
65             val,builder=priv.split(":")[0:2]
66             if fnmatch.fnmatch(where, builder):
67                 try:
68                     val=int(val)
69                 except ValueError:
70                     continue
71                 if prio>=val:
72                     return prio
73                 else:
74                     return val
75         return prio
76
77     def mail_to(self):
78         return self.mailto
79
80     def message_to(self):
81         m = Message()
82         m.set_headers(to = self.mail_to(), cc = config.builder_list)
83         return m
84
85     def get_login(self):
86         return self.login
87
88 class ACL_Conf:
89     def __init__(self):
90         self.reload()
91
92     def try_reload(self):
93         mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
94         if mtime != self.acl_conf_mtime:
95             log.notice("acl.conf has changed, reloading...")
96             self.reload()
97             return True
98         return False
99
100     def reload(self):
101         self.acl_conf_mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
102         self.current_user = None
103         status.push("reading acl.conf")
104         p = ConfigParser.ConfigParser()
105         p.readfp(open(path.acl_conf))
106         self.users = {}
107         for login in p.sections():
108             if self.users.has_key(login):
109                 log.panic("acl: duplicate login: %s" % login)
110                 continue
111             user = User(p, login)
112             for e in user.gpg_emails:
113                 if self.users.has_key(e):
114                     log.panic("acl: user email colision %s <-> %s" % \
115                               (self.users[e].login, login))
116                 else:
117                     self.users[e] = user
118             self.users[login] = user
119         status.pop()
120
121     def user_by_email(self, ems):
122         for e in ems:
123             if self.users.has_key(e):
124                 return self.users[e]
125         return None
126
127     def user(self, l):
128         if not self.users.has_key(l):
129             log.panic("no such user: %s" % l)
130         return self.users[l]
131
132     def set_current_user(self, u):
133         self.current_user = u
134         if u != None:
135             status.email = u.mail_to()
136
137     def current_user_login(self):
138         if self.current_user != None:
139             return self.current_user.login
140         else:
141             return ""
142
143 acl = ACL_Conf()