1. Brendan McCollam
  2. apscheduler

Commits

Brendan McCollam  committed 086f0b5

Tests for timezone-naive next_run_time

  • Participants
  • Parent commits f16131a
  • Branches error_on_offset_naive_datetime

Comments (0)

Files changed (1)

File tests/test_schedulers.py

View file
 from logging import StreamHandler, getLogger, INFO
-from datetime import timedelta
+from datetime import datetime, timedelta
 from threading import Thread
 
 from pytz import utc
 
 
 @pytest.fixture
-def scheduler():
+def scheduler(monkeypatch, timezone):
+    monkeypatch.setattr('apscheduler.schedulers.base.get_localzone', MagicMock(return_value=timezone))
     return DummyScheduler()
 
 
         assert scheduler._listeners == [(func2, EVENT_JOBSTORE_ADDED)]
 
     @pytest.mark.parametrize('stopped', [True, False], ids=['stopped=True', 'stopped=False'])
-    def test_add_job(self, scheduler, stopped):
+    def test_add_job(self, scheduler, stopped, timezone):
         func = lambda x, y: None
         scheduler._stopped = stopped
         scheduler._real_add_job = MagicMock()
-        job = scheduler.add_job(func, 'date', [1], {'y': 2}, 'my-id', 'dummy', run_date='2014-06-01 08:41:00')
+        job = scheduler.add_job(func, 'date', [1], {'y': 2}, 'my-id', 'dummy',
+                                next_run_time=datetime(2014, 05, 23, 10),
+                                run_date='2014-06-01 08:41:00')
 
         assert isinstance(job, Job)
         assert job.id == 'my-id'
         assert not hasattr(job, 'misfire_grace_time')
         assert not hasattr(job, 'coalesce')
         assert not hasattr(job, 'max_instances')
+        assert job.next_run_time.tzinfo.zone == timezone.zone
         assert len(scheduler._pending_jobs) == (1 if stopped else 0)
         assert scheduler._real_add_job.call_count == (0 if stopped else 1)
 
+
     def test_scheduled_job(self, scheduler):
         func = lambda x, y: None
         scheduler.add_job = MagicMock()
                                                   run_date='2014-06-01 08:41:00')
 
     @pytest.mark.parametrize('pending', [True, False], ids=['pending job', 'scheduled job'])
-    def test_modify_job(self, scheduler, pending):
+    def test_modify_job(self, scheduler, pending, timezone):
         job = MagicMock()
         scheduler._dispatch_event = MagicMock()
         scheduler._lookup_job = MagicMock(return_value=(job, None if pending else 'default'))
         if not pending:
             jobstore = MagicMock()
             scheduler._lookup_jobstore = lambda alias: jobstore if alias == 'default' else None
-        scheduler.modify_job('blah', misfire_grace_time=5, max_instances=2)
+        scheduler.modify_job('blah', misfire_grace_time=5, max_instances=2, next_run_time=datetime(2014, 10, 17))
 
-        job._modify.assert_called_once_with(misfire_grace_time=5, max_instances=2)
+        job._modify.assert_called_once_with(misfire_grace_time=5, max_instances=2,
+                                            next_run_time=timezone.localize(datetime(2014, 10, 17)))
         if not pending:
             jobstore.update_job.assert_called_once(job)