]> TLD Linux GIT Repositories - TLD.git/blobdiff - pld-builder.new/PLD_Builder/file_sender.py
- from https://github.com/pld-linux/pld-builder.new
[TLD.git] / pld-builder.new / PLD_Builder / file_sender.py
diff --git a/pld-builder.new/PLD_Builder/file_sender.py b/pld-builder.new/PLD_Builder/file_sender.py
new file mode 100644 (file)
index 0000000..f8d41f3
--- /dev/null
@@ -0,0 +1,222 @@
+# vi: encoding=utf-8 ts=8 sts=4 sw=4 et
+
+import glob
+import re
+import string
+import os
+import time
+import shutil
+import sys
+import traceback
+import urllib2
+
+from config import config, init_conf
+import mailer
+import path
+import log
+import loop
+import status
+import lock
+
+retries_times = [5 * 60, 5 * 60, 10 * 60, 10 * 60, 30 * 60, 60 * 60]
+
+def read_name_val(file):
+    f = open(file)
+    r = {'_file': file[:-5], '_desc': file}
+    rx = re.compile(r"^([^:]+)\s*:(.*)$")
+    for l in f.xreadlines():
+        if l == "END\n":
+            f.close()
+            return r
+        m = rx.search(l)
+        if m:
+            r[m.group(1)] = string.strip(m.group(2))
+        else:
+            break
+    f.close()
+    return None
+
+def scp_file(src, target):
+    global problems
+    f = os.popen("scp -v -B %s %s 2>&1 < /dev/null" % (src, target))
+    p = f.read()
+    ret = f.close()
+    if ret:
+        problems[src] = p
+    return ret
+
+def copy_file(src, target):
+    try:
+        shutil.copyfile(src, target)
+        return 0
+    except:
+        global problems
+        exctype, value = sys.exc_info()[:2]
+        problems[src] = "cannot copy file: %s" % traceback.format_exception_only(exctype, value)
+        return 1
+
+def rsync_file(src, target, host):
+    global problems
+
+    p = open(path.rsync_password_file, "r")
+    password = ""
+    for l in p.xreadlines():
+        l = string.split(l)
+        if len(l) >= 2 and l[0] == host:
+            password = l[1]
+    p.close()
+
+    # NOTE: directing STDIN to /dev/null, does not make rsync to skip asking
+    # password, it opens /dev/tty and still asks if password is needed and
+    # missing, therefore we always set RSYNC_PASSWORD env var
+    os.environ["RSYNC_PASSWORD"] = password
+    rsync = "rsync --verbose --archive --timeout=360 --contimeout=360"
+    f = os.popen("%s %s %s 2>&1" % (rsync, src, target))
+    p = f.read()
+    ret = f.close()
+    if ret:
+        problems[src] = p
+    del os.environ["RSYNC_PASSWORD"];
+    return ret
+
+def rsync_ssh_file(src, target):
+    global problems
+    rsync = "rsync --verbose --archive --timeout=360 -e ssh"
+    f = os.popen("%s %s %s 2>&1 < /dev/null" % (rsync, src, target))
+    p = f.read()
+    ret = f.close()
+    if ret:
+        problems[src] = p
+    return ret
+
+def post_file(src, url):
+    global problems
+    try:
+        f = open(src, 'r')
+        data = f.read()
+        f.close()
+        req = urllib2.Request(url, data)
+        req.add_header('X-Filename', os.path.basename(src))
+        f = urllib2.urlopen(req)
+        f.close()
+    except Exception, e:
+        problems[src] = e
+        return e
+    return 0
+
+def send_file(src, target):
+    global problems
+    try:
+        log.notice("sending %s to %s (size %d bytes)" % (src, target, os.stat(src).st_size))
+        m = re.match('rsync://([^/]+)/.*', target)
+        if m:
+            return not rsync_file(src, target, host = m.group(1))
+        if target != "" and target[0] == '/':
+            return not copy_file(src, target)
+        m = re.match('scp://([^@:]+@[^/:]+)(:|)(.*)', target)
+        if m:
+            return not scp_file(src, m.group(1) + ":" + m.group(3))
+        m = re.match('ssh\+rsync://([^@:]+@[^/:]+)(:|)(.*)', target)
+        if m:
+            return not rsync_ssh_file(src, m.group(1) + ":" + m.group(3))
+        m = re.match('http://.*', target)
+        if m:
+            return not post_file(src, target)
+        log.alert("unsupported protocol: %s" % target)
+    except OSError, e:
+        problems[src] = e
+        log.error("send_file(%s, %s): %s" % (src, target, e))
+        return False
+    return True
+
+def maybe_flush_queue(dir):
+    retry_delay = 0
+    try:
+        f = open(dir + "/retry-at")
+        last_retry = int(string.strip(f.readline()))
+        retry_delay = int(string.strip(f.readline()))
+        f.close()
+        if last_retry + retry_delay > time.time():
+            return
+        os.unlink(dir + "/retry-at")
+    except:
+        pass
+
+    status.push("flushing %s" % dir)
+
+    if flush_queue(dir):
+        f = open(dir + "/retry-at", "w")
+        if retry_delay in retries_times:
+            idx = retries_times.index(retry_delay)
+            if idx < len(retries_times) - 1: idx += 1
+        else:
+            idx = 0
+        f.write("%d\n%d\n" % (time.time(), retries_times[idx]))
+        f.close()
+
+    status.pop()
+
+def flush_queue(dir):
+    q = []
+    os.chdir(dir)
+    for f in glob.glob(dir + "/*.desc"):
+        d = read_name_val(f)
+        if d != None: q.append(d)
+    def mycmp(x, y):
+        rc = cmp(x['Time'], y['Time'])
+        if rc == 0 and x.has_key('Type') and y.has_key('Type'):
+            return cmp(x['Type'], y['Type'])
+        else:
+            return rc
+    q.sort(mycmp)
+
+    error = None
+    # copy of q
+    remaining = q[:]
+    for d in q:
+        if not send_file(d['_file'], d['Target']):
+            error = d
+            continue
+        if os.access(d['_file'] + ".info", os.F_OK):
+            if not send_file(d['_file'] + ".info", d['Target'] + ".info"):
+                error = d
+                continue
+            os.unlink(d['_file'] + ".info")
+        os.unlink(d['_file'])
+        os.unlink(d['_desc'])
+        remaining.remove(d)
+
+    if error != None:
+        emails = {}
+        emails[config.admin_email] = 1
+        pr = ""
+        for src, msg in problems.iteritems():
+            pr = pr + "[src: %s]\n\n%s\n" % (src, msg)
+        for d in remaining:
+            if d.has_key('Requester'):
+                emails[d['Requester']] = 1
+        e = emails.keys()
+        m = mailer.Message()
+        m.set_headers(to = string.join(e, ", "),
+                      subject = "[%s] builder queue problem" % config.builder)
+        m.write("there were problems sending files from queue %s:\n" % dir)
+        m.write("problems:\n")
+        m.write("%s\n" % pr)
+        m.send()
+        log.error("error sending files from %s:\n%s\n" % (dir, pr))
+        return 1
+
+    return 0
+
+problems = {}
+
+def main():
+    if lock.lock("sending-files", non_block = 1) == None:
+        return
+    init_conf()
+    maybe_flush_queue(path.notify_queue_dir)
+    maybe_flush_queue(path.buildlogs_queue_dir)
+    maybe_flush_queue(path.ftp_queue_dir)
+
+if __name__ == '__main__':
+    loop.run_loop(main)