+++ /dev/null
-# vi: encoding=utf-8 ts=8 sts=4 sw=4 et
-
-import glob
-import re
-import string
-import os
-import time
-import shutil
-import sys
-import traceback
-import urllib2
-
-from config import config, init_conf
-import mailer
-import path
-import log
-import loop
-import status
-import lock
-
-retries_times = [5 * 60, 5 * 60, 10 * 60, 10 * 60, 30 * 60, 60 * 60]
-
-def read_name_val(file):
- f = open(file)
- r = {'_file': file[:-5], '_desc': file}
- rx = re.compile(r"^([^:]+)\s*:(.*)$")
- for l in f.xreadlines():
- if l == "END\n":
- f.close()
- return r
- m = rx.search(l)
- if m:
- r[m.group(1)] = string.strip(m.group(2))
- else:
- break
- f.close()
- return None
-
-def scp_file(src, target):
- global problems
- f = os.popen("scp -v -B %s %s 2>&1 < /dev/null" % (src, target))
- p = f.read()
- ret = f.close()
- if ret:
- problems[src] = p
- return ret
-
-def copy_file(src, target):
- try:
- shutil.copyfile(src, target)
- return 0
- except:
- global problems
- exctype, value = sys.exc_info()[:2]
- problems[src] = "cannot copy file: %s" % traceback.format_exception_only(exctype, value)
- return 1
-
-def rsync_file(src, target, host):
- global problems
-
- p = open(path.rsync_password_file, "r")
- password = ""
- for l in p.xreadlines():
- l = string.split(l)
- if len(l) >= 2 and l[0] == host:
- password = l[1]
- p.close()
-
- # NOTE: directing STDIN to /dev/null, does not make rsync to skip asking
- # password, it opens /dev/tty and still asks if password is needed and
- # missing, therefore we always set RSYNC_PASSWORD env var
- os.environ["RSYNC_PASSWORD"] = password
- rsync = "rsync --verbose --archive --timeout=360 --contimeout=360"
- f = os.popen("%s %s %s 2>&1" % (rsync, src, target))
- p = f.read()
- ret = f.close()
- if ret:
- problems[src] = p
- del os.environ["RSYNC_PASSWORD"];
- return ret
-
-def rsync_ssh_file(src, target):
- global problems
- rsync = "rsync --verbose --archive --timeout=360 -e ssh"
- f = os.popen("%s %s %s 2>&1 < /dev/null" % (rsync, src, target))
- p = f.read()
- ret = f.close()
- if ret:
- problems[src] = p
- return ret
-
-def post_file(src, url):
- global problems
- try:
- f = open(src, 'r')
- data = f.read()
- f.close()
- req = urllib2.Request(url, data)
- req.add_header('X-Filename', os.path.basename(src))
- f = urllib2.urlopen(req)
- f.close()
- except Exception, e:
- problems[src] = e
- return e
- return 0
-
-def send_file(src, target):
- global problems
- try:
- log.notice("sending %s to %s (size %d bytes)" % (src, target, os.stat(src).st_size))
- m = re.match('rsync://([^/]+)/.*', target)
- if m:
- return not rsync_file(src, target, host = m.group(1))
- if target != "" and target[0] == '/':
- return not copy_file(src, target)
- m = re.match('scp://([^@:]+@[^/:]+)(:|)(.*)', target)
- if m:
- return not scp_file(src, m.group(1) + ":" + m.group(3))
- m = re.match('ssh\+rsync://([^@:]+@[^/:]+)(:|)(.*)', target)
- if m:
- return not rsync_ssh_file(src, m.group(1) + ":" + m.group(3))
- m = re.match('http://.*', target)
- if m:
- return not post_file(src, target)
- log.alert("unsupported protocol: %s" % target)
- except OSError, e:
- problems[src] = e
- log.error("send_file(%s, %s): %s" % (src, target, e))
- return False
- return True
-
-def maybe_flush_queue(dir):
- retry_delay = 0
- try:
- f = open(dir + "/retry-at")
- last_retry = int(string.strip(f.readline()))
- retry_delay = int(string.strip(f.readline()))
- f.close()
- if last_retry + retry_delay > time.time():
- return
- os.unlink(dir + "/retry-at")
- except:
- pass
-
- status.push("flushing %s" % dir)
-
- if flush_queue(dir):
- f = open(dir + "/retry-at", "w")
- if retry_delay in retries_times:
- idx = retries_times.index(retry_delay)
- if idx < len(retries_times) - 1: idx += 1
- else:
- idx = 0
- f.write("%d\n%d\n" % (time.time(), retries_times[idx]))
- f.close()
-
- status.pop()
-
-def flush_queue(dir):
- q = []
- os.chdir(dir)
- for f in glob.glob(dir + "/*.desc"):
- d = read_name_val(f)
- if d != None: q.append(d)
- def mycmp(x, y):
- rc = cmp(x['Time'], y['Time'])
- if rc == 0 and x.has_key('Type') and y.has_key('Type'):
- return cmp(x['Type'], y['Type'])
- else:
- return rc
- q.sort(mycmp)
-
- error = None
- # copy of q
- remaining = q[:]
- for d in q:
- if not send_file(d['_file'], d['Target']):
- error = d
- continue
- if os.access(d['_file'] + ".info", os.F_OK):
- if not send_file(d['_file'] + ".info", d['Target'] + ".info"):
- error = d
- continue
- os.unlink(d['_file'] + ".info")
- os.unlink(d['_file'])
- os.unlink(d['_desc'])
- remaining.remove(d)
-
- if error != None:
- emails = {}
- emails[config.admin_email] = 1
- pr = ""
- for src, msg in problems.iteritems():
- pr = pr + "[src: %s]\n\n%s\n" % (src, msg)
- for d in remaining:
- if d.has_key('Requester'):
- emails[d['Requester']] = 1
- e = emails.keys()
- m = mailer.Message()
- m.set_headers(to = string.join(e, ", "),
- subject = "[%s] builder queue problem" % config.builder)
- m.write("there were problems sending files from queue %s:\n" % dir)
- m.write("problems:\n")
- m.write("%s\n" % pr)
- m.send()
- log.error("error sending files from %s:\n%s\n" % (dir, pr))
- return 1
-
- return 0
-
-problems = {}
-
-def main():
- if lock.lock("sending-files", non_block = 1) == None:
- return
- init_conf()
- maybe_flush_queue(path.notify_queue_dir)
- maybe_flush_queue(path.buildlogs_queue_dir)
- maybe_flush_queue(path.ftp_queue_dir)
-
-if __name__ == '__main__':
- loop.run_loop(main)