Commits

Anonymous committed 813aabe

change write res

Comments (0)

Files changed (6)

example/simple.py

 from memproto import *
-print dir()
+print "start server"
 
 class Simple(Server):
     
         #print "get:" + k
         #print "get: %s" % k
         #return self.cache[k]
+        #import time 
+        #time.sleep(10)
         return self.cache.get(k, ("",0))
         #print "get %d" % len(v)
         #return v
 
 #define BUFSIZE 8192
 
-static int
-send_writev(int fd, struct iovec *iov, int iov_cnt, size_t total)
-{
-    size_t w;
-    int i = 0;
-    w = writev(fd, iov, iov_cnt);
-    if(w == -1){
-        //error
-        if (errno == EAGAIN || errno == EWOULDBLOCK) { 
-            /* try again later */
-            printf("EAGAIN  \n");
-            usleep(1000);
-            return send_writev(fd, iov, iov_cnt, total); 
-        }else{
-            return -1;
-        }
-    }if(w == 0){
-        return total;
-    }else{
-        //printf("write = %d/%d\n", w, total);
-        if(total > w){
-            //printf("rewrite = %d/%d\n", w, total);
-            //remain buf
-            for(; i < iov_cnt;i++){
-                if(w > iov[i].iov_len){
-                    //already write
-                    w -= iov[i].iov_len;
-                    iov[i].iov_len = 0;
-                }else{
-                    iov[i].iov_base += w;
-                    iov[i].iov_len = iov[i].iov_len - w;
-                    break;
-                }
-            }
-            return send_writev(fd, iov, iov_cnt, total - w); 
-        }
-    }
-    return total;
-
-}
+//static int
+//send_retrieval(int fd, char *key, size_t key_len, char *data, size_t data_len, int flags, bool end);
 
 static int
-send_data(int fd, struct iovec *iov, int iov_cnt, size_t total)
-{
-    size_t w = 0;
-    int sended = total;
-    while(sended > 0){
-        w = writev(fd, iov, iov_cnt);
-        if(w == -1){
-            //error
-            if (errno == EAGAIN || errno == EWOULDBLOCK) { 
-                /* try again later */
-                printf("EAGAIN  \n");
-                usleep(1000);
-            }else{
-                return -1;
-            }
-        }
-        sended -= w;
-        
-    }
-    return total;
-}
+req_retrieval(Client *client, char *key, size_t key_len, char *data, size_t data_len, int flags);
 
-
-static int
-send_retrieval(int fd, char *key, size_t key_len, char *data, size_t data_len, int flags, bool end);
 int 
-write_retrieval(int fd, PyObject *env, PyObject *response, int flags, bool end);
+write_retrieval(Client *client, PyObject *env, PyObject *response, int flags);
 
 static void
 send_server_error(Client *client, char *error)
 }
 
 static int 
-send_numeric(int fd, char *data, size_t data_len)
+req_numeric(Client *client, char *data, size_t data_len)
 {
-
-    size_t write_len;
-    struct iovec iov[2];
+    int fd = client->fd;
+    size_t total;
+    struct iovec *iov;
+    iov = (struct iovec *)PyMem_Malloc(sizeof(struct iovec) * 2);
  
     iov[0].iov_base = data;
     iov[0].iov_len = data_len;
+    total += iov[0].iov_len;
 
     iov[1].iov_base = "\r\n";
     iov[1].iov_len = 2;
+    total += iov[1].iov_len;
 
-    write_len = writev(fd, iov, 2);
+    req_write_ctx(client, iov, 2, total);
 
-    return write_len;
+    return 1;
 
 }
 
+
 static int
-send_retrieval(int fd, char *key, size_t key_len, char *data, size_t data_len, int flags, bool end)
+req_retrieval(Client *client, char *key, size_t key_len, char *data, size_t data_len, int flags)
 {
+    int fd = client->fd;
     size_t total = 0;
-    struct iovec iov[7];
+    struct iovec *iov;
     int iov_cnt = 0;
+    bool end;
+    
+    if(client->key_num == 1){
+        //last
+        end = 1;
+    }
+    
+    iov = (struct iovec *)PyMem_Malloc(sizeof(struct iovec) * 7);
 
     iov[0].iov_base = "VALUE ";
     iov[0].iov_len = 6;
     char data_len_str[64];
 
     sprintf(data_len_str, "%d \r\n", data_len);
-    
-    //printf("data_len %d\n", data_len);
 
     iov[3].iov_base = data_len_str;
     iov[3].iov_len = strlen(data_len_str);
     }else{
         iov_cnt = 6;
     }
-    return send_writev(fd, iov, iov_cnt, total);
-    //return send_data(fd, iov, iov_cnt, total);
-
+    
+    req_write_ctx(client, iov, iov_cnt, total);
+    return 1;
 }
 
 static int
     memcpy(client->input_buf + client->input_len, c , l);
 
     client->input_len += (int)l;
-    /*
-    printf("input buf \n");
-    printf("****************************\n");
-    printf("%s\n", client->input_buf);
-    printf("****************************\n");
-    printf("\n");
-    */
     return (int)l;
 }
 
 }
 
 int 
-write_numeric(int fd, PyObject *response)
+write_numeric(Client *client, PyObject *response)
 {
     PyObject *str_response = NULL;
     char *data;
         //TODO raise Error
         goto error;
     }
-    ret = send_numeric(fd, data, data_len);
+    ret = req_numeric(client, data, data_len);
     Py_XDECREF(str_response);
     Py_DECREF(response);
     return ret;
 }
 
 int 
-write_retrieval_flags(int fd, PyObject *env, PyObject *response, bool end)
+write_retrieval_flags(Client *client, PyObject *env, PyObject *response)
 {
     int ret;
+    int fd;
+    
+    fd = client->fd;
+
     if(PyTuple_Check(response)){
         PyObject *data = PyTuple_GetItem(response, 0);
         PyObject *flags = PyTuple_GetItem(response, 1);
         }
         //Py_XDECREF(response);
         Py_INCREF(data);
-        ret = write_retrieval(fd, env, data, c_flags, end);
+        ret = write_retrieval(client, env, data, c_flags);
         //Py_XDECREF(flags);
         Py_XDECREF(response);
 
     }else{
-        ret = write_retrieval(fd, env, response, 0, end);
+        ret = write_retrieval(client, env, response, 0);
     }
+    client->key_num--;
     return ret;
 }
 
 int 
-write_retrieval(int fd, PyObject *env, PyObject *response, int flags, bool end)
+write_retrieval(Client *client, PyObject *env, PyObject *response, int flags)
 {
     PyObject *keyobj;
     char *key, *data;
     Py_ssize_t key_len, data_len;
     int ret;
-        
+    int fd;
+    
+    fd = client->fd;
 
     if(!PyString_Check(response)){
         //TODO raise Error
     }
     
 
-    ret = send_retrieval(fd, key, key_len, data, data_len, flags, end);
+    //ret = send_retrieval(fd, key, key_len, data, data_len, flags, end);
+    ret = req_retrieval(client, key, key_len, data, data_len, flags);
     if(ret < 0){
         //write_error
         //raise Error
     switch(cmd){
         /*retrieval*/    
         case MEMTEXT_CMD_GET:
-            ret = write_retrieval_flags(fd, env, response, true);
+            ret = write_retrieval_flags(self, env, response);
             break;
         case MEMTEXT_CMD_GETS:
             //ret = write_delete(fd, response); 
         /* numeric */
         case MEMTEXT_CMD_INCR:
         case MEMTEXT_CMD_DECR:
-            ret = write_numeric(fd, response);
+            ret = write_numeric(self, response);
             break;
         /* other */
         case MEMTEXT_CMD_VERSION:
         default:
             break;
     }
+    /*
     success = ret > 0;
     if(success){
         //self->
             Sever_add_writer(self);
         }
         //printf("add writer");
-    }
+    }*/
     return ret;
 
 }
 void 
 Client_clear(Client *self)
 {
+#ifdef DEBUG
+    printf("clear fd = %d\n", self->fd);
+#endif
     client_t *client;
     client = self->client;
     free_client_field(client);
-    client->input_buf = malloc(sizeof(char) * BUFSIZE); //input_buf
+    client->input_buf = malloc(sizeof(char) * BUFSIZE);
     client->input_buf_size = sizeof(char) * BUFSIZE;
     client->input_pos = 0;
     client->input_len = 0;
     self->status = READY;
+    self->key_num = 1;
     PyDict_Clear(self->env);
     init_parser(self, false);
 }
     PyDict_Clear(self->env);
     Py_XDECREF(self->env);
     Py_DECREF(self);
+#ifdef DEBUG
+    printf("close fd = %d\n", self->fd);
+#endif
 
-    //printf("close fd %d\n", self->fd);
 }
 
 
     ERROR,
 } client_status;
 
+typedef struct iovec iovec_t;
 
 typedef struct {
     int fd;
     int fd;
     client_status status;
     memtext_command cmd;
+    int key_num;
 } Client;
 
+typedef struct {
+    Client *client;
+    int fd;
+    iovec_t *iov;
+    int iov_cnt;
+    int total;
+} write_ctx;
+
+
 PyObject * 
 Client_New(int fd, char *remote_addr, int remote_port);
 
 {
     int ret = 0;
     Client *client = (Client *)user;
-
+    client->key_num = req->key_num;
 	if(req->key_num == 1) {
 		ret = get(client, cmd, req);
 	} else {
 };
 
 static void
+read_callback(picoev_loop* loop, int fd, int events, void* cb_arg);
+static void
 write_callback(picoev_loop* loop, int fd, int events, void* cb_arg);
-/**
- * memproto module
- *
- *
- *
- */
 
 int loop_done = 0;
 
 {
 }
 
+
+static int
+send_writev(write_ctx *ctx)
+{
+    size_t w;
+    int i = 0;
+    w = writev(ctx->fd, ctx->iov, ctx->iov_cnt);
+    if(w == -1){
+        //error
+        if (errno == EAGAIN || errno == EWOULDBLOCK) { 
+            /* try again later */
+            return 0;
+        }else{
+            return -1;
+        }
+    }if(w == 0){
+        return 1;
+    }else{
+        if(ctx->total > w){
+            for(; i < ctx->iov_cnt;i++){
+                if(w > ctx->iov[i].iov_len){
+                    //already write
+                    w -= ctx->iov[i].iov_len;
+                    ctx->iov[i].iov_len = 0;
+                }else{
+                    ctx->iov[i].iov_base += w;
+                    ctx->iov[i].iov_len = ctx->iov[i].iov_len - w;
+                    break;
+                }
+            }
+            ctx->total = ctx->total -w;
+            //resume
+            return 0;
+            //return send_writev(fd, iov, iov_cnt, total - w); 
+        }
+    }
+    return 1;
+
+}
+
+static void
+clear_write_ctx(write_ctx *ctx)
+{
+    if(ctx && ctx->iov){
+        PyMem_Free(ctx->iov);
+        ctx->iov = NULL;
+    }
+    if(ctx){
+        PyMem_Free(ctx);
+    }
+
+}
+
+
+static void
+write_req_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
+{
+    int ret;
+    write_ctx *ctx = (write_ctx *)(cb_arg);
+    Client *client = ctx->client;
+
+#ifdef DEBUG
+    printf("write callback %d\n", fd);
+#endif
+    
+    if ((events & PICOEV_TIMEOUT) != 0) {
+
+        //timeout
+#ifdef DEBUG
+    printf("write callback timeout %d\n", fd, client->key_num);
+#endif
+        write_error_response(client, "timeout"); 
+        picoev_del(loop, fd);
+        clear_write_ctx(ctx);
+        Client_close(client);
+    
+    } else if ((events & PICOEV_WRITE) != 0) {
+        ret = send_writev(ctx);
+        switch(ret){
+            case 0:
+                // try again
+                picoev_set_timeout(loop, fd, 1);
+                break;
+            case -1:
+                //send fatal error
+                picoev_del(loop, fd);
+                clear_write_ctx(ctx);
+                Client_close(client);
+                
+                break;
+            default:
+                //ok
+#ifdef DEBUG
+    printf("client fd = %d key_num %d\n", fd, client->key_num);
+#endif
+                clear_write_ctx(ctx);
+                if(client->key_num == 0){
+                    //send END
+                    Client_clear(client);
+                    picoev_del(loop, fd);
+                    picoev_add(loop, fd, PICOEV_READ, TIMEOUT_SECS, read_callback, client);
+                }
+
+        };
+
+    }
+
+}
+
+
+void
+req_write_ctx(Client *client, struct iovec *iov, int iov_cnt, size_t total)
+{    
+    picoev_loop *loop;
+    write_ctx *ctx;
+    ServerObject *server;
+    
+    server = client->server;
+    ctx = PyMem_Malloc(sizeof(write_ctx));
+    ctx->client = client;
+    ctx->fd = client->fd;
+    ctx->iov = iov;
+    ctx->iov_cnt = iov_cnt;
+    ctx->total = total;
+    loop = server->main_loop;
+    picoev_add(loop, client->fd, PICOEV_WRITE, TIMEOUT_SECS, write_req_callback, (void *)ctx);
+}
+
+
+void
+Sever_add_revent(Client *client, picoev_handler* callback, void* cb_arg)
+{
+    ServerObject *server;
+    picoev_loop *loop;
+    server = (ServerObject *)client->server;
+    loop = server->main_loop;
+    picoev_add(loop, client->fd, PICOEV_READ, TIMEOUT_SECS, callback, cb_arg);
+
+}
+
+void
+Sever_add_wevent(Client *client, picoev_handler* callback, void* cb_arg)
+{
+    ServerObject *server;
+    picoev_loop *loop;
+    server = (ServerObject *)client->server;
+    loop = server->main_loop;
+    picoev_add(loop, client->fd, PICOEV_WRITE, TIMEOUT_SECS, callback, cb_arg);
+
+}
+
 void 
 Sever_add_writer(Client *client)
 {
 read_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
 {
     Client *client = (Client *)(cb_arg);
+#ifdef DEBUG
+    printf("read callback %d\n", fd);
+#endif
     if ((events & PICOEV_TIMEOUT) != 0) {
 
         //timeout
-        //printf("read timeout! \n");
+#ifdef DEBUG
+    printf("read callback timeout %d\n", fd, client->key_num);
+#endif
         write_error_response(client, "timeout"); 
         picoev_del(loop, client->fd);
         Client_close(client);
         switch (r) {
             case 0: 
                 //printf("read over  \n");
+#ifdef DEBUG
+    printf("read callback close %d\n", fd);
+#endif
                 picoev_del(loop, client->fd);
                 Client_close(client);
 
                 break;
             case -1: /* error */
                 if (errno == EAGAIN || errno == EWOULDBLOCK) { /* try again later */
-                    //picoev_set_timeout(loop, client->fd, 1);
-                    printf("EAGAIN  \n");
+#ifdef DEBUG
+    printf("read callback EAGAIN %d\n", fd);
+#endif
 	                break;
                 } else { /* fatal error */
-                    printf("fatal error  \n");
-                    //write_error_response(client, "i/o error"); 
+#ifdef DEBUG
+    printf("read callback fatal error %d\n", fd);
+#endif
+                    write_error_response(client, "i/o error"); 
                     picoev_del(loop, client->fd);
                     Client_close(client);
                 }
                 break;
             default: 
-                //printf("Client_exec_parse  \n");
                 Client_exec_parse(client, buf, r);
                 break;
         }
+        /*
         switch(client->status){
             case CALLED:
                 //printf("Client called  \n");
                 //printf("Client sended  %d\n", client->fd);
                 //picoev_del(loop, client->fd);
                 //Client_close(client);
-                picoev_set_timeout(loop, client->fd, 1);
-                Client_clear(client);
+                //picoev_set_timeout(loop, client->fd, 1);
+                //Client_clear(client);
                 break;
             default:
                 break;
-        }
+        }*/
 
     }
 }
 #define NOT_FOUND "NOT_FOUND\r\n"
 #define DELETED "DELETED\r\n"
 
+//void
+//Sever_add_writer(Client *client);
+
+//void
+//Sever_add_wevent(Client *client, picoev_handler* callback, void* cb_arg);
+
 void
-Sever_add_writer(Client *client);
+req_write_ctx(Client *client, struct iovec *iov, int iov_cnt, size_t total);
 
 int loop_done;