1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
8 from io import StringIO
21 from bqueue import B_Queue
22 from config import config, init_conf
26 def alarmalarm(signum, frame):
27 raise IOError('TCP connection hung')
29 def has_new(control_url):
31 cnt_f = open(path.last_req_no_file)
33 last_count = int(cnt_f.readline().strip())
34 except ValueError as e:
39 socket.setdefaulttimeout(240)
40 signal.signal(signal.SIGALRM, alarmalarm)
43 headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }
44 req = urllib.request.Request(url=control_url + "/max_req_no", headers=headers)
45 f = urllib.request.urlopen(req)
46 count = int(f.readline().strip())
48 except Exception as e:
50 log.error("can't fetch %s: %s" % (control_url + "/max_req_no", e))
53 if count != last_count:
58 def fetch_queue(control_url):
59 signal.signal(signal.SIGALRM, alarmalarm)
60 socket.setdefaulttimeout(240)
63 headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }
64 req = urllib.request.Request(url=control_url + "/queue.gz", headers=headers)
65 f = urllib.request.urlopen(req)
67 except Exception as e:
69 log.error("can't fetch %s: %s" % (control_url + "/queue.gz", e))
75 f = gzip.GzipFile(fileobj = sio)
78 except struct.error as e:
79 log.alert("corrupted fetched queue.gz file")
81 (signers, body) = gpg.verify_sig(fdata)
82 u = acl.user_by_email(signers)
84 log.alert("queue.gz not signed with signature of valid user: %s" % signers)
86 if not u.can_do("sign_queue", "all"):
87 log.alert("user %s is not allowed to sign my queue" % u.login)
89 return request.parse_requests(body)
91 def handle_reqs(builder, reqs):
92 qpath = path.queue_file + "-" + builder
93 if not os.access(qpath, os.F_OK):
94 util.append_to(qpath, "<queue/>\n")
100 raise Exception('handle_reqs: fatal: huh? %s' % r.kind)
103 if builder in b.builders:
106 log.notice("queued %s (%d) for %s" % (r.id, r.no, builder))
112 lck = lock.lock("request_fetcher", non_block = True)
118 status.push("fetching requests")
119 if has_new(config.control_url):
120 q = fetch_queue(config.control_url)
126 if r.no > last_count:
128 for b in config.binary_builders:
129 handle_reqs(b, q_new)
130 f = open(path.last_req_no_file, "w")
131 f.write("%d\n" % max_no)
136 if __name__ == '__main__':
137 # http connection is established (and few bytes transferred through it)
138 # each $secs seconds.
139 loop.run_loop(main, secs = 10)