X-Git-Url: https://git.tld-linux.org/?a=blobdiff_plain;f=PLD_Builder%2Ffile_sender.py;fp=PLD_Builder%2Ffile_sender.py;h=0000000000000000000000000000000000000000;hb=b999f53d4bf5d44586ecf028876e8bc20b5fd2ce;hp=1df7ea1d38af973b3a56164ce1304000b4405546;hpb=37463eaa22f48f5fecbb90e69ef67a69e0bf9788;p=tld-builder.git diff --git a/PLD_Builder/file_sender.py b/PLD_Builder/file_sender.py deleted file mode 100644 index 1df7ea1..0000000 --- a/PLD_Builder/file_sender.py +++ /dev/null @@ -1,222 +0,0 @@ -# vi: encoding=utf-8 ts=8 sts=4 sw=4 et - -import glob -import re -import string -import os -import time -import shutil -import sys -import traceback -import urllib2 - -from config import config, init_conf -import mailer -import path -import log -import loop -import status -import lock - -retries_times = [5 * 60, 5 * 60, 10 * 60, 10 * 60, 30 * 60, 60 * 60] - -def read_name_val(file): - f = open(file) - r = {'_file': file[:-5], '_desc': file} - rx = re.compile(r"^([^:]+)\s*:(.*)$") - for l in f.xreadlines(): - if l == "END\n": - f.close() - return r - m = rx.search(l) - if m: - r[m.group(1)] = string.strip(m.group(2)) - else: - break - f.close() - return None - -def scp_file(src, target): - global problems - f = os.popen("scp -P 7272 -v -B %s %s 2>&1 < /dev/null" % (src, target)) - p = f.read() - ret = f.close() - if ret: - problems[src] = p - return ret - -def copy_file(src, target): - try: - shutil.copyfile(src, target) - return 0 - except: - global problems - exctype, value = sys.exc_info()[:2] - problems[src] = "cannot copy file: %s" % traceback.format_exception_only(exctype, value) - return 1 - -def rsync_file(src, target, host): - global problems - - p = open(path.rsync_password_file, "r") - password = "" - for l in p.xreadlines(): - l = string.split(l) - if len(l) >= 2 and l[0] == host: - password = l[1] - p.close() - - # NOTE: directing STDIN to /dev/null, does not make rsync to skip asking - # password, it opens /dev/tty and still asks if password is needed and - # missing, therefore we always set RSYNC_PASSWORD env var - os.environ["RSYNC_PASSWORD"] = password - rsync = "rsync --verbose --archive --timeout=360 --contimeout=360" - f = os.popen("%s %s %s 2>&1" % (rsync, src, target)) - p = f.read() - ret = f.close() - if ret: - problems[src] = p - del os.environ["RSYNC_PASSWORD"]; - return ret - -def rsync_ssh_file(src, target): - global problems - rsync = "rsync --verbose --archive --timeout=360 -e ssh" - f = os.popen("%s %s %s 2>&1 < /dev/null" % (rsync, src, target)) - p = f.read() - ret = f.close() - if ret: - problems[src] = p - return ret - -def post_file(src, url): - global problems - try: - f = open(src, 'r') - data = f.read() - f.close() - req = urllib2.Request(url, data) - req.add_header('X-Filename', os.path.basename(src)) - f = urllib2.urlopen(req) - f.close() - except Exception, e: - problems[src] = e - return e - return 0 - -def send_file(src, target): - global problems - try: - log.notice("sending %s to %s (size %d bytes)" % (src, target, os.stat(src).st_size)) - m = re.match('rsync://([^/]+)/.*', target) - if m: - return not rsync_file(src, target, host = m.group(1)) - if target != "" and target[0] == '/': - return not copy_file(src, target) - m = re.match('scp://([^@:]+@[^/:]+)(:|)(.*)', target) - if m: - return not scp_file(src, m.group(1) + ":" + m.group(3)) - m = re.match('ssh\+rsync://([^@:]+@[^/:]+)(:|)(.*)', target) - if m: - return not rsync_ssh_file(src, m.group(1) + ":" + m.group(3)) - m = re.match('http://.*', target) - if m: - return not post_file(src, target) - log.alert("unsupported protocol: %s" % target) - except OSError, e: - problems[src] = e - log.error("send_file(%s, %s): %s" % (src, target, e)) - return False - return True - -def maybe_flush_queue(dir): - retry_delay = 0 - try: - f = open(dir + "/retry-at") - last_retry = int(string.strip(f.readline())) - retry_delay = int(string.strip(f.readline())) - f.close() - if last_retry + retry_delay > time.time(): - return - os.unlink(dir + "/retry-at") - except: - pass - - status.push("flushing %s" % dir) - - if flush_queue(dir): - f = open(dir + "/retry-at", "w") - if retry_delay in retries_times: - idx = retries_times.index(retry_delay) - if idx < len(retries_times) - 1: idx += 1 - else: - idx = 0 - f.write("%d\n%d\n" % (time.time(), retries_times[idx])) - f.close() - - status.pop() - -def flush_queue(dir): - q = [] - os.chdir(dir) - for f in glob.glob(dir + "/*.desc"): - d = read_name_val(f) - if d != None: q.append(d) - def mycmp(x, y): - rc = cmp(x['Time'], y['Time']) - if rc == 0 and x.has_key('Type') and y.has_key('Type'): - return cmp(x['Type'], y['Type']) - else: - return rc - q.sort(mycmp) - - error = None - # copy of q - remaining = q[:] - for d in q: - if not send_file(d['_file'], d['Target']): - error = d - continue - if os.access(d['_file'] + ".info", os.F_OK): - if not send_file(d['_file'] + ".info", d['Target'] + ".info"): - error = d - continue - os.unlink(d['_file'] + ".info") - os.unlink(d['_file']) - os.unlink(d['_desc']) - remaining.remove(d) - - if error != None: - emails = {} - emails[config.admin_email] = 1 - pr = "" - for src, msg in problems.iteritems(): - pr = pr + "[src: %s]\n\n%s\n" % (src, msg) - for d in remaining: - if d.has_key('Requester'): - emails[d['Requester']] = 1 - e = emails.keys() - m = mailer.Message() - m.set_headers(to = string.join(e, ", "), - subject = "[%s] builder queue problem" % config.builder) - m.write("there were problems sending files from queue %s:\n" % dir) - m.write("problems:\n") - m.write("%s\n" % pr) - m.send() - log.error("error sending files from %s:\n%s\n" % (dir, pr)) - return 1 - - return 0 - -problems = {} - -def main(): - if lock.lock("sending-files", non_block = 1) == None: - return - init_conf() - maybe_flush_queue(path.notify_queue_dir) - maybe_flush_queue(path.buildlogs_queue_dir) - maybe_flush_queue(path.ftp_queue_dir) - -if __name__ == '__main__': - loop.run_loop(main)