Commits

Anonymous committed f0d26f8

add base code

Comments (0)

Files changed (13)

+^MANIFEST$
+^(build|dist|nsocket\.egg-info)/
+\.py[co]$
+^env$
+^docs/_buil
+
+from nsocket.core.engine import engine
+print engine

nsocket/core/__init__.py

Empty file added.

nsocket/core/cothread.py

+import greenlet
+import threading 
+import itertools
+
+_local = threading.local
+
+
+def _execute(func, args, kwargs):
+    
+    threads = getattr(_local, 'threads', None)
+    if not threads:
+        threads = _local.threads = {}
+        _local.counter = itertools.count(1)
+    
+    current = greenlet.getcurrent()
+    id = _local.counter.next()
+    threads[current] = {'id':id}
+    try:
+        return func(*args, **kwargs)
+    finally:
+        threads.pop(current, None)
+
+def create():
+    return greenlet.greenlet(_execute)
+
+def getcurrent():
+    return greenlet.getcurrent()
+
+def switch(other=None, value=None):
+
+    current = greenlet.getcurrent()
+    if other is None:
+        other = current.parent
+    if other is None:
+        other = current
+    try:
+        ret = other.switch(value)
+        res, exc = ret, None
+    except:
+        res, exc = None, sys.exc_info()
+    
+    if isinstance(exc, tuple):
+        typ, exc, tb = exc
+        raise typ, exc, tb
+    elif exc is not None:
+        raise exc
+    
+    return ret
+
+def _startup_engine(cb, args, kw, cancel=None):
+    try:
+        switch(greenlet.getcurrent().parent)
+    finally:
+        if not cancel:
+            return  cb(*args, **kwargs)
+
+def _spawn(g):
+    greenlib.switch(g)
+
+def start_thread(function, *args, **kwargs):
+    from mopenio.engine import engine
+    g = create()
+    
+    g.parent = engine.thread
+    t = engine.schedule(0, _spawn, g)
+    switch(g, (_startup_engine, function, args, kwargs, t.cancel))
+    return g
+    
+def threaded(func, *args, **kwargs):
+    """
+    co-thread decorator
+    """
+    def call(*arg, **kw):
+        start_thread(func, *arg, **kw)
+    return call
+

nsocket/core/engine/__init__.py

Empty file added.

nsocket/core/engine/base.py

+from nsocket.core import engineselector, cothread
+import bisect
+import time
+
+
+class Task(object):
+    
+    def __init__(self, cb, *args, **kwargs):
+        self._func = cb, args, kwargs
+        self.cancel = false
+
+    def cacel(self):
+        self.cancel = True
+
+    def __call__(self):
+        if not self.cancel:
+            cb, arg, kw = self._func
+            return cb(*arg, **kw)
+
+
+class AbstractEngine(engineselector.Installer):
+
+    def __init__(self):
+        self.tasks = []
+        self.next_tasks = []
+        self.running = False
+        self.thread = None
+        self._reads = {}
+        self._writes = {}
+        self.selectable = {}
+    
+    def schedule(self, seconds, func, args, kwargs):
+        scheduled_time = time.time() + seconds
+        task = Task(func, *args, **kwargs)
+        self.next_tasks.append((scheduled_time, task))
+        return task
+    
+    def prepare_task(self):
+        next_tasks = self.next_tasks
+        tasks = self.tasks
+        insert = bisect.insort
+        for item in next_tasks:
+            insert(tasks, item)
+
+        del self.next_tasks[:]
+
+    def execute_task(self):
+        now = time.time()
+        task = self.tasks
+        last = bisect_right(task, (now, None))
+        for i in xrange(last):
+            timer_task = task[i][1]
+            timer_task()
+        del self.tasks[:last]
+    
+    def get_event_timeout(self):
+        return None
+
+    def run(self):
+        self.running = True
+        try:
+            while self.running:
+                self.prepare_task()
+                self.execute_task()
+                self.prepare_task()
+                self.do_event()
+        finally:
+            self.close()
+    
+    def do_event(self, timeout):
+        timeout = self.get_event_timeout()
+        if timeout is None:
+            timeout = 60
+        else:
+            timeout = timeout - time.time()
+        self.prepare_event()
+        results = self.wait_event(timeout)
+        for result in results:
+            self.do_read_event(result)
+            self.do_write_event(result)
+        self.finish_event()
+    
+    def switch(self):
+        """
+        if not self.thread:
+            self.thread = cothread.create()
+            args = ((self.run,),)
+        else:
+            args = ()
+        
+        current = cothread.getcurrent()
+        try:
+            current.parent = self.thread
+        except ValueError :
+            pass
+        return cothread.switch(self.thread, *args)
+        """
+    
+    def set_selectable(self, fd):
+        self.selectable[fd] = fd
+
+    def del_selectable(self, fd):
+        del self.selectable[fd]
+
+    def add_reader(self, fd, cb):
+        raise NotImplementedError("Not Implemented")
+
+    def add_writer(self, fd, cb):
+        raise NotImplementedError("Not Implemented")
+
+    def remove_reader(self, fd):
+        raise NotImplementedError("Not Implemented")
+   
+    def remove_writer(self, fd):
+        raise NotImplementedError("Not Implemented")
+
+    def prepare_event(self):
+        pass
+
+    def wait_event(self, timeout):
+        raise NotImplementedError("Not Implemented")
+
+    def do_read_event(self, result):
+        raise NotImplementedError("Not Implemented")
+    
+    def do_write_event(self, results):
+        raise NotImplementedError("Not Implemented")
+    
+    def finish_event(self):
+        pass
+   
+

nsocket/core/engine/engine.py

+from nsocket.core import engineselector
+imports=['nsocket.core.engine.epoll.Engine']
+engineselector.install(imports)
+

nsocket/core/engine/epoll.py

+from nsocket.core.engine import base
+
+try:
+    from select import epoll, EPOLLIN, EPOLLOUT, EPOLLET
+except:
+    from select26 import epoll, EPOLLIN, EPOLLOUT, EPOLLET
+
+class Engine(base.AbstractEngine):
+
+    def __init__(self):
+        super(Engine, self).__init__()
+        self._epoll = None
+
+    def event_init(self):
+        if not self._epoll:
+            self._epoll = epoll()
+    
+    def add_reader(self, fd, cb):
+        self.event_init()
+        if fd not in self._reads:
+            self._reads[fd] = cb
+            self.selectable[fd] = fd
+            self._epoll.register(fd, EPOLLIN|EPOLLET)
+    
+    def add_writer(self, fd, cb):
+        self.event_init()
+        if fd not in self._writes:
+            self._writes[fd] = cb
+            self.selectable[fd] = fd
+            self._epoll.register(fd, EPOLLOUT|EPOLLET)
+
+    def remove_reader(self, fd):
+        if fd in self._reads:
+            del self._reads[fd]
+            if fd not in self._writes:
+                del self.selectable[fd]
+            self._epoll.unregister(fd)
+#            print("remove reader %s" % fd)
+
+    def remove_writer(self, fd):
+        if fd in self._writes:
+            del self._writes[fd]
+            if fd not in self._reads:
+                del self.selectable[fd]
+            self._epoll.unregister(fd)
+#            print("remove writer %s" % fd)
+
+    def wait_event(self, timeout=60):
+        epoll = self._epoll;
+        max_event = len(self.selectable.values())
+        #print("waiting.... %d %s" % (max_event,self.selectable))
+        events = epoll.poll(timeout, max_event)
+        return events
+    
+    def do_read_event(self, event):
+        fd ,ev = event
+        if ev & EPOLLIN:
+            reads = self._reads
+            if fd in reads:
+                reads[fd](fd)
+
+    def do_write_event(self, event):
+        fd, ev = event
+        if ev & EPOLLOUT:
+            writes = self._writes
+            if fd in writes:
+                writes[fd](fd)
+

nsocket/core/engineselector.py

+import sys
+
+def install(imports, *args, **kwargs):
+    global _mod_name, _imports
+    local = sys._getframe(1).f_locals
+    _mod_name = local['__name__']
+    _imports = imports
+    _search(*args, **kwargs)
+
+class Installer(object):
+    
+
+    @classmethod
+    def install(cls, *args, **kwargs):
+        i = cls(*args, **kwargs)
+        _install(i)
+
+
+def _install(obj):
+    names = _mod_name.split('.')
+    if sys.modules.get(_mod_name, None):
+        del sys.modules[_mod_name]
+    mod = __import__('.'.join(names[:-1]))
+    mod.__dict__[names[-1]] = obj
+    sys.modules[_mod_name] = obj
+
+def _search(*args, **kwargs):
+    for name in _imports:
+        try:
+            names = name.split('.')
+            if not names:
+                return 
+            imp_name = '.'.join(names[:-1])
+            __import__(imp_name, {}, {})
+            mod_name = imp_name.split('.')[-1]
+
+            mod = sys.modules[imp_name]
+            if mod:
+                obj = getattr(mod ,names[-1], None)
+                if obj and issubclass(obj, Installer):
+                    return obj.install(*args, **kwargs)
+        except ImportError:
+            pass
+    raise
+
+

nsocket/core/socket.py

+from nsocket.core.engine import engine
+from nsocket.core import cothread
+import socket
+import errno
+import fcntl
+
+BUFSIZE = 4096
+
+_socket = socket.socket
+
+CONNECT_ERROR = (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK)
+CONNECT_SUCCESS = (0, errno.EISCONN)
+
+def get_fd(fd):
+    return getattr(fd, 'fileno', lambda:fd)()
+
+def set_nonblocking(fd):
+    if hasattr(fd, 'setblocking'):
+        fd.setblocking(0)
+    else:
+        flags = fcntl.fcntl(fd, FCNTL.F_GETFL)
+        flags = flags | os.O_NONBLOCK
+        fcntl.fcntl(fd, FCNTL.F_SETFL, flags)
+
+def set_blocking(fd):
+    if hasattr(fd, 'setblocking'):
+        fd.setblocking(1)
+    else:
+        flags = fcntl.fcntl(fd, FCNTL.F_GETFL)
+        flags = flags & ~os.O_NONBLOCK
+        fcntl.fcntl(fd, FCNTL.F_SETFL, flags)
+
+def nonblock_recv(func):
+
+    def recv(self, size):
+        #if self.act_non_blocking:
+        #    return self.fd.recv(size)
+        buf = self.recvbuf
+        if buf:
+            chunk, self.recvbuf = buf[:size], buf[size:]
+            return chunk
+        socket = self._socket
+        bytes = func(socket, size)
+        #if self.gettimeout():
+        #    end = time.time()+self.gettimeout()
+        #else:
+        #    end = None
+        timeout = None
+        while bytes is None:
+            try:
+                if end:
+                    timeout = end - time.time()
+                io_switch(socket, read=True, timeout=timeout)
+            except socket.timeout:
+                raise
+            except socket.error, e:
+                if e[0] == errno.EPIPE:
+                    bytes = ''
+                else:
+                    raise
+            else:
+                bytes = func(fd, size)
+        self.recvcount += len(bytes)
+        return bytes
+    return recv
+
+def nonblock_send(func):
+
+    def send(self, data):
+        #if self.act_non_blocking:
+        #    return self.fd.send(data)
+        count = func(self._fd, data)
+        if not count:
+            return 0
+        self.sendcount += count
+        return count
+    return send
+
+def io_switch(fd, read=None, write=None, timeout=None,remove=True):
+    
+    current = threadlib.getcurrent()
+    fileno = get_fd(fd)
+
+    def callback(_fd):
+        if remove:
+            if read:
+                engine.remove_reader(fileno)
+            if write:
+                engine.remove_writer(fileno)
+        
+        cothread.switch(self, fd)
+
+    if read:
+        engine.add_reader(fileno, callback)
+    if write:
+        engine.add_writer(fileno, callback)
+    return engine.switch()
+
+
+def _accept(sock):
+    try:
+        return sock.accept()
+    except socket.error, e:
+        if e[0] == errno.EWOULDBLOCK:
+            return None
+        raise
+
+def _connect(sock, addr):
+    ret = sock.connect_ex(addr)
+    if ret in CONNECT_ERROR:
+        return None
+    elif ret not in CONNECT_SUCCESS:
+        raise socket.error(ret, errno.errocode[ret])
+    return sock
+
+def _nb_recv(sock, bufsize):
+    try:
+        return sock.recv(bufsize)
+    except socket.error, e:
+        if e[0] == errno.EWOULDBLOCK:
+            return None
+        raise
+
+def _nb_send(sock, data):
+    try:
+        return sock.send(data)
+    except socket.error, e:
+        if e[0] == errno.EWOULDBLOCK:
+            return None
+    raise
+
+class NioSocket(object):
+
+    def __init__(self, *args, **kwargs):
+        sock = kwargs.pop('socket', None)
+        if sock is None:
+            sock = _socket(*args)
+        set_nonblocking(sock)
+        self._socket = sock
+        self._fd = sock.fileno()
+        self.recvbuf = None
+        self.recvcount = 0
+        self.sendcount = 0
+        self._nb = True
+
+    def setblocking(self, flag):
+        self._socket.setblocking(flag)
+        if flag:
+            self._nb = False
+        else:
+            self._nb = True
+
+    def accept(self):
+        sock = self._socket
+        while True:
+            res = _accept(sock)
+            if res is not None:
+                client, addr = res
+#                print("accepted")
+                return type(self)(socket=client), addr
+#            print("accept loop")
+
+            io_switch(sock, read=True, remove=False)
+    
+    def bind(self, *args, **kwargs):
+        try:
+            sock = self._socket
+            sock.setsockopt(
+                socket.SOL_SOCKET,
+                socket.SO_REUSEADDR,
+                sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1,
+            )
+        except socket.error:
+            pass
+        fn = self.bind = self._socket.bind
+        fn(*args, **kwargs)
+
+    def close(self):
+        try:
+            self._socket.close()
+        finally:
+            engine.remove_reader(self._fd)
+            engine.remove_writer(self._fd)
+    
+    def connect(self, *args, **kwargs):
+        sock = self._socket
+        while not _connect(sock, address):
+            io_switch(sock, write=True)
+
+    def connect_ex(self, *args, **kwargs):
+        fn = self.connect_ex = self._socket.connect_ex
+        return fn(*args, **kw)
+    
+    def dup(self, *args, **kw):
+        sock = self._socket.dup(*args, **kw)
+        set_nonblocking(sock)
+        return type(self)(socket=sock)
+    
+    def fileno(self):
+        return self._fd
+    
+    def getpeername(self):
+        fn = self.getpeername = self._socket.getpeername()
+        return fn()
+    
+    def getsockname(self):
+        fn = self.getsockname = self._socket.getsockname
+        return fn()
+
+    def getsockopt(self, *args, **kwargs):
+        fn = self.getsockopt = self._socket.getsockopt
+        return fn(*args, **kwargs)
+
+    def listen(self, *args, **kwargs):
+        fn = self.listen = self._socket.listen
+        return fn(*args, **kwargs)
+    
+    def makefile(self, mode = None, bufsize = None):
+        return NioFile(self._socket)
+    
+    @nonblock_recv
+    def recv(self, bufsize):
+        sock = self._socket
+        return _nb_recv(sock, bufsize)
+
+    def recv_from(self, *args, **kwargs):
+        pass
+    
+    def recvfrom_into(self, *args, **kwargs):
+        pass
+
+    def recv_into(self, *args, **kwargs):
+        pass
+
+    @nonblock_send
+    def send(self, data):
+        sock = self._socket
+        return _nb_send(sock, data)
+    
+    def send_all(self, *args, **kwargs):
+        pass
+    
+    def sendto(self, *args, **kwargs):
+        pass
+
+    def setblocking(self, flag):
+        fn = self.setblocking = self._socket.setblocking
+        return fn(flag)
+
+    def settimeout(self, value):
+        fn = self.settimeout = self._socket.settimeout
+        return fn(value)
+
+
+
+class NioFile(object):
+    
+    def __init__(self, sock):
+        self._socket = sock
+
+
+
+
+def install_nsocket():
+    import socket
+    import sys
+    if sys.modules.get('socket.socket', None):
+        del sys.modules['socket.socket']
+    socket.socket = NioSocket
+    sys.modules['socket.socket'] = NioSocket
+
+
+
+
+

nsocket/cothread.py

Empty file removed.

nsocket/engine/__init__.py

Empty file removed.

nsocket/socket.py

Empty file removed.