paulc avatar paulc committed aec29a1

Initial zmq_poll implementation

Comments (0)

Files changed (2)

Binary file removed.

 #include "sds.h"
 #include "sdsutils.h"
 
-#define USAGE           "Usage: zmqcat -t [REQ|REP] <transport>"
+#define USAGE           "Usage: zmqcat -t [REQ|REP] [-e <cmd>] [-v] <transport>"
+
+#define USAGE_FULL      USAGE "\n\n" \
+                        "   -t [REQ|REP]        Socket type (default: REQ)\n" \
+                        "   -e <cmd>            Exec <cmd> on connect and pipe to socket\n" \
+                        "   -v                  Verbose\n" \
+                        "\n" \
+                        "   <transport>         Socket endpoint\n"
 
 void usage_error(int code,char *msg) {
     fprintf(stderr,"%s\n",USAGE); 
     int i = 1;
     char *transport = NULL;
     int socket_type = ZMQ_REQ;
+    int verbose = 0;
+    int timeout = -1;
+    char *exec = NULL;
 
     while (i < argc && argv[i][0] == '-') {
         if (strcmp(argv[i],"-t")==0) {
             } else {
                 usage_error(EX_USAGE,"Invalid socket type");
             }
+        } else if (strcmp(argv[i],"-e")==0) {
+            if (++i >= argc) usage_error(EX_USAGE,"-e <cmd>");
+            exec = argv[i];
+        } else if (strcmp(argv[i],"-v")==0) {
+            verbose++;
         } else if (strcmp(argv[i],"-h")==0) {
-            usage_error(EX_USAGE,"Invalid socket type");
+            fprintf(stderr,"%s\n",USAGE_FULL); 
+            exit(EX_USAGE);
         }
         i++;
     }
 
     transport = argv[i];
 
-    if (!(starts_with(transport,"ipc:") || 
-          starts_with(transport,"tcp:"))) { 
-        usage_error(EX_USAGE,"Invalid transport - must be ipc: or tcp:");
+    if (!(starts_with(transport,"ipc://") || 
+          starts_with(transport,"tcp://"))) { 
+        usage_error(EX_USAGE,"Invalid transport - must be ipc:// or tcp://");
     }
 
     void *context;
     void *socket;
+    int local = 0;
+    int done = 0;
 
     check(context = zmq_init(1),"zmq_init",EX_UNAVAILABLE);
     check(socket = zmq_socket(context,socket_type),"zmq_socket",EX_UNAVAILABLE);
 
     if (socket_type == ZMQ_REQ) {
         check_zero(zmq_connect(socket,transport),"zmq_connect",EX_UNAVAILABLE);
-        sds local = sdsreadfile(stdin);
-        send(socket,local);
-        sds net = recv(socket);
-        printf("Received: %d\n",(int) sdslen(net));
-        sdsfree(local);
-        sdsfree(net);
     } else if (socket_type == ZMQ_REP) {
         check_zero(zmq_bind(socket,transport),"zmq_bind",EX_UNAVAILABLE);
-        sds net = recv(socket);
-        printf("Received: %d\n",(int) sdslen(net));
-        sds local = sdsreadfile(stdin);
-        send(socket,local);
-        sdsfree(local);
-        sdsfree(net);
+    } else {
+        usage_error(EX_USAGE,"Invalid socket type");
     }
 
+    zmq_pollitem_t items [] = {
+        { socket, 0, ZMQ_POLLIN, 0 },
+        { NULL, local, ZMQ_POLLIN, 0 }
+    };
+
+    while (!done) {
+        zmq_poll(items,2,timeout);
+        if (items[0].revents & ZMQ_POLLIN) {
+            // read from local to send-buffer
+            // check if complete (eof or delim)
+        }
+        if (items[1].revents & ZMQ_POLLIN) {
+            // read from socket to recv-buffer
+            // if not loop then complete
+        }
+        if (items[0].revents & ZMQ_POLLOUT) {
+            // write recv-buffer to socket
+        }
+        if (items[1].revents & ZMQ_POLLOUT) {
+            // write send-buffer to socket if complete
+        }
+    }
+
+
+
+//    if (socket_type == ZMQ_REQ) {
+//        check_zero(zmq_connect(socket,transport),"zmq_connect",EX_UNAVAILABLE);
+//        sds local = sdsreadfile(stdin);
+//        send(socket,local);
+//        sds net = recv(socket);
+//        if (verbose) {
+//            fprintf(stderr,"Received: %d\n",(int) sdslen(net));
+//        }
+//        sdsfree(local);
+//        sdsfree(net);
+//    } else if (socket_type == ZMQ_REP) {
+//        check_zero(zmq_bind(socket,transport),"zmq_bind",EX_UNAVAILABLE);
+//        sds net = recv(socket);
+//        if (verbose) {
+//            fprintf(stderr,"Received: %d\n",(int) sdslen(net));
+//        }
+//        sds local = sdsreadfile(stdin);
+//        send(socket,local);
+//        sdsfree(local);
+//        sdsfree(net);
+//    }
+
     zmq_close(socket);
     zmq_term(context);
     exit(0);
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.