Source

celery-paste / tests / test_run.py

Full commit
import sys

from celerypaste import setup_loader

CELERY_CONFIG = {"BROKER_HOST": "localhost",
                 "BROKER_PORT": 5672,
                 "BROKER_VHOST": "/",
                 "BROKER_USER": "guest",
                 "BROKER_PASSWORD": "guest",
                 "CELERY_BACKEND": "amqp",
                 "DATABASE_ENGINE": "sqlite3",
                 "DATABASE_NAME": "test.db",
                 "tasks": "mytasks",
}


setup_loader(CELERY_CONFIG)

def setup_db():
    from celery.loaders import current_loader
    loader = current_loader()
    conf = loader.read_configuration()
    from django.core.management import call_command, setup_environ
    sys.stderr.write("Creating database tables...\n")
    setup_environ(conf)
    call_command("syncdb")


def test_run():
    setup_db()
    from mytasks import AddTask
    result = AddTask.apply_async(args=[10, 10])
    print("RESULT: %s" % result.get())
    assert result.get()==20, "didn't get expected result"

if __name__ == "__main__":
    test_run()