importpikaclassGenericConsumer(object):def__init__(self,channel,consume_queue_name,callback):self.channel=channelself.callback=callbackself.properties=pika.BasicProperties(delivery_mode=2,# make message persistent)self.channel.queue_declare(queue=consume_queue_name,durable=True)self.channel.basic_consume(self.callback,queue=consume_queue_name,no_ack=False)self.channel.start_consuming()defcallback(self,ch:pika.adapters.blocking_connection.BlockingChannel,method,properties,body):ifself.callback(ch,method,properties,body):ch.basic_ack(delivery_tag=method.delivery_tag)else:# failed to handle messagepassif__name__=='__main__':connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))defcallback(ch,method,properties,body):print(body)returnTrueconsumer=GenericConsumer(connection.channel(),'queue_name',callback)
HTTPSSSH
You can clone a snippet to your computer for local editing.
Learn more.