asksol / python-stomp
fork of python-stomp
stompy is a fork of the STOMP client python-stomp, with support for non-blocking, polling and multiple subscriptions per connection.
Clone this repository (size: 101.6 KB): HTTPS / SSH
$ hg clone http://bitbucket.org/asksol/python-stomp/
| commit 60: | 17e1a51966bd |
| parent 59: | 3a5c85ad6ef6 |
| parent 57: | 4c35356b4980 |
| branch: | default |
Merge in changes from http://bitbucket.org/asksol/python-stomp/ and
http://bitbucket.org/crankycoder/python-stomp/ Thanks!
7 months ago
Changed (Δ13.9 KB):
raw changeset »
stomp/__init__.py
stomp/frame.py
stomp/stomp.py
stompy/frame.py (224 lines added, 0 lines removed)
stompy/stomp.py (241 lines added, 0 lines removed)
Up to file-list stompy/frame.py:
1 |
import socket |
|
2 |
import random |
|
3 |
from errno import EAGAIN |
|
4 |
from Queue import Queue |
|
5 |
from Queue import Empty as QueueEmpty |
|
6 |
||
7 |
||
8 |
class UnknownBrokerResponseError(Exception): |
|
9 |
"""An unexpected response was received from the broker.""" |
|
10 |
||
11 |
||
12 |
class IntermediateMessageQueue(object): |
|
13 |
"""Internal message queue that holds messages received by the server. |
|
14 |
||
15 |
This to make sure a message isn't received instead of a command response |
|
16 |
after issuing a receipt request. |
|
17 |
||
18 |
""" |
|
19 |
||
20 |
def __init__(self): |
|
21 |
self._queue = Queue() |
|
22 |
||
23 |
def put(self, frame): |
|
24 |
"""Put a new frame onto the message queue.""" |
|
25 |
if "destination" not in frame.headers: |
|
26 |
return |
|
27 |
self._queue.put(frame) |
|
28 |
||
29 |
def get(self, frame, nb=False): |
|
30 |
"""Get a new frame from the message queue. |
|
31 |
If no frame is available it try to get the next frame |
|
32 |
from the socket. |
|
33 |
||
34 |
:param frame: A :class:`Frame` instance. |
|
35 |
:keyword nb: Non-blocking. |
|
36 |
||
37 |
""" |
|
38 |
try: |
|
39 |
return self._queue.get_nowait() |
|
40 |
except QueueEmpty: |
|
41 |
return frame.parse_frame(nb=nb) |
|
42 |
||
43 |
||
44 |
class Frame(object): |
|
45 |
"""Build and manage a STOMP Frame. |
|
46 |
||
47 |
:keyword sock: An open socket to the STOMP server. |
|
48 |
||
49 |
""" |
|
50 |
def __init__(self, sock=None): |
|
51 |
self.command = None |
|
52 |
self.headers = {} |
|
53 |
self.body = None |
|
54 |
self.session = None |
|
55 |
self.my_name = socket.gethostbyname(socket.gethostname()) |
|
56 |
self.sock = sock |
|
57 |
self.iqueue = IntermediateMessageQueue() |
|
58 |
self.rqueue = Queue() |
|
59 |
||
60 |
def connect(self, sock): |
|
61 |
"""Connect to the STOMP server and get the session id.""" |
|
62 |
self.sock = sock |
|
63 |
frame = self.build_frame({"command": "CONNECT", "headers": {}}) |
|
64 |
self.send_frame(frame.as_string()) |
|
65 |
||
66 |
# Get session from the next reply from the server. |
|
67 |
next_frame = self.get_reply() |
|
68 |
self.session = next_frame.headers |
|
69 |
||
70 |
def build_frame(self, args, want_receipt=False): |
|
71 |
"""Build a frame based on a :class:`dict` of arguments. |
|
72 |
||
73 |
:param args: A :class:`dict` of arguments for the frame. |
|
74 |
||
75 |
:keyword want_receipt: Optional argument to get a receipt from |
|
76 |
the sever that the frame was received. |
|
77 |
||
78 |
Example |
|
79 |
||
80 |
>>> frame = frameobj.build_frame({"command": 'CONNECT', |
|
81 |
"headers": {}, |
|
82 |
want_receipt=True) |
|
83 |
""" |
|
84 |
self.command = args.get('command') |
|
85 |
self.headers = args.get('headers') |
|
86 |
self.body = args.get('body') |
|
87 |
if want_receipt: |
|
88 |
receipt_stamp = str(random.randint(0, 10000000)) |
|
89 |
self.headers["receipt"] = "%s-%s" % ( |
|
90 |
self.session.get("session"), receipt_stamp) |
|
91 |
return self |
|
92 |
||
93 |
def as_string(self): |
|
94 |
"""Raw string representation of this frame |
|
95 |
Suitable for passing over a socket to the STOMP server. |
|
96 |
||
97 |
Example |
|
98 |
||
99 |
>>> stomp.send(frameobj.as_string()) |
|
100 |
||
101 |
""" |
|
102 |
command = self.command |
|
103 |
headers = self.headers |
|
104 |
body = self.body |
|
105 |
||
106 |
bytes_message = False |
|
107 |
if 'bytes_message' in headers: |
|
108 |
bytes_message = True |
|
109 |
del headers['bytes_message'] |
|
110 |
headers['content-length'] = len(body) |
|
111 |
headers['x-client'] = self.my_name |
|
112 |
||
113 |
# Convert and append any existing headers to a string as the |
|
114 |
# protocol describes. |
|
115 |
headerparts = ("%s:%s\n" % (key, value) |
|
116 |
for key, value in headers.iteritems()) |
|
117 |
||
118 |
# Frame is Command + Header + EOF marker. |
|
119 |
frame = "%s\n%s\n%s\x00" % (command, "".join(headerparts), body) |
|
120 |
||
121 |
return frame |
|
122 |
||
123 |
def get_message(self, nb=False): |
|
124 |
"""Get next message frame.""" |
|
125 |
while True: |
|
126 |
frame = self.iqueue.get(self, nb=nb) |
|
127 |
if not frame and nb: |
|
128 |
return None |
|
129 |
if frame.command == "MESSAGE": |
|
130 |
return frame |
|
131 |
else: |
|
132 |
self.rqueue.put(frame) |
|
133 |
||
134 |
def get_reply(self, nb=False): |
|
135 |
"""Get command reply frame.""" |
|
136 |
while True: |
|
137 |
try: |
|
138 |
return self.rqueue.get_nowait() |
|
139 |
except QueueEmpty: |
|
140 |
frame = self.parse_frame(nb=nb) |
|
141 |
if not frame and nb: |
|
142 |
return None |
|
143 |
if frame.command == "MESSAGE": |
|
144 |
self.iqueue.put(frame) |
|
145 |
else: |
|
146 |
self.rqueue.put(frame) |
|
147 |
||
148 |
def parse_frame(self, nb=False): |
|
149 |
"""Parse data from socket |
|
150 |
||
151 |
:keyword nb: Non-blocking: If this is set and there is no |
|
152 |
messages currently waiting, this functions returns ``None`` |
|
153 |
instead of waiting for more data. |
|
154 |
||
155 |
Example |
|
156 |
||
157 |
>>> frameobj.parse_frame() |
|
158 |
||
159 |
""" |
|
160 |
line = self._getline(nb=nb) |
|
161 |
if not line: |
|
162 |
return |
|
163 |
||
164 |
command = self.parse_command(line) |
|
165 |
line = line[len(command)+1:] |
|
166 |
headers_str, _, body = line.partition("\n\n") |
|
167 |
if not headers_str: |
|
168 |
raise UnknownBrokerResponseError( |
|
169 |
"Received: (%s)" % line) |
|
170 |
headers = self.parse_headers(headers_str) |
|
171 |
||
172 |
if 'content-length' in headers: |
|
173 |
headers['bytes_message'] = True |
|
174 |
||
175 |
frame = Frame(self.sock) |
|
176 |
return frame.build_frame({'command': command, |
|
177 |
'headers': headers, |
|
178 |
'body': body}) |
|
179 |
||
180 |
def parse_command(self, str): |
|
181 |
"""Parse command received from the server.""" |
|
182 |
command = str.split('\n', 1)[0] |
|
183 |
return command |
|
184 |
||
185 |
def parse_headers(self, headers_str): |
|
186 |
"""Parse headers received from the servers and convert |
|
187 |
to a :class:`dict`.""" |
|
188 |
# george:constanza\nelaine:benes |
|
189 |
# -> {"george": "constanza", "elaine": "benes"} |
|
190 |
return dict(line.split(":", 1) for line in headers_str.split("\n")) |
|
191 |
||
192 |
def send_frame(self, frame): |
|
193 |
"""Send frame to server, get receipt if needed.""" |
|
194 |
self.sock.sendall(frame) |
|
195 |
||
196 |
if 'receipt' in self.headers: |
|
197 |
return self.get_reply() |
|
198 |
||
199 |
def _getline(self, nb=False): |
|
200 |
"""Get a single line from socket |
|
201 |
||
202 |
:keyword nb: Non-blocking: If this is set, and there is no |
|
203 |
messages to receive, this function returns ``None``. |
|
204 |
||
205 |
""" |
|
206 |
self.sock.setblocking(not nb) |
|
207 |
try: |
|
208 |
buffer = '' |
|
209 |
partial = '' |
|
210 |
while not buffer.endswith('\x00\n'): |
|
211 |
try: |
|
212 |
partial = self.sock.recv(1) |
|
213 |
except socket.error, exc: |
|
214 |
if exc.errno == EAGAIN: |
|
215 |
if not buffer: |
|
216 |
return None |
|
217 |
continue |
|
218 |
buffer += partial |
|
219 |
finally: |
|
220 |
self.sock.setblocking(nb) |
|
221 |
return buffer[:-2] |
|
222 |
||
223 |
def __repr__(self): |
|
224 |
return "<Frame %s>" % pformat(self.headers) |
Up to file-list stompy/stomp.py:
1 |
import socket |
|
2 |
from stompy.frame import Frame |
|
3 |
from functools import wraps |
|
4 |
||
5 |
||
6 |
class NotConnectedError(Exception): |
|
7 |
"""No longer connected to the STOMP server.""" |
|
8 |
||
9 |
class ConnectionError(socket.error): |
|
10 |
"""Couldn't connect to the STOMP server.""" |
|
11 |
||
12 |
||
13 |
class ConnectionTimeoutError(socket.timeout): |
|
14 |
"""Timed-out while establishing connection to the STOMP server.""" |
|
15 |
||
16 |
||
17 |
class Stomp(object): |
|
18 |
"""STOMP Client. |
|
19 |
||
20 |
:param hostname: Hostname of the STOMP server to connect to. |
|
21 |
:param port: The port to use. (default ``61613``) |
|
22 |
||
23 |
""" |
|
24 |
ConnectionError = ConnectionError |
|
25 |
ConnectionTimeoutError = ConnectionTimeoutError |
|
26 |
NotConnectedError = NotConnectedError |
|
27 |
||
28 |
def __init__(self, hostname, port=61613): |
|
29 |
self.host = hostname |
|
30 |
self.port = port |
|
31 |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
32 |
self._subscribed_to = {} |
|
33 |
self._subscribed = None |
|
34 |
self.connected = None |
|
35 |
self.frame = Frame() |
|
36 |
||
37 |
def connect(self): |
|
38 |
"""Connect to STOMP server.""" |
|
39 |
try: |
|
40 |
self.sock.connect((self.host, self.port)) |
|
41 |
self.frame.connect(self.sock) |
|
42 |
except socket.error, exc: |
|
43 |
raise self.ConnectionError(*exc.args) |
|
44 |
except socket.timeout, exc: |
|
45 |
raise self.ConnectionTimeoutError(*exc.args) |
|
46 |
self.connected = True |
|
47 |
||
48 |
def disconnect(self, conf=None): |
|
49 |
"""Disconnect from the server.""" |
|
50 |
try: |
|
51 |
for destination in self._subscribed_to.keys(): |
|
52 |
self.unsubscribe({"destination": destination}) |
|
53 |
self._send_command("DISCONNECT", conf) |
|
54 |
except self.NotConnectedError: |
|
55 |
pass |
|
56 |
self.sock.shutdown(0) |
|
57 |
self.connected = False |
|
58 |
||
59 |
def send(self, conf=None): |
|
60 |
"""Send message to STOMP server |
|
61 |
||
62 |
You'll need to pass the body and any other headers your |
|
63 |
STOMP server likes. |
|
64 |
||
65 |
destination is **required** |
|
66 |
||
67 |
In the case of ActiveMQ with persistence, you could do this: |
|
68 |
||
69 |
>>> for i in xrange(1,1000): |
|
70 |
... stomp.send({'destination': '/queue/foo', |
|
71 |
... 'body': 'Testing', |
|
72 |
... 'persistent': 'true'}) |
|
73 |
||
74 |
""" |
|
75 |
headers = dict(conf) |
|
76 |
body = headers.pop("body", "") |
|
77 |
return self._send_command("SEND", headers, extra={"body": body}, |
|
78 |
want_receipt=True) |
|
79 |
||
80 |
def _build_frame(self, *args, **kwargs): |
|
81 |
self._connected_or_raise() |
|
82 |
return self.frame.build_frame(*args, **kwargs) |
|
83 |
||
84 |
def subscribe(self, conf=None): |
|
85 |
"""Subscribe to a given destination |
|
86 |
||
87 |
You will need to pass any headers your STOMP server likes. |
|
88 |
||
89 |
destination is *required* |
|
90 |
||
91 |
In the case of ActiveMQ, you could do this: |
|
92 |
||
93 |
>>> stomp.subscribe({'destination':'/queue/foo', |
|
94 |
... 'ack':'client'}) |
|
95 |
""" |
|
96 |
destination = conf["destination"] |
|
97 |
self._send_command("SUBSCRIBE", conf) |
|
98 |
self._subscribed_to[destination] = True |
|
99 |
||
100 |
def begin(self, conf=None): |
|
101 |
"""Begin transaction. |
|
102 |
||
103 |
You will need to pass any headers your STOMP server likes. |
|
104 |
||
105 |
destination is *required* |
|
106 |
||
107 |
In the case of ActiveMQ, you could do this: |
|
108 |
||
109 |
>>> stomp.begin({'transaction':'<randomish_hash_like_thing>'}) |
|
110 |
""" |
|
111 |
self._send_command("BEGIN", conf) |
|
112 |
||
113 |
def commit(self, conf=None): |
|
114 |
"""Commit transaction. |
|
115 |
||
116 |
You will need to pass any headers your STOMP server likes. |
|
117 |
||
118 |
destination is **required** |
|
119 |
||
120 |
In the case of ActiveMQ, you could do this: |
|
121 |
||
122 |
>>> stomp.commit({'transaction':'<randomish_hash_like_thing>'}) |
|
123 |
||
124 |
""" |
|
125 |
self._send_command("COMMIT", conf) |
|
126 |
||
127 |
||
128 |
def abort(self, conf=None): |
|
129 |
"""Abort transaction. |
|
130 |
||
131 |
In the case of ActiveMQ, you could do this: |
|
132 |
||
133 |
>>> stomp.abort({'transaction':'<randomish_hash_like_thing>'}) |
|
134 |
||
135 |
""" |
|
136 |
self._send_command("ABORT", conf) |
|
137 |
||
138 |
||
139 |
def unsubscribe(self, conf=None): |
|
140 |
"""Unsubscribe from a given destination |
|
141 |
||
142 |
You will need to pass any headers your STOMP server likes. |
|
143 |
||
144 |
destination is *required* |
|
145 |
||
146 |
>>> stomp.unsubscribe({'destination':'/queue/foo'}) |
|
147 |
""" |
|
148 |
destination = conf["destination"] |
|
149 |
self._send_command("UNSUBSCRIBE", conf) |
|
150 |
self._subscribed_to.pop(destination, None) |
|
151 |
||
152 |
def ack(self, frame): |
|
153 |
"""Acknowledge receipt of a message |
|
154 |
||
155 |
:param: A :class:`stompy.frame.Frame` instance. |
|
156 |
||
157 |
Example |
|
158 |
||
159 |
>>> while True: |
|
160 |
... frame = stomp.receive_frame() |
|
161 |
... stomp.ack(frame) |
|
162 |
||
163 |
""" |
|
164 |
message_id = frame.headers.get('message-id') |
|
165 |
self._send_command("ACK", {"message-id": message_id}) |
|
166 |
||
167 |
def receive_frame(self, nonblocking=False): |
|
168 |
"""Get a frame from the STOMP server |
|
169 |
||
170 |
:keyword nonblocking: By default this function waits forever |
|
171 |
until there is a message to be received, however, in non-blocking |
|
172 |
mode it returns ``None`` if there is currently no message |
|
173 |
available. |
|
174 |
||
175 |
Note that you must be subscribed to one or more destinations. |
|
176 |
Use :meth:`subscribe` to subscribe to a topic/queue. |
|
177 |
||
178 |
Example: Blocking |
|
179 |
||
180 |
>>> while True: |
|
181 |
... frame = stomp.receive_frame() |
|
182 |
... print(frame.headers['message-id']) |
|
183 |
... stomp.ack(frame) |
|
184 |
||
185 |
Example: Non-blocking |
|
186 |
||
187 |
>>> frame = stomp.recieve_frame(nonblocking=True) |
|
188 |
>>> if frame: |
|
189 |
... process_message(frame) |
|
190 |
... else: |
|
191 |
... # no messages yet. |
|
192 |
||
193 |
""" |
|
194 |
self._connected_or_raise() |
|
195 |
return self.frame.get_message(nb=nonblocking) |
|
196 |
||
197 |
def poll(self): |
|
198 |
"""Alias to :meth:`receive_frame` with ``nonblocking=True``.""" |
|
199 |
return self.receive_frame(nonblocking=True) |
|
200 |
||
201 |
def send_frame(self, frame): |
|
202 |
"""Send a custom frame to the STOMP server |
|
203 |
||
204 |
:param frame: A :class:`stompy.frame.Frame` instance. |
|
205 |
||
206 |
Example |
|
207 |
||
208 |
>>> from stompy import Frame |
|
209 |
>>> frame = Frame().build_frame({ |
|
210 |
... "command": "DISCONNECT", |
|
211 |
... "headers": {}, |
|
212 |
... }) |
|
213 |
>>> stomp.send_frame(frame) |
|
214 |
||
215 |
""" |
|
216 |
self._connected_or_raise() |
|
217 |
frame = self.frame.send_frame(frame.as_string()) |
|
218 |
return frame |
|
219 |
||
220 |
def _send_command(self, command, conf=None, extra=None, **kwargs): |
|
221 |
conf = conf or {} |
|
222 |
extra = extra or {} |
|
223 |
frame_conf = {"command": command, "headers": conf} |
|
224 |
frame_conf.update(extra) |
|
225 |
frame = self._build_frame(frame_conf, **kwargs) |
|
226 |
reply = self.send_frame(frame) |
|
227 |
if kwargs.get("want_receipt", False): |
|
228 |
return reply |
|
229 |
return frame |
|
230 |
||
231 |
def _connected_or_raise(self): |
|
232 |
if not self.connected: |
|
233 |
raise self.NotConnectedError("Not connected to STOMP server.") |
|
234 |
||
235 |
@property |
|
236 |
def subscribed(self): |
|
237 |
"""**DEPRECATED** The queue or topic currently subscribed to.""" |
|
238 |
as_list = self._subscribed_to.keys() |
|
239 |
if not as_list: |
|
240 |
return |
|
241 |
return as_list[0] |
