Source

celery-pylons / celerypylons / commands.py

import os
import sys
import warnings
from paste.script.command import Command, BadCommand
import paste.deploy
from pylons import config
from celery.exceptions import ImproperlyConfigured


__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand', 'CAMQPAdminCommand', 'CeleryEventCommand']


class CeleryCommand(Command):
    min_args = 1
    min_args_error = "Please provide a paster config file as an argument."
    takes_config_file = 1
    requires_config_file = True

    def bootstrap_config(self):
        path_to_ini_file = os.path.realpath(self.args[0])
        conf = paste.deploy.appconfig('config:' + path_to_ini_file)
        config.init_app(conf.global_conf,conf.local_conf)


class CeleryDaemonCommand(CeleryCommand):
    """Start the celery worker

    Starts the celery worker that uses a paste.deploy configuration
    file.
    """
    usage = 'CONFIG_FILE [celeryd options...]'
    summary = __doc__.splitlines()[0]
    description = "".join(__doc__.splitlines()[2:])

    parser = Command.standard_parser(quiet=True)
        
    def command(self):
        self.bootstrap_config()
        from celery.bin import celeryd
        for x in celeryd.OPTION_LIST:
            self.parser.add_option(x)
        options = celeryd.parse_options(sys.argv[1:])
        return celeryd.run_worker(**vars(options))


class CeleryBeatCommand(CeleryCommand):
    """Start the celery beat server

    Starts the celery beat server using a paste.deploy configuration
    file.
    """
    usage = 'CONFIG_FILE [celerybeat options...]'
    summary = __doc__.splitlines()[0]
    description = "".join(__doc__.splitlines()[2:])

    parser = Command.standard_parser(quiet=True)
        
    def command(self):
        self.bootstrap_config()
        from celery.bin import celerybeat
        for x in celerybeat.OPTION_LIST:
            self.parser.add_option(x)
        options = celerybeat.parse_options(self.args[1:])
        return celerybeat.run_celerybeat(**vars(options))


class CAMQPAdminCommand(CeleryCommand):
    """CAMQP Admin

    CAMQP celery admin tool.
    """
    usage = 'CONFIG_FILE [camqadm options...]'
    summary = __doc__.splitlines()[0]
    description = "".join(__doc__.splitlines()[2:])

    parser = Command.standard_parser(quiet=True)
        
    def command(self):
        self.bootstrap_config()
        from celery.bin import camqadm
        for x in camqadm.OPTION_LIST:
            self.parser.add_option(x)
        options, values = camqadm.parse_options(self.args[1:])
        return camqadm.camqadm(*values, **vars(options))

    
class CeleryEventCommand(CeleryCommand):
    """Celery event commandd.

    Capture celery events.
    """
    usage = 'CONFIG_FILE [celeryev options...]'
    summary = __doc__.splitlines()[0]
    description = "".join(__doc__.splitlines()[2:])

    parser = Command.standard_parser(quiet=True)
        
    def command(self):
        self.bootstrap_config()
        from celery.bin import celeryev
        for x in celeryev.OPTION_LIST:
            self.parser.add_option(x)
        options = celeryev.parse_options(self.args[1:])
        return celeryev.run_celeryev(**vars(options))