Commits

Lynn Rees committed 32aff3a

- pep8
- djangoize

  • Participants
  • Parent commits b251891

Comments (0)

Files changed (7)

 *.pyc
 *.egg-info
+.p*
+*~

File django_ztask/__init__.py

-"""Django ZTask."""
+'''Django ZTask'''
+
 import os
 
 VERSION = (0, 1, 4)
 
-__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
-__author__ = "Jason Allum and Dave Martorana"
-__contact__ = "jason@dmgctrl.com"
-__homepage__ = "http://github.com/dmgctrl/django-ztask"
-__docformat__ = "markdown"
-__license__ = "BSD (3 clause)"
+__version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])
+__author__ = 'Jason Allum and Dave Martorana'
+__contact__ = 'jason@dmgctrl.com'
+__homepage__ = 'http://github.com/dmgctrl/django-ztask'
+__docformat__ = 'markdown'
+__license__ = 'BSD (3 clause)'

File django_ztask/conf/settings.py

 ZTASKD_DISABLED = getattr(settings, 'ZTASKD_DISABLED', False)
 ZTASKD_RETRY_COUNT = getattr(settings, 'ZTASKD_RETRY_COUNT', 5)
 ZTASKD_RETRY_AFTER = getattr(settings, 'ZTASKD_RETRY_AFTER', 5)
-
+ZTASKD_LOGGER = getattr(settings, 'ZTASKD_LOGGER', 'django_ztaskd')
 ZTASKD_ON_LOAD = getattr(settings, 'ZTASKD_ON_LOAD', ())
-#ZTASKD_ON_CALL_COMPLETE = getattr(settings, 'ZTASKD_ON_COMPLETE', ())
+#ZTASKD_ON_CALL_COMPLETE = getattr(settings, 'ZTASKD_ON_COMPLETE', ())

File django_ztask/decorators.py

-from django.utils.decorators import available_attrs
+import types
 from functools import wraps
 
-import logging
-import types
+from django.utils.log import getLogger
+from django_ztask.conf import settings
+try:
+    from zmq import PUSH
+except:
+    from zmq import DOWNSTREAM as PUSH
+from django_ztask.context import shared_context as context
+    
+log = getLogger(settings.ZTASKD_LOGGER)
 
 def task():
-    from django_ztask.conf import settings
-    try:
-        from zmq import PUSH
-    except:
-        from zmq import DOWNSTREAM as PUSH
     def wrapper(func):
         function_name = '%s.%s' % (func.__module__, func.__name__)
-        
-        logger = logging.getLogger('ztaskd')
-        logger.info('Registered task: %s' % function_name)
-        
-        from django_ztask.context import shared_context as context
+        log.info('Registered task: %s' % function_name)
         socket = context.socket(PUSH)
         socket.connect(settings.ZTASKD_URL)
+        
         @wraps(func)
-        def _func(*args, **kwargs):
-            after = kwargs.pop('__ztask_after', 0)
+        def _func(*args, **kw):
+            after = kw.pop('__ztask_after', 0)
             if settings.ZTASKD_DISABLED:
                 try:
-                    socket.send_pyobj(('ztask_log', ('Would have called but ZTASKD_DISABLED is True', function_name), None, 0))
+                    socket.send_pyobj(('ztask_log', (
+                        'Would have called but ZTASKD_DISABLED is True', 
+                         function_name,
+                    ), None, 0))
                 except:
-                    logger.info('Would have sent %s but ZTASKD_DISABLED is True' % function_name)
+                    log.info(
+                        'Would have sent %s but ZTASKD_DISABLED is'
+                        'True' % function_name
+                    )
                 return
             elif settings.ZTASKD_ALWAYS_EAGER:
-                logger.info('Running %s in ZTASKD_ALWAYS_EAGER mode' % function_name)
+                log.info(
+                    'Running %s in ZTASKD_ALWAYS_EAGER mode' % function_name
+                )
                 if after > 0:
-                    logger.info('Ignoring timeout of %d seconds because ZTASKD_ALWAYS_EAGER is set' % after)
-                func(*args, **kwargs)
+                    log.info(
+                        'Ignoring timeout of %d seconds because '
+                        'ZTASKD_ALWAYS_EAGER is set' % after
+                    )
+                func(*args, **kw)
             else:
                 try:
-                    socket.send_pyobj((function_name, args, kwargs, after))
-                except Exception, e:
+                    socket.send_pyobj((function_name, args, kw, after))
+                except:
                     if after > 0:
-                        logger.info('Ignoring timeout of %s seconds because function is being run in-process' % after)
-                    func(*args, **kwargs)
+                        log.info(
+                            'Ignoring timeout of %s seconds because function is'
+                            ' being run in-process' % after
+                        )
+                    func(*args, **kw)
 
-        def _func_delay(*args, **kwargs):
+        def _func_delay(*args, **kw):
             try:
-                socket.send_pyobj(('ztask_log', ('.delay is depricated... use.async instead', function_name), None, 0))
+                socket.send_pyobj(('ztask_log', (
+                    '.delay is deprecated... use.async instead', function_name
+                ), None, 0))
             except:
                 pass
-            _func(*args, **kwargs)
+            _func(*args, **kw)
             
-        def _func_after(*args, **kwargs):
+        def _func_after(*args, **kw):
             try:
                 after = args[0]
                 if type(after) != types.IntType:
-                    raise TypeError('The first argument of .after must be an integer representing seconds to wait')
-                kwargs['__ztask_after'] = after
-                _func(*args[1:], **kwargs)
+                    raise TypeError(
+                        'The first argument of .after must be an integer '
+                        'representing seconds to wait'
+                    )
+                kw['__ztask_after'] = after
+                _func(*args[1:], **kw)
             except Exception, e:
-                logger.info('Error adding delayed task:\n%s' % e)
-        
+                log.info('Error adding delayed task:\n%s' % e)
         setattr(func, 'async', _func)
         setattr(func, 'delay', _func_delay)
         setattr(func, 'after', _func_after)
         return func
-    
     return wrapper

File django_ztask/management/commands/ztaskd.py

-from django.core.management.base import BaseCommand
-from django.utils import autoreload
-#
-from django_ztask.models import *
-#
-from django_ztask.conf import settings
-from django_ztask.context import shared_context as context
-#
-import zmq
-from zmq.eventloop import ioloop
+import sys
+import time
+import pickle
+import datetime
+from optparse import make_option
+
 try:
     from zmq import PULL
 except:
     from zmq import UPSTREAM as PULL
-#
-from optparse import make_option
-import sys
-import traceback
+from zmq.eventloop import ioloop
+from django.utils.log import getLogger
+from django.utils import autoreload, importlib
+from django.core.management.base import BaseCommand
+
+from django_ztask.models import Task
+from django_ztask.conf import settings
+from django_ztask.context import shared_context as context
+
+log = getLogger(settings.ZTASKD_LOGGER)
 
-import logging
-import pickle
-import datetime, time
  
 class Command(BaseCommand):
+    
     option_list = BaseCommand.option_list + (
-        make_option('--noreload', action='store_false', dest='use_reloader', default=True, help='Tells Django to NOT use the auto-reloader.'),
-        make_option('-f', '--logfile', action='store', dest='logfile', default=None, help='Tells ztaskd where to log information. Leaving this blank logs to stderr'),
-        make_option('-l', '--loglevel', action='store', dest='loglevel', default='info', help='Tells ztaskd what level of information to log'),
-        make_option('--replayfailed', action='store_true', dest='replay_failed', default=False, help='Replays all failed calls in the db'),
+        make_option(
+            '--noreload', 
+            action='store_false', 
+            dest='use_reloader', 
+            default=True, 
+            help='Tells Django to NOT use the auto-reloader',
+        ),
+        make_option(
+            '--replayfailed', 
+            action='store_true', 
+            dest='replay_failed', 
+            default=False, 
+            help='Replays all failed calls in the DB',
+        ),
     )
     args = ''
     help = 'Start the ztaskd server'
     io_loop = None
     
     def handle(self, *args, **options):
-        self._setup_logger(options.get('logfile', None), options.get('loglevel', 'info'))
         use_reloader = options.get('use_reloader', True)
         replay_failed = options.get('replay_failed', False)
         if use_reloader:
             self._handle(use_reloader, replay_failed)
     
     def _handle(self, use_reloader, replay_failed):
-        self.logger.info("%sServer starting on %s." % ('Development ' if use_reloader else '', settings.ZTASKD_URL))
+        log.info("%sServer starting on %s." % (
+            'Development ' if use_reloader else '', settings.ZTASKD_URL)
+        )
         self._on_load()
-        
         socket = context.socket(PULL)
         socket.bind(settings.ZTASKD_URL)
-        
-        def _queue_handler(socket, *args, **kwargs):
+        def _queue_handler(socket, *args, **kw):
             try:
-                function_name, args, kwargs, after = socket.recv_pyobj()
+                function_name, args, kw, after = socket.recv_pyobj()
                 if function_name == 'ztask_log':
-                    self.logger.warn('%s: %s' % (args[0], args[1]))
+                    log.warn('%s: %s' % (args[0], args[1]))
                     return
                 task = Task.objects.create(
                     function_name=function_name, 
                     args=pickle.dumps(args), 
-                    kwargs=pickle.dumps(kwargs), 
+                    kwargs=pickle.dumps(kw), 
                     retry_count=settings.ZTASKD_RETRY_COUNT,
                     next_attempt=time.time() + after
                 )
-                
                 if after:
-                    ioloop.DelayedCallback(lambda: self._call_function(task.pk, function_name=function_name, args=args, kwargs=kwargs), after * 1000, io_loop=self.io_loop).start()
+                    ioloop.DelayedCallback(lambda: self._call_function(
+                        task.pk, 
+                        function_name=function_name, 
+                        args=args, 
+                        kwargs=kw),
+                        after*1000, 
+                        io_loop=self.io_loop
+                    ).start()
                 else:
-                    self._call_function(task.pk, function_name=function_name, args=args, kwargs=kwargs)
-            except Exception, e:
-                self.logger.error('Error setting up function. Details:\n%s' % e)
-                traceback.print_exc(e)
-        
+                    self._call_function(
+                        task.pk, 
+                        function_name=function_name, 
+                        args=args, 
+                        kwargs=kw,
+                    )
+            except:
+                log.exception('Error setting up function')
         # Reload tasks if necessary
         if replay_failed:
             replay_tasks = Task.objects.all().order_by('created')
         else:
-            replay_tasks = Task.objects.filter(retry_count__gt=0).order_by('created')
+            replay_tasks = Task.objects.filter(
+                retry_count__gt=0
+            ).order_by('created')
         for task in replay_tasks:
             if task.next_attempt < time.time():
-                ioloop.DelayedCallback(lambda: self._call_function(task.pk), 5000, io_loop=self.io_loop).start()
+                ioloop.DelayedCallback(
+                    lambda: self._call_function(task.pk), 
+                    5000, 
+                    io_loop=self.io_loop,
+                ).start()
             else:
                 after = task.next_attempt - time.time()
-                ioloop.DelayedCallback(lambda: self._call_function(task.pk), after * 1000, io_loop=self.io_loop).start()
-        
+                ioloop.DelayedCallback(
+                    lambda: self._call_function(task.pk), 
+                    after * 1000, 
+                    io_loop=self.io_loop
+                ).start()
         self.io_loop = ioloop.IOLoop.instance()
         self.io_loop.add_handler(socket, _queue_handler, self.io_loop.READ)
         self.io_loop.start()
     def p(self, txt):
         print txt
     
-    def _call_function(self, task_id, function_name=None, args=None, kwargs=None):
+    def _call_function(self, task_id, function_name=None, args=None, kw=None):
         try:
             if not function_name:
                 try:
                     task = Task.objects.get(pk=task_id)
                     function_name = task.function_name
                     args = pickle.loads(str(task.args))
-                    kwargs = pickle.loads(str(task.kwargs))
-                except Exception, e:
-                    self.logger.info('Count not get task with id %s:\n%s' % (task_id, e))
-                    return
-                
-            self.logger.info('Calling %s' % function_name)
-            #self.logger.info('Task ID: %s' % task_id)
+                    kw = pickle.loads(str(task.kwargs))
+                except:
+                    log.exception('Count not get task id %s' % task_id)
+                    return None
+            log.info('Calling %s' % function_name)
             try:
                 function = self.func_cache[function_name]
             except KeyError:
                 module_name = '.'.join(parts[:-1])
                 member_name = parts[-1]
                 if not module_name in sys.modules:
-                    __import__(module_name)
+                    importlib.import_module(module_name)
                 function = getattr(sys.modules[module_name], member_name)
                 self.func_cache[function_name] = function
-            function(*args, **kwargs)
-            self.logger.info('Called %s successfully' % function_name)
+            function(*args, **kw)
+            log.info('Called %s successfully' % function_name)
             Task.objects.get(pk=task_id).delete()
         except Exception, e:
-            self.logger.error('Error calling %s. Details:\n%s' % (function_name, e))
+            log.exception('Error calling %s' % function_name)
             try:
                 task = Task.objects.get(pk=task_id)
                 if task.retry_count > 0:
                     task.retry_count = task.retry_count - 1
                     task.next_attempt = time.time() + settings.ZTASKD_RETRY_AFTER
-                    ioloop.DelayedCallback(lambda: self._call_function(task.pk), settings.ZTASKD_RETRY_AFTER * 1000, io_loop=self.io_loop).start()
+                    ioloop.DelayedCallback(
+                        lambda: self._call_function(task.pk), 
+                        settings.ZTASKD_RETRY_AFTER * 1000, 
+                        io_loop=self.io_loop
+                    ).start()
                 task.failed = datetime.datetime.utcnow()
                 task.last_exception = '%s' % e
                 task.save()
-            except Exception, e2:
-                self.logger.error('Error capturing exception in _call_function. Details:\n%s' % e2)
-            traceback.print_exc(e)
-    
-    def _setup_logger(self, logfile, loglevel):
-        LEVELS = {
-            'debug': logging.DEBUG,
-            'info': logging.INFO,
-            'warning': logging.WARNING,
-            'error': logging.ERROR,
-            'critical': logging.CRITICAL
-        }
-        
-        self.logger = logging.getLogger('ztaskd')
-        self.logger.setLevel(LEVELS[loglevel.lower()])
-        if logfile:
-            handler = logging.FileHandler(logfile, delay=True)
-        else:
-            handler = logging.StreamHandler()
-        
-        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-        handler.setFormatter(formatter)
-        self.logger.addHandler(handler)
+            except:
+                log.exception('Error capturing exception in _call_function')
         
     def _on_load(self):
         for callable_name in settings.ZTASKD_ON_LOAD:
-            self.logger.info("ON_LOAD calling %s" % callable_name)
+            log.info("ON_LOAD calling %s" % callable_name)
             parts = callable_name.split('.')
             module_name = '.'.join(parts[:-1])
             member_name = parts[-1]
             if not module_name in sys.modules:
-                __import__(module_name)
+                importlib.import_module(module_name)
             callable_fn = getattr(sys.modules[module_name], member_name)
             callable_fn()
-            
-    
-
-
-

File django_ztask/models.py

-from django.db.models import *
-
 import uuid
-import datetime
+from datetime import datetime
+
+from django.db import models
+
 
-class QuerySetManager(Manager):
+class QuerySetManager(models.Manager):
+    
     def __getattr__(self, attr, *args):
         try:
             return getattr(self.__class__, attr, *args)
         return self.model.QuerySet(self.model)
     
 
-#
-#
-class Task(Model):
-    uuid = CharField(max_length=36, primary_key=True)
-    function_name = CharField(max_length=255)
-    args = TextField()
-    kwargs = TextField()
-    retry_count = IntegerField(default=0)
-    last_exception = TextField(blank=True, null=True)
-    next_attempt = FloatField(blank=True, null=True)
-    created = DateTimeField(blank=True, null=True)
-    failed = DateTimeField(blank=True, null=True)
+class Task(models.Model):
+    
+    uuid = models.CharField(max_length=36, primary_key=True)
+    function_name = models.CharField(max_length=255)
+    args = models.TextField()
+    kwargs = models.TextField()
+    retry_count = models.IntegerField(default=0)
+    last_exception = models.TextField(blank=True, null=True)
+    next_attempt = models.FloatField(blank=True, null=True)
+    created = models.DateTimeField(blank=True, null=True)
+    failed = models.DateTimeField(blank=True, null=True)
     
     def save(self, *args, **kwargs):
         if not self.uuid:
-            self.created = datetime.datetime.utcnow()
+            self.created = datetime.utcnow()
             self.uuid = uuid.uuid4()
         super(Task, self).save(*args, **kwargs)
     
 #!/usr/bin/env python
 try:
-    from setuptools import setup
+    from setuptools import setup, find_packages
 except:
-    from distutils.core import setup
+    from distutils.core import setup, find_packages
+    
 import django_ztask as distmeta
 
 setup(
     author=distmeta.__author__,
     author_email=distmeta.__contact__,
     url=distmeta.__homepage__,
-    #
     name='django-ztask',
+    include_package_data=True,
     packages=['django_ztask'],
-    install_requires=[
-        'pyzmq',
-    ]
+    install_requires=['django>=1.3', 'pyzmq'],
 )