Anonymous avatar Anonymous committed 521e7ea

Moved away from the boneheadedness of config.py in favor of specifying a special loader for pylons.

This does have the unfortunate side-effect of clobbering the usage message spit out by paster. Need to find a workaround for that next.

Comments (0)

Files changed (5)

celerypylons/__init__.py

 import os
 import warnings
 
-CELERYPYLONS_CONF = 'celerypylons.config'
-if os.environ.get('CELERY_CONFIG_MODULE', CELERYPYLONS_CONF) != CELERYPYLONS_CONF:
-    warnings.warn("'CELERY_CONFIG_MODULE' environment variable will be overridden by celery-pylons.")
-os.environ['CELERY_CONFIG_MODULE'] = CELERYPYLONS_CONF
+CELERYPYLONS_LOADER = 'celerypylons.loader.PylonsLoader'
+if os.environ.get('CELERY_LOADER', CELERYPYLONS_LOADER) != CELERYPYLONS_LOADER:
+    warnings.warn("'CELERY_LOADER' environment variable will be overridden by celery-pylons.")
+os.environ['CELERY_LOADER'] = CELERYPYLONS_LOADER

celerypylons/commands.py

 __all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand', 'CAMQPAdminCommand', 'CeleryEventCommand']
 
 
-## @@ We have to do some rather odd bootstrapping in order to get
-## celery to load correctly.
-def bootstrap_config():
-    try:
-        path_to_ini_file = os.path.realpath(sys.argv[2])
-        conf = paste.deploy.appconfig('config:' + path_to_ini_file)
-        config.init_app(conf.global_conf,conf.local_conf)
-    except ValueError:
-        ## @@ If the user just wants to see usage or docs, don't fail on configuration
-        pass
-    except IndexError:
-        ## @@ No configuration was passed in
-        pass
-bootstrap_config()
-
-## @@ We can now import celeryd since the configuration has been set.
-try:
-    ## @@ If paster is run without arguments this file is imported,
-    ## yet there is no config file passed in which causes these
-    ## imports to fail due to a configuration error.
-    from celery.bin import celeryd
-    from celery.bin import celerybeat
-    from celery.bin import camqadm
-    from celery.bin import celeryev
-except ImproperlyConfigured:
-    import dummy as celeryd
-    import dummy as celerybeat
-    import dummy as camqadm
-    import dummy as celeryev
-
-
 class CeleryCommand(Command):
     min_args = 1
     min_args_error = "Please provide a paster config file as an argument."
     takes_config_file = 1
     requires_config_file = True
 
+    def bootstrap_config(self):
+        path_to_ini_file = os.path.realpath(self.args[0])
+        conf = paste.deploy.appconfig('config:' + path_to_ini_file)
+        config.init_app(conf.global_conf,conf.local_conf)
+
 
 class CeleryDaemonCommand(CeleryCommand):
     """Start the celery worker
     description = "".join(__doc__.splitlines()[2:])
 
     parser = Command.standard_parser(quiet=True)
-    for x in celeryd.OPTION_LIST:
-        parser.add_option(x)
         
     def command(self):
-        options = celeryd.parse_options(sys.argv[3:])
+        self.bootstrap_config()
+        from celery.bin import celeryd
+        for x in celeryd.OPTION_LIST:
+            self.parser.add_option(x)
+        options = celeryd.parse_options(sys.argv[1:])
         return celeryd.run_worker(**vars(options))
 
 
     description = "".join(__doc__.splitlines()[2:])
 
     parser = Command.standard_parser(quiet=True)
-    for x in celerybeat.OPTION_LIST:
-        parser.add_option(x)
         
     def command(self):
-        options = celerybeat.parse_options(sys.argv[3:])
+        self.bootstrap_config()
+        from celery.bin import celerybeat
+        for x in celerybeat.OPTION_LIST:
+            self.parser.add_option(x)
+        options = celerybeat.parse_options(self.args[1:])
         return celerybeat.run_celerybeat(**vars(options))
 
 
     description = "".join(__doc__.splitlines()[2:])
 
     parser = Command.standard_parser(quiet=True)
-    for x in camqadm.OPTION_LIST:
-        parser.add_option(x)
         
     def command(self):
-        options, values = camqadm.parse_options(sys.argv[3:])
+        self.bootstrap_config()
+        from celery.bin import camqadm
+        for x in camqadm.OPTION_LIST:
+            self.parser.add_option(x)
+        options, values = camqadm.parse_options(self.args[1:])
         return camqadm.camqadm(*values, **vars(options))
 
     
     description = "".join(__doc__.splitlines()[2:])
 
     parser = Command.standard_parser(quiet=True)
-    for x in celeryev.OPTION_LIST:
-        parser.add_option(x)
         
     def command(self):
-        options = celeryev.parse_options(sys.argv[3:])
+        self.bootstrap_config()
+        from celery.bin import celeryev
+        for x in celeryev.OPTION_LIST:
+            self.parser.add_option(x)
+        options = celeryev.parse_options(self.args[1:])
         return celeryev.run_celeryev(**vars(options))

celerypylons/config.py

-from pylons import config
-import re
-import logging
-from datetime import timedelta
-
-# We'll adopt a URI style broker def:
-# amqp://user:password@host:port/vhost
-BROKER_URI_PARSER = re.compile(
-    r"amqp://(?P<user>[^:]+):(?P<password>[^@]+)@(?P<host>[^:]+)(:(?P<port>[0-9]+))?/(?P<vhost>.+)")
-parts = re.match(BROKER_URI_PARSER, config.get('celery.broker.uri', ""))
-
-BROKER_HOST = parts and parts.groupdict()['host']
-BROKER_PORT = parts and parts.groupdict().get('port', 5984)
-BROKER_USER = parts and parts.groupdict()['user']
-BROKER_PASSWORD = parts and parts.groupdict()['password']
-BROKER_VHOST = parts and parts.groupdict()['vhost']
-
-CELERY_IMPORTS = config.get('celery.imports', "").split()
-
-DEFAULT_PROCESS_LOG_FMT = """
-    [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
-""".strip()
-DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s] %(message)s'
-DEFAULT_TASK_LOG_FMT = " ".join("""
-    [%(asctime)s: %(levelname)s/%(processName)s]
-    [%(task_name)s(%(task_id)s)] %(message)s
-""".strip().split())
-
-LOG_LEVELS = dict(logging._levelNames)
-LOG_LEVELS["FATAL"] = logging.FATAL
-LOG_LEVELS[logging.FATAL] = "FATAL"
-
-_DEFAULTS = {
-    "CELERY_RESULT_BACKEND": "database",
-    "CELERY_ALWAYS_EAGER": False,
-    "CELERY_EAGER_PROPAGATES_EXCEPTIONS": False,
-    "CELERY_TASK_RESULT_EXPIRES": timedelta(days=1),
-    "CELERY_SEND_EVENTS": False,
-    "CELERY_IGNORE_RESULT": False,
-    "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": False,
-    "CELERY_TASK_SERIALIZER": "pickle",
-    "CELERY_DISABLE_RATE_LIMITS": False,
-    "CELERYD_TASK_TIME_LIMIT": None,
-    "CELERYD_TASK_SOFT_TIME_LIMIT": None,
-    "CELERYD_MAX_TASKS_PER_CHILD": None,
-    "CELERY_ROUTES": None,
-    "CELERY_CREATE_MISSING_QUEUES": True,
-    "CELERY_DEFAULT_ROUTING_KEY": "celery",
-    "CELERY_DEFAULT_QUEUE": "celery",
-    "CELERY_DEFAULT_EXCHANGE": "celery",
-    "CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
-    "CELERY_DEFAULT_DELIVERY_MODE": 2, # persistent
-    "BROKER_CONNECTION_TIMEOUT": 4,
-    "BROKER_CONNECTION_RETRY": True,
-    "BROKER_CONNECTION_MAX_RETRIES": 100,
-    "CELERY_ACKS_LATE": False,
-    "CELERYD_POOL_PUTLOCKS": True,
-    "CELERYD_POOL": "celery.concurrency.processes.TaskPool",
-    "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
-    "CELERYD_ETA_SCHEDULER": "celery.worker.controllers.ScheduleController",
-    "CELERYD_LISTENER": "celery.worker.listener.CarrotListener",
-    "CELERYD_CONCURRENCY": 0, # defaults to cpu count
-    "CELERYD_PREFETCH_MULTIPLIER": 4,
-    "CELERYD_LOG_FORMAT": DEFAULT_PROCESS_LOG_FMT,
-    "CELERYD_TASK_LOG_FORMAT": DEFAULT_TASK_LOG_FMT,
-    "CELERYD_LOG_COLOR": False,
-    "CELERYD_LOG_LEVEL": "WARN",
-    "CELERYD_LOG_FILE": None, # stderr
-    "CELERYD_STATE_DB": None,
-    "CELERYD_ETA_SCHEDULER_PRECISION": 1,
-    "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
-    "CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60, # five minutes.
-    "CELERYBEAT_LOG_LEVEL": "INFO",
-    "CELERYBEAT_LOG_FILE": None, # stderr
-    "CELERYMON_LOG_LEVEL": "INFO",
-    "CELERYMON_LOG_FILE": None, # stderr
-    "CELERYMON_LOG_FORMAT": DEFAULT_LOG_FMT,
-    "CELERY_BROADCAST_QUEUE": "celeryctl",
-    "CELERY_BROADCAST_EXCHANGE": "celeryctl",
-    "CELERY_BROADCAST_EXCHANGE_TYPE": "fanout",
-    "CELERY_EVENT_QUEUE": "celeryevent",
-    "CELERY_EVENT_EXCHANGE": "celeryevent",
-    "CELERY_EVENT_EXCHANGE_TYPE": "direct",
-    "CELERY_EVENT_ROUTING_KEY": "celeryevent",
-    "CELERY_EVENT_SERIALIZER": "json",
-    "CELERY_RESULT_EXCHANGE": "celeryresults",
-    "CELERY_RESULT_EXCHANGE_TYPE": "direct",
-    "CELERY_RESULT_SERIALIZER": "pickle",
-    "CELERY_RESULT_PERSISTENT": False,
-    "CELERY_MAX_CACHED_RESULTS": 5000,
-    "CELERY_TRACK_STARTED": False,
-
-    # Default e-mail settings.
-    "SERVER_EMAIL": "celery@localhost",
-    "EMAIL_HOST": "localhost",
-    "EMAIL_PORT": 25,
-    "ADMINS": (),
-}
-
-for key, value in _DEFAULTS.items():
-    globals()[key] = config.get(key.replace("_",".").lower(), value)

celerypylons/loader.py

+from celery.loaders.base import BaseLoader
+from pylons import config
+
+
+to_pylons = lambda x: x.replace('_','.').lower()
+to_celery = lambda x: x.replace('.', '_').upper()
+
+
+class PylonsSettingsProxy(object):
+    """
+    Proxies settings from pylons.config
+    """
+    def __getattr__(self, key):
+        pylons_key = to_pylons(key)
+        try:
+            value = config[pylons_key]
+            if len(value.split()) > 1: return value.split()
+            return value
+        except KeyError:
+            raise AttributeError(pylons_key)
+    
+    def __setattr__(self, key, value):
+        pylons_key = to_pylons(key)
+        config[pylons_key] = value
+        
+
+class PylonsLoader(BaseLoader):
+    """Pylons celery loader.
+
+    This loader provides the following:
+
+        * Maps the celery config onto pylons.config
+
+        * Wraps task execution according to pylons.config
+
+    """
+    def on_task_init(self, task_id, task):
+        """
+        Calls the functions defined by `celery.task.init` in order.
+        """
+        # @@ TODO
+        pass
+
+    def on_process_cleanup(self):
+        pass
+
+    def on_worker_init(self):
+        pass
+
+    def read_configuration(self):
+        self.configured = True
+        return PylonsSettingsProxy()
 
 setup(name='celery-pylons',
       version=version,
-      description="Celery integration with Pylons via PasteScript.",
+      description="Celery integration with Pylons.",
       long_description="""\
 """,
-      classifiers=[], # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers
-      keywords='paste pastescript pylons celery message queue amqp job task distributed',
+      classifiers=[
+        "Development Status :: 4 - Beta",
+        "Framework :: Pylons",
+        "Intended Audience :: Developers",
+        "License :: OSI Approved :: BSD License",
+        "Operating System :: OS Independent",
+        "Programming Language :: Python",
+        "Topic :: Software Development :: Libraries :: Python Modules",
+        "Topic :: Internet :: WWW/HTTP",
+        "Topic :: System :: Distributed Computing"
+        ],
+      keywords='paste pylons celery message queue amqp job task distributed',
       author='Ian Schenck',
       author_email='ian@tatuminteractive.com',
-      url='',
+      url='http://bitbucket.org/ianschenck/celery-pylons/',
       license='BSD',
       packages=find_packages(exclude=['ez_setup', 'examples', 'tests']),
       include_package_data=True,
       zip_safe=True,
       install_requires=[
           # -*- Extra requirements: -*-
-        "PasteScript"
+        "Pylons>=1.0",
+        "celery>=2.0",
       ],
       entry_points="""
       # -*- Entry points: -*-
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.