Commits

Matt Joiner  committed d6a6eac

Basics with working examples

  • Participants

Comments (0)

Files changed (5)

File examples/child_proc.py

+from fcntl import fcntl, F_SETFL
+from gthread import Event, spawn, run
+from os import O_NONBLOCK
+from signal import SIGCHLD
+from subprocess import Popen, PIPE
+
+def find_var():
+    sigchld = Event('signal', SIGCHLD)
+    proc = Popen(['find', '/var'], stdout=PIPE, stderr=PIPE)
+    stdout = Event('read', proc.stdout)
+    stderr = Event('read', proc.stderr)
+    fcntl(proc.stdout, F_SETFL, O_NONBLOCK)
+    lines = 0
+    stderr_buf = b''
+    while any([sigchld, stdout, stderr]):
+        event = yield
+        if event is sigchld:
+            rc = proc.poll()
+            if rc is not None:
+                print('process terminated with returncode', rc)
+                sigchld.remove()
+        elif event is stdout:
+            buf = proc.stdout.read(100)
+            #~ print(buf)
+            if buf:
+                lines += buf.count(b'\n')
+            else:
+                stdout.remove()
+                print('got', lines, 'lines')
+        elif event is stderr:
+            buf = proc.stderr.read(1)
+            if not buf:
+                stderr.remove()
+            stderr_buf += buf
+
+spawn(find_var)
+run()

File examples/http_server.py

+import gthread
+import socket
+
+def handler(sock, addr):
+    print('Handling connection from', addr)
+    header = yield from sock.until_recv_term(b'\r\n\r\n')
+    print('Received header:', header)
+    yield from sock.until_sendall('Hello {}!\n'.format(addr).encode())
+    sock.close()
+
+def serve():
+    sock = gthread.socket()
+    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
+    sock.bind(('', 8080))
+    sock.listen(socket.SOMAXCONN)
+    while True:
+        new_sock, addr = yield from sock.until_accept()
+        gthread.spawn(handler, new_sock, addr)
+
+gthread.spawn(serve)
+
+gthread.run()

File examples/prime_sieve.py

+from gthread import *
+
+def generate(ch, n):
+    for i in range(2, n+1):
+        yield from ch.put(i)
+
+def filter(p, in_, out):
+    while True:
+        n = yield from in_.get()
+        if n % p:
+            yield from out.put(n)
+
+def sieve(n):
+    ch = Channel()
+    spawn(generate, ch, n)
+    while True:
+        p = yield from ch.get()
+        print(p)
+        ch1 = Channel()
+        spawn(filter, p, ch, ch1)
+        ch = ch1
+
+spawn(sieve, 1000)
+run()
+try:
+    # interestingly it's not faster to use the python api version
+    # but i'll use it for maximum correctness
+    import ctypes
+    fileno = ctypes.pythonapi.PyObject_AsFileDescriptor
+    fileno.argtypes = [ctypes.py_object]
+except (ImportError, AttributeError):
+    # see fileobject.c:PyObject_AsFileDescriptor
+    def fileno(obj):
+        if isinstance(obj, int):
+            fd = obj
+        elif hasattr(obj, 'fileno'):
+            fd = obj.fileno()
+        else:
+            raise TypeError('argument must be an int, or have a fileno() method.')
+        if fd < 0:
+            raise ValueError('file descriptor cannot be a negative integer (%i)' % fd)
+        return fd
+
+from time import time
+import select as _select
+
+import socket as _socket
+
+
+class socket(_socket.socket):
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.setblocking(False)
+
+    def until_recv_term(self, term):
+        with scheduler.event('read', self):
+            buf = b''
+            while not buf.endswith(term):
+                yield
+                buf1 = self.recv(0x10000, _socket.MSG_PEEK)
+                if not buf1:
+                    break
+                index = (buf + buf1).find(term)
+                buf2 = self.recv(len(buf1) if index == -1 else index - len(buf) + len(term))
+                buf += buf2
+            return buf
+
+    def until_recv(self, count):
+        yield
+        return self.recv(term)
+
+    def until_sendall(self, buf):
+        while buf:
+            sent = yield from self.until_send(buf)
+            if not sent:
+                break
+            buf = buf[sent:]
+
+    def until_send(self, buf):
+        with Event('write', self) as can_send:
+            event = yield
+            if event is can_send:
+                return self.send(buf)
+            yield event
+
+    def until_accept(self):
+        with event('read', self):
+            yield
+            return self.accept()
+
+    def accept(self):
+        sock, addr = super().accept()
+        return self.__class__(fileno=sock.detach()), addr
+
+
+import collections
+
+class Channel:
+
+    def __init__(self):
+        #~ self._items = collections.deque()
+        self._receivers = set()
+        self._senders = set()
+
+    def get(self):
+        #~ if self._items:
+            #~ return self._items.popleft()
+        if self._senders:
+            item, sender = self._senders.pop()
+            schedule(sender)
+            return item
+        self._receivers.add(current)
+        return (yield)
+
+    def put(self, item):
+        if self._receivers:
+            schedule(self._receivers.pop(), item)
+        else:
+            self._senders.add((item, current))
+            yield
+
+def spawn(*args, **kwargs):
+    scheduler.spawn(*args, **kwargs)
+
+
+class timeout:
+
+    def __init__(self, timeout):
+        self.deadline = timeout + time()
+
+    def __enter__(self):
+        scheduler().add_deadline(self.deadline)
+
+    def __exit__(self):
+        scheduler().remove_deadline(self.deadline)
+
+import os as _os
+
+from fileno import fileno
+
+
+class _Event:
+
+    def __init__(self, scheduler, filter, data):
+        self.scheduler = scheduler
+        self.filter = filter
+        self.data = data
+        self.fiber = current
+        self.add()
+
+    def __enter__(self):
+        self.add()
+        return self
+
+    def __exit__(self, *args):
+        self.remove()
+
+    def add(self):
+        self.scheduler.add_event(self)
+        self.active = True
+
+    def remove(self):
+        self.scheduler.remove_event(self)
+        self.active = False
+
+    def __bool__(self):
+        return self.active
+
+import signal
+import pdb
+import os
+
+class _Scheduler:
+
+    _close_mask = _select.EPOLLHUP|_select.EPOLLERR
+    _read_mask = _select.EPOLLIN|_select.EPOLLPRI|_close_mask
+    _read_mask |= getattr(_select, 'EPOLLRDHUP', 0)
+    _write_mask = _select.EPOLLOUT|_close_mask
+
+    def __init__(self):
+        self._poll_obj = _select.epoll()
+        open_max = _os.sysconf(_os.sysconf_names['SC_OPEN_MAX'])
+        self._readers = [set() for _ in range(open_max)]
+        self._writers = [set() for _ in range(open_max)]
+        from collections import defaultdict
+        self._signals = defaultdict(set)
+        self._registered = set()
+        self._deadlines = []
+        import os
+        self._signalfd, self._wakeup_fd = os.pipe()
+        from fcntl import fcntl, F_SETFL
+        from os import O_NONBLOCK
+        fcntl(self._wakeup_fd, F_SETFL, O_NONBLOCK)
+        signal.set_wakeup_fd(self._wakeup_fd)
+        self._ready = []
+        self._original_signal_handlers = {}
+        self.spawn(self._signalfd_reader)
+
+    def run_fiber(self, fiber, arg=None):
+        global current
+        current = fiber
+        try:
+            return fiber.send(arg)
+        except StopIteration:
+            return arg
+        finally:
+            current = None
+
+    def handle_event(self, event):
+        event = self.run_fiber(event.fiber, event)
+        if event:
+            event.remove()
+
+    def _signalfd_reader(self):
+        self.event('read', self._signalfd)
+        while True:
+            yield
+            for signo in os.read(self._signalfd, 0x100):
+                for event in self._signals[signo]:
+                    self.handle_event(event)
+
+    def run(self):
+        while any([self._registered - {self._signalfd}, self._ready, self._deadlines]):
+            if self._registered - {self._signalfd}:
+                if self._ready:
+                    timeout = 0
+                elif self._deadlines:
+                    timeout = max(0, self._deadlines[0][0])
+                else:
+                    timeout = -1
+                for fd, mask in self._poll_obj.poll(timeout):
+                    #~ if mask & self._close_mask:
+                        #~ self._registered.remove(fd)
+                    if mask & self._read_mask:
+                        readers = self._readers[fd]
+                        for event in readers.copy():
+                            if event in readers:
+                                self.handle_event(event)
+                    if mask & self._write_mask:
+                        writers = self._writers[fd]
+                        for event in writers.copy():
+                            if event in writers:
+                                self.handle_event(event)
+            ready = self._ready
+            self._ready = []
+            for gen, arg in ready:
+                self.run_fiber(gen, arg)
+
+    def spawn(self, func, *args, **kwargs):
+        self.schedule(func(*args, **kwargs))
+
+    def schedule(self, fiber, arg=None):
+        self._ready.append((fiber, arg))
+
+    def event(self, filter, data):
+        return _Event(self, filter, data)
+
+    def _update_poll(self, fd):
+        mask = _select.EPOLLIN if self._readers[fd] else 0
+        mask |= _select.EPOLLOUT if self._writers[fd] else 0
+        if fd in self._registered:
+            if mask:
+                self._poll_obj.modify(fd, mask)
+            else:
+                self._poll_obj.unregister(fd)
+                self._registered.remove(fd)
+        else:
+            if mask:
+                self._poll_obj.register(fd, mask)
+                self._registered.add(fd)
+
+    def _signal_handler(self, signum, frame):
+        pass
+
+    def add_event(self, event):
+        if event.filter == 'read':
+            fd = fileno(event.data)
+            self._readers[fd].add(event)
+            self._update_poll(fd)
+        elif event.filter == 'write':
+            fd = fileno(event.data)
+            self._writers[fd].add(event)
+            self._update_poll(fd)
+        elif event.filter == 'signal':
+            signum = event.data
+            self._signals[event.data].add(event)
+            handler = signal.getsignal(signum)
+            def meh(signum, frame):
+                pass
+            if handler in (signal.SIG_IGN, signal.SIG_DFL, meh):
+                signal.signal(signum, meh)
+            else:
+                assert False, (signum, handler)
+        else:
+            raise ValueError('Unknown filter', event.filter)
+
+    def remove_event(self, event):
+        if event.filter == 'read':
+            fd = fileno(event.data)
+            self._readers[fd].remove(event)
+            self._update_poll(fd)
+        elif event.filter == 'write':
+            fd = fileno(event.data)
+            self._writers[fd].remove(event)
+            self._update_poll(fd)
+        elif event.filter == 'signal':
+            signum = event.data
+            if signal.getsignal(signum) == self._signal_handler:
+                if signal.signal(signum, self._original_signal_handlers.pop(signum)) != self._signal_handler:
+                    raise RuntimeError('signal handler changed unexpectedly')
+        else:
+            raise ValueError('unknown filter', event.filter)
+
+scheduler = _Scheduler()
+schedule = scheduler.schedule
+
+run = scheduler.run
+
+event = scheduler.event
+Event = event