Commits

paulc committed f55d7e7

Add looping

Comments (0)

Files changed (5)

 *.a
 .DS_Store
 zmqcat
+zmqsend
 
 CC = gcc
 AR = ar
-CFLAGS = -Wall -O2 -std=gnu99 -I/usr/local/include
+CFLAGS = -Wall -O0 -std=gnu99 -I/usr/local/include
 LDFLAGS = -L/usr/local/lib -lzmq
 DEBUG ?= -g -rdynamic -ggdb
 
 zmqcat.o: zmqcat.c
 zmqsend.o: zmqsend.c
 
-
 # Targets
 zmqcat : zmqcat.o $(OBJ)
 	$(CC) -o zmqcat $(LDFLAGS) $(DEBUG) zmqcat.o $(OBJ)

zmq.c

-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sysexits.h>
-
-#include <zmq.h>
-
-#include "sds.h"
-#include "sdsutils.h"
-
-#define USAGE           "Usage: zmqcat -t [REQ|REP] <transport>"
-
-void usage_error(int code,char *msg) {
-    fprintf(stderr,"%s\n",USAGE); 
-    if (strlen(msg) > 0) {
-        fprintf(stderr,"\nError: %s\n",msg);
-    }
-    exit(code);
-}
-
-void check(void *p,char *s,int code) {
-    if (p == NULL) {
-        perror(s);
-        exit(code);
-    }
-}
-
-void check_zero(int val,char *s, int code) {
-    if (val != 0) {
-        perror(s);
-        exit(code);
-    }
-}
-
-int starts_with(char *s, char *prefix) {
-    int l1 = strlen(s);
-    int l2 = strlen(prefix);
-    if (l1 < l2) return 0;
-    return strncmp(s,prefix,l2) == 0;
-}
-
-void send(void *socket,sds data) {
-    zmq_msg_t msg;
-    check_zero(zmq_msg_init_data(&msg,data,sdslen(data),NULL,NULL),
-                   "zmq_msg_init_data",EX_UNAVAILABLE);
-    check_zero(zmq_send(socket,&msg,0),"zmq_send",EX_UNAVAILABLE);
-    check_zero(zmq_msg_close(&msg),"zmq_msg_close",EX_UNAVAILABLE);
-}
-
-sds recv(void *socket) {
-    zmq_msg_t msg;
-    zmq_msg_init(&msg);
-    zmq_recv(socket,&msg,0);
-    sds data = sdsnewlen(zmq_msg_data(&msg),zmq_msg_size(&msg));
-    zmq_msg_close(&msg);
-    return data;
-}
-
-int main(int argc, char **argv) {
-
-    int i = 1;
-    char *transport = NULL;
-    int socket_type = ZMQ_REQ;
-
-    while (i < argc && argv[i][0] == '-') {
-        if (strcmp(argv[i],"-t")==0) {
-            if (++i >= argc) usage_error(EX_USAGE,"-t <socket_type>");
-            if (strcmp(argv[i],"REQ")==0) {
-                socket_type = ZMQ_REQ;
-            } else if (strcmp(argv[i],"REP") == 0) {
-                socket_type = ZMQ_REP;
-            } else {
-                usage_error(EX_USAGE,"Invalid socket type");
-            }
-        } else if (strcmp(argv[i],"-h")==0) {
-            usage_error(EX_USAGE,"Invalid socket type");
-        }
-        i++;
-    }
-
-    if (i >= argc) usage_error(EX_USAGE,"No transport specified");
-
-    transport = argv[i];
-
-    if (!(starts_with(transport,"ipc:") || 
-          starts_with(transport,"tcp:"))) { 
-        usage_error(EX_USAGE,"Invalid transport - must be ipc: or tcp:");
-    }
-
-    void *context;
-    void *socket;
-
-    check(context = zmq_init(1),"zmq_init",EX_UNAVAILABLE);
-    check(socket = zmq_socket(context,socket_type),"zmq_socket",EX_UNAVAILABLE);
-
-    if (socket_type == ZMQ_REQ) {
-        check_zero(zmq_connect(socket,transport),"zmq_connect",EX_UNAVAILABLE);
-        sds local = sdsreadfile(stdin);
-        send(socket,local);
-        sds net = recv(socket);
-        printf("Received: %d\n",(int) sdslen(net));
-        sdsfree(local);
-        sdsfree(net);
-    } else if (socket_type == ZMQ_REP) {
-        check_zero(zmq_bind(socket,transport),"zmq_bind",EX_UNAVAILABLE);
-        sds net = recv(socket);
-        printf("Received: %d\n",(int) sdslen(net));
-        sds local = sdsreadfile(stdin);
-        send(socket,local);
-        sdsfree(local);
-        sdsfree(net);
-    }
-
-    zmq_close(socket);
-    zmq_term(context);
-    exit(0);
-}
 #include "sds.h"
 #include "sdsutils.h"
 
-#define USAGE           "Usage: zmqcat -t [REQ|REP] [-e <cmd>] [-n] [-v] <transport>"
+#define USAGE           "Usage: zmqcat -t [REQ|REP] [-e <cmd>] [-n] [-l] [-v] <transport>"
 
 #define USAGE_FULL      USAGE "\n\n" \
                         "   -t [REQ|REP]        Socket type (default: REQ)\n" \
                         "   -e <cmd>            Exec <cmd> on connect and pipe to socket\n" \
+                        "   -l                  Loop\n" \
                         "   -n                  Close stdin\n" \
                         "   -v                  Verbose\n" \
                         "\n" \
     int verbose = 0;
     int timeout = -1;
     int no_stdin = 0;
+    int loop = 0;
     char *exec = NULL;
 
     int i = 1;
             no_stdin = 1;
         } else if (strcmp(argv[i],"-n")==0) {
             no_stdin = 1;
+        } else if (strcmp(argv[i],"-l")==0) {
+            loop = 1;
         } else if (strcmp(argv[i],"-v")==0) {
             verbose++;
         } else if (strcmp(argv[i],"-h")==0) {
 
     sds rx_buffer = sdsempty();
     sds tx_buffer = sdsempty();
-    int local_done = 0, remote_done = 0, local_eof = 0;
+    int local_done = 0, 
+        remote_done = 0, 
+        local_eof = 0;
     
-    if (no_stdin && socket_type == ZMQ_REQ) {
-        if (exec != NULL) {
-            sdsfree(tx_buffer);
-            tx_buffer = sdsexec(exec);
-            printf("EXEC: %s\n",tx_buffer);
+
+    if (no_stdin) {
+        if (socket_type == ZMQ_REQ) {
+            if (exec != NULL) {
+                sdsfree(tx_buffer);
+                tx_buffer = sdsexec(exec);
+            }
+            send(remote,tx_buffer);
+            local_done = 1;
+            local_eof = 1;
+        } else if (socket_type == ZMQ_REP) {
+            local_eof = 1;
         }
-        send(remote,tx_buffer);
-        local_eof = 1;
-        local_done = 1;
     }
 
     int count = 0;
 
         int ready = zmq_poll(items,2,timeout);
 
-        printf("[%d] Poll...(%d): %d-%d\n",++count,ready,items[0].revents & ZMQ_POLLIN,items[1].revents & ZMQ_POLLIN);
+        if (verbose) 
+            fprintf(stderr,"[%d] Poll...(%d): %d-%d / local_done=%d, remote_done=%d\n",
+                ++count,ready,items[0].revents,items[1].revents,local_done,remote_done);
 
         if (items[0].revents & ZMQ_POLLIN) {
             int n = recv(remote,&rx_buffer);
-            printf("Remote: Read %d bytes\n",n);
+            if (verbose) printf("Remote: Read %d bytes\n",n);
             fwrite(rx_buffer,1,sdslen(rx_buffer),stdout);
-            remote_done = 1;
+            sdsfree(rx_buffer);
+            rx_buffer = sdsempty();
+            if (!loop) remote_done = 1;
         }
         if (items[0].revents & ZMQ_POLLOUT) {
-            if (local_eof) {
-                printf("Remote: Sent %d bytes\n",sdslen(tx_buffer));
+            if (exec != NULL) {
+                sdsfree(tx_buffer);
+                tx_buffer = sdsexec(exec);
                 send(remote,tx_buffer);
+                if (verbose) fprintf(stderr,"Remote: Sent %d bytes\n",(int) sdslen(tx_buffer));
+                if (!loop) local_done = 1;
+            } else if (local_eof) {
+                send(remote,tx_buffer);
+                if (verbose) fprintf(stderr,"Remote: Sent %d bytes\n",(int) sdslen(tx_buffer));
                 local_done = 1;
             }
         }
                     perror("Error reading from local fd");
                     exit(EX_IOERR);
                 } else if (n == 0) {
-                    printf("+++ Local EOF\n");
+                    if (verbose) fprintf(stderr,"+++ Local EOF\n");
                     items[1].events = 0;
                     local_eof = 1;
                     if (socket_type == ZMQ_REQ) {
                         local_done = 1;
                     }
                 } else {
-                    printf("Local: Read %d bytes\n",n);
+                    if (verbose) fprintf(stderr,"Local: Read %d bytes\n",n);
                     tx_buffer = sdscatlen(tx_buffer,buf,n);
                 }
             }
 #include <stdlib.h>
 #include <string.h>
 #include <sysexits.h>
-#include <unistd.h>
 
 #include <zmq.h>
 
     check(context = zmq_init(1),"zmq_init",EX_UNAVAILABLE);
     check(socket = zmq_socket(context,socket_type),"zmq_socket",EX_UNAVAILABLE);
 
-	sds local = sdsreadfile(stdin);
-
     if (socket_type == ZMQ_REQ) {
         check_zero(zmq_connect(socket,transport),"zmq_connect",EX_UNAVAILABLE);
+        sds local = sdsreadfile(stdin);
+        send(socket,local);
+        sds net = recv(socket);
+        printf("Received: %d\n",(int) sdslen(net));
+        sdsfree(local);
+        sdsfree(net);
     } else if (socket_type == ZMQ_REP) {
         check_zero(zmq_bind(socket,transport),"zmq_bind",EX_UNAVAILABLE);
-        zmq_pollitem_t items[1];
-        items[0].socket = socket;
-        items[0].events = ZMQ_POLLIN;
-        int ready = zmq_poll(items,1,10);
-        printf("Poll: %d\n",ready);
+        sds net = recv(socket);
+        printf("Received: %d\n",(int) sdslen(net));
+        sds local = sdsreadfile(stdin);
+        send(socket,local);
+        sdsfree(local);
+        sdsfree(net);
     }
-	send(socket,local);
-	sdsfree(local);
 
     zmq_close(socket);
     zmq_term(context);