Source

nsocket / nsocket / core / engine / base.py

Full commit
from nsocket.core import engineselector, cothread
import bisect
import time


class Task(object):
    
    def __init__(self, cb, *args, **kwargs):
        self._func = cb, args, kwargs
        self.cancel = false

    def cacel(self):
        self.cancel = True

    def __call__(self):
        if not self.cancel:
            cb, arg, kw = self._func
            return cb(*arg, **kw)


class AbstractEngine(engineselector.Installer):

    def __init__(self):
        self.tasks = []
        self.next_tasks = []
        self.running = False
        self.thread = None
        self._reads = {}
        self._writes = {}
        self.selectable = {}
    
    def schedule(self, seconds, func, args, kwargs):
        scheduled_time = time.time() + seconds
        task = Task(func, *args, **kwargs)
        self.next_tasks.append((scheduled_time, task))
        return task
    
    def prepare_task(self):
        next_tasks = self.next_tasks
        tasks = self.tasks
        insert = bisect.insort
        for item in next_tasks:
            insert(tasks, item)

        del self.next_tasks[:]

    def execute_task(self):
        now = time.time()
        task = self.tasks
        last = bisect_right(task, (now, None))
        for i in xrange(last):
            timer_task = task[i][1]
            timer_task()
        del self.tasks[:last]
    
    def get_event_timeout(self):
        return None

    def run(self):
        self.running = True
        try:
            while self.running:
                self.prepare_task()
                self.execute_task()
                self.prepare_task()
                self.do_event()
        finally:
            self.close()
    
    def do_event(self, timeout):
        timeout = self.get_event_timeout()
        if timeout is None:
            timeout = 60
        else:
            timeout = timeout - time.time()
        self.prepare_event()
        results = self.wait_event(timeout)
        for result in results:
            self.do_read_event(result)
            self.do_write_event(result)
        self.finish_event()
    
    def switch(self):
        """
        if not self.thread:
            self.thread = cothread.create()
            args = ((self.run,),)
        else:
            args = ()
        
        current = cothread.getcurrent()
        try:
            current.parent = self.thread
        except ValueError :
            pass
        return cothread.switch(self.thread, *args)
        """
    
    def set_selectable(self, fd):
        self.selectable[fd] = fd

    def del_selectable(self, fd):
        del self.selectable[fd]

    def add_reader(self, fd, cb):
        raise NotImplementedError("Not Implemented")

    def add_writer(self, fd, cb):
        raise NotImplementedError("Not Implemented")

    def remove_reader(self, fd):
        raise NotImplementedError("Not Implemented")
   
    def remove_writer(self, fd):
        raise NotImplementedError("Not Implemented")

    def prepare_event(self):
        pass

    def wait_event(self, timeout):
        raise NotImplementedError("Not Implemented")

    def do_read_event(self, result):
        raise NotImplementedError("Not Implemented")
    
    def do_write_event(self, results):
        raise NotImplementedError("Not Implemented")
    
    def finish_event(self):
        pass