Source

asyncthreads / asyncthreads / reactor.py

Full commit
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
"""
Reactor pattern to call queued functions in a separate thread.

Functions can be queued to be called by the reactor main thread, or by a thread
in the reactor's thread pool.  Functions can be be queued to be called
immediately (as soon as calling thread is available), or queued to be called at
a specified later time.

"""
__author__ = "Andrew Gillis"
__license__ = "http://www.opensource.org/licenses/mit-license.php"
__maintainer__ = "Andrew Gillis"
__email__ = "gillis.andrewj@gmail.com"

import threading
import traceback
import heapq
import time
from collections import Callable
try:
    import queue
except ImportError:
    import Queue as queue

import threadpool


class Result(object):

    """
    A Result object is returned by a Reactor's call_() methods.

    The Result object allows the caller to wait for and retrieve the result of
    a function called by the Reactor.  This not only includes any return from
    the called function, but any exception as well.

    A Result object is also used to identify a scheduled function to cancel,
    and it can be used to check if a scheduled call has been canceled.

    """

    slots = ('_call_done', '_rsp', '_exception', '_traceback_str', '_task_id',
             '_result_q')
    def __init__(self, result_q):
        self._call_done = threading.Event()
        self._rsp = None
        self._exception = None
        self._traceback_str = None

        # For canceling scheduled tasks.
        self._task_id = None

        # Optional queue to put this Result on when completed.
        self._result_q = result_q


    def get_result(self, timeout=None):
        if self._call_done.wait(timeout):
            if self._exception:
                raise self._exception

            return self._rsp
        return None


    def has_result(self):
        return self._call_done.is_set()


    def get_traceback(self):
        return self._traceback_str


    def is_canceled(self):
        return self._task_id is False


class _Deferred(object):
    # Internal object used to transition from execution in main thread to
    # execution in pool thread.
    #
    slots = ('action_tuple',)
    def __init__(self, func, args=None, callback=None):
        self.action_tuple = (func, args, callback)


class Reactor(threading.Thread):

    """
    The Reactor executes queued functions in a single thread.  Functions can be
    executed immediately, in the order they are enqueued, or they can be
    executed at some later time.  Functions may also be executed, immediately
    or later, in threads separate from the reactor's main thread.

    All functions executed by the reactor have an associated Result object.
    A Result object allows the caller to wait for and retrieve the result of a
    function called by the reactor.

    The reason to use a Reactor is to execute functions in a separate thread,
    asynchronously, from the thread of the caller.

    """

    # How long to wait for thread to shutdown.
    SHUTDOWN_FAILURE_TIMEOUT_SEC = 10


    def __init__(self, thread_pool_size=0, queue_size=None):
        """
        Initialize Reactor thread object and internal thread pool (if used).

        Arguments:
        pool_size     -- Number of threads to have in pool.  If this is set to
                         0 or None, then the reactor will not use a thread pool
                         and will start threads directly.
        queue_size    -- Maximum items allowed in task queue.  None means no
                         limit.  This is ignored if thread pool not used.

        """
        threading.Thread.__init__(self, None, None, None)
        self._call_queue = queue.Queue()
        self.idle_wake_sec = None
        if thread_pool_size:
            self._thread_pool = threadpool.ThreadPool(thread_pool_size,
                                                      queue_size)
        else:
            self._thread_pool = None

        self._task_heap = []


    def call(self, func, args=None, callback=None, result_q=None):
        """
        Enqueue a function for the reactor to run in its main thread.

        The caller can wait on the response using the returned Result object.
        Running a function in the reactor's main thread means that no other
        functions will be running in the reactor's main thread at the same
        time.  This can be used in place of other synchronization.

        Arguments:
        func     -- Function to execute.
        args     -- Argument tuple to pass to function.
        callback -- Optional callback that takes a Result as its argument.
        result_q -- Optional response queue to place complete Result on.

        Return:
        Result object.

        """
        assert(self.is_alive()), 'reactor is not started'
        assert(isinstance(func, Callable)), 'function is not callable'
        assert(callback is None or
               isinstance(callback, Callable)),'callback is not callable'
        assert(result_q is None or
               isinstance(result_q, queue.Queue)), 'result_q is not Queue'
        result = Result(result_q)
        self._call_queue.put((func, args, result, callback))
        return result


    def call_later(self, time_until_call, func, args=None, callback=None,
                   result_q=None):
        """
        Run a function in the reactor's main thread at a later time.

        The caller can wait on the response using the returned Result object.

        Arguments:
        time_until_call -- Seconds remaining until call.
        func            -- Function to execute.
        args            -- Argument tuple to pass to function.
        callback        -- Optional callback that takes a Result as its
                           argument.
        result_q        -- Optional response queue to place complete Result on.

        Return:
        Result object.

        """
        assert(self.is_alive()), 'reactor is not started'
        assert(isinstance(func, Callable)), 'function is not callable'
        assert(callback is None or
               isinstance(callback, Callable)), 'callback is not callable'
        if time_until_call:
            action_time = time.time() + time_until_call
        else:
            action_time = 0

        result = Result(result_q)
        d_data = (func, args, result, callback)
        self._call_queue.put((None, d_data, action_time, False))
        return result


    def call_in_thread(self, func, args=None, callback=None, result_q=None):
        """
        Run the given function in a separate thread.

        The caller can wait on the response using the returned Result object.
        A thread from the reactor's internal thread pool is used to avoid the
        overhead of creating a new thread.

        Arguments:
        func     -- Function to execute.
        args     -- Argument tuple to pass to function.
        callback -- Optional callback that takes a Result as its argument.
        result_q -- Optional response queue to place complete Result on.

        Return:
        Result object.

        """
        assert(self.is_alive()), 'reactor is not started'
        assert(isinstance(func, Callable)), 'function is not callable'
        assert(callback is None or
               isinstance(callback, Callable)), 'callback is not callable'
        result = Result(result_q)
        if self._thread_pool:
            self._thread_pool.queue_task(self._thread_wrapper,
                                         (func, args, result, callback))
        else:
            th = threading.Thread(target=self._thread_wrapper,
                                  args=(func, args, result, callback))
            th.daemon = True
            th.start()
        return result


    def call_in_thread_later(self, time_until_call, func, args=None,
                             callback=None, result_q=None):
        """
        Run the given function in a separate thread, at a later time.

        The caller can wait on the response using the returned Result object.
        A thread from the reactor's internal thread pool is used to avoid the
        overhead of creating a new thread.

        Arguments:
        time_until_call -- Seconds remaining until call.
        func            -- Function to execute.
        args            -- Argument tuple to pass to function.
        callback        -- Optional callback that takes a Result as its
                           argument.
        result_q        -- Optional response queue to place complete Result on.

        Return:
        Result object.

        """
        assert(self.is_alive()), 'reactor is not started'
        assert(isinstance(func, Callable)), 'function is not callable'
        assert(callback is None or
               isinstance(callback, Callable)), 'callback is not callable'
        if time_until_call:
            action_time = time.time() + time_until_call
        else:
            action_time = 0

        result = Result(result_q)
        d_data = (func, args, result, callback)
        self._call_queue.put((None, d_data, action_time, True))
        return result


    def cancel_scheduled(self, result):
        """
        Cancel a call scheduled to be called at a later time.

        Arguments:
        result -- Result object returned from call_later() or from
                  call_in_thread_later().

        Return:
        True if scheduled call was successfully canceled.  False if scheduled
        call was not found (already executed) or already canceled.

        """
        # The result's task_id cannot be checked in this method.  The request
        # must be placed on the reactor's call queue to ensure that a scheduled
        # task request is received by the reactor before this request to cancel
        # it is received.
        tmp_result = self.call(self._cancel_scheduled, result)
        # Wait for reactor to remove task, and return whether or not the task
        # was removed.
        return tmp_result.get_result()


    def shutdown(self):
        """
        Cause the reactor thread to exit gracefully.

        """
        if not self.is_alive():
            #print "thread is not started"
            return False

        self._call_queue.put(None)
        if self._thread_pool is not None:
            self._thread_pool.shutdown()

        # Wait for thread to exit.
        self.join(Reactor.SHUTDOWN_FAILURE_TIMEOUT_SEC)
        if self.is_alive():
            #print '===> timed out waiting to join:', self.getName()
            return False

        #print '===> thread exited:', self.getName()
        return True


    def qsize(self):
        """Return number of items in the message queue."""
        return self._call_queue.qsize()


    def empty(self):
        """Return True is no items in the message queue."""
        return self._call_queue.empty()


    def defer_to_thread(self, func, args=None, callback=None):
        """
        Defer execution from main thread to pooled thread, and use the caller's
        Result object to return the result of the additional execution.

        Arguments:
        func     -- Function to execute in pooled thread.
        args     -- Arguments to pass to func.
        callback -- Optional function to call with the Results of calling func.

        Returns:
        Special object recognized by Reactor to set up deferred call.

        """
        return _Deferred(func, args, callback)


    #==========================================================================
    # Private methods follow:
    #==========================================================================

    def _thread_wrapper(self, func, args, result, callback):
        try:
            if args is None:
                rsp = func()
            elif isinstance(args, (tuple, list, set)):
                rsp = func(*args)
            elif isinstance(args, dict):
                rsp = func(**args)
            else:
                rsp = func(args)
        except Exception as e:
            rsp = None
            result._exception = e
            result._traceback_str = traceback.format_exc()

        if isinstance(rsp, _Deferred):
            func, args, new_callback = rsp.action_tuple
            if new_callback is not None:
                callback = new_callback
            if self._thread_pool:
                self._thread_pool.queue_task(
                    self._thread_wrapper, (func, args, result, callback))
            else:
                th = threading.Thread(
                    target=self._thread_wrapper,
                    args=(func, args, result, callback))
                th.daemon = True
                th.start()
            return

        result._rsp = rsp
        result._call_done.set()
        if callback:
            try:
                callback(result)
            except Exception as e:
                rsp = None
                result._exception = e
                result._traceback_str = traceback.format_exc()
        if result._result_q:
            result_q = result._result_q
            result._result_q = None
            try:
                result_q.put(result)
            except queue.Full:
                pass


    def _process_scheduled_queue(self):
        """
        Process all scheduled tasks that are currently due.

        """
        task_heap = self._task_heap
        # Execute the first task without checking the time, because this method
        # is only called when a scheduled task is due.
        task_time, task_id, task_data, call_in_thread = heapq.heappop(
            task_heap)

        # If this task has been removed, then do nothing.
        if task_data is None:
            return

        def do_sched_task(func, args, result, callback):
            if call_in_thread:
                if self._thread_pool is not None:
                    # Call thread pool to execute scheduled function.
                    self._thread_pool.queue_task(
                        self._thread_wrapper, (func, args, result, callback))
                else:
                    # Create new thread to execute scheduled function.
                    th = threading.Thread(
                        target=self._thread_wrapper,
                        args=(func, args, result, callback))
                    th.daemon = True
                    th.start()
            else:
                #print '===> calling scheduled function'
                self._thread_wrapper(func, args, result, callback)


        do_sched_task(*task_data)
        if task_heap:
            now = time.time()
            next_time = task_heap[0][0]
            while task_heap and (not next_time or next_time <= now):
                task_time, task_id, task_data, call_in_thread = heapq.heappop(
                    task_heap)
                do_sched_task(*task_data)
                if task_heap:
                    next_time = task_heap[0][0]


    def _cancel_scheduled(self, result):
        """
        Mark a scheduled call, in scheduled task heap, as canceled.

        """
        task_id = result._task_id
        if task_id is None or task_id is False:
            # Task is not scheduled or is already canceled.
            return False
        task_heap = self._task_heap
        for idx, heap_item in enumerate(task_heap):
            task_time, heap_task_id, task_data, call_in_thread = heap_item
            if heap_task_id == task_id:
                func, args, result, callback = task_data
                result._rsp = None
                result._task_id = False
                result._call_done.set()
                # Need to keep task_id to preserve heap invariant.
                task_heap[idx] = (task_time, heap_task_id, None, None)
                return True
        return False


    def run(self):
        """
        Function run in thread, started by the start() method.

        """
        call_queue = self._call_queue
        task_heap = self._task_heap
        next_task_id = 0
        while True:
            sleep_time = None
            # If there are items on the scheduled task heap, then sleep until
            # next scheduled task is due.
            if task_heap:
                now = time.time()
                # Look at timer at top of heap.
                action_time = task_heap[0][0]
                if not action_time or action_time <= now:
                    sleep_time = 0
                else:
                    sleep_time = action_time - now

            try:
                #print '===> sleeping for %s seconds, or until next call put on queue' % (sleep_time,)
                # Get the next function from the call queue.
                action_tuple = call_queue.get(True, sleep_time)
                if action_tuple is None:
                    break
                func, args, result, callback = action_tuple

                # If this is a new scheduled task.
                if func is None:
                    time_to_call = result
                    call_in_thread = callback # callback is really thread flag
                    f,a,r,c = args
                    r._task_id = next_task_id
                    heapq.heappush(task_heap,
                                   (time_to_call, next_task_id, args,
                                    call_in_thread))
                    next_task_id += 1
                else:
                    self._thread_wrapper(func, args, result, callback)

            except queue.Empty:
                # Woke up because it is time do next scheduled task.
                #print '===> woke up to do next scheduled task'
                self._process_scheduled_queue()
                continue

            except Exception:
                print('ERROR: '+str(traceback.format_exc()))

        return True