Commits

John Mitchell committed b201382

lots of rearranging, "make test" is 100%

  • Participants
  • Parent commits bfa83d7

Comments (0)

Files changed (21)

 all:
 
-test:	messageq.html
+# TEST_VALUE := $(shell fortune -s)
+TEST_VALUE := $(shell date +%N | cut -c6-)
+STOP := pkill -f qlisten.py
 
-test-fast:
-	./qecho.py beer
+test:	mq-is-running
+	@echo "\n**** Stopping old consumers, if any\n"
+	-$(STOP)
+	@echo "\n**** Starting new consumer\n"
+	python src/qlisten.py &
+	@sleep 1
+	@echo "\n**** Sending four-digit test message '$(TEST_VALUE)'\n"
+	python src/qecho.py '$(TEST_VALUE)'
+	@echo "\n**** Stopping consumer\n"
+	$(STOP)
+	@sleep 1
+	@echo "\n**** Done!\n"
 
-status:
+stop:
+	$(STOP)
+
+mq-is-running:
+	@pgrep -P1 -f rabbitmq >/dev/null \
+	|| (echo "RabbitMQ not running:					\n\
+- is it installed? 	(sudo apt-get install rabbitmq-server)		\n\
+- running?		(sudo /etc/init.d/rabbitmq-server start)	\n\
+- something in the logs? (grep . /var/log/rabbitmq/*log)		\n\
+" ; false) 
+
+mq-status:
 	sudo rabbitmqctl list_queues
 	sudo rabbitmqctl list_bindings
 	sudo rabbitmqctl list_exchanges
 
-%.html:	%.rst
-	rst2s5 $< $@
-
-sync:
-	cp ~/Documents/messageq* .
-	rsync -av ./ inkblot:src/amqp-ex/
-	test -d /media/BLACKBERRY \
-	&& rsync -av ./ /media/BLACKBERRY/amqp-ex/
-
-# http://johnmacfarlane.net/pandoc/examples.html
-# %.html:	%.rst
-# 	pandoc -s -m -i --from rst --write s5 $< -o $@
-# %.html:	%.md
-# 	pandoc -s -m -i  --write s5 $< -o $@

File OLD/amqp_consumer.py

+#!/usr/bin/python
+
+from amqplib import client_0_8 as amqp
+
+conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
+chan = conn.channel()
+
+chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)
+chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False,)
+
+chan.queue_bind(queue="po_box", exchange="sorting_room", routing_key="jason")
+
+def recv_callback(msg):
+    print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)
+
+chan.basic_consume(queue='po_box', no_ack=True, callback=recv_callback, consumer_tag="testtag")
+while True:
+    chan.wait()
+chan.basic_cancel("testtag")
+
+
+chan.close()
+conn.close()

File OLD/amqp_publisher.py

+#!/usr/bin/python
+
+from amqplib import client_0_8 as amqp
+import sys
+
+conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
+chan = conn.channel()
+
+msg = amqp.Message(sys.argv[1])
+msg.properties["delivery_mode"] = 2
+chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")
+
+chan.close()
+conn.close()

File amqp_consumer.py

-#!/usr/bin/python
-
-from amqplib import client_0_8 as amqp
-
-conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
-chan = conn.channel()
-
-chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)
-chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False,)
-
-chan.queue_bind(queue="po_box", exchange="sorting_room", routing_key="jason")
-
-def recv_callback(msg):
-    print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)
-
-chan.basic_consume(queue='po_box', no_ack=True, callback=recv_callback, consumer_tag="testtag")
-while True:
-    chan.wait()
-chan.basic_cancel("testtag")
-
-
-chan.close()
-conn.close()

File amqp_publisher.py

-#!/usr/bin/python
-
-from amqplib import client_0_8 as amqp
-import sys
-
-conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
-chan = conn.channel()
-
-msg = amqp.Message(sys.argv[1])
-msg.properties["delivery_mode"] = 2
-chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")
-
-chan.close()
-conn.close()

File dbus-onsleep.py

-#!/usr/bin/env python
-import dbus, gobject, time
-from dbus.mainloop.glib import DBusGMainLoop
-
-def screensaver_act(sleeping):
-    print '%s sleeping: %s' % (time.time(),sleeping)
-
-dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
-bus = dbus.SessionBus()
-bus.add_signal_receiver(
-    screensaver_act,
-    dbus_interface="org.gnome.ScreenSaver",
-    signal_name="ActiveChanged")
-
-loop = gobject.MainLoop()
-loop.run()

File hello.py

-#!/usr/bin/env python
-
-'''
-'''
-
-import amqplib.client_0_8 as amqp
-
-class MessageIter(object):
-    def __init__(self, chan):
-        self._chan = chan
-    def _gotmessage_cb(self, msg):
-        self._message = msg
-    def __iter__(self):
-        self._chan.basic_consume(
-            no_ack=True,
-            callback=self._gotmessage_cb)
-        while True:
-            self._chan.wait()
-            yield self._message
-
-
-conn = amqp.Connection()
-chan = conn.channel()
-
-chan.queue_declare('hello', nowait=True)
-chan.exchange_declare(exchange="amq.fanout", type="fanout")
-
-chan.queue_bind(queue="hello", exchange="amq.fanout")
-
-print 'start'
-for msg in MessageIter(chan):
-    print 'ding',msg
-print 'done'        
-
-chan.close()
-conn.close()

File qecho-dir.py

-#!/usr/bin/env python
-
-'''
-qecho.py -- send words to AMQP routing key
-'''
-import sys, time
-from amqplib import client_0_8 as amqp
-
-conn = amqp.Connection()
-chan = conn.channel()
-
-args = sys.argv[1:]
-queuename = args.pop(0)
-msg = amqp.Message( ' '.join(args) )
-chan.basic_publish(msg, exchange="amq.direct", routing_key=queuename)
-
-chan.close()
-conn.close()

File qecho.py

-#!/usr/bin/env python
-
-'''
-qecho.py -- broadcast words to AMQP
-'''
-import sys, time
-from amqplib import client_0_8 as amqp
-
-conn = amqp.Connection()
-chan = conn.channel()
-
-msg = amqp.Message( ' '.join(sys.argv[1:]) )
-try:
-    chan.basic_publish(msg, exchange="amq.fanout")
-    chan.close()
-except amqp.AMQPChannelException as exc:
-    print sys.argv[0],exc[1]
-    sys.exit(1)
-finally:
-    conn.close()

File qlisten.py

-#!/usr/bin/env python
-
-'''
-qlisten.py -- listen to broadcast AMQP messages
-'''
-
-import os, sys, time
-import amqplib.client_0_8 as amqp
-
-
-def LOG(text):
-    print '%s %5d %s' % (time.ctime(), os.getpid(), text)
-
-def recv(msg):
-    LOG('C: %s %s' % (msg.properties, msg.body))
-
-
-conn = amqp.Connection()
-chan = conn.channel()
-
-# configure my exchange to receive 'myfanout' broadcast messages
-chan.exchange_declare('amq.fanout', 'fanout')
-# declare new, private queue
-chan.queue_declare(exclusive=True)
-# route broadcast messages to it
-chan.queue_bind(queue='', exchange='amq.fanout')
-# call recv(msg) when messages arrive
-chan.basic_consume(callback=recv)
-
-LOG('listening')
-try:
-    while True:
-        chan.wait()
-except KeyboardInterrupt:
-    pass
-LOG('done')
-
-chan.close()
-conn.close()

File qtail.py

-#!/usr/bin/env python
-
-'''
-qtail.py -- print messages incoming from a AMQP queue
-'''
-
-import logging, sys, time
-import amqplib.client_0_8 as amqp
-
-
-class MessageIter(object):
-    def __init__(self, chan):
-        self._chan = chan
-    def _gotmessage_cb(self, msg):
-        self._message = msg
-    def __iter__(self):
-        self._chan.basic_consume(
-            no_ack=True, callback=self._gotmessage_cb)
-        while True:
-            self._chan.wait()
-            yield self._message
-
-queuename = sys.argv[1]
-
-conn = amqp.Connection()
-chan = conn.channel()
-
-chan.queue_declare(queuename, nowait=True)
-
-# X: automatic routing: key "x" goes to queue "x"
-chan.queue_bind(queue=queuename, routing_key=queuename, exchange="amq.direct")
-
-print time.ctime(), '%s: listening to queue' % queuename
-try:
-    for msg in MessageIter(chan):
-        print time.ctime(), msg.body
-except KeyboardInterrupt:
-    pass
-print time.ctime(), '%s: done' % queuename
-
-chan.close()
-conn.close()

File src/dbus-onsleep.py

+#!/usr/bin/env python
+import dbus, gobject, time
+from dbus.mainloop.glib import DBusGMainLoop
+
+def screensaver_act(sleeping):
+    print '%s sleeping: %s' % (time.time(),sleeping)
+
+dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+bus = dbus.SessionBus()
+bus.add_signal_receiver(
+    screensaver_act,
+    dbus_interface="org.gnome.ScreenSaver",
+    signal_name="ActiveChanged")
+
+loop = gobject.MainLoop()
+loop.run()

File src/hello.py

+#!/usr/bin/env python
+
+'''
+'''
+
+import amqplib.client_0_8 as amqp
+
+class MessageIter(object):
+    def __init__(self, chan):
+        self._chan = chan
+    def _gotmessage_cb(self, msg):
+        self._message = msg
+    def __iter__(self):
+        self._chan.basic_consume(
+            no_ack=True,
+            callback=self._gotmessage_cb)
+        while True:
+            self._chan.wait()
+            yield self._message
+
+
+conn = amqp.Connection()
+chan = conn.channel()
+
+chan.queue_declare('hello', nowait=True)
+chan.exchange_declare(exchange="amq.fanout", type="fanout")
+
+chan.queue_bind(queue="hello", exchange="amq.fanout")
+
+print 'start'
+for msg in MessageIter(chan):
+    print 'ding',msg
+print 'done'        
+
+chan.close()
+conn.close()

File src/qecho-dir.py

+#!/usr/bin/env python
+
+'''
+qecho.py -- send words to AMQP routing key
+'''
+import sys, time
+from amqplib import client_0_8 as amqp
+
+conn = amqp.Connection()
+chan = conn.channel()
+
+args = sys.argv[1:]
+queuename = args.pop(0)
+msg = amqp.Message( ' '.join(args) )
+chan.basic_publish(msg, exchange="amq.direct", routing_key=queuename)
+
+chan.close()
+conn.close()

File src/qecho.py

+#!/usr/bin/env python
+
+'''
+qecho.py -- broadcast words to AMQP
+'''
+import sys, time
+from amqplib import client_0_8 as amqp
+
+conn = amqp.Connection()
+chan = conn.channel()
+
+msg = amqp.Message( ' '.join(sys.argv[1:]) )
+try:
+    chan.basic_publish(msg, exchange="amq.fanout")
+    chan.close()
+except amqp.AMQPChannelException as exc:
+    print sys.argv[0],exc[1]
+    sys.exit(1)
+finally:
+    conn.close()

File src/qlisten.py

+#!/usr/bin/env python
+
+'''
+qlisten.py -- listen to broadcast AMQP messages
+'''
+
+import os, signal, sys, time
+import amqplib.client_0_8 as amqp
+
+
+def LOG(text):
+    print '%s %5d %s' % (time.ctime(), os.getpid(), text)
+
+def recv(msg):
+    LOG('C: %s %s' % (msg.properties, msg.body))
+
+# ::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
+
+
+# exit cleanly if this proc is pkill'd
+def sigterm(*_):
+    raise KeyboardInterrupt()   # X: close enough :)
+signal.signal(signal.SIGTERM, sigterm)
+
+conn = amqp.Connection()
+chan = conn.channel()
+
+# configure my exchange to receive broadcast messages
+# from the standard predefined exchange
+chan.exchange_declare('amq.fanout', 'fanout')
+# declare new, private queue
+chan.queue_declare(exclusive=True)
+# route broadcast messages to it
+chan.queue_bind(queue='', exchange='amq.fanout')
+# call recv(msg) when messages arrive
+chan.basic_consume(callback=recv)
+
+LOG('listening')
+try:
+    while True:
+        chan.wait()
+except KeyboardInterrupt:
+    pass
+finally:
+    chan.close()
+    conn.close()
+LOG('done')
+

File src/qtail.py

+#!/usr/bin/env python
+
+'''
+qtail.py -- print messages incoming from a AMQP queue
+'''
+
+import logging, sys, time
+import amqplib.client_0_8 as amqp
+
+
+class MessageIter(object):
+    def __init__(self, chan):
+        self._chan = chan
+    def _gotmessage_cb(self, msg):
+        self._message = msg
+    def __iter__(self):
+        self._chan.basic_consume(
+            no_ack=True, callback=self._gotmessage_cb)
+        while True:
+            self._chan.wait()
+            yield self._message
+
+queuename = sys.argv[1]
+
+conn = amqp.Connection()
+chan = conn.channel()
+
+chan.queue_declare(queuename, nowait=True)
+
+# X: automatic routing: key "x" goes to queue "x"
+chan.queue_bind(queue=queuename, routing_key=queuename, exchange="amq.direct")
+
+print time.ctime(), '%s: listening to queue' % queuename
+try:
+    for msg in MessageIter(chan):
+        print time.ctime(), msg.body
+except KeyboardInterrupt:
+    pass
+print time.ctime(), '%s: done' % queuename
+
+chan.close()
+conn.close()

File src/test_qtail.py

+#!/usr/bin/env python
+
+import random, os, time
+from subprocess import *
+import nose
+
+def test_me():
+    qname = 'test%04d' % random.randint(100,999)
+    pipe = Popen('./qtail.py %s' % qname, shell=True, stdout=PIPE)
+    time.sleep(1)
+    
+    os.system('./qecho.py %s haha' % qname)
+    pipe.terminate()
+    print 'done'
+    pipe.wait()
+    print pipe.stdout
+    
+#!/usr/bin/python
+
+from amqplib import client_0_8 as amqp
+import sys
+
+conn = amqp.Connection()
+chan = conn.channel()
+chan.queue_declare('hello', nowait=True)
+msg = amqp.Message('beer')
+chan.basic_publish(msg, exchange="amq.fanout")
+
+
+# conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
+# chan = conn.channel()
+
+# msg = amqp.Message(sys.argv[1])
+# msg.properties["delivery_mode"] = 2
+# chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")
+
+chan.close()
+conn.close()

File test_qtail.py

-#!/usr/bin/env python
-
-import random, os, time
-from subprocess import *
-import nose
-
-def test_me():
-    qname = 'test%04d' % random.randint(100,999)
-    pipe = Popen('./qtail.py %s' % qname, shell=True, stdout=PIPE)
-    time.sleep(1)
-    
-    os.system('./qecho.py %s haha' % qname)
-    pipe.terminate()
-    print 'done'
-    pipe.wait()
-    print pipe.stdout
-    

File z.py

-#!/usr/bin/python
-
-from amqplib import client_0_8 as amqp
-import sys
-
-conn = amqp.Connection()
-chan = conn.channel()
-chan.queue_declare('hello', nowait=True)
-msg = amqp.Message('beer')
-chan.basic_publish(msg, exchange="amq.fanout")
-
-
-# conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
-# chan = conn.channel()
-
-# msg = amqp.Message(sys.argv[1])
-# msg.properties["delivery_mode"] = 2
-# chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")
-
-chan.close()
-conn.close()