+# vi: encoding=utf-8 ts=8 sts=4 sw=4 et
+
+import glob
+import re
+import string
+import os
+import time
+import shutil
+import sys
+import traceback
+import urllib2
+
+from config import config, init_conf
+import mailer
+import path
+import log
+import loop
+import status
+import lock
+
+retries_times = [5 * 60, 5 * 60, 10 * 60, 10 * 60, 30 * 60, 60 * 60]
+
+def read_name_val(file):
+ f = open(file)
+ r = {'_file': file[:-5], '_desc': file}
+ rx = re.compile(r"^([^:]+)\s*:(.*)$")
+ for l in f.xreadlines():
+ if l == "END\n":
+ f.close()
+ return r
+ m = rx.search(l)
+ if m:
+ r[m.group(1)] = string.strip(m.group(2))
+ else:
+ break
+ f.close()
+ return None
+
+def scp_file(src, target):
+ global problems
+ f = os.popen("scp -v -B %s %s 2>&1 < /dev/null" % (src, target))
+ p = f.read()
+ ret = f.close()
+ if ret:
+ problems[src] = p
+ return ret
+
+def copy_file(src, target):
+ try:
+ shutil.copyfile(src, target)
+ return 0
+ except:
+ global problems
+ exctype, value = sys.exc_info()[:2]
+ problems[src] = "cannot copy file: %s" % traceback.format_exception_only(exctype, value)
+ return 1
+
+def rsync_file(src, target, host):
+ global problems
+
+ p = open(path.rsync_password_file, "r")
+ password = ""
+ for l in p.xreadlines():
+ l = string.split(l)
+ if len(l) >= 2 and l[0] == host:
+ password = l[1]
+ p.close()
+
+ # NOTE: directing STDIN to /dev/null, does not make rsync to skip asking
+ # password, it opens /dev/tty and still asks if password is needed and
+ # missing, therefore we always set RSYNC_PASSWORD env var
+ os.environ["RSYNC_PASSWORD"] = password
+ rsync = "rsync --verbose --archive --timeout=360 --contimeout=360"
+ f = os.popen("%s %s %s 2>&1" % (rsync, src, target))
+ p = f.read()
+ ret = f.close()
+ if ret:
+ problems[src] = p
+ del os.environ["RSYNC_PASSWORD"];
+ return ret
+
+def rsync_ssh_file(src, target):
+ global problems
+ rsync = "rsync --verbose --archive --timeout=360 -e ssh"
+ f = os.popen("%s %s %s 2>&1 < /dev/null" % (rsync, src, target))
+ p = f.read()
+ ret = f.close()
+ if ret:
+ problems[src] = p
+ return ret
+
+def post_file(src, url):
+ global problems
+ try:
+ f = open(src, 'r')
+ data = f.read()
+ f.close()
+ req = urllib2.Request(url, data)
+ req.add_header('X-Filename', os.path.basename(src))
+ f = urllib2.urlopen(req)
+ f.close()
+ except Exception, e:
+ problems[src] = e
+ return e
+ return 0
+
+def send_file(src, target):
+ global problems
+ try:
+ log.notice("sending %s to %s (size %d bytes)" % (src, target, os.stat(src).st_size))
+ m = re.match('rsync://([^/]+)/.*', target)
+ if m:
+ return not rsync_file(src, target, host = m.group(1))
+ if target != "" and target[0] == '/':
+ return not copy_file(src, target)
+ m = re.match('scp://([^@:]+@[^/:]+)(:|)(.*)', target)
+ if m:
+ return not scp_file(src, m.group(1) + ":" + m.group(3))
+ m = re.match('ssh\+rsync://([^@:]+@[^/:]+)(:|)(.*)', target)
+ if m:
+ return not rsync_ssh_file(src, m.group(1) + ":" + m.group(3))
+ m = re.match('http://.*', target)
+ if m:
+ return not post_file(src, target)
+ log.alert("unsupported protocol: %s" % target)
+ except OSError, e:
+ problems[src] = e
+ log.error("send_file(%s, %s): %s" % (src, target, e))
+ return False
+ return True
+
+def maybe_flush_queue(dir):
+ retry_delay = 0
+ try:
+ f = open(dir + "/retry-at")
+ last_retry = int(string.strip(f.readline()))
+ retry_delay = int(string.strip(f.readline()))
+ f.close()
+ if last_retry + retry_delay > time.time():
+ return
+ os.unlink(dir + "/retry-at")
+ except:
+ pass
+
+ status.push("flushing %s" % dir)
+
+ if flush_queue(dir):
+ f = open(dir + "/retry-at", "w")
+ if retry_delay in retries_times:
+ idx = retries_times.index(retry_delay)
+ if idx < len(retries_times) - 1: idx += 1
+ else:
+ idx = 0
+ f.write("%d\n%d\n" % (time.time(), retries_times[idx]))
+ f.close()
+
+ status.pop()
+
+def flush_queue(dir):
+ q = []
+ os.chdir(dir)
+ for f in glob.glob(dir + "/*.desc"):
+ d = read_name_val(f)
+ if d != None: q.append(d)
+ def mycmp(x, y):
+ rc = cmp(x['Time'], y['Time'])
+ if rc == 0 and x.has_key('Type') and y.has_key('Type'):
+ return cmp(x['Type'], y['Type'])
+ else:
+ return rc
+ q.sort(mycmp)
+
+ error = None
+ # copy of q
+ remaining = q[:]
+ for d in q:
+ if not send_file(d['_file'], d['Target']):
+ error = d
+ continue
+ if os.access(d['_file'] + ".info", os.F_OK):
+ if not send_file(d['_file'] + ".info", d['Target'] + ".info"):
+ error = d
+ continue
+ os.unlink(d['_file'] + ".info")
+ os.unlink(d['_file'])
+ os.unlink(d['_desc'])
+ remaining.remove(d)
+
+ if error != None:
+ emails = {}
+ emails[config.admin_email] = 1
+ pr = ""
+ for src, msg in problems.iteritems():
+ pr = pr + "[src: %s]\n\n%s\n" % (src, msg)
+ for d in remaining:
+ if d.has_key('Requester'):
+ emails[d['Requester']] = 1
+ e = emails.keys()
+ m = mailer.Message()
+ m.set_headers(to = string.join(e, ", "),
+ subject = "[%s] builder queue problem" % config.builder)
+ m.write("there were problems sending files from queue %s:\n" % dir)
+ m.write("problems:\n")
+ m.write("%s\n" % pr)
+ m.send()
+ log.error("error sending files from %s:\n%s\n" % (dir, pr))
+ return 1
+
+ return 0
+
+problems = {}
+
+def main():
+ if lock.lock("sending-files", non_block = 1) == None:
+ return
+ init_conf()
+ maybe_flush_queue(path.notify_queue_dir)
+ maybe_flush_queue(path.buildlogs_queue_dir)
+ maybe_flush_queue(path.ftp_queue_dir)
+
+if __name__ == '__main__':
+ loop.run_loop(main)