Source

green380 / gthread.py

Full commit

from time import time
import select as _select

import socket as _socket


class socket(_socket.socket):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.setblocking(False)

    def until_recv_term(self, term):
        with scheduler.event('read', self):
            buf = b''
            while not buf.endswith(term):
                yield
                buf1 = self.recv(0x10000, _socket.MSG_PEEK)
                if not buf1:
                    break
                index = (buf + buf1).find(term)
                buf2 = self.recv(len(buf1) if index == -1 else index - len(buf) + len(term))
                buf += buf2
            return buf

    def until_recv(self, count):
        yield
        return self.recv(term)

    def until_sendall(self, buf):
        while buf:
            sent = yield from self.until_send(buf)
            if not sent:
                break
            buf = buf[sent:]

    def until_send(self, buf):
        with Event('write', self) as can_send:
            event = yield
            if event is can_send:
                return self.send(buf)
            yield event

    def until_accept(self):
        with event('read', self):
            yield
            return self.accept()

    def accept(self):
        sock, addr = super().accept()
        return self.__class__(fileno=sock.detach()), addr


import collections

class Channel:

    def __init__(self):
        #~ self._items = collections.deque()
        self._receivers = set()
        self._senders = set()

    def get(self):
        #~ if self._items:
            #~ return self._items.popleft()
        if self._senders:
            item, sender = self._senders.pop()
            schedule(sender)
            return item
        self._receivers.add(current)
        return (yield)

    def put(self, item):
        if self._receivers:
            schedule(self._receivers.pop(), item)
        else:
            self._senders.add((item, current))
            yield

def spawn(*args, **kwargs):
    scheduler.spawn(*args, **kwargs)


class timeout:

    def __init__(self, timeout):
        self.deadline = timeout + time()

    def __enter__(self):
        scheduler().add_deadline(self.deadline)

    def __exit__(self):
        scheduler().remove_deadline(self.deadline)

import os as _os

from fileno import fileno


class _Event:

    def __init__(self, scheduler, filter, data):
        self.scheduler = scheduler
        self.filter = filter
        self.data = data
        self.fiber = current
        self.add()

    def __enter__(self):
        self.add()
        return self

    def __exit__(self, *args):
        self.remove()

    def add(self):
        self.scheduler.add_event(self)
        self.active = True

    def remove(self):
        self.scheduler.remove_event(self)
        self.active = False

    def __bool__(self):
        return self.active

import signal
import pdb
import os

class _Scheduler:

    _close_mask = _select.EPOLLHUP|_select.EPOLLERR
    _read_mask = _select.EPOLLIN|_select.EPOLLPRI|_close_mask
    _read_mask |= getattr(_select, 'EPOLLRDHUP', 0)
    _write_mask = _select.EPOLLOUT|_close_mask

    def __init__(self):
        self._poll_obj = _select.epoll()
        open_max = _os.sysconf(_os.sysconf_names['SC_OPEN_MAX'])
        self._readers = [set() for _ in range(open_max)]
        self._writers = [set() for _ in range(open_max)]
        from collections import defaultdict
        self._signals = defaultdict(set)
        self._registered = set()
        self._deadlines = []
        import os
        self._signalfd, self._wakeup_fd = os.pipe()
        from fcntl import fcntl, F_SETFL
        from os import O_NONBLOCK
        fcntl(self._wakeup_fd, F_SETFL, O_NONBLOCK)
        signal.set_wakeup_fd(self._wakeup_fd)
        self._ready = []
        self._original_signal_handlers = {}
        self.spawn(self._signalfd_reader)

    def run_fiber(self, fiber, arg=None):
        global current
        current = fiber
        try:
            return fiber.send(arg)
        except StopIteration:
            return arg
        finally:
            current = None

    def handle_event(self, event):
        event = self.run_fiber(event.fiber, event)
        if event:
            event.remove()

    def _signalfd_reader(self):
        self.event('read', self._signalfd)
        while True:
            yield
            for signo in os.read(self._signalfd, 0x100):
                for event in self._signals[signo]:
                    self.handle_event(event)

    def run(self):
        while any([self._registered - {self._signalfd}, self._ready, self._deadlines]):
            if self._registered - {self._signalfd}:
                if self._ready:
                    timeout = 0
                elif self._deadlines:
                    timeout = max(0, self._deadlines[0][0])
                else:
                    timeout = -1
                for fd, mask in self._poll_obj.poll(timeout):
                    #~ if mask & self._close_mask:
                        #~ self._registered.remove(fd)
                    if mask & self._read_mask:
                        readers = self._readers[fd]
                        for event in readers.copy():
                            if event in readers:
                                self.handle_event(event)
                    if mask & self._write_mask:
                        writers = self._writers[fd]
                        for event in writers.copy():
                            if event in writers:
                                self.handle_event(event)
            ready = self._ready
            self._ready = []
            for gen, arg in ready:
                self.run_fiber(gen, arg)

    def spawn(self, func, *args, **kwargs):
        self.schedule(func(*args, **kwargs))

    def schedule(self, fiber, arg=None):
        self._ready.append((fiber, arg))

    def event(self, filter, data):
        return _Event(self, filter, data)

    def _update_poll(self, fd):
        mask = _select.EPOLLIN if self._readers[fd] else 0
        mask |= _select.EPOLLOUT if self._writers[fd] else 0
        if fd in self._registered:
            if mask:
                self._poll_obj.modify(fd, mask)
            else:
                self._poll_obj.unregister(fd)
                self._registered.remove(fd)
        else:
            if mask:
                self._poll_obj.register(fd, mask)
                self._registered.add(fd)

    def _signal_handler(self, signum, frame):
        pass

    def add_event(self, event):
        if event.filter == 'read':
            fd = fileno(event.data)
            self._readers[fd].add(event)
            self._update_poll(fd)
        elif event.filter == 'write':
            fd = fileno(event.data)
            self._writers[fd].add(event)
            self._update_poll(fd)
        elif event.filter == 'signal':
            signum = event.data
            self._signals[event.data].add(event)
            handler = signal.getsignal(signum)
            def meh(signum, frame):
                pass
            if handler in (signal.SIG_IGN, signal.SIG_DFL, meh):
                signal.signal(signum, meh)
            else:
                assert False, (signum, handler)
        else:
            raise ValueError('Unknown filter', event.filter)

    def remove_event(self, event):
        if event.filter == 'read':
            fd = fileno(event.data)
            self._readers[fd].remove(event)
            self._update_poll(fd)
        elif event.filter == 'write':
            fd = fileno(event.data)
            self._writers[fd].remove(event)
            self._update_poll(fd)
        elif event.filter == 'signal':
            signum = event.data
            if signal.getsignal(signum) == self._signal_handler:
                if signal.signal(signum, self._original_signal_handlers.pop(signum)) != self._signal_handler:
                    raise RuntimeError('signal handler changed unexpectedly')
        else:
            raise ValueError('unknown filter', event.filter)

scheduler = _Scheduler()
schedule = scheduler.schedule

run = scheduler.run

event = scheduler.event
Event = event