Commits

Ross Lagerwall committed 0ba10d6

Initial commit. A multi-threaded attempt using socketserver/ThreadedTCPServer and locking.

Comments (0)

Files changed (2)

+import socketserver, select, os, threading, time, random
+import socket
+
+STOPPED = 0
+BUILDING = 1
+
+BUILD = 0
+FORCE = 1
+STOP = 2
+IDENTIFY = 3
+
+class SlaveHandler(socketserver.StreamRequestHandler):
+
+    def setup(self):
+        self.rbufsize = 0
+        self.wbufsize = 0
+        socketserver.StreamRequestHandler.setup(self)
+        self.queue = []
+        self.state = STOPPED
+
+    def handle_receive_result(self, length):
+        type = self.rfile.read(2).decode()
+        print(type)
+        if length > 0:
+            data = self.rfile.read(length)
+            print(data)
+        if type == "E ":
+            if len(self.queue) > 0:
+                self.send_command()
+            else:
+                print("Moved to STae stopped")
+                self.state = STOPPED
+
+    def handle_receive_command(self):
+        command = os.read(self.reader, 41).decode()
+        if command[0] == "B":
+            self.queue.append((BUILD, command[1:]))
+        elif command[0] == "F":
+            self.queue.append((FORCE, command[1:]))
+        elif command[0] == "S":
+            self.wfile.write(b"S                                        ")
+            self.state = STOPPED
+        elif command[0] == "I":
+            self.queue.append((IDENTIFY, command[1:]))
+        if self.state == STOPPED:
+            self.send_command()
+
+    def send_command(self):
+        command, data = self.queue[0]
+        self.queue = self.queue[1:]
+
+        if command == BUILD:
+            self.wfile.write(b"B" + data.encode())
+            self.state = BUILDING
+        elif command == FORCE:
+            self.wfile.write(b"F" + data.encode())
+            self.state = BUILDING
+        elif command == STOP:
+            self.wfile.write(b"S" + data.encode())
+            self.state = STOPPED
+        elif command == IDENTIFY:
+            self.wfile.write(b"I" + data.encode())
+        print("Sent command")
+
+    def handle(self):
+
+        self.reader, self.writer = os.pipe()
+        el.register(self.writer)
+
+        try:
+            while True:
+                res = select.select([self.rfile, self.reader], [], [])[0]
+                if self.rfile in res:
+                    raw = self.rfile.read(10)
+                    if not raw:
+                        break
+                    print(raw)
+                    self.handle_receive_result(int(raw))
+                    
+                if self.reader in res:
+                    self.handle_receive_command()
+        finally:
+            el.unregister(self.writer)
+            os.close(self.reader)
+            os.close(self.writer)
+
+
+class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
+    allow_reuse_address = True
+
+
+class EventListener:
+
+    def __init__(self):
+        self.lock = threading.Lock()
+        self.writer_list = []
+
+    def run(self):
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        s.bind(('', 9998))
+        s.listen(5)
+        while True:
+            conn, addr = s.accept()
+            rfile = conn.makefile("rb")
+            data = rfile.read(41)
+            self.lock.acquire()
+            for w in self.writer_list:
+                print("Rev")
+                os.write(w, data)
+            self.lock.release()
+            conn.close()
+
+    def register(self, writer):
+        self.lock.acquire()
+        self.writer_list.append(writer)
+        self.lock.release()
+
+    def unregister(self, writer):
+        self.lock.acquire()
+        self.writer_list.remove(writer)
+        self.lock.release()
+
+
+if __name__ == "__main__":
+    HOST, PORT = "localhost", 9999
+    server = ThreadedTCPServer((HOST, PORT), SlaveHandler)
+
+    el = EventListener()
+    event_thread = threading.Thread(target=el.run)
+    event_thread.daemon = True
+    event_thread.start()
+
+    server.serve_forever()
+import socket, threading, os, select
+import sys, time, random, subprocess
+
+HOST, PORT = "localhost", 9999
+
+# Create a socket (SOCK_STREAM means a TCP socket)
+sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+# Connect to server and send data
+sock.connect((HOST, PORT))
+rfile = sock.makefile("rb", 0)
+wfile = sock.makefile("wb", 0)
+
+def output(cmd, data):
+    wfile.write('{:>10}'.format(len(data)).encode() + cmd.encode() + data)
+
+def run(cmd, letter):
+    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False)
+    tolisten = [p.stdout, p.stderr, rfile]
+    stdoutbuffer = b""
+    stderrbuffer = b""
+    while len(tolisten) > 1:
+        res = select.select(tolisten, [], [])[0]
+        if p.stdout in res:
+            data = os.read(p.stdout.fileno(), 512)
+            if not data:
+                tolisten.remove(p.stdout)
+            else:
+                stdoutbuffer += data
+                if len(stdoutbuffer) > 4096:
+                    output(letter + "S", stdoutbuffer)
+                    stdoutbuffer = b""
+        if p.stderr in res:
+            data = os.read(p.stderr.fileno(), 512)
+            if not data:
+                tolisten.remove(p.stderr)
+            else:
+                stderrbuffer += data
+                if len(stderrbuffer) > 4096:
+                    output(letter + "E", stderrbuffer)
+                    stderrbuffer = b""
+        if rfile in res:
+            command = rfile.read(41).decode()
+            print("'" + command + "'")
+            if command[0] == "S":
+                return None
+    p.wait()
+
+    if len(stdoutbuffer) > 0:
+        output(letter + "S", stdoutbuffer)
+    if len(stderrbuffer) > 0:
+        output(letter + "E", stderrbuffer)
+
+    return p.returncode
+
+while True:
+    print("About to receive command")
+    command = rfile.read(41).decode()
+    print("'" + command + "'")
+
+    if command[0] == "B" or command[0] == "F":
+        # build
+
+        rev = command[1:].strip()
+
+        rc = run(["hg", "pull"], "P")
+        if rc == None:
+            continue
+
+        rc = run(["hg", "update", "-C", rev], "U")
+        if rc == None:
+            continue
+
+        rc = run([os.path.join(os.getcwd(), "configure")], "C")
+        if rc == None:
+            continue
+
+        rc = run(["make"], "M")
+        if rc == None:
+            continue
+
+        rc = run(["make", "clean"], "K")
+        if rc == None:
+            continue
+
+        output("E ", b"")