Commits

Ask Solem  committed 5886754

Renamed from four to stompy

  • Participants
  • Parent commits fa049e0

Comments (0)

Files changed (22)

 ==========================================
- four (4/4) - Python STOMP client library
+ stompy - Python STOMP client library
 ==========================================
 
 This is useful for connecting to and communicating with
         <serverTransport uri="stomp://localhost:61613"/>
     </connector>
 
-See http://bitbucket.org/asksol/four/ for latest code.
+See http://bitbucket.org/asksol/python-stomp/ for latest code.
 
 .. _`ActiveMQ`: http://activemq.apache.org/
 .. _`Java Message Service`: http://java.sun.com/products/jms/

File docs/conf.py

 # is relative to the documentation root, use os.path.abspath to make it
 # absolute, like shown here.
 sys.path.insert(0, "../")
-from four import distmeta
+from stompy import distmeta
 
 # General configuration
 # ---------------------

File docs/index.rst

 ===============================================
- Four 4/4 - Python STOMP Client  Documentation
+ stompy - Python STOMP Client  Documentation
 ===============================================
 
 Contents:

File docs/introduction.rst

-../README
+==========================================
+ stompy - Python STOMP client library
+==========================================
+
+This is useful for connecting to and communicating with
+Apache `ActiveMQ`_ (an open source `Java Message Service`_ (JMS)
+message broker) or other brokers with support for the `STOMP`_ protocol.
+
+The majority of the methods available take a single argument; a dictionary.
+This dictionary should contain the necessary bits you need
+to pass to the `STOMP`_ server.  It is outlined in each method
+exactly what it needs to work.
+
+For specifics on the protocol, see the `STOMP protocol specification`_.
+
+This library is basically a Python implementation of Perl's `Net::Stomp`_.
+
+To enable the `ActiveMQ`_ Broker for `STOMP`_ add the following to the
+``activemq.xml`` configuration::
+
+    <connector>
+        <serverTransport uri="stomp://localhost:61613"/>
+    </connector>
+
+See http://bitbucket.org/asksol/python-stomp/ for latest code.
+
+.. _`ActiveMQ`: http://activemq.apache.org/
+.. _`Java Message Service`: http://java.sun.com/products/jms/
+.. _`STOMP`: http://stomp.codehaus.org/
+.. _`STOMP protocol specification`: http://stomp.codehaus.org/Protocol
+.. _`Net::Stomp`: http://search.cpan.org/perldoc?Net::Stomp

File docs/reference/four.frame.rst

-=======================================
- Frames and communication - four.frame
-=======================================
-
-.. currentmodule:: four.frame
-
-.. automodule:: four.frame
-    :members:
-
-

File docs/reference/four.stomp.rst

-=====================================
-Client - four.stomp
-=====================================
-
-.. currentmodule:: four.stomp
-
-.. automodule:: four.stomp
-    :members:
-
-

File docs/reference/index.rst

 .. toctree::
     :maxdepth: 2
   
-    four.stomp
-    four.frame 
+    stompy.stomp
+    stompy.frame 

File docs/reference/stompy.frame.rst

+=======================================
+ Frames and communication - stompy.frame
+=======================================
+
+.. currentmodule:: stompy.frame
+
+.. automodule:: stompy.frame
+    :members:
+
+

File docs/stompy.stomp.rst

+=====================================
+Client - stompy.stomp
+=====================================
+
+.. currentmodule:: stompy.stomp
+
+.. automodule:: stompy.stomp
+    :members:
+
+

File examples/stomp_example.py

 #!/usr/bin/env python
 import sys
 import time
-from four import Stomp
+from stompy import Stomp
 from optparse import OptionParser
 
 

File four/__init__.py

-from four.stomp import Stomp, NotConnectedError
-from four.distmeta import __version__
-from four.distmeta import __doc__, __author__, __contact__, __homepage__

File four/distmeta.py

-"""Implementation of the STOMP protocol in Python."""
-VERSION = (0, 1, 0)
-__version__ = ".".join(map(str, VERSION))
-__author__ = "Benjamin W. Smith"
-__contact__ = "benjaminwarfield@just-another.net"
-__homepage__ = "http://github.com/ask/celery/"
-__docformat__ = "restructuredtext"
-
-
-def is_stable_release():
-    return bool(not VERSION[1] % 2)
-
-
-def version_with_meta():
-    meta = "unstable"
-    if is_stable_release():
-        meta = "stable"
-    return "%s (%s)" % (__version__, meta)

File four/frame.py

-import socket
-import random
-from errno import EAGAIN
-from Queue import Queue
-from Queue import Empty as QueueEmpty
-
-
-class IntermediateMessageQueue(object):
-    """Internal message queue that holds messages received by the server.
-
-    This to make sure a message isn't received instead of a command response
-    after issuing a receipt request.
-
-    """
-
-    def __init__(self):
-        self._queue = Queue()
-
-    def put(self, frame):
-        """Put a new frame onto the message queue."""
-        if "destination" not in frame.headers:
-            return
-        self._queue.put(frame)
-
-    def get(self, frame, nb=False):
-        """Get a new frame from the message queue.
-        If no frame is available it try to get the next frame
-        from the socket.
-
-        :param frame: A :class:`Frame` instance.
-        :keyword nb: Non-blocking.
-
-        """
-        try:
-            return self._queue.get_nowait()
-        except QueueEmpty:
-            return frame.parse_frame(nb=nb)
-
-
-class Frame(object):
-    """Build and manage a STOMP Frame.
-
-    :keyword sock: An open socket to the STOMP server.
-
-    """
-
-    def __init__(self, sock=None):
-        self.command = None
-        self.headers = {}
-        self.body = None
-        self.session = None
-        self.my_name = socket.gethostbyname(socket.gethostname())
-        self.sock = sock
-        self.iqueue = IntermediateMessageQueue()
-        self.rqueue = Queue()
-
-    def connect(self, sock):
-        """Connect to the STOMP server and get the session id."""
-        self.sock = sock
-        frame = self.build_frame({"command": "CONNECT", "headers": {}})
-        self.send_frame(frame.as_string())
-
-        # Get session from the next reply from the server.
-        next_frame = self.get_reply()
-        self.session = next_frame.headers
-
-    def build_frame(self, args, want_receipt=False):
-        """Build a frame based on a :class:`dict` of arguments.
-
-        :param args: A :class:`dict` of arguments for the frame.
-
-        :keyword want_receipt: Optional argument to get a receipt from
-            the sever that the frame was received.
-
-        Example
-
-            >>> frame = frameobj.build_frame({"command": 'CONNECT',
-                                              "headers": {},
-                                              want_receipt=True)
-        """
-        self.command = args.get('command')
-        self.headers = args.get('headers')
-        self.body = args.get('body')
-        if want_receipt:
-            receipt_stamp = str(random.randint(0, 10000000))
-            self.headers["receipt"] = "%s-%s" % (
-                    self.session.get("session"), receipt_stamp)
-        return self
-
-    def as_string(self):
-        """Raw string representation of this frame
-        Suitable for passing over a socket to the STOMP server.
-
-        Example
-
-            >>> stomp.send(frameobj.as_string())
-
-        """
-        command = self.command
-        headers = self.headers
-        body = self.body
-        frame = "%s\n" % command
-        headers['x-client'] = self.my_name
-        bytes_message = False
-        if 'bytes_message' in headers:
-            bytes_message = True
-            del headers['bytes_message']
-            headers['content-length'] = len(body)
-
-        # Convert and append any existing headers to a string as the
-        # protocol describes.
-        headerparts = ("%s:%s\n" % (key, value)
-                            for key, value in headers.iteritems())
-        frame += "".join(headerparts)
-
-        # Finally append the body with the EOF marker.
-        frame += "\n%s\x00" % body
-
-        return frame
-
-    def get_message(self, nb=False):
-        """Get next message frame."""
-        while True:
-            frame = self.iqueue.get(self, nb=nb)
-            if not frame and nb:
-                return None
-            if frame.command == "MESSAGE":
-                return frame
-            else:
-                self.rqueue.put(frame)
-
-    def get_reply(self, nb=False):
-        """Get command reply frame."""
-        while True:
-            try:
-                return self.rqueue.get_nowait()
-            except QueueEmpty:
-                frame = self.parse_frame(nb=nb)
-                if not frame and nb:
-                    return None
-                if frame.command == "MESSAGE":
-                    self.iqueue.put(frame)
-                else:
-                    self.rqueue.put(frame)
-
-    def parse_frame(self, nb=False):
-        """Parse data from socket
-
-        :keyword nb: Non-blocking: If this is set and there is no
-            messages currently waiting, this functions returns ``None``
-            instead of waiting for more data.
-
-        Example
-
-            >>> frameobj.parse_frame()
-
-        """
-        command = None
-        body = None
-        headers = {}
-
-        while True:
-            line = self._getline(nb=nb)
-            if not line:
-                return
-
-            command = self.parse_command(line)
-            line = line[len(command)+1:]
-            headers_str, body = line.split('\n\n')
-            headers = self.parse_headers(headers_str)
-
-            if 'content-length' in headers:
-                headers['bytes_message'] = True
-            break
-
-        frame = Frame(self.sock)
-        frame = frame.build_frame({'command': command,
-                                   'headers': headers,
-                                   'body': body})
-        return frame
-
-    def parse_command(self, str):
-        """Parse command received from the server."""
-        command = str.split('\n', 1)[0]
-        return command
-
-    def parse_headers(self, str):
-        """Parse headers received from the servers and convert
-        to a :class:`dict`."""
-        headers = {}
-        for line in str.split('\n'):
-            key, value = line.split(':', 1)
-            headers[key] = value
-        return headers
-
-    def send_frame(self, frame):
-        """Send frame to server, get receipt if needed."""
-        self.sock.sendall(frame)
-
-        if 'receipt' in self.headers:
-            return self.get_reply()
-
-    def _getline(self, nb=False):
-        """Get a single line from socket
-
-        :keyword nb: Non-blocking: If this is set, and there is no
-            messages to receive, this function returns ``None``.
-
-        """
-        self.sock.setblocking(not nb)
-        try:
-            buffer = ''
-            partial = ''
-            while not buffer.endswith('\x00\n'):
-                try:
-                    partial = self.sock.recv(1)
-                except socket.error, exc:
-                    if exc.errno == EAGAIN:
-                        if not buffer:
-                            return None
-                        continue
-                buffer += partial
-        finally:
-            self.sock.setblocking(nb)
-        return buffer[:-2]

File four/stomp.py

-import socket
-from four.frame import Frame
-from functools import wraps
-
-
-class NotConnectedError(Exception):
-    """No longer connected to the STOMP server."""
-
-
-class ConnectionError(socket.error):
-    """Couldn't connect to the STOMP server."""
-
-
-class ConnectionTimeoutError(socket.timeout):
-    """Timed-out while establishing connection to the STOMP server."""
-
-
-class Stomp(object):
-    """STOMP Client.
-
-    :param hostname: Hostname of the STOMP server to connect to.
-    :param port: The port to use. (default ``61613``)
-
-    """
-    ConnectionError = ConnectionError
-    ConnectionTimeoutError = ConnectionTimeoutError
-    NotConnectedError = NotConnectedError
-
-    def __init__(self, hostname, port=61613):
-        self.host = hostname
-        self.port = port
-        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self._subscribed_to = {}
-        self._subscribed = None
-        self.connected = None
-        self.frame = Frame()
-
-    def connect(self):
-        """Connect to STOMP server."""
-        try:
-            self.sock.connect((self.host, self.port))
-            self.frame.connect(self.sock)
-        except socket.error, exc:
-            raise self.ConnectionError(*exc.args)
-        except socket.timeout, exc:
-            raise self.ConnectionTimeoutError(*exc.args)
-        self.connected = True
-
-    def disconnect(self, conf=None):
-        """Disconnect from the server."""
-        conf = conf or {}
-        for destination in self._subscribed_to.keys():
-            self.unsubscribe({"destination": destination})
-        self._send_command("DISCONNECT", conf)
-        self.sock.shutdown(0)
-        self.connected = False
-
-    def send(self, conf=None):
-        """Send message to STOMP server
-
-        You'll need to pass the body and any other headers your
-        STOMP server likes.
-
-        destination is **required**
-
-        In the case of ActiveMQ with persistence, you could do this:
-
-            >>> for i in xrange(1,1000):
-            ...     stomp.send({'destination': '/queue/foo',
-            ...                 'body': 'Testing',
-            ...                 'persistent': 'true'})
-
-        """
-        body = conf['body']
-        del conf['body']
-        frame = self._send_command("SEND", conf, extra={"body": body},
-                                   want_receipt=True)
-        return frame
-
-    def _build_frame(self, *args, **kwargs):
-        self._connected_or_raise()
-        return self.frame.build_frame(*args, **kwargs)
-
-    def subscribe(self, conf=None):
-        """Subscribe to a given destination
-
-        You will need to pass any headers your STOMP server likes.
-
-        destination is *required*
-
-        In the case of ActiveMQ, you could do this:
-
-            >>> stomp.subscribe({'destination':'/queue/foo',
-        ...                      'ack':'client'})
-        """
-        destination = conf["destination"]
-        self._send_command("SUBSCRIBE", conf)
-        self._subscribed_to[destination] = True
-
-    def begin(self, conf=None):
-        """Begin transaction.
-
-        You will need to pass any headers your STOMP server likes.
-
-        destination is *required*
-
-        In the case of ActiveMQ, you could do this:
-
-            >>> stomp.begin({'transaction':'<randomish_hash_like_thing>'})
-        """
-        self._send_command("BEGIN", conf)
-
-    def commit(self, conf=None):
-        """Commit transaction.
-
-        You will need to pass any headers your STOMP server likes.
-
-        destination is **required**
-
-        In the case of ActiveMQ, you could do this:
-
-            >>> stomp.commit({'transaction':'<randomish_hash_like_thing>'})
-
-        """
-        self._send_command("COMMIT", conf)
-
-
-    def abort(self, conf=None):
-        """Abort transaction.
-
-        In the case of ActiveMQ, you could do this:
-
-            >>> stomp.abort({'transaction':'<randomish_hash_like_thing>'})
-
-        """
-        self._send_command("ABORT", conf)
-
-
-    def unsubscribe(self, conf=None):
-        """Unsubscribe from a given destination
-
-        You will need to pass any headers your STOMP server likes.
-
-        destination is *required*
-
-        >>> stomp.unsubscribe({'destination':'/queue/foo'})
-        """
-        destination = conf["destination"]
-        self._send_command("UNSUBSCRIBE", conf)
-        self._subscribed_to.pop(destination, None)
-
-    def ack(self, frame):
-        """Acknowledge receipt of a message
-
-        :param: A :class:`four.frame.Frame` instance.
-
-        Example
-
-            >>> while True:
-            ...     frame = stomp.receive_frame()
-            ...     stomp.ack(frame)
-
-        """
-        message_id = frame.headers.get('message-id')
-        self._send_command("ACK", {"message-id": message_id})
-
-    def receive_frame(self, nonblocking=False):
-        """Get a frame from the STOMP server
-
-        :keyword nonblocking: By default this function waits forever
-            until there is a message to be received, however, in non-blocking
-            mode it returns ``None`` if there is currently no message
-            available.
-
-        Note that you must be subscribed to one or more destinations.
-        Use :meth:`subscribe` to subscribe to a topic/queue.
-
-        Example: Blocking
-
-            >>> while True:
-            ...     frame = stomp.receive_frame()
-            ...     print(frame.headers['message-id'])
-            ...     stomp.ack(frame)
-
-        Example: Non-blocking
-
-            >>> frame = stomp.recieve_frame(nonblocking=True)
-            >>> if frame:
-            ...     process_message(frame)
-            ... else:
-            ...     # no messages yet.
-
-        """
-        self._connected_or_raise()
-        return self.frame.get_message(nb=nonblocking)
-
-    def poll(self):
-        """Alias to :meth:`receive_frame` with ``nonblocking=True``."""
-        return self.receive_frame(nonblocking=True)
-
-    def send_frame(self, frame):
-        """Send a custom frame to the STOMP server
-
-        :param frame: A :class:`four.frame.Frame` instance.
-
-        Example
-
-            >>> from four import Frame
-            >>> frame = Frame().build_frame({
-            ...    "command": "DISCONNECT",
-            ...    "headers": {},
-            ... })
-            >>> stomp.send_frame(frame)
-
-        """
-        self._connected_or_raise()
-        frame = self.frame.send_frame(frame.as_string())
-        return frame
-    
-    def _send_command(self, command, conf=None, extra=None, **kwargs):
-        conf = conf or {}
-        extra = extra or {}
-        frame_conf = {"command": command, "headers": conf}
-        frame_conf.update(extra)
-        frame = self._build_frame(frame_conf, **kwargs)
-        self.send_frame(frame)
-        return frame
-
-    def _connected_or_raise(self):
-        if not self.connected:
-            raise self.NotConnectedError("Not connected to STOMP server.")
-
-    @property
-    def subscribed(self):
-        """**DEPRECATED** The queue or topic currently subscribed to."""
-        as_list = self._subscribed_to.keys()
-        if not as_list:
-            return
-        return as_list[0]
 detailed-errors = 1
 with-coverage = 1
 match = ((?:^|[b_.-])(:?[Tt]est|When|should))
-cover-package = four
+cover-package = stompy
 cover-erase = True
 from setuptools import setup, find_packages
-from four import distmeta
+from stompy import distmeta
 
-setup(name='four',
+setup(name='stompy',
       version=distmeta.__version__,
       description=distmeta.__doc__,
       author=distmeta.__author__,
       author_email=distmeta.__contact__,
-      packages = ['four'],
+      packages = ['stompy'],
       license='BSD',
       url=distmeta.__homepage__,
       keywords='stomp activemq jms messaging',

File stompy/__init__.py

+from stompy.stomp import Stomp, NotConnectedError
+from stompy.distmeta import __version__
+from stompy.distmeta import __doc__, __author__, __contact__, __homepage__

File stompy/distmeta.py

+"""Implementation of the STOMP protocol in Python."""
+VERSION = (0, 1, 0)
+__version__ = ".".join(map(str, VERSION))
+__author__ = "Benjamin W. Smith"
+__contact__ = "benjaminwarfield@just-another.net"
+__homepage__ = "http://github.com/ask/celery/"
+__docformat__ = "restructuredtext"
+
+
+def is_stable_release():
+    return bool(not VERSION[1] % 2)
+
+
+def version_with_meta():
+    meta = "unstable"
+    if is_stable_release():
+        meta = "stable"
+    return "%s (%s)" % (__version__, meta)

File stompy/frame.py

+import socket
+import random
+from errno import EAGAIN
+from Queue import Queue
+from Queue import Empty as QueueEmpty
+
+
+class IntermediateMessageQueue(object):
+    """Internal message queue that holds messages received by the server.
+
+    This to make sure a message isn't received instead of a command response
+    after issuing a receipt request.
+
+    """
+
+    def __init__(self):
+        self._queue = Queue()
+
+    def put(self, frame):
+        """Put a new frame onto the message queue."""
+        if "destination" not in frame.headers:
+            return
+        self._queue.put(frame)
+
+    def get(self, frame, nb=False):
+        """Get a new frame from the message queue.
+        If no frame is available it try to get the next frame
+        from the socket.
+
+        :param frame: A :class:`Frame` instance.
+        :keyword nb: Non-blocking.
+
+        """
+        try:
+            return self._queue.get_nowait()
+        except QueueEmpty:
+            return frame.parse_frame(nb=nb)
+
+
+class Frame(object):
+    """Build and manage a STOMP Frame.
+
+    :keyword sock: An open socket to the STOMP server.
+
+    """
+
+    def __init__(self, sock=None):
+        self.command = None
+        self.headers = {}
+        self.body = None
+        self.session = None
+        self.my_name = socket.gethostbyname(socket.gethostname())
+        self.sock = sock
+        self.iqueue = IntermediateMessageQueue()
+        self.rqueue = Queue()
+
+    def connect(self, sock):
+        """Connect to the STOMP server and get the session id."""
+        self.sock = sock
+        frame = self.build_frame({"command": "CONNECT", "headers": {}})
+        self.send_frame(frame.as_string())
+
+        # Get session from the next reply from the server.
+        next_frame = self.get_reply()
+        self.session = next_frame.headers
+
+    def build_frame(self, args, want_receipt=False):
+        """Build a frame based on a :class:`dict` of arguments.
+
+        :param args: A :class:`dict` of arguments for the frame.
+
+        :keyword want_receipt: Optional argument to get a receipt from
+            the sever that the frame was received.
+
+        Example
+
+            >>> frame = frameobj.build_frame({"command": 'CONNECT',
+                                              "headers": {},
+                                              want_receipt=True)
+        """
+        self.command = args.get('command')
+        self.headers = args.get('headers')
+        self.body = args.get('body')
+        if want_receipt:
+            receipt_stamp = str(random.randint(0, 10000000))
+            self.headers["receipt"] = "%s-%s" % (
+                    self.session.get("session"), receipt_stamp)
+        return self
+
+    def as_string(self):
+        """Raw string representation of this frame
+        Suitable for passing over a socket to the STOMP server.
+
+        Example
+
+            >>> stomp.send(frameobj.as_string())
+
+        """
+        command = self.command
+        headers = self.headers
+        body = self.body
+        frame = "%s\n" % command
+        headers['x-client'] = self.my_name
+        bytes_message = False
+        if 'bytes_message' in headers:
+            bytes_message = True
+            del headers['bytes_message']
+            headers['content-length'] = len(body)
+
+        # Convert and append any existing headers to a string as the
+        # protocol describes.
+        headerparts = ("%s:%s\n" % (key, value)
+                            for key, value in headers.iteritems())
+        frame += "".join(headerparts)
+
+        # Finally append the body with the EOF marker.
+        frame += "\n%s\x00" % body
+
+        return frame
+
+    def get_message(self, nb=False):
+        """Get next message frame."""
+        while True:
+            frame = self.iqueue.get(self, nb=nb)
+            if not frame and nb:
+                return None
+            if frame.command == "MESSAGE":
+                return frame
+            else:
+                self.rqueue.put(frame)
+
+    def get_reply(self, nb=False):
+        """Get command reply frame."""
+        while True:
+            try:
+                return self.rqueue.get_nowait()
+            except QueueEmpty:
+                frame = self.parse_frame(nb=nb)
+                if not frame and nb:
+                    return None
+                if frame.command == "MESSAGE":
+                    self.iqueue.put(frame)
+                else:
+                    self.rqueue.put(frame)
+
+    def parse_frame(self, nb=False):
+        """Parse data from socket
+
+        :keyword nb: Non-blocking: If this is set and there is no
+            messages currently waiting, this functions returns ``None``
+            instead of waiting for more data.
+
+        Example
+
+            >>> frameobj.parse_frame()
+
+        """
+        command = None
+        body = None
+        headers = {}
+
+        while True:
+            line = self._getline(nb=nb)
+            if not line:
+                return
+
+            command = self.parse_command(line)
+            line = line[len(command)+1:]
+            headers_str, body = line.split('\n\n')
+            headers = self.parse_headers(headers_str)
+
+            if 'content-length' in headers:
+                headers['bytes_message'] = True
+            break
+
+        frame = Frame(self.sock)
+        frame = frame.build_frame({'command': command,
+                                   'headers': headers,
+                                   'body': body})
+        return frame
+
+    def parse_command(self, str):
+        """Parse command received from the server."""
+        command = str.split('\n', 1)[0]
+        return command
+
+    def parse_headers(self, str):
+        """Parse headers received from the servers and convert
+        to a :class:`dict`."""
+        headers = {}
+        for line in str.split('\n'):
+            key, value = line.split(':', 1)
+            headers[key] = value
+        return headers
+
+    def send_frame(self, frame):
+        """Send frame to server, get receipt if needed."""
+        self.sock.sendall(frame)
+
+        if 'receipt' in self.headers:
+            return self.get_reply()
+
+    def _getline(self, nb=False):
+        """Get a single line from socket
+
+        :keyword nb: Non-blocking: If this is set, and there is no
+            messages to receive, this function returns ``None``.
+
+        """
+        self.sock.setblocking(not nb)
+        try:
+            buffer = ''
+            partial = ''
+            while not buffer.endswith('\x00\n'):
+                try:
+                    partial = self.sock.recv(1)
+                except socket.error, exc:
+                    if exc.errno == EAGAIN:
+                        if not buffer:
+                            return None
+                        continue
+                buffer += partial
+        finally:
+            self.sock.setblocking(nb)
+        return buffer[:-2]

File stompy/stomp.py

+import socket
+from stompy.frame import Frame
+from functools import wraps
+
+
+class NotConnectedError(Exception):
+    """No longer connected to the STOMP server."""
+
+
+class ConnectionError(socket.error):
+    """Couldn't connect to the STOMP server."""
+
+
+class ConnectionTimeoutError(socket.timeout):
+    """Timed-out while establishing connection to the STOMP server."""
+
+
+class Stomp(object):
+    """STOMP Client.
+
+    :param hostname: Hostname of the STOMP server to connect to.
+    :param port: The port to use. (default ``61613``)
+
+    """
+    ConnectionError = ConnectionError
+    ConnectionTimeoutError = ConnectionTimeoutError
+    NotConnectedError = NotConnectedError
+
+    def __init__(self, hostname, port=61613):
+        self.host = hostname
+        self.port = port
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self._subscribed_to = {}
+        self._subscribed = None
+        self.connected = None
+        self.frame = Frame()
+
+    def connect(self):
+        """Connect to STOMP server."""
+        try:
+            self.sock.connect((self.host, self.port))
+            self.frame.connect(self.sock)
+        except socket.error, exc:
+            raise self.ConnectionError(*exc.args)
+        except socket.timeout, exc:
+            raise self.ConnectionTimeoutError(*exc.args)
+        self.connected = True
+
+    def disconnect(self, conf=None):
+        """Disconnect from the server."""
+        conf = conf or {}
+        for destination in self._subscribed_to.keys():
+            self.unsubscribe({"destination": destination})
+        self._send_command("DISCONNECT", conf)
+        self.sock.shutdown(0)
+        self.connected = False
+
+    def send(self, conf=None):
+        """Send message to STOMP server
+
+        You'll need to pass the body and any other headers your
+        STOMP server likes.
+
+        destination is **required**
+
+        In the case of ActiveMQ with persistence, you could do this:
+
+            >>> for i in xrange(1,1000):
+            ...     stomp.send({'destination': '/queue/foo',
+            ...                 'body': 'Testing',
+            ...                 'persistent': 'true'})
+
+        """
+        body = conf['body']
+        del conf['body']
+        frame = self._send_command("SEND", conf, extra={"body": body},
+                                   want_receipt=True)
+        return frame
+
+    def _build_frame(self, *args, **kwargs):
+        self._connected_or_raise()
+        return self.frame.build_frame(*args, **kwargs)
+
+    def subscribe(self, conf=None):
+        """Subscribe to a given destination
+
+        You will need to pass any headers your STOMP server likes.
+
+        destination is *required*
+
+        In the case of ActiveMQ, you could do this:
+
+            >>> stomp.subscribe({'destination':'/queue/foo',
+        ...                      'ack':'client'})
+        """
+        destination = conf["destination"]
+        self._send_command("SUBSCRIBE", conf)
+        self._subscribed_to[destination] = True
+
+    def begin(self, conf=None):
+        """Begin transaction.
+
+        You will need to pass any headers your STOMP server likes.
+
+        destination is *required*
+
+        In the case of ActiveMQ, you could do this:
+
+            >>> stomp.begin({'transaction':'<randomish_hash_like_thing>'})
+        """
+        self._send_command("BEGIN", conf)
+
+    def commit(self, conf=None):
+        """Commit transaction.
+
+        You will need to pass any headers your STOMP server likes.
+
+        destination is **required**
+
+        In the case of ActiveMQ, you could do this:
+
+            >>> stomp.commit({'transaction':'<randomish_hash_like_thing>'})
+
+        """
+        self._send_command("COMMIT", conf)
+
+
+    def abort(self, conf=None):
+        """Abort transaction.
+
+        In the case of ActiveMQ, you could do this:
+
+            >>> stomp.abort({'transaction':'<randomish_hash_like_thing>'})
+
+        """
+        self._send_command("ABORT", conf)
+
+
+    def unsubscribe(self, conf=None):
+        """Unsubscribe from a given destination
+
+        You will need to pass any headers your STOMP server likes.
+
+        destination is *required*
+
+        >>> stomp.unsubscribe({'destination':'/queue/foo'})
+        """
+        destination = conf["destination"]
+        self._send_command("UNSUBSCRIBE", conf)
+        self._subscribed_to.pop(destination, None)
+
+    def ack(self, frame):
+        """Acknowledge receipt of a message
+
+        :param: A :class:`stompy.frame.Frame` instance.
+
+        Example
+
+            >>> while True:
+            ...     frame = stomp.receive_frame()
+            ...     stomp.ack(frame)
+
+        """
+        message_id = frame.headers.get('message-id')
+        self._send_command("ACK", {"message-id": message_id})
+
+    def receive_frame(self, nonblocking=False):
+        """Get a frame from the STOMP server
+
+        :keyword nonblocking: By default this function waits forever
+            until there is a message to be received, however, in non-blocking
+            mode it returns ``None`` if there is currently no message
+            available.
+
+        Note that you must be subscribed to one or more destinations.
+        Use :meth:`subscribe` to subscribe to a topic/queue.
+
+        Example: Blocking
+
+            >>> while True:
+            ...     frame = stomp.receive_frame()
+            ...     print(frame.headers['message-id'])
+            ...     stomp.ack(frame)
+
+        Example: Non-blocking
+
+            >>> frame = stomp.recieve_frame(nonblocking=True)
+            >>> if frame:
+            ...     process_message(frame)
+            ... else:
+            ...     # no messages yet.
+
+        """
+        self._connected_or_raise()
+        return self.frame.get_message(nb=nonblocking)
+
+    def poll(self):
+        """Alias to :meth:`receive_frame` with ``nonblocking=True``."""
+        return self.receive_frame(nonblocking=True)
+
+    def send_frame(self, frame):
+        """Send a custom frame to the STOMP server
+
+        :param frame: A :class:`stompy.frame.Frame` instance.
+
+        Example
+
+            >>> from stompy import Frame
+            >>> frame = Frame().build_frame({
+            ...    "command": "DISCONNECT",
+            ...    "headers": {},
+            ... })
+            >>> stomp.send_frame(frame)
+
+        """
+        self._connected_or_raise()
+        frame = self.frame.send_frame(frame.as_string())
+        return frame
+    
+    def _send_command(self, command, conf=None, extra=None, **kwargs):
+        conf = conf or {}
+        extra = extra or {}
+        frame_conf = {"command": command, "headers": conf}
+        frame_conf.update(extra)
+        frame = self._build_frame(frame_conf, **kwargs)
+        self.send_frame(frame)
+        return frame
+
+    def _connected_or_raise(self):
+        if not self.connected:
+            raise self.NotConnectedError("Not connected to STOMP server.")
+
+    @property
+    def subscribed(self):
+        """**DEPRECATED** The queue or topic currently subscribed to."""
+        as_list = self._subscribed_to.keys()
+        if not as_list:
+            return
+        return as_list[0]

File tests/test_frame.py

 from unittest import TestCase
 import sys
 import socket
-from four import frame
-from four.frame import Frame
+from stompy import frame
+from stompy.frame import Frame
 
 
 class WhenSettingUp(DingusTestCase(Frame)):

File tests/test_stomp.py

 from dingus import Dingus, DingusTestCase, DontCare
 from unittest import TestCase
 import sys
-import four
-from four import Stomp
+import stompy
+from stompy import Stomp
 
 
 class WhenConnecting(DingusTestCase(Stomp)):
         self.stomp.connected = True
 
     def should_set_socket_opts(self):
-        assert four.stomp.socket.calls('socket', DontCare, DontCare)
+        assert stompy.stomp.socket.calls('socket', DontCare, DontCare)
 
     def should_connect(self):
         self.stomp.connect()
 
     def should_fail_to_send(self):
         mystomp = Stomp('localhost', 99999)
-        self.failUnlessRaises(four.NotConnectedError, mystomp.send,
+        self.failUnlessRaises(stompy.NotConnectedError, mystomp.send,
                               {"body": "f"})
 
     def should_raise_nc(self):
         mystomp = Stomp('localhost', 99999)
         try:
             mystomp.send({"body": "Vandelay Industries"})
-        except four.NotConnectedError, err:
+        except stompy.NotConnectedError, err:
             assert True # Should raise not connected
             return
         assert False # Should raise not connected