Commits

Jeremy Rossi committed aa42250

more playing with brawndo

Comments (0)

Files changed (2)

src/os_kvd/kv_main.c

 typedef struct {
     TCHDB *hdb;
     void *ctx;
+    char *name;
 } db_plus_ctx;
 
-void *key_error(void *request_queue, char *error_msg)
+typedef struct {
+    zmq_msg_t *address;
+    zmq_msg_t *empty;
+    zmq_msg_t *message;
+    char *db_name;
+} db_req;
+
+
+
+void *key_error(void *request_queue, zmq_msg_t *address, zmq_msg_t *empty, char *error_msg)
 {
     bson b;
     bson_buffer *bb; 
     free(bb);
     zmq_msg_init_size(&reply, bson_size(&b));
     memcpy(zmq_msg_data(&reply), b.data, bson_size(&b));
+    zmq_send(request_queue, address, 0);
+    zmq_send(request_queue, empty, 0);
     zmq_send(request_queue, &reply, 0);
     return NULL;
 }
     return(NULL);
 }
 
+/*
+bson_type os_bson_find(bson_iterator* it, const bson* obj, const char* name)
+{
+    bson_iterator_init(it, obj->data);
+
+}
+
+*/
+
+char *db_name_parse(char *data)
+{
+    bson_iterator i, ii;
+    //printf("data: %s\n", data);
+    char *db_name; 
+    bson b; 
+    bson_iterator_init(&i, data);
+    bson_init(&b, data, 0);
+    bson_print(&b);
+    printf("db_name_parser: start bson_iterator_next\n");
+    do
+    {
+        switch( bson_iterator_type(&i)) 
+        {
+            case bson_object:
+                printf("l1 key: %s\n", bson_iterator_key(&i));
+                if(strcmp(bson_iterator_key(&i), "db") == 0)
+                {
+                    printf("db_name_parser: start bson_iterator_init\n");
+                    bson_iterator_init(&ii, bson_iterator_value(&i));
+                    do 
+                    {
+                        printf("db_name_parser: in loop\n");
+                        switch( bson_iterator_type(&ii)) 
+                        {
+                            case bson_string:
+                                printf("hit string\n");
+                                if (strcmp(bson_iterator_key(&ii), "name") == 0)
+                                {
+                                    printf("in name\n");
+                                    db_name = malloc(strlen(bson_iterator_string(&ii))+1);
+                                    strcpy(db_name, bson_iterator_string(&ii));
+                                    printf("matching db_name ->%s<-\n", db_name);
+                                    return db_name;
+                                }
+                            default:
+                                printf("l2 hit default\n");
+                                break;
+                        }
+                    } while(bson_iterator_next(&ii));
+                }
+            default:
+                printf("l1 hit default\n");
+                break;
+        }
+    } while(bson_iterator_next(&i));
+    return NULL;
+}
+
 kv_op *request_parse (char *data)
 {
     bson_iterator i, ii;
     int ecode;
     db_plus_ctx *details = d; 
     kv_op *op;
+    char tmp[1024];
+    snprintf(tmp, 1023, "inproc://db_actor_%s", details->name);
 
 
+    printf("binding to queue %s\n", tmp);
     void *request_queue = zmq_socket(details->ctx, ZMQ_REP);
     zmq_connect(request_queue, "inproc://db_actor");
     int run = 1;
     {
         found = 0;
         zmq_msg_t query;
+        zmq_msg_t empty;
+        zmq_msg_t address;
+
+        zmq_msg_init(&address);
+        zmq_msg_init(&empty);
         zmq_msg_init(&query);
 
 
+
+        zmq_recv(request_queue, &address, 0);
+        zmq_recv(request_queue, &empty, 0);
         zmq_recv(request_queue, &query, 0);
+
         char *string = malloc (zmq_msg_size(&query) + 1);
         memcpy(string, zmq_msg_data(&query), zmq_msg_size(&query));
         zmq_msg_close(&query);
         if((op = request_parse(string)) == NULL)
         {
-            key_error(request_queue, "BSON Format not understoof");
+            key_error(request_queue, &address, &empty, "BSON Format not understoof");
         }
         else
         {
 
                     zmq_msg_init_size(&responce, bson_size(b));
                     memcpy(zmq_msg_data(&responce), b->data, bson_size(b));
+                    zmq_send(request_queue, &address, ZMQ_SNDMORE);
+                    zmq_send(request_queue, &empty, ZMQ_SNDMORE);
                     zmq_send(request_queue, &responce, 0);
                     zmq_msg_close(&responce);
                     bson_destroy(b);
                 }
                 else
                 {
-                    key_error(request_queue, "Key not found");
+                    key_error(request_queue, &address, &empty, "Key not found");
                 }
                 free_kv_op(op);
             }
                     ecode = tchdbecode(details->hdb);
                     char emsg[1024];
                     snprintf(emsg, 1023, "%s", tchdberrmsg(ecode));
-                    key_error(request_queue, emsg);
+                    key_error(request_queue, &address, &empty, emsg);
                 }
                 else
                 {
                     zmq_msg_init_size(&responce, bson_size(b));
                     memcpy(zmq_msg_data(&responce), b->data, bson_size(b));
                     //printf("db_actor: sending confirm: %s\n",key);
+                    zmq_send(request_queue, &address, ZMQ_SNDMORE);
+                    zmq_send(request_queue, &empty, ZMQ_SNDMORE);
                     zmq_send(request_queue, &responce, 0);
                     zmq_msg_close(&responce);
                     bson_destroy(b);
             }
 
         }
+        zmq_msg_close(&address);
+        zmq_msg_close(&empty);
+        zmq_msg_close(&query);
     }
     return NULL;
 }
 
+/*
+void *db_ctx_del(db_plus_ctx)
+{
+    //
+}
+*/
+
+db_plus_ctx *db_ctx_new(void *ctx, char *name)
+{
+    char path[1024];
+    db_plus_ctx *new_db = (db_plus_ctx*)calloc(1,sizeof(db_plus_ctx));
+    new_db->name = strdup(name);
+    new_db->hdb = tchdbnew();
+    snprintf(path, 1023, "./%s.tch", new_db->name);
+    if(!tchdbopen(new_db->hdb, path, HDBOWRITER | HDBOCREAT ))
+    {
+
+        //ecode = tchdbecode(new_db->hdb);
+        //fprintf(stderr, "open error: %s\n", tchdberrmsg(ecode));
+        //XXX clean up on error
+        return NULL;
+    }
+}
+
+int os_recv_db_req(void *s, db_req *msg)
+{
+    zmq_recv(s, msg->address, 0);
+    zmq_recv(s, msg->empty, 0);
+    zmq_recv(s, msg->message, 0);
+    msg->db_name = db_name_parse(zmq_msg_data(msg->message));
+}
+
+int os_send_db_req(void *s, db_req *msg)
+{
+    printf("os_send_db_req: started\n");
+    printf("os_send_db_req: msg->address\n");
+    zmq_send(s, msg->address, ZMQ_SNDMORE);
+
+    printf("os_send_db_req: msg->empty\n");
+    zmq_send(s, msg->empty, ZMQ_SNDMORE);
+
+    printf("os_send_db_req: msg->message\n");
+    zmq_send(s, msg->message, 0);
+
+    printf("os_send_db_req: finsihed\n");
+}
+
+db_req *db_req_new(){
+    db_req *new_req = malloc(sizeof(db_req));
+    if(new_req == NULL)
+    {
+        printf("malloc failed\n");
+        return NULL;
+    }
+    else
+        printf("malloc completed\n");
+    new_req->address = malloc(sizeof(zmq_msg_t));
+    new_req->empty = malloc(sizeof(zmq_msg_t));
+    new_req->message = malloc(sizeof(zmq_msg_t));
+    zmq_msg_init(new_req->address);
+    zmq_msg_init(new_req->empty);
+    zmq_msg_init(new_req->message);
+    printf("finished init\n");
+    return new_req;
+}
+
+void *db_req_delete(db_req *msg)
+{
+    zmq_msg_close(msg->address);
+    zmq_msg_close(msg->empty);
+    zmq_msg_close(msg->message);
+    free(msg->db_name);
+    free(msg);
+}
+
+
+void *os_message_muxer()
+{
+    zmq_pollitem_t items[128];
+    db_plus_ctx db_ctx[128];
+
+    int items_count = 1;
+    int items_current = 1;
+    int db_count = 1;
+    int db_current = 1;
+    int found = 0;
+    int tcount = 0;
+
+    void *context = zmq_init(1);
+    void *clients = zmq_socket (context, ZMQ_XREP);
+    void *thread_sync = zmq_socket (context, ZMQ_PULL);
+    zmq_bind (clients, "tcp://*:1337");
+    zmq_bind (clients, "ipc:///tmp/kvd");
+    
+
+    items[0].socket = clients;
+    items[0].fd = 0;
+    items[0].events = ZMQ_POLLIN;
+    items[0].revents = 0;
+
+    while (1)
+    {
+        db_req *incoming_msg;
+
+        zmq_poll (items, items_count, -1);
+        if(items[0].revents & ZMQ_POLLIN)
+        {
+            printf("Got external packet\n");
+
+            incoming_msg = db_req_new();
+            os_recv_db_req(items[0].socket, incoming_msg);
+
+            found = 0;
+            for(db_current=1;db_count > db_current;db_current++)
+            {
+                printf("searching for database\n");
+                if(strcmp(db_ctx[db_current].name, incoming_msg->db_name) == 0)
+                {
+                    os_send_db_req(items[db_current].socket, incoming_msg);
+                    found = 1;
+                    break;
+                }
+            }
+            printf("no datapases\n");
+            if(!found)
+            {
+                printf("no database found creating one\n");
+                char tmp[1024];
+                db_count++;
+                items_count++;
+                items[items_count].socket = zmq_socket(context, ZMQ_XREQ);
+                items[items_count].fd = 0;
+                items[items_count].events = ZMQ_POLLIN;
+                items[items_count].revents = 0;
+                snprintf(tmp, 1023, "inproc://db_actor_%s", incoming_msg->db_name);
+                printf("binding new db threadins\n");
+                printf("tmp ->%s<-\n\n", tmp);
+                zmq_bind (items[items_count].socket, tmp);
+                printf("binding complete\n");
+                db_ctx[db_count].hdb = tchdbnew();
+                db_ctx[db_count].ctx = context;
+                printf("db_name: %s\n", incoming_msg->db_name);
+                snprintf(tmp, 1023, "./%s.tch", incoming_msg->db_name);
+                db_ctx[db_count].name = strdup(incoming_msg->db_name);
+                tchdbopen(db_ctx[db_count].hdb, tmp, HDBOWRITER | HDBOCREAT );
+                printf("starting threads\n");
+                for(tcount=0;tcount<4;tcount++)
+                {
+                    pthread_t worker;
+                    pthread_create (&worker, NULL, db_actor, &db_ctx[db_count]);
+                }
+                printf("sending to workers\n");
+                os_send_db_req(items[items_count].socket, incoming_msg);
+                printf("finished sending\n");
+            }
+            db_req_delete(incoming_msg);
+            printf("done\n");
+        }
+        for(items_current=1;items_current > items_count; items_current++)
+        {
+            if(items[items_current].revents & ZMQ_POLLIN)
+            {
+                
+                incoming_msg = db_req_new();
+                os_recv_db_req(items[items_current].socket, incoming_msg);
+                os_send_db_req(items[0].socket, incoming_msg);
+                db_req_delete(incoming_msg);
+            }
+        }
+    }
+    zmq_term(context);
+    return(0);
+
+}
+
 int main()
 {
-    int ecode;
-    int tcount = 0;
-
-
-    printf("- starting tokyo cabinet: syscheck.tch\n");
-    db_plus_ctx details;
-    details.hdb = tchdbnew();
-    if(!tchdbopen(details.hdb, "./syscheck.tch", HDBOWRITER | HDBOCREAT )){
-        ecode = tchdbecode(details.hdb);
-        fprintf(stderr, "open error: %s\n", tchdberrmsg(ecode));
-    }
-    else
-    {
-        printf("- starting zmq\n");
-        details.ctx = zmq_init (1);
-        void *clients = zmq_socket (details.ctx, ZMQ_XREP);
-        zmq_bind (clients, "tcp://*:1337");
-        zmq_bind (clients, "ipc:///tmp/kvd");
-        void *workers = zmq_socket (details.ctx, ZMQ_XREQ);
-        zmq_bind (workers, "inproc://db_actor");
-
-
-        printf("- starting threads\n");
-        for(tcount=0;tcount<10;tcount++)
-        {
-            printf("- starting thread %d\n", tcount);
-            pthread_t worker;
-            pthread_create (&worker, NULL, db_actor, &details);
-        }
-        zmq_device (ZMQ_QUEUE, clients, workers);
-        zmq_term(details.ctx);
-    }
-    return 0;
+    os_message_muxer();
+    return(0);
 }
     bson_append_string(bb, "key", key);
     bson_append_string(bb, "value",value);
     bson_from_buffer(&b, bb);
+    bson_print(&b);
     free(bb);
     zmq_msg_t query;
     //zmq_msg_init_data(&query, b.data, bson_size(&b), bson_destroy, NULL);
     bson_append_string(bb, "action", "get");
     bson_append_string(bb, "key", key);
     bson_from_buffer(&b, bb);
+    bson_print(&b);
     free(bb);
     zmq_msg_t query;
     zmq_msg_init_size(&query, bson_size(&b));
     printf("-connection to kvd");
     //zmq_connect(requestor, "tcp://127.0.0.1:1337");
     zmq_connect(requestor, "ipc:///tmp/kvd");
-    /*
     ks_put(requestor, "syscheck", "Jeremy", "Rossi");
     for (i=1;i<10000;i++)
     {
         snprintf(theKey,1000,"key-%d", i);
         ks_put(requestor, "syscheck", theKey, "Jeremy Rossi");
     }
-    */
     for (i=1;i<10000;i++)
     {
         snprintf(theKey,1000,"key-%d", i);