X-Git-Url: https://git.tld-linux.org/?p=TLD.git;a=blobdiff_plain;f=pld-builder.new%2FPLD_Builder%2Ffile_sender.py;fp=pld-builder.new%2FPLD_Builder%2Ffile_sender.py;h=f8d41f377cea9429a50dac8197dfc17b0e11f269;hp=0000000000000000000000000000000000000000;hb=90809c8fec988489786ce00247d9a4150070748b;hpb=ab3934fab858112cd552359b18cb980ea07c310b diff --git a/pld-builder.new/PLD_Builder/file_sender.py b/pld-builder.new/PLD_Builder/file_sender.py new file mode 100644 index 0000000..f8d41f3 --- /dev/null +++ b/pld-builder.new/PLD_Builder/file_sender.py @@ -0,0 +1,222 @@ +# vi: encoding=utf-8 ts=8 sts=4 sw=4 et + +import glob +import re +import string +import os +import time +import shutil +import sys +import traceback +import urllib2 + +from config import config, init_conf +import mailer +import path +import log +import loop +import status +import lock + +retries_times = [5 * 60, 5 * 60, 10 * 60, 10 * 60, 30 * 60, 60 * 60] + +def read_name_val(file): + f = open(file) + r = {'_file': file[:-5], '_desc': file} + rx = re.compile(r"^([^:]+)\s*:(.*)$") + for l in f.xreadlines(): + if l == "END\n": + f.close() + return r + m = rx.search(l) + if m: + r[m.group(1)] = string.strip(m.group(2)) + else: + break + f.close() + return None + +def scp_file(src, target): + global problems + f = os.popen("scp -v -B %s %s 2>&1 < /dev/null" % (src, target)) + p = f.read() + ret = f.close() + if ret: + problems[src] = p + return ret + +def copy_file(src, target): + try: + shutil.copyfile(src, target) + return 0 + except: + global problems + exctype, value = sys.exc_info()[:2] + problems[src] = "cannot copy file: %s" % traceback.format_exception_only(exctype, value) + return 1 + +def rsync_file(src, target, host): + global problems + + p = open(path.rsync_password_file, "r") + password = "" + for l in p.xreadlines(): + l = string.split(l) + if len(l) >= 2 and l[0] == host: + password = l[1] + p.close() + + # NOTE: directing STDIN to /dev/null, does not make rsync to skip asking + # password, it opens /dev/tty and still asks if password is needed and + # missing, therefore we always set RSYNC_PASSWORD env var + os.environ["RSYNC_PASSWORD"] = password + rsync = "rsync --verbose --archive --timeout=360 --contimeout=360" + f = os.popen("%s %s %s 2>&1" % (rsync, src, target)) + p = f.read() + ret = f.close() + if ret: + problems[src] = p + del os.environ["RSYNC_PASSWORD"]; + return ret + +def rsync_ssh_file(src, target): + global problems + rsync = "rsync --verbose --archive --timeout=360 -e ssh" + f = os.popen("%s %s %s 2>&1 < /dev/null" % (rsync, src, target)) + p = f.read() + ret = f.close() + if ret: + problems[src] = p + return ret + +def post_file(src, url): + global problems + try: + f = open(src, 'r') + data = f.read() + f.close() + req = urllib2.Request(url, data) + req.add_header('X-Filename', os.path.basename(src)) + f = urllib2.urlopen(req) + f.close() + except Exception, e: + problems[src] = e + return e + return 0 + +def send_file(src, target): + global problems + try: + log.notice("sending %s to %s (size %d bytes)" % (src, target, os.stat(src).st_size)) + m = re.match('rsync://([^/]+)/.*', target) + if m: + return not rsync_file(src, target, host = m.group(1)) + if target != "" and target[0] == '/': + return not copy_file(src, target) + m = re.match('scp://([^@:]+@[^/:]+)(:|)(.*)', target) + if m: + return not scp_file(src, m.group(1) + ":" + m.group(3)) + m = re.match('ssh\+rsync://([^@:]+@[^/:]+)(:|)(.*)', target) + if m: + return not rsync_ssh_file(src, m.group(1) + ":" + m.group(3)) + m = re.match('http://.*', target) + if m: + return not post_file(src, target) + log.alert("unsupported protocol: %s" % target) + except OSError, e: + problems[src] = e + log.error("send_file(%s, %s): %s" % (src, target, e)) + return False + return True + +def maybe_flush_queue(dir): + retry_delay = 0 + try: + f = open(dir + "/retry-at") + last_retry = int(string.strip(f.readline())) + retry_delay = int(string.strip(f.readline())) + f.close() + if last_retry + retry_delay > time.time(): + return + os.unlink(dir + "/retry-at") + except: + pass + + status.push("flushing %s" % dir) + + if flush_queue(dir): + f = open(dir + "/retry-at", "w") + if retry_delay in retries_times: + idx = retries_times.index(retry_delay) + if idx < len(retries_times) - 1: idx += 1 + else: + idx = 0 + f.write("%d\n%d\n" % (time.time(), retries_times[idx])) + f.close() + + status.pop() + +def flush_queue(dir): + q = [] + os.chdir(dir) + for f in glob.glob(dir + "/*.desc"): + d = read_name_val(f) + if d != None: q.append(d) + def mycmp(x, y): + rc = cmp(x['Time'], y['Time']) + if rc == 0 and x.has_key('Type') and y.has_key('Type'): + return cmp(x['Type'], y['Type']) + else: + return rc + q.sort(mycmp) + + error = None + # copy of q + remaining = q[:] + for d in q: + if not send_file(d['_file'], d['Target']): + error = d + continue + if os.access(d['_file'] + ".info", os.F_OK): + if not send_file(d['_file'] + ".info", d['Target'] + ".info"): + error = d + continue + os.unlink(d['_file'] + ".info") + os.unlink(d['_file']) + os.unlink(d['_desc']) + remaining.remove(d) + + if error != None: + emails = {} + emails[config.admin_email] = 1 + pr = "" + for src, msg in problems.iteritems(): + pr = pr + "[src: %s]\n\n%s\n" % (src, msg) + for d in remaining: + if d.has_key('Requester'): + emails[d['Requester']] = 1 + e = emails.keys() + m = mailer.Message() + m.set_headers(to = string.join(e, ", "), + subject = "[%s] builder queue problem" % config.builder) + m.write("there were problems sending files from queue %s:\n" % dir) + m.write("problems:\n") + m.write("%s\n" % pr) + m.send() + log.error("error sending files from %s:\n%s\n" % (dir, pr)) + return 1 + + return 0 + +problems = {} + +def main(): + if lock.lock("sending-files", non_block = 1) == None: + return + init_conf() + maybe_flush_queue(path.notify_queue_dir) + maybe_flush_queue(path.buildlogs_queue_dir) + maybe_flush_queue(path.ftp_queue_dir) + +if __name__ == '__main__': + loop.run_loop(main)