]> TLD Linux GIT Repositories - tld-builder.git/blob - TLD_Builder/acl.py
- python 3.x fixes
[tld-builder.git] / TLD_Builder / acl.py
1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
2
3 import sys
4 if sys.version_info[0] == 2:
5     import ConfigParser
6 else:
7     import configparser as ConfigParser
8 import string
9 import fnmatch
10 import os
11 import stat
12
13 import path
14 import log
15 import status
16 from mailer import Message
17 from config import config
18
19 class User:
20     def __init__(self, p, login):
21         self.login = login
22         self.privs = []
23         self.gpg_emails = []
24         self.mailto = ""
25         self.change_requester = False
26
27         if p.has_option(login, "gpg_emails"):
28             self.gpg_emails = p.get(login, "gpg_emails").split()
29         else:
30             log.panic("acl: [%s] has no gpg_emails" % login)
31
32         if p.has_option(login, "mailto"):
33             self.mailto = p.get(login, "mailto")
34         else:
35             if len(self.gpg_emails) > 0:
36                 self.mailto = self.gpg_emails[0]
37
38         if p.has_option(login, "change_requester"):
39             self.change_requester = True
40
41         if p.has_option(login, "privs"):
42             for p in p.get(login, "privs").split():
43                 l = p.split(":")
44                 if len(l) == 2:
45                     p+=":*"
46                 if len(l) not in (2,3) or l[0] == "" or l[1] == "":
47                     log.panic("acl: invalid priv format: '%s' [%s]" % (p, login))
48                 else:
49                     self.privs.append(p)
50         else:
51             log.panic("acl: [%s] has no privs" % login)
52
53     def can_do(self, what, where, branch=None):
54         if branch:
55             action = "%s:%s:%s" % (what, where, branch)
56         else:
57             action = "%s:%s:N-A" % (what, where)
58         for priv in self.privs:
59             if priv[0] == "!":
60                 ret = 0
61                 priv = priv[1:]
62             else:
63                 ret = 1
64             pwhat,pwhere,pbranch=priv.split(":")
65             for pbranch in pbranch.split(","):
66                 priv="%s:%s:%s" % (pwhat,pwhere,pbranch)
67                 if fnmatch.fnmatch(action, priv):
68                     return ret
69         return 0
70
71     def check_priority(self, prio, where):
72         for priv in self.privs:
73             val,builder=priv.split(":")[0:2]
74             if fnmatch.fnmatch(where, builder):
75                 try:
76                     val=int(val)
77                 except ValueError:
78                     continue
79                 if prio>=val:
80                     return prio
81                 else:
82                     return val
83         return prio
84
85     def mail_to(self):
86         return self.mailto
87
88     def message_to(self):
89         m = Message()
90         m.set_headers(to = self.mail_to(), cc = config.builder_list)
91         return m
92
93     def get_login(self):
94         return self.login
95
96 class ACL_Conf:
97     def __init__(self):
98         self.reload()
99
100     def try_reload(self):
101         mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
102         if mtime != self.acl_conf_mtime:
103             log.notice("acl.conf has changed, reloading...")
104             self.reload()
105             return True
106         return False
107
108     def reload(self):
109         self.acl_conf_mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
110         self.current_user = None
111         status.push("reading acl.conf")
112         p = ConfigParser.ConfigParser()
113         p.readfp(open(path.acl_conf))
114         self.users = {}
115         for login in p.sections():
116             if login in self.users:
117                 log.panic("acl: duplicate login: %s" % login)
118                 continue
119             user = User(p, login)
120             for e in user.gpg_emails:
121                 if e in self.users:
122                     log.panic("acl: user email colision %s <-> %s" % \
123                               (self.users[e].login, login))
124                 else:
125                     self.users[e] = user
126             self.users[login] = user
127         status.pop()
128
129     def user_by_email(self, ems):
130         for e in ems:
131             if e in self.users:
132                 return self.users[e]
133         return None
134
135     def user_by_login(self, l):
136         return self.users[l]
137
138     def user(self, l):
139         if not l in self.users:
140             log.panic("no such user: %s" % l)
141         return self.users[l]
142
143     def set_current_user(self, u):
144         self.current_user = u
145         if u != None:
146             status.email = u.mail_to()
147
148     def current_user_login(self):
149         if self.current_user != None:
150             return self.current_user.login
151         else:
152             return ""
153
154 acl = ACL_Conf()