]> TLD Linux GIT Repositories - tld-builder.git/blob - TLD_Builder/file_sender.py
77a9848bc2d97c214d0764e261eddb9ecfc48e01
[tld-builder.git] / TLD_Builder / file_sender.py
1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
2
3 import glob
4 import re
5 import string
6 import os
7 import time
8 import shutil
9 import sys
10 import traceback
11 import urllib2
12
13 from config import config, init_conf
14 import mailer
15 import path
16 import log
17 import loop
18 import status
19 import lock
20
21 retries_times = [5 * 60, 5 * 60, 10 * 60, 10 * 60, 30 * 60, 60 * 60]
22
23 def read_name_val(file):
24     f = open(file)
25     r = {'_file': file[:-5], '_desc': file}
26     rx = re.compile(r"^([^:]+)\s*:(.*)$")
27     for l in f:
28         if l == "END\n":
29             f.close()
30             return r
31         m = rx.search(l)
32         if m:
33             r[m.group(1)] = m.group(2).strip()
34         else:
35             break
36     f.close()
37     return None
38
39 def scp_file(src, target):
40     global problems
41     f = os.popen("scp -P 7272 -v -B %s %s 2>&1 < /dev/null" % (src, target))
42     p = f.read()
43     ret = f.close()
44     if ret:
45         problems[src] = p
46     return ret
47
48 def copy_file(src, target):
49     try:
50         shutil.copyfile(src, target)
51         return 0
52     except:
53         global problems
54         exctype, value = sys.exc_info()[:2]
55         problems[src] = "cannot copy file: %s" % traceback.format_exception_only(exctype, value)
56         return 1
57
58 def rsync_file(src, target, host):
59     global problems
60
61     p = open(path.rsync_password_file, "r")
62     password = ""
63     for l in p:
64         l = l.split()
65         if len(l) >= 2 and l[0] == host:
66             password = l[1]
67     p.close()
68
69     # NOTE: directing STDIN to /dev/null, does not make rsync to skip asking
70     # password, it opens /dev/tty and still asks if password is needed and
71     # missing, therefore we always set RSYNC_PASSWORD env var
72     os.environ["RSYNC_PASSWORD"] = password
73     rsync = "rsync --verbose --archive --timeout=360 --contimeout=360"
74     f = os.popen("%s %s %s 2>&1" % (rsync, src, target))
75     p = f.read()
76     ret = f.close()
77     if ret:
78         problems[src] = p
79     del os.environ["RSYNC_PASSWORD"];
80     return ret
81
82 def rsync_ssh_file(src, target):
83     global problems
84     rsync = "rsync --verbose --archive --timeout=360 -e ssh"
85     f = os.popen("%s %s %s 2>&1 < /dev/null" % (rsync, src, target))
86     p = f.read()
87     ret = f.close()
88     if ret:
89         problems[src] = p
90     return ret
91
92 def post_file(src, url):
93     global problems
94     try:
95         f = open(src, 'r')
96         data = f.read()
97         f.close()
98         req = urllib2.Request(url, data)
99         req.add_header('X-Filename', os.path.basename(src))
100         f = urllib2.urlopen(req)
101         f.close()
102     except Exception as e:
103         problems[src] = e
104         return e
105     return 0
106
107 def send_file(src, target):
108     global problems
109     try:
110         log.notice("sending %s to %s (size %d bytes)" % (src, target, os.stat(src).st_size))
111         m = re.match('rsync://([^/]+)/.*', target)
112         if m:
113             return not rsync_file(src, target, host = m.group(1))
114         if target != "" and target[0] == '/':
115             return not copy_file(src, target)
116         m = re.match('scp://([^@:]+@[^/:]+)(:|)(.*)', target)
117         if m:
118             return not scp_file(src, m.group(1) + ":" + m.group(3))
119         m = re.match('ssh\+rsync://([^@:]+@[^/:]+)(:|)(.*)', target)
120         if m:
121             return not rsync_ssh_file(src, m.group(1) + ":" + m.group(3))
122         m = re.match('(http|https)://.*', target)
123         if m:
124             return not post_file(src, target)
125         log.alert("unsupported protocol: %s" % target)
126     except OSError as e:
127         problems[src] = e
128         log.error("send_file(%s, %s): %s" % (src, target, e))
129         return False
130     return True
131
132 def maybe_flush_queue(dir):
133     retry_delay = 0
134     try:
135         f = open(dir + "/retry-at")
136         last_retry = int(f.readline().strip())
137         retry_delay = int(f.readline().strip())
138         f.close()
139         if last_retry + retry_delay > time.time():
140             return
141         os.unlink(dir + "/retry-at")
142     except:
143         pass
144
145     status.push("flushing %s" % dir)
146
147     if flush_queue(dir):
148         f = open(dir + "/retry-at", "w")
149         if retry_delay in retries_times:
150             idx = retries_times.index(retry_delay)
151             if idx < len(retries_times) - 1: idx += 1
152         else:
153             idx = 0
154         f.write("%d\n%d\n" % (time.time(), retries_times[idx]))
155         f.close()
156
157     status.pop()
158
159 def flush_queue(dir):
160     q = []
161     os.chdir(dir)
162     for f in glob.glob(dir + "/*.desc"):
163         d = read_name_val(f)
164         if d != None: q.append(d)
165     def mycmp(x, y):
166         rc = cmp(x['Time'], y['Time'])
167         if rc == 0 and 'Type' in x and 'Type' in y:
168             return cmp(x['Type'], y['Type'])
169         else:
170             return rc
171     q.sort(mycmp)
172
173     error = None
174     # copy of q
175     remaining = q[:]
176     for d in q:
177         if not send_file(d['_file'], d['Target']):
178             error = d
179             continue
180         if os.access(d['_file'] + ".info", os.F_OK):
181             if not send_file(d['_file'] + ".info", d['Target'] + ".info"):
182                 error = d
183                 continue
184             os.unlink(d['_file'] + ".info")
185         os.unlink(d['_file'])
186         os.unlink(d['_desc'])
187         remaining.remove(d)
188
189     if error != None:
190         emails = {}
191         emails[config.admin_email] = 1
192         pr = ""
193         for src, msg in problems.items():
194             pr = pr + "[src: %s]\n\n%s\n" % (src, msg)
195         for d in remaining:
196             if 'Requester' in d:
197                 emails[d['Requester']] = 1
198         e = emails.keys()
199         m = mailer.Message()
200         m.set_headers(to = string.join(e, ", "),
201                       subject = "[%s] builder queue problem" % config.builder)
202         m.write("there were problems sending files from queue %s:\n" % dir)
203         m.write("problems:\n")
204         m.write("%s\n" % pr)
205         m.send()
206         log.error("error sending files from %s:\n%s\n" % (dir, pr))
207         return 1
208
209     return 0
210
211 problems = {}
212
213 def main():
214     if lock.lock("sending-files", non_block = 1) == None:
215         return
216     init_conf()
217     maybe_flush_queue(path.notify_queue_dir)
218     maybe_flush_queue(path.buildlogs_queue_dir)
219     maybe_flush_queue(path.ftp_queue_dir)
220
221 if __name__ == '__main__':
222     loop.run_loop(main)