Source

aurum / python / aurum / repeatedcmd.py

from aurum.utils import readsystem
import time

# Threads do not run in vim on ARM linux. Have to use processes instead.
use_threads=False

if use_threads:
    from multiprocessing import Value
    from Queue import Queue
    from threading import Lock
    from threading import Thread as Process
    def signal(*_):
        pass
    SIGTERM=None
    SIG_DFL=None
else:
    from multiprocessing import Process, Queue, Value, Lock
    from signal import signal, SIGTERM, SIG_DFL

class ProcessHaltedError(Exception):
    pass

notalive = 'Child process is not alive or is exiting'

class RepeatedCmd(object):
    __slots__ = set(('interval', 'queue', 'qlock', 'vlock', 'func', 'process',
                     'value', 'args', 'kwargs', 'paused', 'curinterval'))
    def __init__(self, interval, func, *args, **kwargs):
        self.curinterval = float(interval)
        self.interval    = Value('f', self.curinterval)
        self.queue       = Queue()
        self.vlock       = Lock()
        self.qlock       = Lock()
        self.func        = func
        self.args        = args
        self.kwargs      = kwargs
        self.paused      = Value('b', 0)

        def procfunc(queue, qlock, vlock, interval, func, args, kwargs):
            # Override default signal vim handler with system default behavior 
            # (i.e. just terminate). Without this hack process.terminate() may 
            # leave the process alive. Bad, I do not know why it leaves it alive 
            # only in some limited set of cases.
            signal(SIGTERM, SIG_DFL)
            while interval.value > 0:
                starttime = time.clock()
                if not self.paused.value:
                    vlock.acquire()
                    value     = func(*args, **kwargs)
                    vlock.release()
                    qlock.acquire()
                    # If I only knew the size of func() output I would have used 
                    # Value()
                    while(not queue.empty()):
                        queue.get()
                    queue.put(value)
                    qlock.release()
                try:
                    time.sleep(interval.value-(time.clock()-starttime))
                except Exception:
                    # Work when interval-elapsed results in negative value.
                    pass

        self.process  = Process(target=procfunc,
                                args=(self.queue, self.qlock, self.vlock,
                                      self.interval,
                                      func, args, kwargs))
        self.process.start()
        self.value = None

    def setinterval(self, interval):
        if not self.alive():
            raise ProcessHaltedError(notalive)
        self.curinterval    = float(interval)
        self.interval.value = self.curinterval

    def getvalue(self):
        if not self.alive():
            raise ProcessHaltedError(notalive)
        self.qlock.acquire()
        if self.value is None or not self.queue.empty():
            while(not self.queue.empty()):
                self.value = self.queue.get()
        self.qlock.release()
        return self.value

    def getcurrentvalue(self):
        self.vlock.acquire()
        self.qlock.acquire()
        if self.value is None or not self.queue.empty():
            while(not self.queue.empty()):
                self.queue.get()
        self.qlock.release()
        self.value = self.func(*self.args, **self.kwargs)
        self.vlock.release()
        return self.value

    def alive(self):
        return self.process.is_alive() and self.interval.value > 0

    def stop(self):
        if not self.alive():
            raise ProcessHaltedError(notalive)
        # For some reason process.terminate() sometimes may not seem to work. 
        # This is a workaround. Seems it is no longer needed with 
        # self.process.join(), just keeping it here for multiprocessing.

        # For threading this is the only way to terminate thread (use old 
        # implementation based on sequence of timers?)
        self.interval.value = -1.0
        self.paused.value = 1
        try:
            self.process.terminate()
            # Anti-zombie code
            self.process.join()
        except AttributeError:
            # Threads do not have .terminate method
            pass

    def pause(self):
        if not self.alive():
            raise ProcessHaltedError(notalive)
        if not self.paused.value:
            self.paused.value   = 1
            self.interval.value = 1.0

    def resume(self):
        if not self.alive():
            raise ProcessHaltedError(notalive)
        if self.paused.value:
            self.paused.value   = 0
            self.interval.value = self.curinterval

    def __del__(self):
        if self.alive():
            return self.stop()

processes={}

thcount=0

def new(*args, **kwargs):
    global processes
    global thcount
    rcid=thcount
    thcount+=1
    processes[rcid]=RepeatedCmd(*args, **kwargs)
    return rcid

def get(rcid, now=False):
    global processes
    if now:
        return processes[rcid].getcurrentvalue()
    else:
        return processes[rcid].getvalue()

def remove(rcid):
    global processes
    process=processes.pop(rcid)
    process.stop()

def finish():
    global processes
    for process in processes.values():
        process.stop()
    processes={}

def pause(rcid):
    global processes
    processes[rcid].pause()

def resume(rcid):
    global processes
    processes[rcid].resume()