deftest_that_consume_on_invalid_message_does_moves_message_to_dead_letter_queue(self):# publish a messageself.channel.basic_publish(exchange='',routing_key=self.queue_name,body="dummymessage")res=self.create_queue(self.queue_name,True)self.assertEqual(res.method.message_count,1)message_callback=MagicMock(return_value=False)consumer=BasicConsumer(self.rabbitmq_url,self.queue_name,self.queue_arguments,message_callback)# start consumerconsumer.start()# wait for the consumer to be readyself.wait_for_log_contains(BasicConsumer.CONSUMER_READY)# wait for the consumer to signal that is has consumed a messageself.wait_for_log_contains(BasicConsumer.CONSUMER_PROCESSED_MESSAGE)# assert that the queue is emptyres=self.create_queue(self.queue_name,True)self.assertEqual(res.method.message_count,0)# assert that the message is not in the dead-letters-queueres=self.channel.queue_declare(queue=self.queue_name_dead_letters,passive=True)self.assertEqual(res.method.message_count,1)message_callback.assert_called_once()# stop the consumerconsumer.join()
Comments (0)
HTTPSSSH
You can clone a snippet to your computer for local editing.
Learn more.