Created by
Frederik Banke
| def test_that_consume_on_invalid_message_does_moves_message_to_dead_letter_queue(self):
# publish a message
self.channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body="dummymessage"
)
res = self.create_queue(self.queue_name, True)
self.assertEqual(res.method.message_count, 1)
message_callback = MagicMock(return_value=False)
consumer = BasicConsumer(self.rabbitmq_url,
self.queue_name,
self.queue_arguments,
message_callback
)
# start consumer
consumer.start()
# wait for the consumer to be ready
self.wait_for_log_contains(BasicConsumer.CONSUMER_READY)
# wait for the consumer to signal that is has consumed a message
self.wait_for_log_contains(BasicConsumer.CONSUMER_PROCESSED_MESSAGE)
# assert that the queue is empty
res = self.create_queue(self.queue_name, True)
self.assertEqual(res.method.message_count, 0)
# assert that the message is not in the dead-letters-queue
res = self.channel.queue_declare(queue=self.queue_name_dead_letters, passive=True)
self.assertEqual(res.method.message_count, 1)
message_callback.assert_called_once()
# stop the consumer
consumer.join()
|