Snippets

Frederik Banke Python GenericConsumer

Updated by Frederik Banke

File snippet.BUILD Modified

  • Ignore whitespace
  • Hide word diff
 
         self.channel.queue_declare(queue=consume_queue_name, durable=True)
 
-        self.channel.basic_consume(self.callback,
+        self.channel.basic_consume(self.on_message,
                                    queue=consume_queue_name,
                                    no_ack=False)
 
         self.channel.start_consuming()
 
-    def callback(self, ch: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
+    def on_message(self, ch: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
         if self.callback(ch, method, properties, body):
             ch.basic_ack(delivery_tag=method.delivery_tag)
         else:
Updated by Frederik Banke

File snippet.BUILD Modified

  • Ignore whitespace
  • Hide word diff
     def callback(self, ch: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
         if self.callback(ch, method, properties, body):
             ch.basic_ack(delivery_tag=method.delivery_tag)
-            return True
         else:
             # failed to handle message
-            return False
+            pass
 
 
 if __name__ == '__main__':
Updated by Frederik Banke

File snippet.BUILD Modified

  • Ignore whitespace
  • Hide word diff
 
     def callback(ch, method, properties, body):
         print(body)
+        return True
 
     consumer = GenericConsumer(connection.channel(), 'queue_name', callback)
Updated by Frederik Banke

File snippet.BUILD Modified

  • Ignore whitespace
  • Hide word diff
     def callback(self, ch: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
         if self.callback(ch, method, properties, body):
             ch.basic_ack(delivery_tag=method.delivery_tag)
+            return True
         else:
             # failed to handle message
-            pass
+            return False
 
 
 if __name__ == '__main__':
Created by Frederik Banke

File snippet.BUILD Added

  • Ignore whitespace
  • Hide word diff
+import pika
+
+
+class GenericConsumer(object):
+    def __init__(self, channel, consume_queue_name, callback):
+        self.channel = channel
+        self.callback = callback
+
+        self.properties = pika.BasicProperties(
+            delivery_mode=2,  # make message persistent
+        )
+
+        self.channel.queue_declare(queue=consume_queue_name, durable=True)
+
+        self.channel.basic_consume(self.callback,
+                                   queue=consume_queue_name,
+                                   no_ack=False)
+
+        self.channel.start_consuming()
+
+    def callback(self, ch: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
+        if self.callback(ch, method, properties, body):
+            ch.basic_ack(delivery_tag=method.delivery_tag)
+        else:
+            # failed to handle message
+            pass
+
+
+if __name__ == '__main__':
+    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
+
+    def callback(ch, method, properties, body):
+        print(body)
+
+    consumer = GenericConsumer(connection.channel(), 'queue_name', callback)
HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.