]> TLD Linux GIT Repositories - tld-builder.git/blob - TLD_Builder/acl.py
5f0621974187df32d6db3bb5d81b0329fecc438b
[tld-builder.git] / TLD_Builder / acl.py
1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
2
3 import ConfigParser
4 import string
5 import fnmatch
6 import os
7 import stat
8
9 import path
10 import log
11 import status
12 from mailer import Message
13 from config import config
14
15 class User:
16     def __init__(self, p, login):
17         self.login = login
18         self.privs = []
19         self.gpg_emails = []
20         self.mailto = ""
21         self.change_requester = False
22
23         if p.has_option(login, "gpg_emails"):
24             self.gpg_emails = string.split(p.get(login, "gpg_emails"))
25         else:
26             log.panic("acl: [%s] has no gpg_emails" % login)
27
28         if p.has_option(login, "mailto"):
29             self.mailto = p.get(login, "mailto")
30         else:
31             if len(self.gpg_emails) > 0:
32                 self.mailto = self.gpg_emails[0]
33
34         if p.has_option(login, "change_requester"):
35             self.change_requester = True
36
37         if p.has_option(login, "privs"):
38             for p in string.split(p.get(login, "privs")):
39                 l = string.split(p, ":")
40                 if len(l) == 2:
41                     p+=":*"
42                 if len(l) not in (2,3) or l[0] == "" or l[1] == "":
43                     log.panic("acl: invalid priv format: '%s' [%s]" % (p, login))
44                 else:
45                     self.privs.append(p)
46         else:
47             log.panic("acl: [%s] has no privs" % login)
48
49     def can_do(self, what, where, branch=None):
50         if branch:
51             action = "%s:%s:%s" % (what, where, branch)
52         else:
53             action = "%s:%s:N-A" % (what, where)
54         for priv in self.privs:
55             if priv[0] == "!":
56                 ret = 0
57                 priv = priv[1:]
58             else:
59                 ret = 1
60             pwhat,pwhere,pbranch=priv.split(":")
61             for pbranch in pbranch.split(","):
62                 priv="%s:%s:%s" % (pwhat,pwhere,pbranch)
63                 if fnmatch.fnmatch(action, priv):
64                     return ret
65         return 0
66
67     def check_priority(self, prio, where):
68         for priv in self.privs:
69             val,builder=priv.split(":")[0:2]
70             if fnmatch.fnmatch(where, builder):
71                 try:
72                     val=int(val)
73                 except ValueError:
74                     continue
75                 if prio>=val:
76                     return prio
77                 else:
78                     return val
79         return prio
80
81     def mail_to(self):
82         return self.mailto
83
84     def message_to(self):
85         m = Message()
86         m.set_headers(to = self.mail_to(), cc = config.builder_list)
87         return m
88
89     def get_login(self):
90         return self.login
91
92 class ACL_Conf:
93     def __init__(self):
94         self.reload()
95
96     def try_reload(self):
97         mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
98         if mtime != self.acl_conf_mtime:
99             log.notice("acl.conf has changed, reloading...")
100             self.reload()
101             return True
102         return False
103
104     def reload(self):
105         self.acl_conf_mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
106         self.current_user = None
107         status.push("reading acl.conf")
108         p = ConfigParser.ConfigParser()
109         p.readfp(open(path.acl_conf))
110         self.users = {}
111         for login in p.sections():
112             if self.users.has_key(login):
113                 log.panic("acl: duplicate login: %s" % login)
114                 continue
115             user = User(p, login)
116             for e in user.gpg_emails:
117                 if self.users.has_key(e):
118                     log.panic("acl: user email colision %s <-> %s" % \
119                               (self.users[e].login, login))
120                 else:
121                     self.users[e] = user
122             self.users[login] = user
123         status.pop()
124
125     def user_by_email(self, ems):
126         for e in ems:
127             if self.users.has_key(e):
128                 return self.users[e]
129         return None
130
131     def user_by_login(self, l):
132         return self.users[l]
133
134     def user(self, l):
135         if not self.users.has_key(l):
136             log.panic("no such user: %s" % l)
137         return self.users[l]
138
139     def set_current_user(self, u):
140         self.current_user = u
141         if u != None:
142             status.email = u.mail_to()
143
144     def current_user_login(self):
145         if self.current_user != None:
146             return self.current_user.login
147         else:
148             return ""
149
150 acl = ACL_Conf()