Source

celery-paste / tests / test_run.py

Full commit
from celerypaste import set_loader
from threading import Thread
import sys

CELERY_CONFIG ={"BROKER_HOST": "localhost",
                "BROKER_PORT": 5672,
                "BROKER_VHOST": "celeryTst",
                "BROKER_USER": "celery",
                "BROKER_PASSWORD": "celery",
                "CELERY_BACKEND": "database",
                "DATABASE_ENGINE": "sqlite3",
                "DATABASE_NAME": "test.db",
                }


set_loader(CELERY_CONFIG)

from celery.registry import tasks
from celery.task import Task


class AddTask(Task):

    def run(self, x, y):
        return x+y

tasks.register(AddTask)


class CeleryWorker(Thread):

    def run(self):
        from celery.bin.celeryd import run_worker
        run_worker(**vars())


def setup_db():
    from celery.loaders import current_loader
    loader = current_loader()
    conf = loader.read_configuration()
    from django.core.management import call_command, setup_environ
    sys.stderr.write("Creating database tables...\n")
    setup_environ(conf)
    call_command("syncdb")


def test_run():
    setup_db()
    w = CeleryWorker()
    w.start()
    t = AddTask()
    result = t.apply_async(args=[10, 10], countdown=3)
    assert result.get()==20, "didn't get expected result"