Commits

Anonymous committed c9216fe

0.8 branch: proxy; content-length instead of chunked for long-polling

  • Participants
  • Parent commits 10d37ba
  • Branches 0.8

Comments (0)

Files changed (5)

File daemon/csp/port.py

     """
     implements(interfaces.IListeningPort)
 
-    def __init__(self, port=None, factory=None, backlog=50, interface='', reactor=None, resource=None, childName=None):
+    def __init__(self, port=None, factory=None, backlog=50, interface='', reactor=None, resource=None, childName=None, killTimeout=10):
         self.port = port
         self.factory = factory
         self.backlog = backlog
         self.interface = interface
         self.resource = resource
         self.childName = childName
+        self.killTimeout = killTimeout
         self.cspTcpPort = None
         self.listening = False
 
     def startListening(self):
         if not self.listening:
             self.listening = True
-            csp = CSPRootResource()
+            csp = CSPRootResource(self.killTimeout)
             csp.setConnectCb(self.connectionMade)
             if self.port:
                 self.cspTcpPort = reactor.listenTCP(8050, server.Site(csp), self.backlog, self.interface)

File daemon/csp/resource.py

 from session import CSPSession
 
 class CSPRootResource(resource.Resource):
-    def __init__(self):
+    def __init__(self, killTimeout):
         resource.Resource.__init__(self)
         logic = CSPLogicResource(self)
         self.putChild("comet", logic)
         self.putChild("send", logic)
         self.putChild("reflect", logic)
         self.putChild("static", static.File(os.path.join(os.path.dirname(__file__), 'static')))
+        self.killTimeout = killTimeout
         self.sessions = {}
 
     def setConnectCb(self, cb):
         print "%s connected"%(session,)
 
     def disconnectCb(self, session):
+        print 'destroying session',session.key
         del self.sessions[session.key]
 
 class CSPLogicResource(resource.Resource):
 
     def render_handshake(self, session, request):
         key = "a"#str(uuid.uuid4()).replace('-', '')
-        session = CSPSession(key, request, self.root.disconnectCb)
+        session = CSPSession(key, request, self.root.disconnectCb, self.root.killTimeout)
         self.root.sessions[key] = session
         self.root.connectCb(session)
         return session.renderRequest(key, request)

File daemon/csp/session.py

 from util import json, compress
 
 class CSPSession(object):
-    def __init__(self, key, request, destroySessionCb):
+    # SPEC NOTE: added killTimeout (defaults to 10 seconds)
+    def __init__(self, key, request, destroySessionCb, killTimeout):
         self.peer = request.client
         self.host = request.host
         self.destroySessionCb = destroySessionCb
+        self.killTimeout = killTimeout
         self.key = key
         self.request = None
         self.prebuffer = ""
         self.buffer = []
         self.sendId = 0
-        self.durationTimeout = None
-        self.intervalTimeout = None
+        self.durationTimer = None
+        self.intervalTimer = None
+        self.killTimer = None
         self.isClosed = False
         self.permVars = {
             "rp":"",
         while self.buffer and ack >= self.buffer[0][0]:
             self.buffer.pop(0)
         if self.isClosed and not self.buffer:
-            self.destroySessionCb(self)
+            self.teardown()
 
-    def resetDurationTimeout(self):
-        if self.durationTimeout:
-            self.durationTimeout.cancel()
-            self.durationTimeout = None
+    def teardown(self):
+        self.resetKillTimer()
         if self.request:
-            self.durationTimeout = reactor.callLater(self.permVars["du"], self.durationCb)
+            self.endStream()
+        self.destroySessionCb(self)
 
-    def resetIntervalTimeout(self):
-        if self.intervalTimeout:
-            self.intervalTimeout.cancel()
-            self.intervalTimeout = None
+    def resetKillTimer(self, cb=None):
+        if self.killTimer:
+            if self.killTimer.active():
+                self.killTimer.cancel()
+            self.killTimer = None
+        if cb:
+            self.killTimer = reactor.callLater(self.killTimeout, cb)
+
+    def resetDurationTimer(self):
+        if self.durationTimer:
+            self.durationTimer.cancel()
+            self.durationTimer = None
+        if self.request:
+            self.durationTimer = reactor.callLater(self.permVars["du"], self.durationCb)
+
+    def resetIntervalTimer(self):
+        if self.intervalTimer:
+            self.intervalTimer.cancel()
+            self.intervalTimer = None
         if self.request and self.permVars["i"] > 0 and self.permVars["is"]:
-            self.intervalTimeout = reactor.callLater(self.permVars["i"], self.intervalCb)
+            self.intervalTimer = reactor.callLater(self.permVars["i"], self.intervalCb)
 
     def durationCb(self):
-        self.durationTimeout = None
+        self.durationTimer = None
+        self.resetKillTimer(self.close)
         self.endStream()
 
     def intervalCb(self):
-        self.intervalTimeout = None
+        self.intervalTimer = None
         self.sendPackets([])
 
     def endStream(self):
         self.request.finish()
         self.request = None
-        self.resetIntervalTimeout()
-        self.resetDurationTimeout()
+        self.resetIntervalTimer()
+        self.resetDurationTimer()
 
     def tryCompress(self, data, request=None):
         if request is None:
         return s
 
     def setCometRequest(self, request):
+        self.resetKillTimer()
+
         self.request = request
 
         request.setHeader("Content-Type", self.permVars["ct"])
 
         # streaming/long-polling, no immediate response
         if not self.buffer:
-            self.resetDurationTimeout()
-            self.resetIntervalTimeout()
+            self.resetDurationTimer()
+            self.resetIntervalTimer()
             return server.NOT_DONE_YET
-        
+
         # streaming
         if self.permVars["is"]:
             request.write(self.tryCompress(self.renderPrebuffer()))
             self.sendPackets()
-            self.resetDurationTimeout()
+            self.resetDurationTimer()
             return server.NOT_DONE_YET
         # long-polling
         else:
             self.write(None) # SPEC NOTE: close packet is now data packet with data=None
             self.protocol.connectionLost()
             self.isClosed = True
-            if not self.buffer:
-                self.destroySessionCb(self)
+            self.resetKillTimer(self.teardown)
 
     def write(self, data):
         self.sendId += 1
             if self.permVars["is"]:
                 self.sendPackets([[self.sendId, data]])
             else:
-                self.sendPackets()
-                # XXX: This may cause transfer-encoding chunked in long polling mode. FIX
-                self.request.finish()
-                self.request = None
-                self.resetDurationTimeout()
+                self.sendPackets(finish=True)
+                self.resetDurationTimer()
 
     def writeSequence(self, data):
         for datum in data:
         return self.peer
 
     def read(self, data):
-        # TODO: parse packets, throw out duplicates, forward new ones to protocol
+        # TODO: parse packets, throw out duplicates, forward to protocol
         self.protocol.dataReceived(data)
 
-    def sendPackets(self, packets=None):
-        self.resetIntervalTimeout()
-        self.request.write(self.tryCompress(self.renderPackets(packets)))
+    def sendPackets(self, packets=None, finish=False):
+        self.resetIntervalTimer()
+        data = self.tryCompress(self.renderPackets(packets))
+        if finish:
+            self.request.setHeader('Content-Length', len(data))
+        self.request.write(data)
+        if finish:
+            self.request.finish()
+            self.request = None
 
     def renderPrebuffer(self):
         return "%s%s"%(self.prebuffer, self.permVars["p"])

File daemon/echo.py

     protocol = Echo
 
 if __name__ == "__main__":
-    print "listening CSP@8050"
+    print "echo listening on CSP@8050"
     reactor.listenWith(CometPort, port=8050, factory=EchoFactory())
     reactor.run()

File daemon/proxy.py

+from twisted.internet import reactor, protocol
+from csp.port import CometPort
+from csp.util import json
+
+FRAME_OPEN  = 0
+FRAME_CLOSE = 1
+FRAME_DATA  = 2
+
+CODES = {
+    'InvalidHandshake': 102,
+    'UserConnectionReset': 103,
+    'RemoteConnectionTimeout': 104,
+    'Unauthorized': 106,
+    'RemoteConnectionFailed': 108,
+    'ProtocolError': 110
+}
+
+class Outgoing(protocol.Protocol):
+    def __init__(self, incoming, socketId):
+        self.incoming = incoming
+        self.socketId = socketId
+
+    def connectionMade(self):
+        self.incoming.newOutgoing(self)
+
+    def dataReceived(self, data):
+        self.incoming.write([FRAME_DATA, self.socketId, data])
+
+    def connectionLost(self, reason):
+        self.incoming.closeStream(self.socketId, 'RemoteConnectionFailed')
+
+class Incoming(protocol.Protocol):
+    def connectionMade(self):
+        self.buffer = ""
+        self.buffers = {}
+        self.sockets = {}
+        self.active = True
+
+    def write(self, rawdata):
+        data = json.dumps(rawdata)
+        self.transport.write("%s,%s"%(len(data), data))
+
+    def connectionLost(self):
+        print "connectionLost"
+        self.active = False
+        self.buffer = ""
+        for key in self.buffers.keys():
+            del self.buffers[key]
+        for key, sock in self.sockets.items():
+            del self.sockets[key]
+            sock.transport.loseConnection()
+
+    def fatalError(self, msg):
+        print "fatal error:",msg
+        self.transport.loseConnection()
+
+    def closeStream(self, socketId, code):
+        if self.active:
+            if socketId in self.sockets:
+                self.sockets[socketId].transport.loseConnection()
+                del self.sockets[socketId]
+            if socketId in self.buffers:
+                del self.buffers[socketId]
+            self.write([FRAME_CLOSE, socketId, CODES[code]])
+
+    def newOutgoing(self, outgoing):
+        key = outgoing.socketId
+        self.sockets[key] = outgoing
+        for frame in self.buffers[key]:
+            self.processFrame(*frame)
+        del self.buffers[key]
+
+    def processFrame(self, frameType, socketId, *data):
+        if frameType == FRAME_CLOSE:
+            return self.closeStream(socketId, 'UserConnectionReset')
+        if frameType != FRAME_DATA:
+            return self.closeStream(socketId, 'ProtocolError')
+        self.sockets[socketId].transport.write(data)
+
+    def dataReceived(self, rawdata=""):
+        print 'dataReceived:',rawdata
+        # extract first frame
+        self.buffer += rawdata
+        comma = self.buffer.find(',')
+        if comma == -1:
+            return # wait for more bytes
+        size = self.buffer[:comma]
+        try:
+            size = int(size)
+        except:
+            return self.fatalError("non-integer frame size")
+        end = comma + 1 + size
+        if len(self.buffer) < end:
+            return # wait for more bytes
+        frame, self.buffer = self.buffer[comma+1:end], self.buffer[end:]
+        try:
+            frame = json.loads(frame)
+        except:
+            return self.fatalError("cannot parse frame")
+
+        # extract basic frame info
+        if len(frame) < 3:
+            return self.fatalError("illegal frame")
+        frameType, socketId, data = frame[0], frame[1], frame[2:]
+
+        # established stream
+        if socketId in self.sockets:
+            processFrame(*frame)
+
+        # handshake
+        else:
+            if socketId in self.buffers:
+                return self.buffers[socketId].append(frame)
+            if frameType != FRAME_OPEN:
+                return self.closeStream(socketId, 'ProtocolError')
+            try:
+                host, port = data
+            except:
+                return self.closeStream(socketId, 'InvalidHandshake')
+            if False: # here, make sure this connection is allowed
+                return self.closeStream(socketId, 'Unauthorized')
+            out = protocol.ClientCreator(reactor, Outgoing, self, socketId)
+            out.connectTCP(host, port).addErrback(self.closeStream, socketId, 'RemoteConnectionFailed')
+            self.buffers[socketId] = []
+
+        # repeat
+        self.dataReceived()
+
+class ProxyFactory(protocol.Factory):
+    protocol = Incoming
+
+if __name__ == "__main__":
+    print "proxy listening on CSP@8050"
+    reactor.listenWith(CometPort, port=8050, factory=ProxyFactory())
+    reactor.run()