Commits

a...@0x61736b.net  committed 7b31ccd

Can't run threaded worker! Sadly, the message listener doesn't work well then,
must run as main process. But doing .apply() should test the registry well
enough.

  • Participants
  • Parent commits 3a82e7c

Comments (0)

Files changed (1)

File tests/test_run.py

+import sys
+
 from celerypaste import set_loader
-from threading import Thread
-import sys
 
 CELERY_CONFIG ={"BROKER_HOST": "localhost",
                 "BROKER_PORT": 5672,
                 "CELERY_BACKEND": "database",
                 "DATABASE_ENGINE": "sqlite3",
                 "DATABASE_NAME": "test.db",
-                }
+                "CELERY_IMPORTS": ("mytasks", )}
 
 
 set_loader(CELERY_CONFIG)
 
-from celery.registry import tasks
 from celery.task import Task
 
 
     def run(self, x, y):
         return x+y
 
-tasks.register(AddTask)
-
-
-class CeleryWorker(Thread):
-
-    def run(self):
-        from celery.bin.celeryd import run_worker
-        run_worker(**vars())
-
 
 def setup_db():
     from celery.loaders import current_loader
 
 def test_run():
     setup_db()
-    w = CeleryWorker()
-    w.start()
-    t = AddTask()
-    result = t.apply_async(args=[10, 10], countdown=3)
+    result = AddTask.apply(args=[10, 10])
     assert result.get()==20, "didn't get expected result"