deftest_that_consume_on_failure_message_does_reschedule_the_message(self):#publishamessageself.channel.basic_publish(exchange='',routing_key=self.queue_name,body="dummymessage")res=self.create_queue(self.queue_name,True)self.assertEqual(res.method.message_count,1)message_callback=MagicMock(side_effect=Exception("Failed to process message"))consumer=BasicConsumer(self.rabbitmq_url,self.queue_name,self.queue_arguments,message_callback)#startconsumerconsumer.start()#waitfortheconsumertobereadyself.wait_for_log_contains(BasicConsumer.CONSUMER_READY)#waitfortheconsumertosignalthatishasconsumedamessageself.wait_for_log_contains(BasicConsumer.CONSUMER_PROCESSED_MESSAGE)#stoptheconsumerconsumer.join()#assertthatthemessageisstillinthequeueres=self.channel.queue_declare(queue=self.queue_name,passive=True)self.assertEqual(res.method.message_count,1)#sincethemessageisrescheduleditmightbeprocessedmanytimesbeforewestoptheconsumermessage_callback.assert_called()
Comments (0)
HTTPSSSH
You can clone a snippet to your computer for local editing.
Learn more.