]> TLD Linux GIT Repositories - tld-builder.git/blob - TLD_Builder/request_fetcher.py
9877cde771572ba4e7a23ca73fa2e2251a639ee2
[tld-builder.git] / TLD_Builder / request_fetcher.py
1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
2
3 import string
4 import signal
5 import os
6 import urllib
7 import urllib2
8 import StringIO
9 import sys
10 import gzip
11
12 import path
13 import log
14 import status
15 import lock
16 import util
17 import gpg
18 import request
19 import loop
20 import socket
21 import struct
22 from acl import acl
23 from bqueue import B_Queue
24 from config import config, init_conf
25
26 last_count = 0
27
28 def alarmalarm(signum, frame):
29     raise IOError, 'TCP connection hung'
30
31 def has_new(control_url):
32     global last_count
33     cnt_f = open(path.last_req_no_file)
34     try:
35         last_count = int(string.strip(cnt_f.readline()))
36     except ValueError, e:
37         last_count = 0
38
39     cnt_f.close()
40     f = None
41     socket.setdefaulttimeout(240)
42     signal.signal(signal.SIGALRM, alarmalarm)
43     signal.alarm(300)
44     try:
45         headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }
46         req = urllib2.Request(url=control_url + "/max_req_no", headers=headers)
47         f = urllib2.urlopen(req)
48         count = int(string.strip(f.readline()))
49         signal.alarm(0)
50     except Exception, e:
51         signal.alarm(0)
52         log.error("can't fetch %s: %s" % (control_url + "/max_req_no", e))
53         sys.exit(1)
54     res = 0
55     if count != last_count:
56         res = 1
57     f.close()
58     return res
59
60 def fetch_queue(control_url):
61     signal.signal(signal.SIGALRM, alarmalarm)
62     socket.setdefaulttimeout(240)
63     signal.alarm(300)
64     try:
65         headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }
66         req = urllib2.Request(url=control_url + "/queue.gz", headers=headers)
67         f = urllib2.urlopen(req)
68         signal.alarm(0)
69     except Exception, e:
70         signal.alarm(0)
71         log.error("can't fetch %s: %s" % (control_url + "/queue.gz", e))
72         sys.exit(1)
73     sio = StringIO.StringIO()
74     util.sendfile(f, sio)
75     f.close()
76     sio.seek(0)
77     f = gzip.GzipFile(fileobj = sio)
78     try:
79         fdata = f.read()
80     except struct.error, e:
81         log.alert("corrupted fetched queue.gz file")
82         sys.exit(1)
83     (signers, body) = gpg.verify_sig(fdata)
84     u = acl.user_by_email(signers)
85     if u == None:
86         log.alert("queue.gz not signed with signature of valid user: %s" % signers)
87         sys.exit(1)
88     if not u.can_do("sign_queue", "all"):
89         log.alert("user %s is not allowed to sign my queue" % u.login)
90         sys.exit(1)
91     return request.parse_requests(body)
92
93 def handle_reqs(builder, reqs):
94     qpath = path.queue_file + "-" + builder
95     if not os.access(qpath, os.F_OK):
96         util.append_to(qpath, "<queue/>\n")
97     q = B_Queue(qpath)
98     q.lock(0)
99     q.read()
100     for r in reqs:
101         if r.kind != 'group':
102             raise Exception, 'handle_reqs: fatal: huh? %s' % r.kind
103         need_it = 0
104         for b in r.batches:
105             if builder in b.builders:
106                 need_it = 1
107         if need_it:
108             log.notice("queued %s (%d) for %s" % (r.id, r.no, builder))
109             q.add(r)
110     q.write()
111     q.unlock()
112
113 def main():
114     lck = lock.lock("request_fetcher", non_block = True)
115     if lck == None:
116         sys.exit(1)
117     init_conf()
118     acl.try_reload()
119
120     status.push("fetching requests")
121     if has_new(config.control_url):
122         q = fetch_queue(config.control_url)
123         max_no = 0
124         q_new = []
125         for r in q:
126             if r.no > max_no:
127                 max_no = r.no
128             if r.no > last_count:
129                 q_new.append(r)
130         for b in config.binary_builders:
131             handle_reqs(b, q_new)
132         f = open(path.last_req_no_file, "w")
133         f.write("%d\n" % max_no)
134         f.close()
135     status.pop()
136     lck.close()
137
138 if __name__ == '__main__':
139     # http connection is established (and few bytes transferred through it)
140     # each $secs seconds.
141     loop.run_loop(main, secs = 10)