Source

python-stomp / four / stomp.py

import socket
from four.frame import Frame
from functools import wraps


class NotConnectedError(Exception):
    """No longer connected to the STOMP server."""


class ConnectionError(socket.error):
    """Couldn't connect to the STOMP server."""


class ConnectionTimeoutError(socket.timeout):
    """Timed-out while establishing connection to the STOMP server."""


class Stomp(object):
    """STOMP Client.

    :param hostname: Hostname of the STOMP server to connect to.
    :param port: The port to use. (default ``61613``)

    """
    ConnectionError = ConnectionError
    ConnectionTimeoutError = ConnectionTimeoutError
    NotConnectedError = NotConnectedError

    def __init__(self, hostname, port=61613):
        self.host = hostname
        self.port = port
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._subscribed_to = {}
        self._subscribed = None
        self.connected = None
        self.frame = Frame()

    def connect(self):
        """Connect to STOMP server."""
        try:
            self.sock.connect((self.host, self.port))
            self.frame.connect(self.sock)
        except socket.error, exc:
            raise self.ConnectionError(*exc.args)
        except socket.timeout, exc:
            raise self.ConnectionTimeoutError(*exc.args)
        self.connected = True

    def disconnect(self, conf=None):
        """Disconnect from the server."""
        conf = conf or {}
        for destination in self._subscribed_to.keys():
            self.unsubscribe({"destination": destination})
        self._send_command("DISCONNECT", conf)
        self.sock.shutdown(0)
        self.connected = False

    def send(self, conf=None):
        """Send message to STOMP server

        You'll need to pass the body and any other headers your
        STOMP server likes.

        destination is **required**

        In the case of ActiveMQ with persistence, you could do this:

            >>> for i in xrange(1,1000):
            ...     stomp.send({'destination': '/queue/foo',
            ...                 'body': 'Testing',
            ...                 'persistent': 'true'})

        """
        body = conf['body']
        del conf['body']
        frame = self._send_command("SEND", conf, extra={"body": body},
                                   want_receipt=True)
        return frame

    def _build_frame(self, *args, **kwargs):
        self._connected_or_raise()
        return self.frame.build_frame(*args, **kwargs)

    def subscribe(self, conf=None):
        """Subscribe to a given destination

        You will need to pass any headers your STOMP server likes.

        destination is *required*

        In the case of ActiveMQ, you could do this:

            >>> stomp.subscribe({'destination':'/queue/foo',
        ...                      'ack':'client'})
        """
        destination = conf["destination"]
        self._send_command("SUBSCRIBE", conf)
        self._subscribed_to[destination] = True

    def begin(self, conf=None):
        """Begin transaction.

        You will need to pass any headers your STOMP server likes.

        destination is *required*

        In the case of ActiveMQ, you could do this:

            >>> stomp.begin({'transaction':'<randomish_hash_like_thing>'})
        """
        self._send_command("BEGIN", conf)

    def commit(self, conf=None):
        """Commit transaction.

        You will need to pass any headers your STOMP server likes.

        destination is **required**

        In the case of ActiveMQ, you could do this:

            >>> stomp.commit({'transaction':'<randomish_hash_like_thing>'})

        """
        self._send_command("COMMIT", conf)


    def abort(self, conf=None):
        """Abort transaction.

        In the case of ActiveMQ, you could do this:

            >>> stomp.abort({'transaction':'<randomish_hash_like_thing>'})

        """
        self._send_command("ABORT", conf)


    def unsubscribe(self, conf=None):
        """Unsubscribe from a given destination

        You will need to pass any headers your STOMP server likes.

        destination is *required*

        >>> stomp.unsubscribe({'destination':'/queue/foo'})
        """
        destination = conf["destination"]
        self._send_command("UNSUBSCRIBE", conf)
        self._subscribed_to.pop(destination, None)

    def ack(self, frame):
        """Acknowledge receipt of a message

        :param: A :class:`four.frame.Frame` instance.

        Example

            >>> while True:
            ...     frame = stomp.receive_frame()
            ...     stomp.ack(frame)

        """
        message_id = frame.headers.get('message-id')
        self._send_command("ACK", {"message-id": message_id})

    def receive_frame(self, nonblocking=False):
        """Get a frame from the STOMP server

        :keyword nonblocking: By default this function waits forever
            until there is a message to be received, however, in non-blocking
            mode it returns ``None`` if there is currently no message
            available.

        Note that you must be subscribed to one or more destinations.
        Use :meth:`subscribe` to subscribe to a topic/queue.

        Example: Blocking

            >>> while True:
            ...     frame = stomp.receive_frame()
            ...     print(frame.headers['message-id'])
            ...     stomp.ack(frame)

        Example: Non-blocking

            >>> frame = stomp.recieve_frame(nonblocking=True)
            >>> if frame:
            ...     process_message(frame)
            ... else:
            ...     # no messages yet.

        """
        self._connected_or_raise()
        return self.frame.get_message(nb=nonblocking)

    def poll(self):
        """Alias to :meth:`receive_frame` with ``nonblocking=True``."""
        return self.receive_frame(nonblocking=True)

    def send_frame(self, frame):
        """Send a custom frame to the STOMP server

        :param frame: A :class:`four.frame.Frame` instance.

        Example

            >>> from four import Frame
            >>> frame = Frame().build_frame({
            ...    "command": "DISCONNECT",
            ...    "headers": {},
            ... })
            >>> stomp.send_frame(frame)

        """
        self._connected_or_raise()
        frame = self.frame.send_frame(frame.as_string())
        return frame
    
    def _send_command(self, command, conf=None, extra=None, **kwargs):
        conf = conf or {}
        extra = extra or {}
        frame_conf = {"command": command, "headers": conf}
        frame_conf.update(extra)
        frame = self._build_frame(frame_conf, **kwargs)
        self.send_frame(frame)
        return frame

    def _connected_or_raise(self):
        if not self.connected:
            raise self.NotConnectedError("Not connected to STOMP server.")

    @property
    def subscribed(self):
        """**DEPRECATED** The queue or topic currently subscribed to."""
        as_list = self._subscribed_to.keys()
        if not as_list:
            return
        return as_list[0]
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.