Commits

Ask Solem  committed e7c5a60

Added idea for a new shortcut API with automatic transaction handling.

  • Participants
  • Parent commits 1d0966b

Comments (0)

Files changed (3)

File stompy/__init__.py

 from stompy.stomp import Stomp, NotConnectedError
+from stompy.simple import Client
 from stompy.distmeta import __version__
 from stompy.distmeta import __doc__, __author__, __contact__, __homepage__

File stompy/frame.py

         command = self.parse_command(line)
         line = line[len(command)+1:]
         headers_str, _, body = line.partition("\n\n")
-        if not headers_str or not body:
+        if not headers_str:
             raise UnknownBrokerResponseError(
-                    "Received: (%s)\nTraceback:\n%s" % line)
+                    "Received: (%s)" % line)
         headers = self.parse_headers(headers_str)
 
         if 'content-length' in headers:

File stompy/simple.py

+from stompy.stomp import Stomp
+from Queue import Empty
+from uuid import uuid4
+
+
+class TransactionError(Exception):
+    """Transaction related error."""
+
+
+class Client(object):
+    """Simple STOMP client.
+
+    :keyword host: Hostname of the server to connect to (default:
+        ``localhost``)
+    :keyword port: Port of the server to connect to (default: ``61613``)
+
+    Example
+
+        >>> from stompy.simple import Client
+        >>> stomp = Client()
+        >>> stomp.connect()
+        >>> stomp.send("The quick brown fox...", destination="/queue/test")
+        >>> stomp.subscribe("/queue/test")
+        >>> message = stomp.get_nowait()
+        >>> message.body
+        'The quick brown fox...'
+        >>> stomp.ack(message)
+        >>> stomp.unsubscribe("/queue/test")
+        >>> stomp.disconnect()
+
+    """
+    Empty = Empty
+
+    def __init__(self, host="localhost", port=61613):
+        self.stomp = Stomp(host, port)
+        self._current_transaction = None
+
+    def get(self, block=True):
+        """Get message.
+
+        :keyword block: Block if necessary until an item is available.
+            If this is ``False``, return an item if one is immediately
+            available, else raise the :exc:`Empty` exception.
+
+        :raises Empty: If ``block`` is off and no message was receied.
+
+        """
+        frame = self.stomp.receive_frame(nonblocking=not block)
+        if frame is None and not block:
+            raise self.Empty()
+        return frame
+    
+    def get_nowait(self):
+        """Remove and return an item from the queue without blocking.
+    
+        Only get an item if one is immediately available. Otherwise
+        raise the :exc:`Empty` exception.
+
+        See :meth:`get`.
+
+        """
+        return self.get(block=False)
+
+    def put(self, item, destination, persistent=True, conf=None):
+        """Put an item into the queue.
+
+        :param item: Body of the message.
+        :param destination: Destination queue.
+        :keyword persistent: Is message persistent? (store on disk).
+        :keyword conf: Extra headers to send to the broker.
+
+        :returns: The resulting :class:`stompy.frame.Frame` instance.
+
+        """
+        conf = self._make_conf(conf, body=item, destination=destination,
+                               persistent=persistent)
+                  
+        return self.stomp.send(conf)
+    
+    def connect(self):
+        """Connect to the broker.
+
+        :raises :exc:`stompy.stomp.ConnectionError`: if the connection
+            was unsuccessful.
+        :raises :exc:`stompy.stomp.ConnectionTimeoutError`: if the connection
+            timed out.
+
+        """
+        self.stomp.connect()
+
+    def disconnect(self):
+        """Disconnect from the broker."""
+        self.stomp.disconnect()
+
+    def subscribe(self, destination, ack="auto", conf=None):
+        """Subscribe to topic/queue.
+
+        :param destination: The destination queue/topic to subscribe to.
+        :keyword ack: How to handle acknowledgment, either
+            ``auto`` - ack is handled by the server automatically, or
+            ``client`` - ack is handled by the client itself by calling
+            :meth:`ack`.
+        :keyword conf: Additional headers to send with the subscribe request.
+
+        """
+        conf = self._make_conf(conf, destination=destination, ack=ack)
+        return self.stomp.subscribe(conf)
+
+    def unsubscribe(self, destination, conf=None):
+        """Unsubscribe from topic/queue previously subscribed to.
+       
+        :param destination: The destination queue/topic to unsubscribe from.
+        :keyword conf: Additional headers to send with the unsubscribe
+            request.
+
+        """
+        conf = self._make_conf(conf, destination=destination)
+        return self.stomp.unsubscribe(conf)
+    
+    def begin(self, transaction):
+        """Begin transaction.
+       
+        Every :meth:`ack` and :meth:`send` will be affected by this
+        transaction and won't be real until a :meth:`commit` is issued.
+        To roll-back any changes since the transaction started use
+        :meth:`abort`.
+
+        """
+        if self._current_transaction:
+            raise TransactionError(
+                "Already in transaction. Please commit or abort first!")
+        self._current_transaction = str(uuid4())
+        return self.stomp.begin({"transaction": self._current_transaction})
+
+    def commit(self, transaction):
+        """Commit current transaction."""
+        if not self._current_transaction:
+            raise TransactionError("Not in transaction")
+        self.stomp.commit({"transaction": self._current_transaction})
+        self._current_transaction = None
+    
+
+    def abort(self):
+        """Roll-back current transaction."""
+        if not self._current_transaction:
+            raise TransactionError("Not in transaction")
+        self.stomp.commit({"transaction": self._current_transaction})
+        self._current_transaction = None
+
+    def ack(self, message):
+        """Acknowledge message.
+        
+        :param message: The message to acknowledge.
+        
+        """
+        return self.stomp.ack(frame)
+
+    def _make_conf(self, conf, **kwargs):
+        kwargs.update(dict(conf or {}))
+        if self._current_transaction:
+            conf["transaction"] = self._current_transaction
+        return kwargs