# vi: encoding=utf-8 ts=8 sts=4 sw=4 et import string import signal import os import urllib import sys from io import StringIO, BytesIO import gzip import path import log import status import lock import util import gpg import request import loop import socket import struct from acl import acl from bqueue import B_Queue from config import config, init_conf last_count = 0 def alarmalarm(signum, frame): raise IOError('TCP connection hung') def has_new(control_url): global last_count cnt_f = open(path.last_req_no_file) try: last_count = int(cnt_f.readline().strip()) except ValueError as e: last_count = 0 cnt_f.close() f = None socket.setdefaulttimeout(240) signal.signal(signal.SIGALRM, alarmalarm) signal.alarm(300) try: headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' } req = urllib.request.Request(url=control_url + "/max_req_no", headers=headers) f = urllib.request.urlopen(req) count = int(f.readline().strip()) signal.alarm(0) except Exception as e: signal.alarm(0) log.error("can't fetch %s: %s" % (control_url + "/max_req_no", e)) sys.exit(1) res = 0 if count != last_count: res = 1 f.close() return res def fetch_queue(control_url): signal.signal(signal.SIGALRM, alarmalarm) socket.setdefaulttimeout(240) signal.alarm(300) try: headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' } req = urllib.request.Request(url=control_url + "/queue.gz", headers=headers) f = urllib.request.urlopen(req) signal.alarm(0) except Exception as e: signal.alarm(0) log.error("can't fetch %s: %s" % (control_url + "/queue.gz", e)) sys.exit(1) sio = BytesIO() util.sendfile(f, sio) f.close() sio.seek(0) f = gzip.GzipFile(fileobj = sio) try: fdata = f.read() except struct.error as e: log.alert("corrupted fetched queue.gz file") sys.exit(1) (signers, body) = gpg.verify_sig(fdata) u = acl.user_by_email(signers) if u == None: log.alert("queue.gz not signed with signature of valid user: %s" % signers) sys.exit(1) if not u.can_do("sign_queue", "all"): log.alert("user %s is not allowed to sign my queue" % u.login) sys.exit(1) return request.parse_requests(body) def handle_reqs(builder, reqs): qpath = path.queue_file + "-" + builder if not os.access(qpath, os.F_OK): util.append_to(qpath, "\n") q = B_Queue(qpath) q.lock(0) q.read() for r in reqs: if r.kind != 'group': raise Exception('handle_reqs: fatal: huh? %s' % r.kind) need_it = 0 for b in r.batches: if builder in b.builders: need_it = 1 if need_it: log.notice("queued %s (%d) for %s" % (r.id, r.no, builder)) q.add(r) q.write() q.unlock() def main(): lck = lock.lock("request_fetcher", non_block = True) if lck == None: sys.exit(1) init_conf() acl.try_reload() status.push("fetching requests") if has_new(config.control_url): q = fetch_queue(config.control_url) max_no = 0 q_new = [] for r in q: if r.no > max_no: max_no = r.no if r.no > last_count: q_new.append(r) for b in config.binary_builders: handle_reqs(b, q_new) f = open(path.last_req_no_file, "w") f.write("%d\n" % max_no) f.close() status.pop() lck.close() if __name__ == '__main__': # http connection is established (and few bytes transferred through it) # each $secs seconds. loop.run_loop(main, secs = 10)