Source

DailyPromptBot / minibot / eventscheduler.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# The Daily Prompt Mini-Bot - A Shut Up and Write Project
# Author: Marc-Alexandre Chan <laogeodritt at arenthil.net>
#-------------------------------------------------------------------------------
#
# Copyright (c) 2012 Marc-Alexandre Chan. Licensed under the GNU GPL version 3
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#-------------------------------------------------------------------------------

from minibot.util import classname, log_exc_info
from minibot.errors import ResourceError, AttachedEventError, InvalidUserPass,\
    LoginRequired, NotLoggedIn, RateLimitExceeded, EventNotFoundError,\
    HTTPError, MinibotError, SchedulerStoppedError

from praw import Reddit

import time
import logging
from os.path import realpath

class EventScheduler(object):
    """ Event scheduler. Manages and runs events at specified times and
    repetition intervals. This class acts as the kernel/event scheduler to the
    bot's main processes.

    The scheduler checks the event queue at the ``refresh_rate`` specified
    in the config, and executes all tasks that are due or overdue at the current
    time. This occurs synchronously. If an event is queued but misses its
    execution time due to other events taking too long to run, it is executed
    at the next iteration; more overdue tasks are prioritised. When
    calculating the time a periodic (repeating) event should next execute,
    it is scheduled for the next FUTURE time (so if it takes longer than its
    interval time to run, it will skip a run). As such, exact timing is never
    guaranteed.

    The scheduler is NOT designed for threading, although it probably should!

    Events can be queued in the scheduler in any order. You are not restricted
    to adding events to the end of the queue, like with Python's ``sched``
    module.

    Events will always have an ``owner`` attribute set by the scheduler before
    starting or running. This is a reference back to the EventScheduler object,
    and can be used to modify events in the queue.

    Call ``queue_event()`` to queue event objects. Call ``run()`` to run the
    scheduler; this method will exit when no events exist in the queue.

    The logging uses the 'event', 'event.<event class name>' and 'scheduler'
    children of the provided logger instance.

    Implementation details:

    * Queue entry format: (exec_time, priority, event_status, event)

    """

    # Queued event statuses
    STATUS_QUEUED = 0   # queued, not started
    STATUS_RUNNING = 1  # started, on periodic (interval) runs
    STATUS_ONESHOT = 2  # queued, not started, one-shot event
    STATUS_ENDING  = 3  # queued, runs ending at the given time
    STATUS_STRING = {STATUS_QUEUED : 'Queued',
                     STATUS_RUNNING : 'Running',
                     STATUS_ONESHOT : 'Queued (one-shot)',
                     STATUS_ENDING : 'Ending'}

    # Exception handling return values
    EXC_UNHANDLED = 0 # exception could not be handled
    EXC_HANDLED_RETRY = 1 # exception handled, safe to requeue same event
    EXC_HANDLED_CONTINUE = 2 # exception handled, requeue next iteration
    EXC_HANDLED_FINAL = 3 # exception handled,do not requeue

    def __init__(self, config, reddit, database, logger, **res):
        """ Initialise the Minibot. ``config`` should be an instance of the
        ``minibot.Config`` class with a loaded configuration file, or an
        instance of a class with the same accessible attributes. ``logger``
        should be the parent logger to use (EventScheduler will always
        use the ``scheduler`` child of this logger).

        kwargs should be other resource objects that can be made available to
        events. Note that reserved names 'config', 'dbsession', 'reddit',
        'approved' and 'logger' cannot be used. """

        # resources
        self._approved = [u for u in config.users.list()\
                         if config.users.get_level(u) >= 100]
        self._config = config
        self._reddit = reddit
        self._db     = database
        self._res    = res

        # state
        self._stop_flag = False
        self._exit_flag = False
        self._queue = []
        self._next_run = 0

        # components
        self.logger = logger.getChild('scheduler') if logger is not None\
                        else logging.getLogger('scheduler')

        self.__init_validate()

    def __init_validate(self):
        """ Validates resources on initialisation. Raises an exception if issues
        are found. See the source for details. """
        # logged in test
        if not self._reddit.user:
            # may raise reddit.errors.InvalidUserPass
            self._reddit.login(
                self._config.reddit.user,
                self._config.reddit.password)

        # valid target reddit test
        try:
            self._reddit.get_subreddit(self._config.reddit.target).content_id
        except AttributeError:
            raise InvalidRedditError("{} is not a valid reddit".format(
                self._config.reddit.target))

    @property
    def queue(self):
        """ Return a generator for the current queue. Entries are returned in
        sorted order of execution. """
        for exec_time, priority, event_type, event in self._queue:
            yield event

    def run(self):
        """ Run the event scheduler. The event scheduler will run until an
        exit or stop request is made by an event, or no more events exist in
        queue. """
        self.logger.info("%s: Starting event scheduler...", classname(self))
        self._next_run = time.time()
        self._exit_flag = self._stop_flag = False
        while not self._exit_flag and not self._stop_flag and self._queue:
            # if not yet the next run time, sleep until then
            if time.time() < self._next_run:
                try:
                    time.sleep(self._next_run - time.time())
                except IOError: # signal interrupts sleep
                    continue # then we want to reevaluate exit status

            # execute an iteration
            self._execute_iteration()

            # update the next run time
            self._next_run += self._config.minibot.refresh_rate
            # if execution took longer than the refresh rate, do next iter now
            now = time.time()
            if now > self._next_run:
                self._next_run = now

        if self._exit_flag:
            self.exit()
        elif self._stop_flag:
            self.logger.info("%s: Stopped the event scheduler.",classname(self))
        else:
            self.logger.info("%s: Queue empty. Stopped the event scheduler.",
                classname(self))

    def _execute_iteration(self):
        """ Execute an iteration. Pops events off the queue and executes them
        until an event is found which is scheduled for the future.

        Assumes there is at least one item in queue. """
        now = self._next_run # get the timestamp for the current iteration
        while self._queue and self._queue[0][0] <= now:
            evtuple = self._queue.pop(0)
            try:
                self._execute_event(*evtuple)
            except Exception as e:
                self._handle_exception(*(evtuple + (e,)))
            else: # if no exception occurs in the exec
                self._reschedule_event(*evtuple)

    def _execute_event(self, evtime, priority, status, event):
        if status == self.STATUS_QUEUED:
            self.logger.info("%s: Starting event: %s",
                             classname(self), repr(event))
            event.start()
        elif status == self.STATUS_ONESHOT:
            self.logger.info("%s: Running one-shot event: %s",
                             classname(self), repr(event))
            event.start()
            event.end()
        elif status == self.STATUS_RUNNING:
            self.logger.debug("%s: Running event: %s",
                classname(self), repr(event))
            event.run()
        elif status == self.STATUS_ENDING:
            self.logger.info("%s: Terminating event: %s",
                             classname(self), repr(event))
            event.end()

    DELAY_RATE_ERROR = 2
    DELAY_HTTPERROR  = 30
    def _handle_exception(self, evtime, priority, status, event, exc):
        """ Handle an exception that occurred in an event. Returns True if
        successfully handled, False otherwise. """
        # get exception info
        basic, src, trace = log_exc_info()
        self.logger.error("%s: A '%s' was raised by %s %s",
            classname(self), classname(exc), classname(event), repr(event))
        self.logger.warn(*basic)
        self.logger.debug(*src)
        self.logger.debug(*trace)

        # pass exception to event exception handler
        try: # just in case
            handler_status = event.handle_exception(exc)
        except Exception as ehe: # UGH WHYYYYYY
            handler_status = self.EXC_UNHANDLED
            basic2, src2, trace2 = log_exc_info()
            self.logger.error(
                "%s: %s was raised in the exception handler for %s",
                classname(self), classname(ehe), repr(event))
            self.logger.error(*basic2)
            self.logger.debug(*src2)
            self.logger.debug(*trace2)

        # original exception
        if handler_status > self.EXC_UNHANDLED: # event recovered from error
            self.logger.warn(
                "%s: '%s' was raised and handled by %s %s",
                classname(self), classname(exc), classname(event), repr(event))
            if handler_status == self.EXC_HANDLED_CONTINUE:
                self.logger.info("%s: Resuming %s %s",
                    classname(self), classname(event), repr(event))
                self._reschedule_event(evtime, priority, status, event)
            elif handler_status == self.EXC_HANDLED_RETRY:
                self.logger.info("%s: Scheduling retry for %s %s",
                    classname(self), classname(event), repr(event))
                self._retry_event(evtime, priority, status, event)
            else:
                self.logger.info("%s: Dropping %s %s",
                    classname(self), classname(event), repr(event))
            return True

        # TODO: any other recoverable cases?
        # errors that can be handled by the scheduler
        elif isinstance(exc, InvalidUserPass) or\
             isinstance(exc, LoginRequired) or\
             isinstance(exc, NotLoggedIn):
            self.__init_validate() # recheck Reddit state
            self._retry_event(evtime, priority, status, event)
            return True

        elif isinstance(exc, RateLimitExceeded):
            # give it a bit of breathing room
            if not hasattr(event, 'delay') or event.delay<self.DELAY_RATE_ERROR:
                event.delay = self.DELAY_RATE_ERROR
            self._retry_event(evtime, priority, status, event)
            return True

        elif isinstance(exc, HTTPError) or\
                (isinstance(exc, ValueError) and "No JSON object" in exc):
            # oh, is Reddit having some issues?
            if not hasattr(event, 'delay') or event.delay<self.DELAY_HTTPERROR:
                event.delay = self.DELAY_HTTPERROR
            self._retry_event(evtime, priority, status, event)
            return True

        else: # unrecoverable error in event
            self.logger.critical("%s: Dropped event due to unhandled "
                "exception: %s. This operation is unsafe and may leave "
                "open handles or cause errors or instability.",
                classname(self), repr(event))

            # clean up resources that need it explicitly
            if event.res.get('dbsession', None) is not None:
                event.res['dbsession'].rollback()
                event.res['dbsession'].close()
                del event.res['dbsession']
            # and don't reschedule/retry event - just let it drop
            del event
            return False

    def exit(self):
        """ Clean up events and exit the scheduler. This method should be called
        before destroying the EventScheduler or exiting the application. """
        self.logger.info("%s: Exiting the event scheduler.", classname(self))
        self.logger.info("%s: Stopping all events.", classname(self))
        while self._queue:
            evtime, evprio, evstatus, event = self._queue.pop(0)
            if evstatus == self.STATUS_RUNNING or\
               evstatus == self.STATUS_ENDING:
                self.logger.debug("%s: Stopping event: %s",
                                  classname(self), repr(event))
                try:
                    event.end() # allow it to do cleanup
                except Exception as e:
                    self._handle_exception(evtime, evprio, evstatus, event, e)
                del event
            elif evstatus == self.STATUS_QUEUED or\
                 evstatus == self.STATUS_ONESHOT:
                self.logger.debug("%s: Dropping queued event: %s",
                                  classname(self), repr(event))
                del event

    def queue_event(self, event):
        """ Queue a new event object for execution. The event object should
        have been newly constructed; an event object that has already been
        queued or executed may not have a consistent internal state for queueing
        and starting. ResourceError is raised if the event requests an invalid
        resource in ``required_res``. """
        if hasattr(event, 'owner') and event.owner is not None:
            raise AttachedEventError(("Cannot queue Event '{}': "
                "Event is already attached to an EventScheduler.").\
                format(classname(event)))

        self.prepare_event(event)
        if event.interval <= 0 or event.duration == 0:
            status = self.STATUS_ONESHOT
        else:
            status = self.STATUS_QUEUED

        self._insert_event(event.start_time, event.priority, status, event)
        self.logger.info(
            "%s: Queued event: time=%s, priority=%d, status=%s, event=%s",
            classname(self),
            time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.start_time)),
            event.priority, self.STATUS_STRING[status], repr(event))

    def remove_event(self, event):
        """ Remove a particular event object from the queue. """
        del_index = None
        for q_i, q_evtuple in enumerate(self._queue):
            if event is q_evtuple[3]:
                evtuple = q_evtuple
                del_index = q_i
                break
        if del_index is not None:
            self.logger.info(
                "%s: Queued event: time=%s, priority=%d, status=%s, event=%s",
                classname(self),
                time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(evtuple[0])),
                evtuple[1], self.STATUS_STRING[evtuple[2]], repr(event))
            del self._queue[del_index]
        else:
            raise EventNotFoundError(event)

    def _insert_event(self, evtime, priority, status, event):
        """ Insert an event into the appropriate location in a queue. """
        if self._exit_flag:
            raise SchedulerStoppedError()

        evtuple = (evtime, priority, status, event)
        insert_at = len(self._queue) # insert at end by default
        for q_i, q_evtuple in enumerate(self._queue):
            if q_evtuple > evtuple:
                insert_at = q_i
                break
        self._queue.insert(insert_at, evtuple)

    def _reschedule_event(self, evtime, priority, status, event):
        """ Check whether the event needs to be rescheduled and reschedules it.
        Called during the main loop. If you want to reschedule the SAME run
        (i.e. not advance the status, if applicable), """
        # using _next_run calibrates to the actual iteration runtime, not the
        # event's expected runtime, and _next_run resets in _execute_iteration()
        # if it lags behind, so no worries about the reschedules lagging behind
        # with long-runtime events
        now = self._next_run
        r_priority = event.priority

        # refresh resources in the event
        res_error = False
        try:
            self.prepare_event(event)
        except ResourceError as e:
            # error already logged in prepare_event - just take care of cleanup
            self.logger.warning("%s: Event dropped: %s",
                classname(self), repr(event))
            try: # if dbsession assigned, roll it back
                event.res['dbsession'].rollback()
            except KeyError, AttributeError:
                pass
            res_error = True

        if status == self.STATUS_QUEUED or status == self.STATUS_RUNNING and\
                not res_error:
            r_time = now + event.interval + event.delay
            event.delay = None
            end_time = event.start_time + event.duration

            # if infinite duration or end time is later than next run
            if event.duration < 0 or end_time > r_time:
                r_status = self.STATUS_RUNNING
            else: # ending
                r_status = self.STATUS_ENDING
                r_time   = end_time

            try:
                self._insert_event(r_time, r_priority, r_status, event)
            except SchedulerStoppedError:
                self.logger.debug( # debug since called in exc handling
                    "%s: Cannot requeue event during shutdown: "
                    "time=%s, priority=%d, status=%s, event=%s",
                    classname(self),
                    time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(r_time)),
                    r_priority, self.STATUS_STRING[r_status], repr(event))
            else:
                self.logger.debug(
                    "%s: Requeued event: "
                    "time=%s, priority=%d, status=%s, event=%s",
                    classname(self),
                    time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(r_time)),
                    r_priority, self.STATUS_STRING[r_status], repr(event))

        # STATUS_ENDING or STATUS_ONESHOT have now ended and cleaned up
        # so nothing to do there

    def _retry_event(self, evtime, priority, status, event):
        """ Reschedule the event for re-attempt after an exception was handled.
        """
        self.prepare_event(event)
        r_time = self._next_run + event.delay
        try:
            self._insert_event(r_time, priority, status, event)
        except SchedulerStoppedError:
            self.logger.debug( # debug since called in exc handling
                "%s: Cannot queue retry during shutdown: "
                "time=%s, priority=%d, status=%s, event=%s",
                classname(self),
                time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(r_time)),
                r_priority, self.STATUS_STRING[r_status], repr(event))
        else:
            self.logger.debug(
                "%s: Queued retry: "
                "time=%s, priority=%d, status=%s, event=%s",
                classname(self),
                time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(r_time)),
                r_priority, self.STATUS_STRING[r_status], repr(event))
        event.delay = None

    def prepare_event(self, event):
        """ Prepare a event for entry into the queue. This fills the resources
        requested by the event (including the ``owner`` parameter) and checks
        the parameters. This method is intended for public use by events that
        want to chain other events to run under this queue. This method is safe
        for use in refreshing events that were already run. """
        # populate resources
        event.owner = self

        self._set_timing(event)

        if not hasattr(event, 'res') or not isinstance(event.res, dict):
            event.res = dict()
        try:
            for res_name in event.required_res:
                self._set_resource(res_name, event)
        except ResourceError as e:
            self.logger.warning("%s: %s raised in %s: %s",
                classname(self), classname(e), classname(event), e.args[0])
            raise

    def _set_resource(self, res_name, event):
        """ Set resources to an event's resource dict. If the resource is not
        known, raise a ResourceError. """
        if res_name == 'reddit':
            event.res['reddit'] = self._reddit
        elif res_name == 'approved':
            event.res['approved'] = self._approved
        elif res_name == 'logger':
            event.res['logger'] = self.logger.getChild('events').\
                                    getChild(classname(event))
        elif res_name == 'config':
            event.res['config'] = self._config
        elif res_name.startswith('config.') and\
                hasattr(self._config, res_name[7:]):
            event.res[res_name] = getattr(self._config, res_name[7:])
        elif res_name == 'dbsession':
            # only set a new session if no existing session
            if 'dbsession' not in event.res.keys() or\
                    event.res['dbsession'] is None:
                event.res['dbsession'] = self._db.get_new_session()
                self.logger.debug('%s: Starting new database session for %s',
                    classname(self), repr(event))
        elif res_name in self._res.keys():
            event.res[res_name] = self._res[res_name]
        else:
            raise ResourceError("Unknown resource '{}' requested by {}".
                format(res_name, repr(event)))

    def _set_timing(self, event):
        """ Set default event timing values if necessary. """
        if not hasattr(event, 'start_time') or event.start_time is None or\
                event.start_time < 0:
            event.start_time = 0 # on next iteration
        if not hasattr(event, 'interval') or event.interval is None or\
                event.interval < 0:
            event.interval = 0 # single-run
        if not hasattr(event, 'duration') or event.duration is None or\
                event.duration < -1:
            event.duration = -1 # infinite run if interval is nonzero
        if not hasattr(event, 'priority') or event.priority is None:
            event.priority = 100
        if not hasattr(event, 'delay') or event.delay is None:
            event.delay = 0

    def get_events(self, cls=None, subclasses=True):
        """ Return a generator of event objects currently in the queue. If
        ``cls`` is specified and is a class, gets only events of the specified
        class and its subclasses. If ``cls`` is None, this is equivalent to the
        ``queue`` property. If ``subclasses`` is False, does not return events
        that are subclasses of ``cls``.
        """
        if cls is None:
            for event in self.queue:
                yield event
        else:
            for event in self.queue:
                if (subclasses and isinstance(event, cls)) or\
                   (not subclasses and event.__class__ is cls):
                    yield event
        return

    def request_exit(self):
        """ Request that the event scheduler cleans up all events and exits
        on the next iteration. """
        self._exit_flag = True
        self.logger.info("%s: Exit requested", classname(self))

    def request_stop(self):
        """ Requests that the event scheduler stop running on the next
        iteration. """
        self._stop_flag = True
        self.logger.info("%s: Stop requested", classname(self))
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.