Commits

Anonymous committed 3c90f64

a wild experiment with async calls

  • Participants
  • Parent commits 4f5c5c6

Comments (0)

Files changed (1)

 static PyObject *ConnectionFailure;
 static PyObject *Failure;
 static PyObject *KeyExists;
+static PyObject *AsyncLimit;
+
+#define ASYNC_GUARD() if (context->async_mode) {	\
+  if (context->async_count >= context->async_limit) {	\
+    PyErr_SetString(AsyncLimit, "async limit reached");	\
+    return 0;						\
+  }
+
+#define ASYNC_EXIT(ticket) if(context->async_mode) { \
+    ++context->async_count;			     \
+    return Py_BuildValue("i", ticket[0]);	     \
+  }
 
 #define CB_EXCEPTION(type, msg) { \
   PyErr_SetString(type, msg); \
   return 0; \
   }
 
+#define INTERNAL_EXCEPTION_HANDLER(x) { \
+  if (context->internal_exception == 1) { \
+    x; \
+  } \
+  }
+
 /* allocators for commonly held objects */
 
 #define SLAB_SIZE 256
   }
 }
 
+/* async structures */
+
+typedef struct t_async_result {
+  int ticket;
+  PyObject *value;
+} async_result;
+
+typedef struct t_async_results {
+  async_result *buffer;
+  int size;
+  int count;
+  int start;
+  int end;
+} async_results;
+
+
+int async_size(async_results *x, int size) {
+  if (size == x->size)
+    return 0;
+
+  async_result *new = malloc(sizeof(async_result) * size);
+  if (!new) {
+    PyErr_SetString(OutOfMemory, "failed to resize async result buffer");
+    return -1;
+  }
+
+  int s = x->start, c = x->count, o = 0;
+  while (c--) {
+    new[o++] = x->buffer[s];
+    s = (s + 1) % x->size;
+  }
+  
+  free(x->buffer);
+  x->buffer = new;
+  x->size = size;
+  x->count = o;
+  x->start = 0;
+  x->end = o;
+
+  return 0;
+}
+
+void async_push(async_results *x, int ticket, PyObject *value) {
+  x->count++;
+  x->buffer[x->end].ticket = ticket;
+  x->buffer[x->end].value = value;
+  x->end = (x->end + 1) % x->size;
+  return 0;
+}
+
+int async_results(async_results *x, int (*f)(async_result *)) {
+  while (x->count) {
+    if (!f(&x->buffer[x->start]))
+      return -1;
+    x->start = (x->start + 1) % x->size;
+    x->count--;
+  } return 0;
+}
+
+PyObject *_async_rval;
+
+int begin_async_results() {
+  _async_rval = PyList_New(0);
+  if (!_async_rval) {
+    PyErr_SetString(Failure, "begin_async_results");
+    return -1;
+  } return 0;
+}
+
+int process_async_result(async_result *x) {
+  PyObject *t = PyTuplePack(2, PyInt_FromLong(x->ticket), x->result);
+  if (!t) {
+    PyErr_SetString(Failure, "process_async_result");
+    return -1;
+  }
+
+  return PyList_Append(_async_rval, t);
+}
+
 /* structure holding all state for python client */
 
 typedef struct t_pylibcb_instance {
   ticket_slab *ticket_slabs;
   event_list *event_pool;
   event_slab *event_slabs;
+  int async_mode;
+  async_results async;
+  int async_count;
+  int async_limit;
   int succeeded;
   int timed_out;
   int exception;
 static char *default_error_string = "internal exception";
 static PyObject *default_exception;
 
+PyObject *get_async_results() {
+  if (!begin_async_results())
+    return 0;
+  if (!async_results(&context->async, process_async_result))
+    return 0;
+  return _async_rval;
+}
+
 void *error_callback(libcouchbase_t instance,
 		     libcouchbase_error_t error,
 		     const char *errinfo) {
   return 0;
 }
 
-#define INTERNAL_EXCEPTION_HANDLER(x) { \
-  if (context->internal_exception == 1) { \
-    x; \
-  } \
-  }
-
 void *get_callback(libcouchbase_t instance,
 		   const void *cookie,
 		   libcouchbase_error_t error,
   default_error_string = "internal exception";
   default_exception = Failure;
 
+  z->async_limit = 20; /* maximum number of async events that may be queued up on the event loop */
+
   return PyCObject_FromVoidPtrAndDesc(z, pylibcb_instance_desc, pylibcb_instance_dest);
 
  free_event_base:
   } return t;
 }
 
+static PyObject *get_async_limit(PyObject *self, PyObject *args) {
+
+
+}
+
+static PyObject *set_async_limit(PyObject *self, PyObject *args) {
+
+}
+
+static PyObject *enable_async(PyObject *self, PyObject *args) {
+
+}
+
+static PyObject *disable_async(PyObject *self, PyObject *args) {
+
+}
 
 static PyObject *set(PyObject *self, PyObject *args) {
   PyObject *cb;
 
   if (!PyArg_ParseTuple(args, "Os#s#|kk", &cb, &key, &nkey, &val, &nval, &_expiry, &cas))
     return 0;
-  if (!pyobject_is_pylibcb_instance(cb))
-    return 0;
   set_context(cb);
 
+  ASYNC_GUARD();
+
   time_t expiry = _expiry;
   int *ticket = new_ticket();
   if (!ticket)
 
   libcouchbase_store_by_key(context->cb, hand_out_ticket(ticket), LIBCOUCHBASE_SET, 0, 0, 
 			    key, nkey, val, nval, 0, expiry, cas);
+  ASYNC_EXIT(ticket);
+
   libcouchbase_wait(context->cb);
   INTERNAL_EXCEPTION_HANDLER(return 0);
 
 
   if (!PyArg_ParseTuple(args, "Os#|k", &cb, &key, &nkey, &cas))
     return 0;
-  if (!pyobject_is_pylibcb_instance(cb))
-    return 0;
   set_context(cb);
 
+  ASYNC_GUARD();
+
   int *ticket = new_ticket();
   if (!ticket)
     return 0;
 
   libcouchbase_remove_by_key(context->cb, hand_out_ticket(ticket), 0, 0, key, nkey, cas);
+  ASYNC_EXIT(ticket);
+
   libcouchbase_wait(context->cb);
   INTERNAL_EXCEPTION_HANDLER(return 0);
 
   
   if (!PyArg_ParseTuple(args, "Os#|iki", &cb, &key, &_nkey, &usec, &_expiry, &return_cas))
     return 0;
-  if (!pyobject_is_pylibcb_instance(cb))
-    return 0;
   set_context(cb);
 
+  ASYNC_GUARD();
+
   libcouchbase_size_t nkey = _nkey;
   time_t expiry = _expiry;
   int *ticket = new_ticket();
   if (!ticket)
     return 0;
 
-  if (usec)
+  if (usec && !context->async_mode)
     if (!create_timeout(usec, hand_out_ticket(ticket))) {
       rip_ticket(ticket);
       return 0;
     }
+
   libcouchbase_mget_by_key(context->cb, hand_out_ticket(ticket), 0, 0, 1, &key, &nkey, _expiry ? &expiry : 0);
+  ASYNC_EXIT(ticket);
 
   while (!context->timed_out && !context->succeeded && !context->exception && !context->internal_exception)
     libcouchbase_wait(context->cb);
     "Set a value by key" },
   { "remove", _remove, METH_VARARGS,
     "Remove a value by key" },
+  { "get_async_limit", get_async_limit, METH_VARARGS,
+    "Get the limit for the number of requests allowed before one is required to complete" },
+  { "set_async_limit", set_async_limit, METH_VARARGS,
+    "Set the limit for the number of requests allowed before one is required to complete" },
+  { "enable_async", enable_async, METH_VARARGS,
+    "Enable asynchronous behavior" },
+  { "disable_async", disable_async, METH_VARARGS,
+    "Disable asynchronous behavior" },
+  { "run_loop", 
+
   { 0, 0, 0, 0 }
 };
 
   m = Py_InitModule("_pylibcb", PylibcbMethods);
   if (!m)
     return;
+  
+  struct exception_init {
+    char *name;
+    PyObject **exception;
+  } exceptions[] = {
+    { "Timeout", &Timeout },
+    { "OutOfMemory", &OutOfMemory },
+    { "ConnectionFailure", &ConnectionFailure },
+    { "Failure", &Failure },
+    { "KeyExists", &KeyExists },
+    { "AsyncLimit", &AsyncLimit },
+    { 0, 0 }
+  };
 
-  Timeout = PyErr_NewException("_pylibcb.Timeout", 0, 0);
-  Py_INCREF(Timeout);
-  PyModule_AddObject(m, "Timeout", Timeout);
-  
-  OutOfMemory = PyErr_NewException("_pylibcb.OutOfMemory", 0, 0);
-  Py_INCREF(OutOfMemory);
-  PyModule_AddObject(m, "OutOfMemory", OutOfMemory);
-
-  ConnectionFailure = PyErr_NewException("_pylibcb.ConnectionFailure", 0, 0);
-  Py_INCREF(ConnectionFailure);
-  PyModule_AddObject(m, "ConnectionFailure", ConnectionFailure);
-  
-  Failure = PyErr_NewException("_pylibcb.Failure", 0, 0);
-  Py_INCREF(Failure);
-  PyModule_AddObject(m, "Failure", Failure);
-
-  KeyExists = PyErr_NewException("_pylibcb.KeyExists", 0, 0);
-  Py_INCREF(KeyExists);
-  PyModule_AddObject(m, "KeyExists", KeyExists);
+  int i = 0;
+  while (exceptions[i].name) {
+    char name[256];
+    sprintf(name, "_pylibcb.%s", exceptions[i].name);
+    *exceptions[i].exception = PyErr_NewException(name, 0, 0);
+    Py_INCREF(*exceptions[i].exception);
+    PyModule_AddObject(m, exceptions[i].name, *exceptions[i].exception);
+    ++i;
+  }
 
   default_exception = Failure;
 }