]> TLD Linux GIT Repositories - tld-builder.git/blob - TLD_Builder/request_fetcher.py
- fix urllib imports
[tld-builder.git] / TLD_Builder / request_fetcher.py
1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
2
3 import string
4 import signal
5 import os
6 import urllib.request
7 import sys
8 from io import StringIO, BytesIO
9 import gzip
10 import path
11 import log
12 import status
13 import lock
14 import util
15 import shutil
16 import gpg
17 import request
18 import loop
19 import socket
20 import struct
21 from acl import acl
22 from bqueue import B_Queue
23 from config import config, init_conf
24
25 last_count = 0
26
27 def alarmalarm(signum, frame):
28     raise IOError('TCP connection hung')
29
30 def has_new(control_url):
31     global last_count
32     cnt_f = open(path.last_req_no_file)
33     try:
34         last_count = int(cnt_f.readline().strip())
35     except ValueError as e:
36         last_count = 0
37
38     cnt_f.close()
39     f = None
40     socket.setdefaulttimeout(240)
41     signal.signal(signal.SIGALRM, alarmalarm)
42     signal.alarm(300)
43     try:
44         headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }
45         req = urllib.request.Request(url=control_url + "/max_req_no", headers=headers)
46         f = urllib.request.urlopen(req)
47         count = int(f.readline().strip())
48         signal.alarm(0)
49     except Exception as e:
50         signal.alarm(0)
51         log.error("can't fetch %s: %s" % (control_url + "/max_req_no", e))
52         sys.exit(1)
53     res = 0
54     if count != last_count:
55         res = 1
56     f.close()
57     return res
58
59 def fetch_queue(control_url):
60     signal.signal(signal.SIGALRM, alarmalarm)
61     socket.setdefaulttimeout(240)
62     signal.alarm(300)
63     try:
64         headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }
65         req = urllib.request.Request(url=control_url + "/queue.gz", headers=headers)
66         f = urllib.request.urlopen(req)
67         signal.alarm(0)
68     except Exception as e:
69         signal.alarm(0)
70         log.error("can't fetch %s: %s" % (control_url + "/queue.gz", e))
71         sys.exit(1)
72     sio = BytesIO()
73     shutil.copyfileobj(f, sio)
74     f.close()
75     sio.seek(0)
76     f = gzip.GzipFile(fileobj = sio)
77     try:
78         fdata = f.read()
79     except struct.error as e:
80         log.alert("corrupted fetched queue.gz file")
81         sys.exit(1)
82     (signers, body) = gpg.verify_sig(fdata)
83     u = acl.user_by_email(signers)
84     if u == None:
85         log.alert("queue.gz not signed with signature of valid user: %s" % signers)
86         sys.exit(1)
87     if not u.can_do("sign_queue", "all"):
88         log.alert("user %s is not allowed to sign my queue" % u.login)
89         sys.exit(1)
90     return request.parse_requests(body)
91
92 def handle_reqs(builder, reqs):
93     qpath = path.queue_file + "-" + builder
94     if not os.access(qpath, os.F_OK):
95         util.append_to(qpath, "<queue/>\n")
96     q = B_Queue(qpath)
97     q.lock(0)
98     q.read()
99     for r in reqs:
100         if r.kind != 'group':
101             raise Exception('handle_reqs: fatal: huh? %s' % r.kind)
102         need_it = 0
103         for b in r.batches:
104             if builder in b.builders:
105                 need_it = 1
106         if need_it:
107             log.notice("queued %s (%d) for %s" % (r.id, r.no, builder))
108             q.add(r)
109     q.write()
110     q.unlock()
111
112 def main():
113     lck = lock.lock("request_fetcher", non_block = True)
114     if lck == None:
115         sys.exit(1)
116     init_conf()
117     acl.try_reload()
118
119     status.push("fetching requests")
120     if has_new(config.control_url):
121         q = fetch_queue(config.control_url)
122         max_no = 0
123         q_new = []
124         for r in q:
125             if r.no > max_no:
126                 max_no = r.no
127             if r.no > last_count:
128                 q_new.append(r)
129         for b in config.binary_builders:
130             handle_reqs(b, q_new)
131         f = open(path.last_req_no_file, "w")
132         f.write("%d\n" % max_no)
133         f.close()
134     status.pop()
135     lck.close()
136
137 if __name__ == '__main__':
138     # http connection is established (and few bytes transferred through it)
139     # each $secs seconds.
140     loop.run_loop(main, secs = 10)