1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
12 from config import config, init_conf
21 retries_times = [5 * 60, 5 * 60, 10 * 60, 10 * 60, 30 * 60, 60 * 60]
23 def read_name_val(file):
25 r = {'_file': file[:-5], '_desc': file}
26 rx = re.compile(r"^([^:]+)\s*:(.*)$")
33 r[m.group(1)] = m.group(2).strip()
39 def scp_file(src, target):
41 f = os.popen("scp -P 7272 -v -B %s %s 2>&1 < /dev/null" % (src, target))
48 def copy_file(src, target):
50 shutil.copyfile(src, target)
54 exctype, value = sys.exc_info()[:2]
55 problems[src] = "cannot copy file: %s" % traceback.format_exception_only(exctype, value)
58 def rsync_file(src, target, host):
61 p = open(path.rsync_password_file, "r")
65 if len(l) >= 2 and l[0] == host:
69 # NOTE: directing STDIN to /dev/null, does not make rsync to skip asking
70 # password, it opens /dev/tty and still asks if password is needed and
71 # missing, therefore we always set RSYNC_PASSWORD env var
72 os.environ["RSYNC_PASSWORD"] = password
73 rsync = "rsync --verbose --archive --timeout=360 --contimeout=360"
74 f = os.popen("%s %s %s 2>&1" % (rsync, src, target))
79 del os.environ["RSYNC_PASSWORD"];
82 def rsync_ssh_file(src, target):
84 rsync = "rsync --verbose --archive --timeout=360 -e ssh"
85 f = os.popen("%s %s %s 2>&1 < /dev/null" % (rsync, src, target))
92 def post_file(src, url):
96 data = f.read().encode('utf-8')
98 headers = { 'X-Filename' : os.path.basename(src) }
99 req = urllib.request.Request(url, data=data, headers=headers)
100 f = urllib.request.urlopen(req)
102 except Exception as e:
107 def send_file(src, target):
110 log.notice("sending %s to %s (size %d bytes)" % (src, target, os.stat(src).st_size))
111 m = re.match('rsync://([^/]+)/.*', target)
113 return not rsync_file(src, target, host = m.group(1))
114 if target != "" and target[0] == '/':
115 return not copy_file(src, target)
116 m = re.match('scp://([^@:]+@[^/:]+)(:|)(.*)', target)
118 return not scp_file(src, m.group(1) + ":" + m.group(3))
119 m = re.match('ssh\+rsync://([^@:]+@[^/:]+)(:|)(.*)', target)
121 return not rsync_ssh_file(src, m.group(1) + ":" + m.group(3))
122 m = re.match('(http|https)://.*', target)
124 return not post_file(src, target)
125 log.alert("unsupported protocol: %s" % target)
128 log.error("send_file(%s, %s): %s" % (src, target, e))
132 def maybe_flush_queue(dir):
135 f = open(dir + "/retry-at")
136 last_retry = int(f.readline().strip())
137 retry_delay = int(f.readline().strip())
139 if last_retry + retry_delay > time.time():
141 os.unlink(dir + "/retry-at")
145 status.push("flushing %s" % dir)
148 f = open(dir + "/retry-at", "w")
149 if retry_delay in retries_times:
150 idx = retries_times.index(retry_delay)
151 if idx < len(retries_times) - 1: idx += 1
154 f.write("%d\n%d\n" % (time.time(), retries_times[idx]))
159 def flush_queue(dir):
162 for f in glob.glob(dir + "/*.desc"):
164 if d != None: q.append(d)
166 rc = util.cmp(x['Time'], y['Time'])
167 if rc == 0 and 'Type' in x and 'Type' in y:
168 return util.cmp(x['Type'], y['Type'])
171 q.sort(key=util.cmp_to_key(mycmp))
177 if not send_file(d['_file'], d['Target']):
180 if os.access(d['_file'] + ".info", os.F_OK):
181 if not send_file(d['_file'] + ".info", d['Target'] + ".info"):
184 os.unlink(d['_file'] + ".info")
185 os.unlink(d['_file'])
186 os.unlink(d['_desc'])
191 emails[config.admin_email] = 1
193 for src, msg in problems.items():
194 pr = pr + "[src: %s]\n\n%s\n" % (src, msg)
197 emails[d['Requester']] = 1
200 m.set_headers(to = ", ".join(e),
201 subject = "[%s] builder queue problem" % config.builder)
202 m.write("there were problems sending files from queue %s:\n" % dir)
203 m.write("problems:\n")
206 log.error("error sending files from %s:\n%s\n" % (dir, pr))
214 if lock.lock("sending-files", non_block = 1) == None:
217 maybe_flush_queue(path.notify_queue_dir)
218 maybe_flush_queue(path.buildlogs_queue_dir)
219 maybe_flush_queue(path.ftp_queue_dir)
221 if __name__ == '__main__':