1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
9 if sys.version_info[0] == 2:
12 from io import StringIO
26 from bqueue import B_Queue
27 from config import config, init_conf
31 def alarmalarm(signum, frame):
32 raise IOError('TCP connection hung')
34 def has_new(control_url):
36 cnt_f = open(path.last_req_no_file)
38 last_count = int(cnt_f.readline().strip())
39 except ValueError as e:
44 socket.setdefaulttimeout(240)
45 signal.signal(signal.SIGALRM, alarmalarm)
48 headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }
49 req = urllib2.Request(url=control_url + "/max_req_no", headers=headers)
50 f = urllib2.urlopen(req)
51 count = int(f.readline().strip())
53 except Exception as e:
55 log.error("can't fetch %s: %s" % (control_url + "/max_req_no", e))
58 if count != last_count:
63 def fetch_queue(control_url):
64 signal.signal(signal.SIGALRM, alarmalarm)
65 socket.setdefaulttimeout(240)
68 headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }
69 req = urllib2.Request(url=control_url + "/queue.gz", headers=headers)
70 f = urllib2.urlopen(req)
72 except Exception as e:
74 log.error("can't fetch %s: %s" % (control_url + "/queue.gz", e))
76 sio = StringIO.StringIO()
80 f = gzip.GzipFile(fileobj = sio)
83 except struct.error as e:
84 log.alert("corrupted fetched queue.gz file")
86 (signers, body) = gpg.verify_sig(fdata)
87 u = acl.user_by_email(signers)
89 log.alert("queue.gz not signed with signature of valid user: %s" % signers)
91 if not u.can_do("sign_queue", "all"):
92 log.alert("user %s is not allowed to sign my queue" % u.login)
94 return request.parse_requests(body)
96 def handle_reqs(builder, reqs):
97 qpath = path.queue_file + "-" + builder
98 if not os.access(qpath, os.F_OK):
99 util.append_to(qpath, "<queue/>\n")
104 if r.kind != 'group':
105 raise Exception('handle_reqs: fatal: huh? %s' % r.kind)
108 if builder in b.builders:
111 log.notice("queued %s (%d) for %s" % (r.id, r.no, builder))
117 lck = lock.lock("request_fetcher", non_block = True)
123 status.push("fetching requests")
124 if has_new(config.control_url):
125 q = fetch_queue(config.control_url)
131 if r.no > last_count:
133 for b in config.binary_builders:
134 handle_reqs(b, q_new)
135 f = open(path.last_req_no_file, "w")
136 f.write("%d\n" % max_no)
141 if __name__ == '__main__':
142 # http connection is established (and few bytes transferred through it)
143 # each $secs seconds.
144 loop.run_loop(main, secs = 10)