Commits

Jamie Turner  committed eefd558

Server functions working.

  • Participants
  • Parent commits 47bb7b2

Comments (0)

Files changed (4)

File diesel/__init__.py

 import logmod
 log = logmod.log
 from logmod import Logger, LOGLVL_DEBUG, LOGLVL_INFO, LOGLVL_WARN, LOGLVL_ERR, LOGLVL_CRITICAL
-#from core import until, until_eol, bytes, sleep, up, Connection, ConnectionClosed, Loop
-#from core import fire, wait, catch, thread, ClientConnectionError, ClientConnectionClosed
-#from core import LoopKeepAlive, packet
+#from core import until, until_eol, bytes, Connection, ConnectionClosed, Loop, packet
+#from core import ClientConnectionError, ClientConnectionClosed
 import events
-from core import sleep, Loop, wait, fire, thread
+from core import sleep, Loop, wait, fire, thread, until, Connection, ConnectionClosed
+from core import until_eol, send, count
 from app import Application, Service 
 from client import Client, call, message, response
 from security import TLSv1ServiceWrapper, TLSv1ClientWrapper

File diesel/app.py

 from greenlet import greenlet
 
 from diesel.hub import EventHub
-from diesel import logmod, log
+from diesel import logmod, log, Connection, Loop
 from diesel.security import ssl_async_handshake
 from diesel import runtime
 from diesel.events import WaitPool
             loop.keep_alive = True
 
         if self._run:
-            self.hub.schedule(loop.iterate)
+            self.hub.schedule(loop.wake)
         else:
             if front:
                 self._loops.insert(0, loop)
             raise
         sock.setblocking(0)
         def make_connection():
-            Connection(sock, addr, self.connection_handler).iterate()
+            c = Connection(runtime.current_app.hub, sock, addr)
+            l = Loop(self.connection_handler, addr)
+            l.connection_stack.append(c)
+            runtime.current_app.add_loop(l)
         if self.security:
             sock = self.security.wrap(sock)
             ssl_async_handshake(sock, self.application.hub, make_connection)

File diesel/core.py

 CRLF = '\r\n'
 BUFSIZ = 2 ** 14
 
-def until(sentinel):
-    pass
+def until(*args, **kw):
+    return current_loop.input_op(*args, **kw)
+
+def until_eol():
+    return until("\r\n")
+
+def count(*args, **kw):
+    return current_loop.input_op(*args, **kw)
+    
+def send(*args, **kw):
+    return current_loop.send(*args, **kw)
 
 def wait(*args, **kw):
     return current_loop.wait(*args, **kw)
 def fire(*args, **kw):
     return current_loop.fire(*args, **kw)
 
-def packet(s, priority=5):
-    pass
-
 def sleep(*args, **kw):
     return current_loop.sleep(*args, **kw)
     
         if tot == 1:
             return f
         def m_c(res):
+            if isinstance(res, Exception):
+                return f(res)
             real_arg = [None] * tot
             real_arg[pos] = res
             return f(tuple(real_arg))
 ids = id_gen()
 
 class Loop(object):
-    '''A cooperative generator that represents an arbitrary piece of
-    logic.
-    '''
     def __init__(self, loop_callable, *args, **kw):
         self.loop_callable = loop_callable
         self.args = args
         self._wakeup_timer = None
         self.fire_handlers = {}
         self.coroutine = greenlet(self.run)
+        self.connection_stack = []
 
     def run(self):
-        # XXX -- handle keep_alive
         try:
             self.loop_callable(*self.args, **self.kw)
         except:
                 self.reset()
                 self.hub.call_later(0.5, self.wake)
             self.app.runhub.throw(*sys.exc_info())
+        finally:
+            if self.connection_stack:
+                self.connection_stack.pop().shutdown()
 
     def __hash__(self):
         return self.id
             self.coroutine.switch(value)
         else:
             self.coroutine.switch()
+
+    def input_op(self, sentinel_or_count):
+        v = self._input_op(sentinel_or_count)
+        if v:
+            return v
+        else:
+            return self.dispatch()
+
+    def _input_op(self, sentinel, cb_maker=identity):
+        conn = self.check_connection()
+        cb = cb_maker(self.wake)
+        res = conn.buffer.set_term(sentinel)
+        return self.check_buffer(conn, cb)
+        
+    def check_buffer(self, conn, cb):
+        res = conn.buffer.check()
+        if res:
+            return res
+        conn.waiting_callback = cb
+        return None
+
+    def check_connection(self):
+        try:
+            conn = self.connection_stack[-1]
+        except IndexError:
+            raise RuntimeError("Cannot complete socket operation: no associated connection")
+        if conn.closed:
+            raise RuntimeError("Cannot complete socket operation: associated connection is closed")
+        return conn
+
+    def send(self, o, priority=5):
+        conn = self.check_connection()
+        conn.pipeline.add(o, priority)
+        conn.set_writable(True)
+
+class Connection(object):
+    def __init__(self, hub, sock, addr):
+        self.hub = hub
+        self.pipeline = pipeline.Pipeline()
+        self.buffer = buffer.Buffer()
+        self.sock = sock
+        self.addr = addr
+        self.hub.register(sock, self.handle_read, self.handle_write, self.handle_error)
+        self._writable = False
+        self.closed = False
+        self.waiting_callback = None
+
+    def set_writable(self, val):
+        '''Set the associated socket writable.  Called when there is
+        data on the outgoing pipeline ready to be delivered to the 
+        remote host.
+        '''
+        if self.closed:
+            return
+        if val and not self._writable:
+            self.hub.enable_write(self.sock)
+            self._writable = True
+            return
+        if not val and self._writable:
+            self.hub.disable_write(self.sock)
+            self._writable = False
+
+    def shutdown(self, remote_closed=False):
+        '''Clean up after a client disconnects or after
+        the connection_handler ends (and we disconnect).
+        '''
+        self.hub.unregister(self.sock)
+        self.closed = True
+        self.sock.close()
+
+        if remote_closed and self.waiting_callback:
+            self.waiting_callback(ConnectionClosed('Connection closed by remote host'))
+
+    def handle_write(self):
+        '''The low-level handler called by the event hub
+        when the socket is ready for writing.
+        '''
+        if not self.pipeline.empty and not self.closed:
+            try:
+                data = self.pipeline.read(BUFSIZ)
+            except pipeline.PipelineCloseRequest:
+                self.shutdown()
+            else:
+                try:
+                    bsent = self.sock.send(data)
+                except socket.error, s:
+                    code, s = e
+                    if code in (errno.EAGAIN, errno.EINTR):
+                        self.pipeline.backup(data)
+                        return True
+                    self.shutdown(True)
+                else:
+                    if bsent != len(data):
+                        self.pipeline.backup(data[bsent:])
+
+                    if not self.pipeline.empty:
+                        return True
+                    else:
+                        self.set_writable(False)
+
+    def handle_read(self):
+        '''The low-level handler called by the event hub
+        when the socket is ready for reading.
+        '''
+        if self.closed:
+            return
+        try:
+            data = self.sock.recv(BUFSIZ)
+        except socket.error, e:
+            code, s = e
+            if code in (errno.EAGAIN, errno.EINTR):
+                return
+            data = ''
+
+        if not data:
+            self.shutdown(True)
+        else:
+            res = self.buffer.feed(data)
+            if res:
+                self.waiting_callback(res)
+
+    def handle_error(self):
+        self.shutdown(True)

File examples/echo.py

 # vim:ts=4:sw=4:expandtab
 '''Simple echo server.
 '''
-from diesel import Application, Service, until_eol
+from diesel import Application, Service, until_eol, send
 
 def hi_server(addr):
     while 1:
-        inp = (yield until_eol())
+        inp = until_eol()
         if inp.strip() == "quit":
             break
-        yield "you said %s" % inp
+        send("you said %s" % inp)
 
 app = Application()
 app.add_service(Service(hi_server, 8013))