asksol / python-stomp

fork of python-stomp

stompy is a fork of the STOMP client python-stomp, with support for non-blocking, polling and multiple subscriptions per connection.

Clone this repository (size: 101.6 KB): HTTPS / SSH
$ hg clone http://bitbucket.org/asksol/python-stomp/
commit 60: 17e1a51966bd
parent 59: 3a5c85ad6ef6
parent 57: 4c35356b4980
branch: default
Merge in changes from http://bitbucket.org/asksol/python-stomp/ and http://bitbucket.org/crankycoder/python-stomp/ Thanks!
bsm...@localhost
7 months ago

Changed (Δ13.9 KB):

raw changeset »

stomp/__init__.py

stomp/frame.py

stomp/stomp.py

stompy/frame.py (224 lines added, 0 lines removed)

stompy/stomp.py (241 lines added, 0 lines removed)

Up to file-list stompy/frame.py:

1
import socket
2
import random
3
from errno import EAGAIN
4
from Queue import Queue
5
from Queue import Empty as QueueEmpty
6
7
8
class UnknownBrokerResponseError(Exception):
9
    """An unexpected response was received from the broker."""
10
11
12
class IntermediateMessageQueue(object):
13
    """Internal message queue that holds messages received by the server.
14
15
    This to make sure a message isn't received instead of a command response
16
    after issuing a receipt request.
17
18
    """
19
20
    def __init__(self):
21
        self._queue = Queue()
22
23
    def put(self, frame):
24
        """Put a new frame onto the message queue."""
25
        if "destination" not in frame.headers:
26
            return
27
        self._queue.put(frame)
28
29
    def get(self, frame, nb=False):
30
        """Get a new frame from the message queue.
31
        If no frame is available it try to get the next frame
32
        from the socket.
33
34
        :param frame: A :class:`Frame` instance.
35
        :keyword nb: Non-blocking.
36
37
        """
38
        try:
39
            return self._queue.get_nowait()
40
        except QueueEmpty:
41
            return frame.parse_frame(nb=nb)
42
43
44
class Frame(object):
45
    """Build and manage a STOMP Frame.
46
47
    :keyword sock: An open socket to the STOMP server.
48
49
    """
50
    def __init__(self, sock=None):
51
        self.command = None
52
        self.headers = {}
53
        self.body = None
54
        self.session = None
55
        self.my_name = socket.gethostbyname(socket.gethostname())
56
        self.sock = sock
57
        self.iqueue = IntermediateMessageQueue()
58
        self.rqueue = Queue()
59
60
    def connect(self, sock):
61
        """Connect to the STOMP server and get the session id."""
62
        self.sock = sock
63
        frame = self.build_frame({"command": "CONNECT", "headers": {}})
64
        self.send_frame(frame.as_string())
65
66
        # Get session from the next reply from the server.
67
        next_frame = self.get_reply()
68
        self.session = next_frame.headers
69
70
    def build_frame(self, args, want_receipt=False):
71
        """Build a frame based on a :class:`dict` of arguments.
72
73
        :param args: A :class:`dict` of arguments for the frame.
74
75
        :keyword want_receipt: Optional argument to get a receipt from
76
            the sever that the frame was received.
77
78
        Example
79
80
            >>> frame = frameobj.build_frame({"command": 'CONNECT',
81
                                              "headers": {},
82
                                              want_receipt=True)
83
        """
84
        self.command = args.get('command')
85
        self.headers = args.get('headers')
86
        self.body = args.get('body')
87
        if want_receipt:
88
            receipt_stamp = str(random.randint(0, 10000000))
89
            self.headers["receipt"] = "%s-%s" % (
90
                    self.session.get("session"), receipt_stamp)
91
        return self
92
93
    def as_string(self):
94
        """Raw string representation of this frame
95
        Suitable for passing over a socket to the STOMP server.
96
97
        Example
98
99
            >>> stomp.send(frameobj.as_string())
100
101
        """
102
        command = self.command
103
        headers = self.headers
104
        body = self.body
105
106
        bytes_message = False
107
        if 'bytes_message' in headers:
108
            bytes_message = True
109
            del headers['bytes_message']
110
            headers['content-length'] = len(body)
111
        headers['x-client'] = self.my_name
112
113
        # Convert and append any existing headers to a string as the
114
        # protocol describes.
115
        headerparts = ("%s:%s\n" % (key, value)
116
                            for key, value in headers.iteritems())
117
118
        # Frame is Command + Header + EOF marker.
119
        frame = "%s\n%s\n%s\x00" % (command, "".join(headerparts), body)
120
121
        return frame
122
123
    def get_message(self, nb=False):
124
        """Get next message frame."""
125
        while True:
126
            frame = self.iqueue.get(self, nb=nb)
127
            if not frame and nb:
128
                return None
129
            if frame.command == "MESSAGE":
130
                return frame
131
            else:
132
                self.rqueue.put(frame)
133
134
    def get_reply(self, nb=False):
135
        """Get command reply frame."""
136
        while True:
137
            try:
138
                return self.rqueue.get_nowait()
139
            except QueueEmpty:
140
                frame = self.parse_frame(nb=nb)
141
                if not frame and nb:
142
                    return None
143
                if frame.command == "MESSAGE":
144
                    self.iqueue.put(frame)
145
                else:
146
                    self.rqueue.put(frame)
147
148
    def parse_frame(self, nb=False):
149
        """Parse data from socket
150
151
        :keyword nb: Non-blocking: If this is set and there is no
152
            messages currently waiting, this functions returns ``None``
153
            instead of waiting for more data.
154
155
        Example
156
157
            >>> frameobj.parse_frame()
158
159
        """
160
        line = self._getline(nb=nb)
161
        if not line:
162
            return
163
164
        command = self.parse_command(line)
165
        line = line[len(command)+1:]
166
        headers_str, _, body = line.partition("\n\n")
167
        if not headers_str:
168
            raise UnknownBrokerResponseError(
169
                    "Received: (%s)" % line)
170
        headers = self.parse_headers(headers_str)
171
172
        if 'content-length' in headers:
173
            headers['bytes_message'] = True
174
175
        frame = Frame(self.sock)
176
        return frame.build_frame({'command': command,
177
                                  'headers': headers,
178
                                  'body': body})
179
180
    def parse_command(self, str):
181
        """Parse command received from the server."""
182
        command = str.split('\n', 1)[0]
183
        return command
184
185
    def parse_headers(self, headers_str):
186
        """Parse headers received from the servers and convert
187
        to a :class:`dict`."""
188
        # george:constanza\nelaine:benes
189
        # -> {"george": "constanza", "elaine": "benes"}
190
        return dict(line.split(":", 1) for line in headers_str.split("\n"))
191
192
    def send_frame(self, frame):
193
        """Send frame to server, get receipt if needed."""
194
        self.sock.sendall(frame)
195
196
        if 'receipt' in self.headers:
197
            return self.get_reply()
198
199
    def _getline(self, nb=False):
200
        """Get a single line from socket
201
202
        :keyword nb: Non-blocking: If this is set, and there is no
203
            messages to receive, this function returns ``None``.
204
205
        """
206
        self.sock.setblocking(not nb)
207
        try:
208
            buffer = ''
209
            partial = ''
210
            while not buffer.endswith('\x00\n'):
211
                try:
212
                    partial = self.sock.recv(1)
213
                except socket.error, exc:
214
                    if exc.errno == EAGAIN:
215
                        if not buffer:
216
                            return None
217
                        continue
218
                buffer += partial
219
        finally:
220
            self.sock.setblocking(nb)
221
        return buffer[:-2]
222
223
    def __repr__(self):
224
        return "<Frame %s>" % pformat(self.headers)

Up to file-list stompy/stomp.py:

1
import socket
2
from stompy.frame import Frame
3
from functools import wraps
4
5
6
class NotConnectedError(Exception):
7
    """No longer connected to the STOMP server."""
8
9
class ConnectionError(socket.error):
10
    """Couldn't connect to the STOMP server."""
11
12
13
class ConnectionTimeoutError(socket.timeout):
14
    """Timed-out while establishing connection to the STOMP server."""
15
16
17
class Stomp(object):
18
    """STOMP Client.
19
20
    :param hostname: Hostname of the STOMP server to connect to.
21
    :param port: The port to use. (default ``61613``)
22
23
    """
24
    ConnectionError = ConnectionError
25
    ConnectionTimeoutError = ConnectionTimeoutError
26
    NotConnectedError = NotConnectedError
27
28
    def __init__(self, hostname, port=61613):
29
        self.host = hostname
30
        self.port = port
31
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
32
        self._subscribed_to = {}
33
        self._subscribed = None
34
        self.connected = None
35
        self.frame = Frame()
36
37
    def connect(self):
38
        """Connect to STOMP server."""
39
        try:
40
            self.sock.connect((self.host, self.port))
41
            self.frame.connect(self.sock)
42
        except socket.error, exc:
43
            raise self.ConnectionError(*exc.args)
44
        except socket.timeout, exc:
45
            raise self.ConnectionTimeoutError(*exc.args)
46
        self.connected = True
47
48
    def disconnect(self, conf=None):
49
        """Disconnect from the server."""
50
        try:
51
            for destination in self._subscribed_to.keys():
52
                self.unsubscribe({"destination": destination})
53
            self._send_command("DISCONNECT", conf)
54
        except self.NotConnectedError:
55
            pass
56
        self.sock.shutdown(0)
57
        self.connected = False
58
59
    def send(self, conf=None):
60
        """Send message to STOMP server
61
62
        You'll need to pass the body and any other headers your
63
        STOMP server likes.
64
65
        destination is **required**
66
67
        In the case of ActiveMQ with persistence, you could do this:
68
69
            >>> for i in xrange(1,1000):
70
            ...     stomp.send({'destination': '/queue/foo',
71
            ...                 'body': 'Testing',
72
            ...                 'persistent': 'true'})
73
74
        """
75
        headers = dict(conf)
76
        body = headers.pop("body", "")
77
        return self._send_command("SEND", headers, extra={"body": body},
78
                                  want_receipt=True)
79
80
    def _build_frame(self, *args, **kwargs):
81
        self._connected_or_raise()
82
        return self.frame.build_frame(*args, **kwargs)
83
84
    def subscribe(self, conf=None):
85
        """Subscribe to a given destination
86
87
        You will need to pass any headers your STOMP server likes.
88
89
        destination is *required*
90
91
        In the case of ActiveMQ, you could do this:
92
93
            >>> stomp.subscribe({'destination':'/queue/foo',
94
        ...                      'ack':'client'})
95
        """
96
        destination = conf["destination"]
97
        self._send_command("SUBSCRIBE", conf)
98
        self._subscribed_to[destination] = True
99
100
    def begin(self, conf=None):
101
        """Begin transaction.
102
103
        You will need to pass any headers your STOMP server likes.
104
105
        destination is *required*
106
107
        In the case of ActiveMQ, you could do this:
108
109
            >>> stomp.begin({'transaction':'<randomish_hash_like_thing>'})
110
        """
111
        self._send_command("BEGIN", conf)
112
113
    def commit(self, conf=None):
114
        """Commit transaction.
115
116
        You will need to pass any headers your STOMP server likes.
117
118
        destination is **required**
119
120
        In the case of ActiveMQ, you could do this:
121
122
            >>> stomp.commit({'transaction':'<randomish_hash_like_thing>'})
123
124
        """
125
        self._send_command("COMMIT", conf)
126
127
128
    def abort(self, conf=None):
129
        """Abort transaction.
130
131
        In the case of ActiveMQ, you could do this:
132
133
            >>> stomp.abort({'transaction':'<randomish_hash_like_thing>'})
134
135
        """
136
        self._send_command("ABORT", conf)
137
138
139
    def unsubscribe(self, conf=None):
140
        """Unsubscribe from a given destination
141
142
        You will need to pass any headers your STOMP server likes.
143
144
        destination is *required*
145
146
        >>> stomp.unsubscribe({'destination':'/queue/foo'})
147
        """
148
        destination = conf["destination"]
149
        self._send_command("UNSUBSCRIBE", conf)
150
        self._subscribed_to.pop(destination, None)
151
152
    def ack(self, frame):
153
        """Acknowledge receipt of a message
154
155
        :param: A :class:`stompy.frame.Frame` instance.
156
157
        Example
158
159
            >>> while True:
160
            ...     frame = stomp.receive_frame()
161
            ...     stomp.ack(frame)
162
163
        """
164
        message_id = frame.headers.get('message-id')
165
        self._send_command("ACK", {"message-id": message_id})
166
167
    def receive_frame(self, nonblocking=False):
168
        """Get a frame from the STOMP server
169
170
        :keyword nonblocking: By default this function waits forever
171
            until there is a message to be received, however, in non-blocking
172
            mode it returns ``None`` if there is currently no message
173
            available.
174
175
        Note that you must be subscribed to one or more destinations.
176
        Use :meth:`subscribe` to subscribe to a topic/queue.
177
178
        Example: Blocking
179
180
            >>> while True:
181
            ...     frame = stomp.receive_frame()
182
            ...     print(frame.headers['message-id'])
183
            ...     stomp.ack(frame)
184
185
        Example: Non-blocking
186
187
            >>> frame = stomp.recieve_frame(nonblocking=True)
188
            >>> if frame:
189
            ...     process_message(frame)
190
            ... else:
191
            ...     # no messages yet.
192
193
        """
194
        self._connected_or_raise()
195
        return self.frame.get_message(nb=nonblocking)
196
197
    def poll(self):
198
        """Alias to :meth:`receive_frame` with ``nonblocking=True``."""
199
        return self.receive_frame(nonblocking=True)
200
201
    def send_frame(self, frame):
202
        """Send a custom frame to the STOMP server
203
204
        :param frame: A :class:`stompy.frame.Frame` instance.
205
206
        Example
207
208
            >>> from stompy import Frame
209
            >>> frame = Frame().build_frame({
210
            ...    "command": "DISCONNECT",
211
            ...    "headers": {},
212
            ... })
213
            >>> stomp.send_frame(frame)
214
215
        """
216
        self._connected_or_raise()
217
        frame = self.frame.send_frame(frame.as_string())
218
        return frame
219
    
220
    def _send_command(self, command, conf=None, extra=None, **kwargs):
221
        conf = conf or {}
222
        extra = extra or {}
223
        frame_conf = {"command": command, "headers": conf}
224
        frame_conf.update(extra)
225
        frame = self._build_frame(frame_conf, **kwargs)
226
        reply = self.send_frame(frame)
227
        if kwargs.get("want_receipt", False):
228
            return reply
229
        return frame
230
231
    def _connected_or_raise(self):
232
        if not self.connected:
233
            raise self.NotConnectedError("Not connected to STOMP server.")
234
235
    @property
236
    def subscribed(self):
237
        """**DEPRECATED** The queue or topic currently subscribed to."""
238
        as_list = self._subscribed_to.keys()
239
        if not as_list:
240
            return
241
        return as_list[0]