]> TLD Linux GIT Repositories - tld-builder.git/blob - TLD_Builder/acl.py
- more python3 fixes, dropped python2 support
[tld-builder.git] / TLD_Builder / acl.py
1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
2
3 import sys
4 import configparser as ConfigParser
5 import string
6 import fnmatch
7 import os
8 import stat
9
10 import path
11 import log
12 import status
13 from mailer import Message
14 from config import config
15
16 class User:
17     def __init__(self, p, login):
18         self.login = login
19         self.privs = []
20         self.gpg_emails = []
21         self.mailto = ""
22         self.change_requester = False
23
24         if p.has_option(login, "gpg_emails"):
25             self.gpg_emails = p.get(login, "gpg_emails").split()
26         else:
27             log.panic("acl: [%s] has no gpg_emails" % login)
28
29         if p.has_option(login, "mailto"):
30             self.mailto = p.get(login, "mailto")
31         else:
32             if len(self.gpg_emails) > 0:
33                 self.mailto = self.gpg_emails[0]
34
35         if p.has_option(login, "change_requester"):
36             self.change_requester = True
37
38         if p.has_option(login, "privs"):
39             for p in p.get(login, "privs").split():
40                 l = p.split(":")
41                 if len(l) == 2:
42                     p+=":*"
43                 if len(l) not in (2,3) or l[0] == "" or l[1] == "":
44                     log.panic("acl: invalid priv format: '%s' [%s]" % (p, login))
45                 else:
46                     self.privs.append(p)
47         else:
48             log.panic("acl: [%s] has no privs" % login)
49
50     def can_do(self, what, where, branch=None):
51         if branch:
52             action = "%s:%s:%s" % (what, where, branch)
53         else:
54             action = "%s:%s:N-A" % (what, where)
55         for priv in self.privs:
56             if priv[0] == "!":
57                 ret = 0
58                 priv = priv[1:]
59             else:
60                 ret = 1
61             pwhat,pwhere,pbranch=priv.split(":")
62             for pbranch in pbranch.split(","):
63                 priv="%s:%s:%s" % (pwhat,pwhere,pbranch)
64                 if fnmatch.fnmatch(action, priv):
65                     return ret
66         return 0
67
68     def check_priority(self, prio, where):
69         for priv in self.privs:
70             val,builder=priv.split(":")[0:2]
71             if fnmatch.fnmatch(where, builder):
72                 try:
73                     val=int(val)
74                 except ValueError:
75                     continue
76                 if prio>=val:
77                     return prio
78                 else:
79                     return val
80         return prio
81
82     def mail_to(self):
83         return self.mailto
84
85     def message_to(self):
86         m = Message()
87         m.set_headers(to = self.mail_to(), cc = config.builder_list)
88         return m
89
90     def get_login(self):
91         return self.login
92
93 class ACL_Conf:
94     def __init__(self):
95         self.reload()
96
97     def try_reload(self):
98         mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
99         if mtime != self.acl_conf_mtime:
100             log.notice("acl.conf has changed, reloading...")
101             self.reload()
102             return True
103         return False
104
105     def reload(self):
106         self.acl_conf_mtime = os.stat(path.acl_conf)[stat.ST_MTIME]
107         self.current_user = None
108         status.push("reading acl.conf")
109         p = ConfigParser.ConfigParser()
110         p.readfp(open(path.acl_conf))
111         self.users = {}
112         for login in p.sections():
113             if login in self.users:
114                 log.panic("acl: duplicate login: %s" % login)
115                 continue
116             user = User(p, login)
117             for e in user.gpg_emails:
118                 if e in self.users:
119                     log.panic("acl: user email colision %s <-> %s" % \
120                               (self.users[e].login, login))
121                 else:
122                     self.users[e] = user
123             self.users[login] = user
124         status.pop()
125
126     def user_by_email(self, ems):
127         for e in ems:
128             if e in self.users:
129                 return self.users[e]
130         return None
131
132     def user_by_login(self, l):
133         return self.users[l]
134
135     def user(self, l):
136         if not l in self.users:
137             log.panic("no such user: %s" % l)
138         return self.users[l]
139
140     def set_current_user(self, u):
141         self.current_user = u
142         if u != None:
143             status.email = u.mail_to()
144
145     def current_user_login(self):
146         if self.current_user != None:
147             return self.current_user.login
148         else:
149             return ""
150
151 acl = ACL_Conf()