marnold avatar marnold committed 73971f3

Added current (mostly working) pycron module to version control system

Comments (0)

Files changed (1)

+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2009-2010 Max Arnold <lwarxx@gmail.com>.
+# All rights reserved.
+
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+
+# 1. Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+
+# 3. Neither the name of copyright holder nor the names of contributors may
+# be used to endorse or promote products derived from this software without
+# specific prior written permission.
+
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+"""Python module for programmatic event scheduling using cron-like syntax with
+seconds precision.
+
+get_matched_jobs() method should be called periodically and will return list of
+tasks to execute if their time has arrived. Latency can be specified if strict
+calling periodicity (at least once per second) can not be guaranteed. Protection
+against time jitter also implemented.
+
+Each job is described using extended cron-like syntax with six fields:
+
+Field            Allowed values
+------------     --------------
+Second           0 - 59
+Minute           0 - 59
+Hour             0 - 23
+Day of month     1 - 31
+Month            1 - 12 (jan - dec)
+Day of week      1 - 7  (mon - sun)
+
+Fields are separated with space and can contain ranges, comma separated lists
+or asterisk symbol (*) which is expanded as full range. Ranges also can be used
+with step value, for example 0-59/10 or */2. For more examples refer to crontab
+manual page.
+
+Cron record is matched when all fields match current date/time (logical AND).
+
+Job itself is specified using arbitrary python object or data structure. Its
+interpretation and execution is up to calling program.
+"""
+
+import re
+import time
+from datetime import datetime
+
+class pycron(object):
+    """Main cron class
+    """
+
+    # enums are declared as lists in order to use .index method
+    month_enum = ['jan', 'feb', 'mar', 'apr', 'may', 'jun',
+                              'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
+    wday_enum = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
+    f_ranges = [
+            { 'name': 'Second',        'min': 0, 'max': 59 },
+            { 'name': 'Minute',        'min': 0, 'max': 59 },
+            { 'name': 'Hour',          'min': 0, 'max': 23 },
+            { 'name': 'Day of month',  'min': 1, 'max': 31 },
+            { 'name': 'Month',         'min': 1, 'max': 12, 'enum': month_enum },
+            { 'name': 'Day of week',   'min': 1, 'max': 7,  'enum': wday_enum } # TODO: 0 - sunday
+    ]
+
+    cron_split_re = re.compile('(?<![/,\-\s])\s+(?![/,\-\s])')
+    cron_int_re = re.compile('^0*\d{1,2}$')
+
+    def __init__(self, latency=10):
+        self.tlast = None
+        self.latency = abs(int(latency))
+        self.jobs = []
+        self.jobrecs = {}
+        self.taskhist = set()
+
+    def add_job(self, cronrecord, job, jobinfo="", priority=0):
+        """Add cron job
+
+        cronrecord - string in form of 'SS MM HH DD MO DW', with crontab-like syntax
+        job - job identifier, string or callable (calling is not performed)
+        jobinfo - optional string (for debugging purposes)
+        priority - job priority (if same job added twice, wins job with higher priority)
+
+        Return False if error, True if job already exists, total job count if success
+        """
+
+        j = pycron.parse_record(cronrecord)
+        if j is False: return False
+        j = (j, job, priority)
+        if j in self.jobs: return True
+        self.jobs.append(j)
+        self.jobrecs[j] = (cronrecord, jobinfo)
+
+        return len(self.jobs)
+
+    def print_jobs(self):
+        for j in self.jobs:
+            print self.jobrecs[j]
+
+    def del_job(self):
+        # Do something with task history!
+        pass
+
+    def get_matched_jobs(self, tnow=None):
+        """Return tuple of matched task identifiers
+        """
+        tasks = {}
+        if tnow is None: tnow = int(time.mktime(datetime.today().timetuple()))
+        if self.tlast is None:
+            self.tlast = tnow-1
+            firstrun = True
+        else:
+            firstrun = False
+        tdiff = tnow-self.tlast
+        if tdiff == 0:
+            # time not changed
+            return None
+        elif firstrun or abs(tdiff) > self.latency:
+        # time jumped back or forward too much
+            self.clear_taskhist()
+            for j in self.jobs:
+                th = (j[0], j[1], tnow)
+                if pycron.match_job(j[0], tnow):
+                    self.taskhist.add(th)
+                    p = tasks.get(j[1], 0)
+                    if j[2] >= p:
+                        # job with higher or equal priority replaces lower priority
+                        tasks[j[1]] = j[2]
+        else:
+            # time jumped back or forward, but still within latency window
+            self.purge_taskhist(tnow)
+            for ts in xrange(self.tlast, tnow+1):
+                for j in self.jobs:
+                    th = (j[0], j[1], ts)
+                    if pycron.match_job(j[0], ts) and th not in self.taskhist:
+                        self.taskhist.add(th)
+                        p = tasks.get(j[1], 0)
+                        if j[2] >= p:
+                            # job with higher or equal priority replaces lower priority
+                            tasks[j[1]] = j[2]
+
+        self.tlast = tnow
+        return tuple(sorted(tasks.keys()))
+
+    def clear_taskhist(self):
+        self.taskhist = set()
+
+    def purge_taskhist(self, tnow=None):
+        """Iterate over task history (previously matched jobs) and drop everything
+        not within latency window
+        """
+        if tnow is None: tnow = int(time.mktime(datetime.today().timetuple()))
+        thist = self.taskhist
+        # tuple is necessary since we are modifying set in place:
+        for th in tuple(self.taskhist):
+            if th[2] < tnow - self.latency or th[2] >= tnow:
+                self.taskhist.remove(th)
+
+    @staticmethod
+    def parse_fnum(nfield, num):
+        """Parse and validate field value (also accepts weekday/month enums)
+
+        nfield - field number (0-5)
+        num - field value (string)
+
+        Return integer value or False if error
+        """
+        if pycron.cron_int_re.match(num):
+            n = int(num)
+            if pycron.f_ranges[nfield]['min'] <= n <= pycron.f_ranges[nfield]['max']: return n
+        elif 'enum' in pycron.f_ranges[nfield] and num in pycron.f_ranges[nfield]['enum']:
+            return pycron.f_ranges[nfield]['enum'].index(num)+pycron.f_ranges[nfield]['min']
+        return False
+
+    @staticmethod
+    def parse_step(step):
+        """Parse and validate step value
+
+        step - step value (string)
+
+        Return integer value or False if error
+        """
+        if pycron.cron_int_re.match(step):
+            n = int(step)
+            if n > 0: return n
+        return False
+
+    @staticmethod
+    def parse_range(nfield, frange):
+        """Parse and validate range/step value
+
+        nfield - field number (0-5)
+        frange - field value
+
+        Return tuple (range_start, range_end, step) or False if error
+        """
+        # split range into value and step
+        frange = frange.split('/')
+        if len(frange) > 2:
+            # too many "/" symbols
+            return False
+
+        # split value into range start and end
+        rparts = frange[0].split('-')
+        if len(rparts) > 2:
+            # too many "-" symbols
+            return False
+
+        # parse step
+        if len(frange) == 2:
+            step = pycron.parse_step(frange[1])
+            if step is False: return False
+        else:
+            # default step value
+            step = 1
+
+        # parse range
+        if len(rparts) == 2:
+            r1 = pycron.parse_fnum(nfield, rparts[0])
+            r2 = pycron.parse_fnum(nfield, rparts[1])
+            if False in (r1, r2) or r1 > r2: return False
+        else:
+            if rparts[0] == '*':
+                r1 = pycron.f_ranges[nfield]['min']
+                r2 = pycron.f_ranges[nfield]['max']
+            else:
+                r1 = pycron.parse_fnum(nfield, rparts[0])
+                if r1 is False: return False
+                r2 = r1
+
+        return (r1, r2, step)
+
+    @staticmethod
+    def parse_field(nfield, field):
+        """Parse and validate field
+
+        nfield - field number (0-5)
+        field - field value
+
+        Return tuple with unrolled range values or False if error
+        """
+        # remove all whitespaces
+        field = ''.join(field.split())
+        rangeset = []
+        for frange in field.split(','):
+            r = pycron.parse_range(nfield, frange)
+            if r is False: return False
+
+            for i in xrange(r[0], r[1]+1, r[2]): # TODO: optimize
+                # uroll all range values
+                rangeset.append(i)
+
+        # return only unique values
+        return tuple(sorted(set(rangeset)))
+
+    @staticmethod
+    def parse_record(record):
+        """Parse and validate cron record
+
+        Return six unrolled ranges as a tuple, or False if error
+        """
+        # split record into six fields
+        fields = pycron.cron_split_re.split(record.strip())
+        if len(fields) != 6: return False
+
+        # parse and validate each field and build unrolled tuple
+        rec = []
+        for nfield, field in enumerate(fields):
+            f = pycron.parse_field(nfield, field)
+            if f is False: return False
+            rec.append(f)
+
+        return tuple(rec)
+
+    @staticmethod
+    def match_job(r, ts):
+        """Check if parsed cron record equals timestamp
+        """
+        t = datetime.fromtimestamp(ts)
+        if t.second in r[0] and t.minute in r[1] \
+                and t.hour in r[2] and t.day in r[3] \
+                and t.month in r[4] and t.isoweekday() in r[5]:
+            return True
+        return False
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.