Commits

Matt Joiner  committed e710f48

Rename project to green380, and move a bunch of stuff around

  • Participants
  • Parent commits b0cc020

Comments (0)

Files changed (10)

+ * Implement timeouts.
+ * Check with the POSIX or equivalent standard for which signals are ignored by default, and hook those when the scheduler starts.
+ * Test for functionality on Windows, and with select(), and poll().

File examples/child_proc.py

+#!/usr/bin/env python3.3
+
 from fcntl import fcntl, F_SETFL
-from gthread import spawn, run
+from green380 import spawn, run
 from os import O_NONBLOCK
 from pdb import set_trace
 from signal import SIGCHLD

File examples/http_server.py

+#!/usr/bin/env python3.3
+
 import socket
-import gthread
+#
+import green380
 
 def handler(sock, addr):
     print('Handling connection from', addr)
     yield from sock.until_sendall('Hello {}!\n'.format(addr).encode())
     sock.close()
 
-@gthread.spawn
+@green380.spawn
 def server():
-    sock = gthread.socket()
+    sock = green380.socket()
     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
     sock.bind(('', 8080))
     sock.listen(socket.SOMAXCONN)
     while True:
         new_sock, addr = yield from sock.until_accept()
-        gthread.spawn(handler, new_sock, addr)
+        green380.spawn(handler, new_sock, addr)
 
-gthread.run()
+green380.run()

File examples/prime_sieve.py

-from gthread import *
+#!/usr/bin/env python3.3
+
+from green380 import *
 
 def generate(ch, n):
     for i in range(2, n+1):

File fileno.py

-try:
-    # interestingly it's not faster to use the python api version
-    # but i'll use it for maximum correctness
-    import ctypes
-    fileno = ctypes.pythonapi.PyObject_AsFileDescriptor
-    fileno.argtypes = [ctypes.py_object]
-except (ImportError, AttributeError):
-    # see fileobject.c:PyObject_AsFileDescriptor
-    def fileno(obj):
-        if isinstance(obj, int):
-            fd = obj
-        elif hasattr(obj, 'fileno'):
-            fd = obj.fileno()
-        else:
-            raise TypeError('argument must be an int, or have a fileno() method.')
-        if fd < 0:
-            raise ValueError('file descriptor cannot be a negative integer (%i)' % fd)
-        return fd

File green380/__init__.py

+from ._core import *

File green380/_core.py

+__all__ = 'schedule', 'spawn', 'run', 'Channel', 'socket'
+
+import collections
+import errno
+import fcntl
+import heapq
+import logging
+import os
+import pdb
+import select
+import signal
+import socket as _socket
+import time
+
+from .fileno import fileno
+
+#~ logging.root.setLevel(logging.NOTSET)
+
+
+class Lock:
+
+    def __init__(self):
+        self._locked = False
+
+    def __enter__(self):
+        self.acquire()
+        return self
+
+    def __exit__(self, *args):
+        self.release()
+
+    def acquire(self, timeout=None):
+        if not self._locked:
+            self._locked = True
+            return True
+        self._waiters.add(current)
+        acquired = yield from timeout_multiplex(None, timeout)
+        yield
+        return acquired
+
+    def release(self):
+        if not self._locked:
+            raise RuntimeError("can't released unlocked lock")
+        if self._waiters:
+            w = self._waiter.pop()
+            w.send(True)
+            schedule(w)
+        else:
+            self._locked = False
+
+
+class RLock:
+
+    def __init__(self):
+        self._lock = Lock()
+        self._owner = None
+        self._depth = 0
+
+    def __enter__(self):
+        self.acquire()
+        return self
+
+    def __exit__(self, *args):
+        self.release()
+
+    def acquire(self, timeout=None):
+        if current == self._owner:
+            self._depth += 1
+            return True
+        if not self._lock.acquire(timeout):
+            return False
+        self._owner = current
+        return True
+
+    def release(self):
+        if current != self._owner:
+            raise RuntimeError('not the owner')
+        if not self._depth:
+            self._lock.release()
+            self._owner = None
+        self._depth -= 1
+
+
+class Event:
+
+    def __init__(self):
+        self._value = False
+        self._waiters = set()
+
+    def is_set(self):
+        return self._value
+
+    def set(self):
+        for w in self._waiters:
+            w.send(True)
+            schedule(w)
+        self._waiters.clear()
+
+    def clear(self):
+        self._value = False
+
+    def wait(self, timeout=None):
+        if self._value:
+            return True
+        waiter = current
+        self._waiters.add(waiter)
+        @spawn
+        def timeout_():
+            yield 'timeout', timeout
+            try:
+                self._waiters.remove(waiter)
+            except KeyError:
+                return
+            waiter.send(False)
+            schedule(waiter)
+        value = yield
+        yield
+        return value
+
+
+class Condition:
+
+    def __init__(self, lock=None):
+        if lock is None:
+            lock = RLock()
+        self._lock = lock
+        self._waiters = []
+
+    def notify(self, n=1):
+        for w in self._waiters[:n]:
+            w.send(True)
+            schedule(w)
+        self._waiters = self._waiters[n:]
+
+    def notify_all(self):
+        self.notify(len(self._waiters))
+
+    def wait(self, timeout):
+        notified = yield from timeout_multiplex(None, timeout)
+        yield
+        return notified
+
+
+class Queue:
+
+    class Timeout(Exception): pass
+
+    def __init__(self, capacity=None):
+        self._capacity = capacity
+        self._items = collections.deque()
+        self._lock = Lock()
+        self._not_full = Condition(self._lock)
+        self._not_empty = Condition(self._lock)
+
+    def put(self, item, timeout=None):
+        with self._lock:
+            if self._capacity:
+                while len(self._items) >= self._capacity:
+                    if not self._not_full.wait(timeout):
+                        raise self.Timeout
+            self._items.append(item)
+            self._not_empty.notify()
+
+    def get(self, timeout=None):
+        with self._lock:
+            while not self._items:
+                if not self._not_empty.wait(timeout):
+                    raise self.Timeout
+            item = self._items.popleft()
+            if len(self._items) < self._capacity:
+                self._not_full.notify()
+            return item
+
+
+class socket(_socket.socket):
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        super().setblocking(False)
+        self.__timeout = None
+
+    def until_recv_term(self, term):
+        buf = b''
+        while not buf.endswith(term):
+            yield 'read', self
+            buf1 = self.recv(0x10000, _socket.MSG_PEEK)
+            if not buf1:
+                break
+            index = (buf + buf1).find(term)
+            buf2 = self.recv(len(buf1) if index == -1 else index - len(buf) + len(term))
+            buf += buf2
+        return buf
+
+    def until_recv(self, count):
+        yield
+        return self.recv(term)
+
+    def until_sendall(self, buf):
+        while buf:
+            sent = yield from self.until_send(buf)
+            if not sent:
+                break
+            buf = buf[sent:]
+
+    def until_send(self, buf):
+        yield 'write', self
+        return self.send(buf)
+
+    def until_accept(self):
+        yield 'read', self
+        return self.accept()
+
+    def accept(self):
+        yield 'read', self
+        sock, addr = super().accept()
+        return self.__class__(fileno=sock.detach()), addr
+
+    def settimeout(self, timeout):
+        self.__timeout = timeout
+
+    def setblocking(self, blocking):
+        self.settimeout(None if blocking else 0)
+
+    def connect(self, addr):
+        # TODO switch this to use connect_ex, should be faster
+        try:
+            return super().connect(addr)
+        except _socket.error as exc:
+            if exc.errno != errno.EINPROGRESS:
+                raise
+        @spawn
+        def timeout_fiber():
+            yield 'timeout', self.__timeout
+            main_fiber.send(False)
+            event_fiber.close()
+            schedule(main_fiber)
+        @spawn
+        def event_fiber():
+            yield 'write', self
+            err = self.getsockopt(_socket.SOL_SOCKET, _socket.SO_ERROR)
+            if err == 0:
+                main_fiber.send(True)
+            else:
+                main_fiber.throw(_socket.error(err, os.strerror(err)))
+            timeout_fiber.close()
+            schedule(main_fiber)
+        main_fiber = current
+        try:
+            yield
+        finally:
+            yield
+
+def timeout_multiplex(event, timeout):
+    @spawn
+    def timeout_fiber():
+        yield 'timeout', timeout
+        main_fiber.send(False)
+    @spawn
+    def event_fiber():
+        yield event
+        main_fiber.send(True)
+    main_fiber = current
+    try:
+        return (yield)
+    finally:
+        timeout_fiber.close()
+        event_fiber.close()
+
+
+class Channel:
+
+    def __init__(self):
+        # serious black magic; deque is fastest
+        self._receivers = collections.deque()
+        self._senders = collections.deque()
+
+    def get(self):
+        if self._senders:
+            sender = self._senders.popleft()
+            item = next(sender)
+            schedule(sender)
+            return item
+        else:
+            self._receivers.append(current)
+            item = yield
+            yield
+            return item
+
+    def put(self, item):
+        if self._receivers:
+            receiver = self._receivers.popleft()
+            receiver.send(item)
+            schedule(receiver)
+        else:
+            self._senders.append(current)
+            yield
+            yield item
+
+
+class _Scheduler:
+
+    _close_mask = select.EPOLLHUP|select.EPOLLERR
+    _read_mask = select.EPOLLIN|select.EPOLLPRI|_close_mask
+    _read_mask |= getattr(select, 'EPOLLRDHUP', 0)
+    _write_mask = select.EPOLLOUT|_close_mask
+
+    def __init__(self):
+        self._poll_obj = select.epoll()
+        open_max = os.sysconf(os.sysconf_names['SC_OPEN_MAX'])
+        self._readers = [set() for _ in range(open_max)]
+        self._writers = [set() for _ in range(open_max)]
+        self._signals = collections.defaultdict(set)
+        self._registered = set()
+        self._deadlines = []
+        self._signalfd, self._wakeup_fd = os.pipe()
+        fcntl.fcntl(self._wakeup_fd, fcntl.F_SETFL, os.O_NONBLOCK)
+        signal.set_wakeup_fd(self._wakeup_fd)
+        self._ready = []
+        self._original_signal_handlers = {}
+        self.spawn(self._signalfd_reader)
+        self._hook_default_signals()
+
+    def _hook_default_signals(self):
+        if signal.getsignal(signal.SIGCHLD) == signal.SIG_DFL:
+            logging.debug('Hooked signal SIGCHLD')
+            if signal.signal(signal.SIGCHLD, self._handle_signal) != signal.SIG_DFL:
+                raise RuntimeError
+
+    def run_fiber(self, fiber, arg=None):
+        global current
+        current = fiber
+        try:
+            if __debug__:
+                logging.debug('running %s', fiber)
+            return fiber.send(arg)
+        except StopIteration:
+            pass
+        finally:
+            current = None
+
+    def handle_event(self, fiber, event):
+        new_event = self.run_fiber(fiber)
+        if new_event != event:
+            if event is not None:
+                self.remove_event(fiber, *event)
+            if new_event is not None:
+                self.add_event(fiber, *new_event)
+
+    def _signalfd_reader(self):
+        while True:
+            yield 'read', self._signalfd
+            buf = os.read(self._signalfd, 0x100)
+            if not buf:
+                break
+            for signum in buf:
+                logging.debug('Got signal: %s', signum)
+                for fiber in self._signals[signum].copy():
+                    self.handle_event(fiber, ('signal', signum))
+
+    def run(self):
+        while any([self._registered - {self._signalfd}, self._ready, self._deadlines, any(self._signals.values())]):
+            if self._registered:
+                if self._ready:
+                    timeout = 0
+                elif self._deadlines:
+                    timeout = max(0, self._deadlines[0][0] - time.time())
+                else:
+                    timeout = -1
+                while True:
+                    try:
+                        fd_masks = self._poll_obj.poll(timeout)
+                    except InterruptedError:
+                        pass
+                    else:
+                        break
+                for fd, mask in fd_masks:
+                    #~ if mask & self._close_mask:
+                        #~ self._registered.remove(fd)
+                    if mask & self._read_mask:
+                        readers = self._readers[fd]
+                        for fiber in readers.copy():
+                            if fiber in readers:
+                                self.handle_event(fiber, ('read', fd))
+                    if mask & self._write_mask:
+                        writers = self._writers[fd]
+                        for fiber in writers.copy():
+                            if fiber in writers:
+                                self.handle_event(fiber, ('write', fd))
+            while self._deadlines and self._deadlines[0][0] < time.time():
+                deadline, fiber = heapq.heappop(self._deadlines)
+                self.handle_event(fiber, None)
+            ready = self._ready
+            self._ready = []
+            for fiber in ready:
+                self.handle_event(fiber, None)
+
+    def spawn(self, func, *args, **kwargs):
+        return self.schedule(func(*args, **kwargs))
+
+    def schedule(self, fiber):
+        if fiber is None:
+            assert False
+        self._ready.append(fiber)
+        return fiber
+
+    def _update_poll(self, fd):
+        mask = select.EPOLLIN if self._readers[fd] else 0
+        mask |= select.EPOLLOUT if self._writers[fd] else 0
+        if fd in self._registered:
+            if mask:
+                self._poll_obj.modify(fd, mask)
+            else:
+                self._poll_obj.unregister(fd)
+                self._registered.remove(fd)
+        else:
+            if mask:
+                self._poll_obj.register(fd, mask)
+                self._registered.add(fd)
+
+    def _handle_signal(self, signum, frame):
+        #~ logging.critical('signal handler: %s', signum)
+        pass
+
+    def add_event(self, fiber, filter, data):
+        if filter in {'read', 'write'}:
+            fd = fileno(data)
+            watchers = {'read': self._readers, 'write': self._writers}[filter][fd]
+            assert fiber not in watchers
+            watchers.add(fiber)
+            self._update_poll(fd)
+        elif filter == 'signal':
+            self._signals[data].add(fiber)
+            assert signal.getsignal(data) not in {signal.SIG_IGN, signal.SIG_DFL, None}
+        elif filter == 'timeout':
+            if data is not None:
+                assert data >= 0, data
+                heapq.heappush(self._deadlines, (time.time() + data, fiber))
+        else:
+            raise ValueError('Unknown filter', event.filter)
+
+    def remove_event(self, fiber, filter, data):
+        if filter in {'read', 'write'}:
+            fd = fileno(data)
+            watchers = {'read': self._readers, 'write': self._writers}[filter][fd]
+            watchers.remove(fiber)
+            self._update_poll(fd)
+        elif filter == 'signal':
+            self._signals[data].remove(fiber)
+        elif filter == 'timeout':
+            self._deadlines.remove((data, fiber))
+        else:
+            raise ValueError('unknown filter', event.filter)
+
+scheduler = _Scheduler()
+schedule = scheduler.schedule
+spawn = scheduler.spawn
+run = scheduler.run

File green380/fileno.py

+try:
+    # interestingly it's not faster to use the python api version
+    # but i'll use it for maximum correctness
+    import ctypes
+    fileno = ctypes.pythonapi.PyObject_AsFileDescriptor
+    fileno.argtypes = [ctypes.py_object]
+except (ImportError, AttributeError):
+    # see fileobject.c:PyObject_AsFileDescriptor
+    def fileno(obj):
+        if isinstance(obj, int):
+            fd = obj
+        elif hasattr(obj, 'fileno'):
+            fd = obj.fileno()
+        else:
+            raise TypeError('argument must be an int, or have a fileno() method.')
+        if fd < 0:
+            raise ValueError('file descriptor cannot be a negative integer (%i)' % fd)
+        return fd

File gthread.py

-
-from time import time
-import select as _select
-
-import socket as _socket
-
-
-class socket(_socket.socket):
-
-    def __init__(self, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-        self.setblocking(False)
-
-    def until_recv_term(self, term):
-        buf = b''
-        while not buf.endswith(term):
-            yield 'read', self
-            buf1 = self.recv(0x10000, _socket.MSG_PEEK)
-            if not buf1:
-                break
-            index = (buf + buf1).find(term)
-            buf2 = self.recv(len(buf1) if index == -1 else index - len(buf) + len(term))
-            buf += buf2
-        return buf
-
-    def until_recv(self, count):
-        yield
-        return self.recv(term)
-
-    def until_sendall(self, buf):
-        while buf:
-            sent = yield from self.until_send(buf)
-            if not sent:
-                break
-            buf = buf[sent:]
-
-    def until_send(self, buf):
-        yield 'write', self
-        return self.send(buf)
-
-    def until_accept(self):
-        yield 'read', self
-        return self.accept()
-
-    def accept(self):
-        sock, addr = super().accept()
-        return self.__class__(fileno=sock.detach()), addr
-
-
-import collections
-
-class Channel:
-
-    def __init__(self):
-        #~ self._items = collections.deque()
-        self._receivers = set()
-        self._senders = set()
-
-    def get(self):
-        if self._senders:
-            sender = self._senders.pop()
-            item = next(sender)
-            schedule(sender)
-            return item
-        else:
-            self._receivers.add(current)
-            item = yield
-            yield
-            return item
-
-    def put(self, item):
-        if self._receivers:
-            receiver = self._receivers.pop()
-            receiver.send(item)
-            schedule(receiver)
-        else:
-            self._senders.add(current)
-            yield
-            yield item
-
-def spawn(*args, **kwargs):
-    return scheduler.spawn(*args, **kwargs)
-
-import fcntl
-import logging
-import os
-import pdb
-import signal
-
-#~ logging.root.setLevel(logging.NOTSET)
-
-from fileno import fileno
-
-class _Scheduler:
-
-    _close_mask = _select.EPOLLHUP|_select.EPOLLERR
-    _read_mask = _select.EPOLLIN|_select.EPOLLPRI|_close_mask
-    _read_mask |= getattr(_select, 'EPOLLRDHUP', 0)
-    _write_mask = _select.EPOLLOUT|_close_mask
-
-    def __init__(self):
-        self._poll_obj = _select.epoll()
-        open_max = os.sysconf(os.sysconf_names['SC_OPEN_MAX'])
-        self._readers = [set() for _ in range(open_max)]
-        self._writers = [set() for _ in range(open_max)]
-        self._signals = collections.defaultdict(set)
-        self._registered = set()
-        self._deadlines = []
-        self._signalfd, self._wakeup_fd = os.pipe()
-        fcntl.fcntl(self._wakeup_fd, fcntl.F_SETFL, os.O_NONBLOCK)
-        signal.set_wakeup_fd(self._wakeup_fd)
-        self._ready = []
-        self._original_signal_handlers = {}
-        self.spawn(self._signalfd_reader)
-        self._hook_default_signals()
-
-    def _hook_default_signals(self):
-        if signal.getsignal(signal.SIGCHLD) == signal.SIG_DFL:
-            logging.debug('Hooked signal SIGCHLD')
-            if signal.signal(signal.SIGCHLD, self._handle_signal) != signal.SIG_DFL:
-                raise RuntimeError
-
-    def run_fiber(self, fiber, arg=None):
-        global current
-        current = fiber
-        try:
-            if __debug__:
-                logging.debug('running %s', fiber)
-            return fiber.send(arg)
-        except StopIteration:
-            pass
-        finally:
-            current = None
-
-    def handle_event(self, fiber, event):
-        new_event = self.run_fiber(fiber)
-        if new_event != event:
-            if event is not None:
-                self.remove_event(fiber, *event)
-            if new_event is not None:
-                self.add_event(fiber, *new_event)
-
-    def _signalfd_reader(self):
-        while True:
-            yield 'read', self._signalfd
-            buf = os.read(self._signalfd, 0x100)
-            if not buf:
-                break
-            for signum in buf:
-                logging.debug('Got signal: %s', signum)
-                for fiber in self._signals[signum].copy():
-                    self.handle_event(fiber, ('signal', signum))
-
-    def run(self):
-        while any([self._registered - {self._signalfd}, self._ready, self._deadlines, any(self._signals.values())]):
-            if self._registered:
-                if self._ready:
-                    timeout = 0
-                elif self._deadlines:
-                    timeout = max(0, self._deadlines[0][0])
-                else:
-                    timeout = -1
-                while True:
-                    try:
-                        fd_masks = self._poll_obj.poll(timeout)
-                    except InterruptedError:
-                        pass
-                    else:
-                        break
-                for fd, mask in fd_masks:
-                    #~ if mask & self._close_mask:
-                        #~ self._registered.remove(fd)
-                    if mask & self._read_mask:
-                        readers = self._readers[fd]
-                        for fiber in readers.copy():
-                            if fiber in readers:
-                                self.handle_event(fiber, ('read', fd))
-                    if mask & self._write_mask:
-                        writers = self._writers[fd]
-                        for fiber in writers.copy():
-                            if fiber in writers:
-                                self.handle_event(fiber, ('write', fd))
-            ready = self._ready
-            self._ready = []
-            for fiber in ready:
-                self.handle_event(fiber, None)
-
-    def spawn(self, func, *args, **kwargs):
-        return self.schedule(func(*args, **kwargs))
-
-    def schedule(self, fiber):
-        if fiber is None:
-            assert False
-        self._ready.append(fiber)
-        return fiber
-
-    def _update_poll(self, fd):
-        mask = _select.EPOLLIN if self._readers[fd] else 0
-        mask |= _select.EPOLLOUT if self._writers[fd] else 0
-        if fd in self._registered:
-            if mask:
-                self._poll_obj.modify(fd, mask)
-            else:
-                self._poll_obj.unregister(fd)
-                self._registered.remove(fd)
-        else:
-            if mask:
-                self._poll_obj.register(fd, mask)
-                self._registered.add(fd)
-
-    def _handle_signal(self, signum, frame):
-        #~ logging.critical('signal handler: %s', signum)
-        pass
-
-    def add_event(self, fiber, filter, data):
-        if filter in {'read', 'write'}:
-            fd = fileno(data)
-            watchers = {'read': self._readers, 'write': self._writers}[filter][fd]
-            assert fiber not in watchers
-            watchers.add(fiber)
-            self._update_poll(fd)
-        elif filter == 'signal':
-            self._signals[data].add(fiber)
-            assert signal.getsignal(data) not in {signal.SIG_IGN, signal.SIG_DFL, None}
-        else:
-            raise ValueError('Unknown filter', event.filter)
-
-    def remove_event(self, fiber, filter, data):
-        if filter in {'read', 'write'}:
-            fd = fileno(data)
-            watchers = {'read': self._readers, 'write': self._writers}[filter][fd]
-            watchers.remove(fiber)
-            self._update_poll(fd)
-        elif filter == 'signal':
-            self._signals[data].remove(fiber)
-        else:
-            raise ValueError('unknown filter', event.filter)
-
-scheduler = _Scheduler()
-schedule = scheduler.schedule
-
-run = scheduler.run