Commits

Anonymous committed 17e9a1f

Cleaned up interface and code (PEP8)

  • Participants
  • Parent commits 491d068

Comments (0)

Files changed (3)

File examples/stomp_example.py

 #!/usr/bin/env python
 import sys,time
-from stomp import Stomp
+from four import Stomp
 from optparse import OptionParser
 
 def consume(host,port,queue,num=None):

File four/stomp.py

 
 
 class NotConnectedError(Exception):
-    """Raise if not connected"""
-    def __init__(self, value):
-        self.value = value
-    
-    def __str__(self):
-        return repr(self.value)
+    """No longer connected to the STOMP server."""
+
+
+class ConnectionError(socket.error):
+    """Couldn't connect to the STOMP server."""
 
 
 class Stomp(object):
-    """Dead simple Python STOMP client library
+    """STOMP Client.
 
-    This is useful for connecting to and communicating with
-    Apache ActiveMQ, an open source Java Message Service (JMS)
-    message broker.
+    :param hostname: Hostname of the STOMP server to connect to.
+    :param port: The port to use. (default ``61613``)
 
-    The majority of the methods available take a single argument; a dictionary.
-    This dictionary should contain the necessary bits you need
-    to pass to the STOMP server.  It is outlined in each method
-    exactly what it needs to work.
-
-    For specifics on the protocol, see: http://stomp.codehaus.org/Protocol
-
-    This library is basically a Python implementation of Perl's Net::Stomp
-    See: http://search.cpan.org/dist/Net-Stomp/lib/Net/Stomp.pm
-
-    To enable the ActiveMQ Broker for Stomp add the following to the activemq.xml configuration:
-
-    <connector>
-        <serverTransport uri="stomp://localhost:61613"/>
-    </connector>
     """
-    def __init__(self,hostname,port):
-        """Initialize Stomp object
-        Also accepts arguments needed to build the TCP connection.
-        
-        >>> from stomp import Stomp
-        >>> stomp = Stomp('hostname', 61613)
-        """
-        self.host        = hostname
-        self.port        = port
-        self.sock        = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    def __init__(self, hostname, port=61613):
+        self.host = hostname
+        self.port = port
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self._subscribed = None
         self._connected  = None
-        self.frame       = Frame()
+        self.frame = Frame()
 
-    def _get_subscribed(self):
-        return self._subscribed
-
-    def _set_subscribed(self,sub):
-        self._subscribed = sub
-
-    subscribed = property(_get_subscribed, _set_subscribed, 
-                          'The queue or topic we are subscribed to')
-
-    def _get_connected(self):
-        return self._connected
-    
-    def _set_connected(self,conn):
-        self._connected = conn
-
-    connected = property(_get_connected, _set_connected,
-                         'Are we connected to STOMP Server')
-
-    def _is_connected(self):
-        if not self.connected:
-            raise NotConnectedError, 'Not connected to STOMP server.'
-
-    def connect(self, conf=None):
-        """Connect to STOMP server
-        This method does not require any arguments.
-
-        >>> stomp.connect()
-        """
+    def connect(self):
+        """Connect to STOMP server."""
         try:
-            self.sock.connect((self.host,self.port))
+            self.sock.connect((self.host, self.port))
             self.frame.connect(self.sock)
             self.connected = True
-        except (socket.error,socket.timeout), err:
+        except (socket.error, socket.timeout), err:
             print "Cannot connect to %s on port %d" %(self.host,self.port)
             print "Caught error: %s" % err
             raise SystemExit
 
     def disconnect(self, conf=None):
-        """Disconnect from STOMP server
-        This method does not require any arguments.
-        
-        >>> stomp.disconnect()
-        """
+        """Disconnect from the server."""
         if self.subscribed:
-            self.unsubscribe({'destination':self.subscribed})
+            self.unsubscribe({'destination': self.subscribed})
         if conf is None:
             conf = {}
-        frame = self.frame.build_frame({'command':'DISCONNECT','headers':conf})
+        frame = self.frame.build_frame({'command': 'DISCONNECT',
+                                        'headers': conf})
         self.send_frame(frame)
         self.sock.shutdown(0)
 
-    def send(self,conf=None):
+    def send(self, conf=None):
         """Send message to STOMP server
 
-        You'll need to pass the body and any other headers your STOMP server likes.
+        You'll need to pass the body and any other headers your
+        STOMP server likes.
 
         destination is *required*
 
         In the case of ActiveMQ with persistence, you could do this:
-        >>> for i in xrange(1,1000):
-        ...     stomp.send({'destination':'/queue/foo',
-        ...                 'body':'Testing',
-        ...                 'persistent':'true'})
+
+            >>> for i in xrange(1,1000):
+            ...     stomp.send({'destination': '/queue/foo',
+            ...                 'body': 'Testing',
+            ...                 'persistent': 'true'})
         """
         self._is_connected()
         body = conf['body']
         frame = self.send_frame(frame)
         return frame
 
-    def subscribe(self,conf=None):
+    def subscribe(self, conf=None):
         """Subscribe to a given destination
 
         You will need to pass any headers your STOMP server likes.
         destination is *required*
 
         In the case of ActiveMQ, you could do this:
-        >>> stomp.subscribe({'destination':'/queue/foo',
-        ...                  'ack':'client'})
+
+            >>> stomp.subscribe({'destination':'/queue/foo',
+        ...                      'ack':'client'})
         """
         self._is_connected()
-        frame = self.frame.build_frame({'command':'SUBSCRIBE','headers':conf})
+        frame = self.frame.build_frame({'command': 'SUBSCRIBE',
+                                        'headers':conf})
         self.send_frame(frame)
         self.subscribed = conf.get('destination')
 
     def begin(self,conf=None):
-        """Subscribe to a given destination
+        """Begin transaction.
 
         You will need to pass any headers your STOMP server likes.
 
         destination is *required*
 
         In the case of ActiveMQ, you could do this:
-        >>> stomp.begin({'transaction':'<randomish_hash_like_thing>'})
+
+            >>> stomp.begin({'transaction':'<randomish_hash_like_thing>'})
         """
         self._is_connected()
-        frame = self.frame.build_frame({'command':'BEGIN','headers':conf})
+        frame = self.frame.build_frame({'command': 'BEGIN',
+                                        'headers': conf})
         self.send_frame(frame)
 
-    def commit(self,conf=None):
-        """Subscribe to a given destination
+    def commit(self, conf=None):
+        """Commit transaction.
 
         You will need to pass any headers your STOMP server likes.
 
-        destination is *required*
+        destination is **required**
 
         In the case of ActiveMQ, you could do this:
-        >>> stomp.commit({'transaction':'<randomish_hash_like_thing>'})
+
+            >>> stomp.commit({'transaction':'<randomish_hash_like_thing>'})
+
         """
         self._is_connected()
-        frame = self.frame.build_frame({'command':'COMMIT','headers':conf})
+        frame = self.frame.build_frame({'command': 'COMMIT',
+                                        'headers':conf})
         self.send_frame(frame)
 
-    def abort(self,conf=None):
-        """Subscribe to a given destination
-
-        You will need to pass any headers your STOMP server likes.
-
-        destination is *required*
+    def abort(self, conf=None):
+        """Abort transaction.
 
         In the case of ActiveMQ, you could do this:
-        >>> stomp.abort({'transaction':'<randomish_hash_like_thing>'})
+
+            >>> stomp.abort({'transaction':'<randomish_hash_like_thing>'})
+
         """
         self._is_connected()
-        frame = self.frame.build_frame({'command':'ABORT','headers':conf})
+        frame = self.frame.build_frame({'command': 'ABORT',
+                                        'headers': conf})
         self.send_frame(frame)
 
-    def unsubscribe(self,conf=None):
+    def unsubscribe(self, conf=None):
         """Unsubscribe from a given destination
 
         You will need to pass any headers your STOMP server likes.
         self._is_connected()
         if conf is None:
             conf = {}
-        frame = self.frame.build_frame({'command':'UNSUBSCRIBE','headers':conf})
+        frame = self.frame.build_frame({'command': 'UNSUBSCRIBE', 
+                                        'headers': conf})
         self.send_frame(frame)
         self.subscribed = None
 
-    def ack(self,frame):
+    def ack(self, frame):
         """Acknowledge receipt of a message
-        
-        Accepts a frame as an argument and it is *required*.
 
-        Given that you are already subscribed to a destination
-        and that the destination has messages for consumption
+        :param: A :class:`four.frame.Frame` instance.
 
-        >>> while True:
-        ...     frame = stomp.receive_frame()
-        ...     stomp.ack(frame)
+        Example
+
+            >>> while True:
+            ...     frame = stomp.receive_frame()
+            ...     stomp.ack(frame)
+
         """
         self._is_connected()
-        msgid = frame.headers.get('message-id')
-        thisframe = self.frame.build_frame({'command':'ACK','headers':{'message-id':msgid}})
-        self.send_frame(thisframe)
+        message_id = frame.headers.get('message-id')
+        self.send_action("ACK", message_id=message_id)
+
+    def send_action(self, command, **headers):
+        headers = dict((key.replace("_", "-"), value)
+                            for key, value in headers.items())
+        frame = self.frame.build_frame({"command": command,
+                                        "headers": headers or {}})
+        self.send_frame(frame)
 
     def receive_frame(self, nonblocking=False):
         """Get a frame from the STOMP server
         
         :keyword nonblocking: By default this function waits forever
             until there is a message to be received, however, in non-blocking
-            mode it returns ``None`` if there is no messages available.
+            mode it returns ``None`` if there is currently no message
+            available.
 
-        Given that you are already subscribed to a destination
-        and that the destination has messages for consumption
+        Note that you must be subscribed to one or more destinations.
+        Use :meth:`subscribe` to subscribe to a topic/queue.
 
-        >>> while True:
-        ...     frame = stomp.receive_frame()
-        ...     print frame.headers['message-id']
-        ...     stomp.ack(frame)
+        Example: Blocking
+
+            >>> while True:
+            ...     frame = stomp.receive_frame()
+            ...     print(frame.headers['message-id'])
+            ...     stomp.ack(frame)
+
+        Example: Non-blocking
+
+            >>> frame = stomp.recieve_frame(nonblocking=True)
+            >>> if frame:
+            ...     process_message(frame)
+            ... else:
+            ...     # no messages yet.
+
         """
         self._is_connected()
-        frame = self.frame.get_message(nb=nonblocking)
-        return frame
+        return self.frame.get_message(nb=nonblocking)
 
     def poll(self):
-        """See if there is a message to be fetched on the server, if there is
-        returns the frame."""
+        """Alias to :meth:`receive_frame` with ``nonblocking=True``."""
         return self.receive_frame(nonblocking=True)
 
     def send_frame(self, frame):
-        """Send frame to the STOMP server
+        """Send a custom frame to the STOMP server
 
-        Takes a frame object as argument
+        :param frame: A :class:`four.frame.Frame` instance.
 
-        You could build your own frame and bypass all that 
-        this library provides, if you wish
-       
-        >>> from stomp import Stomp, Frame
-        >>> stomp = Stomp('localhost',61613)
-        >>> frameobj = Frame()
-        >>> frame = frameobj.build_frame({'command':'DISCONNECT','headers':{}})
-        >>> stomp.send_frame(frame)
+        Example
+
+            >>> from four import Frame
+            >>> frame = Frame().build_frame({
+            ...    "command": "DISCONNECT", 
+            ...    "headers": {},
+            ... })
+            >>> stomp.send_frame(frame)
+
         """
         self._is_connected()
         frame = self.frame.send_frame(frame.as_string())
         return frame
+    
+    def _is_connected(self):
+        if not self.connected:
+            raise NotConnectedError("Not connected to STOMP server.")
+
+    def _get_subscribed(self):
+        return self._subscribed
+
+    def _set_subscribed(self,sub):
+        self._subscribed = sub
+
+    # XXX This is a problem, because we can be subscribed to more than
+    # one topic/queue at the same time.
+    subscribed = property(_get_subscribed, _set_subscribed, 
+                          "The queue or topic currently subscribed to")
+    
+    def _get_connected(self):
+        return self._connected
+    
+    def _set_connected(self,conn):
+        self._connected = conn
+
+    connected = property(_get_connected, _set_connected,
+                         "Connection status.")
+

File tests/test_stomp.py

         try:
             mystomp.send()
         except four.NotConnectedError, err:
-            assert str(err) == "'Not connected to STOMP server.'"
+            assert True # Should raise not connected
+            return
+        assert False # Should raise not connected
 
 class WhenSocketCantConnect(TestCase):
     def should_fail_connect(self):