Commits

Chad Burrus committed 6c2c4a6

tutorial 2

Comments (0)

Files changed (8)

+from subprocess import call
+
+class Runner(object):
+	""" A class to simplify running external programs. """
+
+	def _run(self, command = [], stdin = None, stdout = None, shell = False):
+		"""
+			Don't call directly!  Use kickoff.
+			Wrapper around subprocess.call. See
+			http://docs.python.org/library/subprocess.html for full details.
+		"""
+		ret_code = call(command, stdin=stdin, stdout=stdout, shell=shell)
+		if ret_code:
+			self.handle_ret_code(ret_code, command, stdin, stdout, shell)
+
+	def handle_ret_code(self, ret_code, command, stdin, stdout, shell):
+		"""
+			Exception handler in case something breaks running an external command.
+			Should be overridden in child classes.  Parameters are as in self.run,
+			with the exception of the first parameter, which is the actual return code
+			from the command execution.
+		"""
+		raise RuntimeError("Command Failed!!!")
+
+	def super_me(self):
+		""" A shortcut method to call super on me. """
+		return super(type(self), self)
+
+	def kickoff(self, command = [], stdin = None, stdout = None, shell = False):
+		"""
+			Actually kick off a run of the command.  Use this for interface
+			consistency.
+		"""
+		self._run(command, stdin, stdout, shell)

tut2/Tutorial2Runner.py

+from Runner import Runner
+
+class Tutorial2Runner(Runner):
+
+	def __init__(self, count):
+		super(type(self), self)
+
+		self.count = count
+
+	def kickoff(self):
+		count = self.count
+		for i in range(1, count):
+			message = "Sending message # %s %s" % (i, "." * i)
+			command = "python durable_task.py '%s'" % message
+			print "Running command \"%s\"" % command
+			self._run(command, shell=True)
+
+runner = Tutorial2Runner(30)
+runner.kickoff()

tut2/ack_worker.py

+#!/usr/bin/env python
+import pika
+import time
+
+# make connection to RabbitMQ server
+connection = pika.BlockingConnection(pika.ConnectionParameters(
+        host='localhost'))
+channel = connection.channel()
+
+# Prepare the recipient queue -- do this in both places so it doesn't matter which one runs first
+channel.queue_declare(queue='hello')
+
+# Print a handy message
+print ' [*] Waiting for messages. To exit press CTRL+C'
+
+# The thing to do when a message is received
+def callback(ch, method, properties, body):
+    print " [x] Received %r" % (body,)
+    time.sleep(body.count("."))
+    print " [x] Done"
+    ch.basic_ack(delivery_tag = method.delivery_tag)
+
+# Set up the consumer
+channel.basic_consume(callback,
+                      queue='hello')
+
+# Start consuming
+channel.start_consuming()

tut2/durable_task.py

+#!/usr/bin/env python
+import pika
+import sys
+
+# make connection to RabbitMQ server
+connection = pika.BlockingConnection(pika.ConnectionParameters(
+                 'localhost'))
+channel = connection.channel()
+
+# Prepare the recipient queue -- do this in both places so it doesn't matter which one runs first
+# Durable means that the queue won't be forgotten on RabbitMQ restart
+channel.queue_declare(queue='durable_test', durable=True)
+
+# Send the message
+message = " ".join(sys.argv[1:]) or "Hello World!"
+channel.basic_publish(exchange='',
+                      routing_key='durable_test',
+                      body=message,
+                      # "ensure" the message is persisted to disk just in case
+                      properties=pika.BasicProperties(
+                        delivery_mode = 2, # make message persistent
+                      ))
+
+# Print a handy message
+print " [x] Sent %r" % (message, )
+
+# Close up shop, flushing buffers and the like
+connection.close()

tut2/durable_worker.py

+#!/usr/bin/env python
+import pika
+import time
+
+# make connection to RabbitMQ server
+connection = pika.BlockingConnection(pika.ConnectionParameters(
+        host='localhost'))
+channel = connection.channel()
+
+# Prepare the recipient queue -- do this in both places so it doesn't matter which one runs first
+# Durable means that the queue won't be forgotten on RabbitMQ restart
+channel.queue_declare(queue='durable_test', durable=True)
+
+# Print a handy message
+print ' [*] Waiting for messages. To exit press CTRL+C'
+
+# The thing to do when a message is received
+def callback(ch, method, properties, body):
+    print " [x] Received %r" % (body,)
+    time.sleep(body.count("."))
+    print " [x] Done"
+    ch.basic_ack(delivery_tag = method.delivery_tag)
+
+# Set up the consumer
+channel.basic_consume(callback,
+                      queue='durable_test')
+
+# Start consuming
+channel.start_consuming()

tut2/durable_worker_with_qos.py

+#!/usr/bin/env python
+import pika
+import time
+
+# make connection to RabbitMQ server
+connection = pika.BlockingConnection(pika.ConnectionParameters(
+        host='localhost'))
+channel = connection.channel()
+
+# Prepare the recipient queue -- do this in both places so it doesn't matter which one runs first
+# Durable means that the queue won't be forgotten on RabbitMQ restart
+channel.queue_declare(queue='durable_test', durable=True)
+
+# Print a handy message
+print ' [*] Waiting for messages. To exit press CTRL+C'
+
+# The thing to do when a message is received
+def callback(ch, method, properties, body):
+    print " [x] Received %r" % (body,)
+    time.sleep(body.count("."))
+    print " [x] Done"
+    ch.basic_ack(delivery_tag = method.delivery_tag)
+
+# Ensure we only pass one job at a time to the worker
+channel.basic_qos(prefetch_count=1)
+
+# Set up the consumer
+channel.basic_consume(callback,
+                      queue='durable_test')
+
+# Start consuming
+channel.start_consuming()
+#!/usr/bin/env python
+import pika
+import sys
+
+# make connection to RabbitMQ server
+connection = pika.BlockingConnection(pika.ConnectionParameters(
+                 'localhost'))
+channel = connection.channel()
+
+# Prepare the recipient queue -- do this in both places so it doesn't matter which one runs first
+channel.queue_declare(queue='hello')
+
+# Send the message
+message = " ".join(sys.argv[1:]) or "Hello World!"
+channel.basic_publish(exchange='',
+                      routing_key='hello',
+                      body=message)
+
+# Print a handy message
+print " [x] Sent %r" % (message, )
+
+# Close up shop, flushing buffers and the like
+connection.close()
+#!/usr/bin/env python
+import pika
+import time
+
+# make connection to RabbitMQ server
+connection = pika.BlockingConnection(pika.ConnectionParameters(
+        host='localhost'))
+channel = connection.channel()
+
+# Prepare the recipient queue -- do this in both places so it doesn't matter which one runs first
+channel.queue_declare(queue='hello')
+
+# Print a handy message
+print ' [*] Waiting for messages. To exit press CTRL+C'
+
+# The thing to do when a message is received
+def callback(ch, method, properties, body):
+    print " [x] Received %r" % (body,)
+    time.sleep(body.count("."))
+    print " [x] Done"
+
+# Set up the consumer
+channel.basic_consume(callback,
+                      queue='hello',
+                      no_ack=True)
+
+# Start consuming
+channel.start_consuming()