1. Moritz Heidkamp
  2. zmq

Commits

Moritz Heidkamp  committed 86827c4

Get rid of socket-mutex

We just assume now that sockets are not used across different
threads (as does zmq itself).

  • Participants
  • Parent commits ad8b43e
  • Branches 3.2

Comments (0)

Files changed (1)

File zmq.scm

View file
  • Ignore whitespace
 
 (define-foreign-type message (c-pointer "zmq_msg_t"))
 
-(define-record socket pointer mutex message)
+(define-record socket pointer message)
 (define-foreign-type socket c-pointer)
 
 (define-foreign-enum-type (socket-type int)
         (zmq-error 'make-socket)
         (let ((m (context-sockets context))
               (s (%make-socket sp
-                               (make-mutex) 
                                (allocate (foreign-value "sizeof(zmq_msg_t)" int)))))
 
           (mutex-lock! m)
                                (if send-more zmq/sndmore 0)))))
     (and (= -1 result) (zmq-error 'send-message))))
 
+(define (message-size message)
+  ((foreign-lambda unsigned-integer zmq_msg_size message) message))
 
-;; TODO: ctrl-c'ing this with parley doesn't unlock the mutex, can we
-;; do that?
-(define (receive-message socket #!key non-blocking (as 'string))
-  (mutex-lock! (socket-mutex socket))
-  (let* ((message (initialize-message (socket-message socket)))
-         (result ((foreign-lambda int zmq_msg_recv message socket int)
-                  message
-                  (socket-pointer socket)
-                  (if non-blocking zmq/dontwait 0))))
+(define (message-data message type)
+  (let* ((size (message-size message))
+         (ptr ((foreign-lambda c-pointer zmq_msg_data message) message)))
 
+    (cond ((symbol? type)
+           (case type
+             ((string)
+              (let ((str (make-string size)))
+                (move-memory! ptr str size)
+                str))
+             ((blob)
+              (let ((blob (make-blob size)))
+                (move-memory! ptr blob size)
+                blob))
+             (else (error 'message-data "invalid message data type" type))))
+          ((procedure? type)
+           (type ptr size))
+          (else (error 'message-data "invalid message data type" type)))))
 
-    (if (= -1 result)
-        (begin ;; error:
-          (mutex-unlock! (socket-mutex socket))
-          (close-message message)
-          (if (memq (errno) '(again term))
-              #f
-              (zmq-error 'receive-message)))
+(define (receive-message socket #!key non-blocking (as 'string))
+  (let ((message (socket-message socket)))
+    (if ((foreign-lambda* scheme-object ((message msg) (socket sock) (int flags))
+           "int rc;
+            C_word *result;
+            rc = zmq_msg_init(msg);
+            if (rc != 0) C_return(C_SCHEME_FALSE);
+            rc = zmq_msg_recv(msg, sock, flags);
+            if (rc == -1) C_return(C_SCHEME_FALSE);
+            C_return(C_SCHEME_TRUE);")
+         message
+         (socket-pointer socket)
+         (if non-blocking zmq/dontwait 0))
         (let ((data (message-data message as)))
-          (mutex-unlock! (socket-mutex socket))
           (close-message message)
-          data))))
+          data)
+        (and (not (memq (errno) '(again term)))
+             (zmq-error 'receive-message)))))
 
 (define (receive-message* socket #!key (as 'string))
   (or (receive-message socket non-blocking: #t as: as)