Source

django-ztask / django_ztask / decorators.py

Full commit
# -*- coding: utf-8 -*-

import types
from functools import wraps

from django.utils.log import getLogger
from django_ztask.utils import setting

try:
    from zmq import PUSH
except:
    from zmq import DOWNSTREAM as PUSH

from django_ztask.context import shared_context as context
    
log = getLogger(setting('ZTASKD_LOGGER', 'django_ztaskd'))

def task():
    def wrapper(func):
        function_name = '%s.%s' % (func.__module__, func.__name__)
        log.info('Registered task: %s' % function_name)
        socket = context.socket(PUSH)
        socket.connect(setting('ZTASKD_URL', 'tcp://127.0.0.1:5555'))
        
        @wraps(func)
        def _func(*args, **kw):
            after = kw.pop('__ztask_after', 0)
            if setting('ZTASKD_DISABLED', False):
                try:
                    socket.send_pyobj(('ztask_log', (
                        'Would have called but ZTASKD_DISABLED is True', 
                         function_name,
                    ), None, 0))
                except:
                    log.info(
                        'Would have sent %s but ZTASKD_DISABLED is'
                        'True' % function_name
                    )
                return
            elif setting('ZTASKD_ALWAYS_EAGER', False):
                log.info(
                    'Running %s in ZTASKD_ALWAYS_EAGER mode' % function_name
                )
                if after > 0:
                    log.info(
                        'Ignoring timeout of %d seconds because '
                        'ZTASKD_ALWAYS_EAGER is set' % after
                    )
                func(*args, **kw)
            else:
                try:
                    socket.send_pyobj((function_name, args, kw, after))
                except:
                    if after > 0:
                        log.info(
                            'Ignoring timeout of %s seconds because function is'
                            ' being run in-process' % after
                        )
                    func(*args, **kw)

        def _func_delay(*args, **kw):
            try:
                socket.send_pyobj(('ztask_log', (
                    '.delay is deprecated... use.async instead', function_name
                ), None, 0))
            except:
                pass
            _func(*args, **kw)
            
        def _func_after(*args, **kw):
            try:
                after = args[0]
                if type(after) != types.IntType:
                    raise TypeError(
                        'The first argument of .after must be an integer '
                        'representing seconds to wait'
                    )
                kw['__ztask_after'] = after
                _func(*args[1:], **kw)
            except:
                log.exception('Error adding delayed task:\n%s')
        setattr(func, 'async', _func)
        setattr(func, 'delay', _func_delay)
        setattr(func, 'after', _func_after)
        return func
    return wrapper