Commits

Anonymous committed 7d37cd5

compiles and contains the bare minimum needed functions for an async client - now must implement callback side of the async deal

Comments (0)

Files changed (1)

 static PyObject *KeyExists;
 static PyObject *AsyncLimit;
 
-#define ASYNC_GUARD() if (context->async_mode) {	\
-  if (context->async_count >= context->async_limit) {	\
-    PyErr_SetString(AsyncLimit, "async limit reached");	\
-    return 0;						\
+#define ASYNC_GUARD() if (context->async_mode) {		\
+    if (context->async_count >= context->async_limit) {		\
+      PyErr_SetString(AsyncLimit, "async limit reached");	\
+      return 0;							\
+    }								\
   }
 
 #define ASYNC_EXIT(ticket) if(context->async_mode) { \
     return Py_BuildValue("i", ticket[0]);	     \
   }
 
-#define CB_EXCEPTION(type, msg) { \
-  PyErr_SetString(type, msg); \
-  context->exception = 1; \
-  return 0; \
+#define CB_EXCEPTION(type, msg) {		\
+    PyErr_SetString(type, msg);			\
+    context->exception = 1;			\
+    return 0;					\
   }
 
-#define INTERNAL_EXCEPTION_HANDLER(x) { \
-  if (context->internal_exception == 1) { \
-    x; \
-  } \
+#define INTERNAL_EXCEPTION_HANDLER(x) {		\
+    if (context->internal_exception == 1) {	\
+      x;					\
+    }						\
   }
 
 /* allocators for commonly held objects */
   x->buffer[x->end].ticket = ticket;
   x->buffer[x->end].value = value;
   x->end = (x->end + 1) % x->size;
-  return 0;
 }
 
-int async_results(async_results *x, int (*f)(async_result *)) {
+int process_async_results(async_results *x, int (*f)(async_result *)) {
   while (x->count) {
-    if (!f(&x->buffer[x->start]))
+    if (f(&x->buffer[x->start]))
       return -1;
     x->start = (x->start + 1) % x->size;
     x->count--;
 }
 
 int process_async_result(async_result *x) {
-  PyObject *t = PyTuplePack(2, PyInt_FromLong(x->ticket), x->result);
+  PyObject *t = PyTuple_Pack(2, PyInt_FromLong(x->ticket), x->value);
   if (!t) {
     PyErr_SetString(Failure, "process_async_result");
-    return -1;
+    return 0;
   }
 
   return PyList_Append(_async_rval, t);
 static PyObject *default_exception;
 
 PyObject *get_async_results() {
-  if (!begin_async_results())
+  if (begin_async_results())
     return 0;
-  if (!async_results(&context->async, process_async_result))
+  if (process_async_results(&context->async, process_async_result))
     return 0;
   return _async_rval;
 }
 }
 
 static PyObject *get_async_limit(PyObject *self, PyObject *args) {
+  PyObject *cb;
+  
+  if (!PyArg_ParseTuple(args, "O", &cb))
+    return 0;
 
+  set_context(cb);
 
+  return PyInt_FromLong(context->async_limit);
 }
 
 static PyObject *set_async_limit(PyObject *self, PyObject *args) {
+  PyObject *cb;
+  int limit;
 
+  if (!PyArg_ParseTuple(args, "Oi", &cb, &limit))
+    return 0;
+
+  if (limit < 1) {
+    PyErr_SetString(Failure, "limit must be an integer value greater than 0");
+    return 0;
+  } else if (limit > 16384) {
+    PyErr_SetString(Failure, "there is an arbitrary upper limit of 16384 simultaneous requests as that is enough for anyone");
+    return 0;
+  }
+
+  set_context(cb);
+
+  if (context->async.count < limit)
+    async_size(&context->async, limit);
+  context->async_limit = limit;
+
+  Py_RETURN_NONE;
+}
+
+static PyObject *get_async_count(PyObject *self, PyObject *args) {
+  PyObject *cb;
+
+  if (!PyArg_ParseTuple(args, "O", &cb))
+    return 0;
+
+  set_context(cb);
+  return PyInt_FromLong(context->async_count);
 }
 
 static PyObject *enable_async(PyObject *self, PyObject *args) {
+  PyObject *cb;
 
+  if (!PyArg_ParseTuple(args, "O", &cb))
+      return 0;
+  set_context(cb);
+
+  context->async_mode = 1;
+  context->async_count = 0;
+  if (context->async.count < context->async_limit)
+    async_size(&context->async, context->async_limit);
+
+  Py_RETURN_NONE;
 }
 
 static PyObject *disable_async(PyObject *self, PyObject *args) {
+  PyObject *cb;
 
+  if (!PyArg_ParseTuple(args, "O", &cb))
+      return 0;
+  set_context(cb);
+
+  context->async_mode = 0;
+
+  Py_RETURN_NONE;
 }
 
 static PyObject *set(PyObject *self, PyObject *args) {
   if (context->exception)
     return 0;
 
-  Py_INCREF(Py_None);
-  return Py_None;
+  Py_RETURN_NONE;
 }
 
 static PyObject *_remove(PyObject *self, PyObject *args) {
   if (context->exception)
     return 0;
 
-  Py_INCREF(Py_None);
-  return Py_None;
+  Py_RETURN_NONE;
 }
 
 static PyObject *get(PyObject *self, PyObject *args) {
     return 0;
   }
 
-  if (context->result == LIBCOUCHBASE_KEY_ENOENT) {
-    Py_INCREF(Py_None);
-    return Py_None;
-  }
+  if (context->result == LIBCOUCHBASE_KEY_ENOENT)
+    Py_RETURN_NONE;
 
   if (return_cas)
     return Py_BuildValue("Ok", context->returned_value, (unsigned long) context->returned_cas);
   return context->returned_value;
 }
 
+static PyObject *async_wait(PyObject *self, PyObject *args) {
+  PyObject *cb;
+  int usec = 0;
+
+  if (!PyArg_ParseTuple(args, "Oi", &cb, &usec))
+    return 0;
+  set_context(cb);
+
+  if (!context->async_mode) {
+    PyErr_SetString(Failure, "async mode is not enabled");
+    return 0;
+  }
+
+  int *ticket = new_ticket();
+  if (!ticket)
+    return 0;
+
+  if (!create_timeout(usec, hand_out_ticket(ticket))) {
+    rip_ticket(ticket);
+    return 0;
+  }
+
+  while (!context->timed_out && context->async_count && !context->internal_exception)
+    libcouchbase_wait(context->cb);
+  INTERNAL_EXCEPTION_HANDLER(return 0);
+
+  return get_async_results();
+}
+
 static PyMethodDef PylibcbMethods[] = {
   { "open", open, METH_VARARGS,
     "Open connection to couchbase server" },
     "Get the limit for the number of requests allowed before one is required to complete" },
   { "set_async_limit", set_async_limit, METH_VARARGS,
     "Set the limit for the number of requests allowed before one is required to complete" },
+  { "get_async_count", get_async_count, METH_VARARGS,
+    "Get the number of incomplete asynchronous requests waiting" },
   { "enable_async", enable_async, METH_VARARGS,
     "Enable asynchronous behavior" },
   { "disable_async", disable_async, METH_VARARGS,
     "Disable asynchronous behavior" },
-  { "run_loop", 
-
+  { "async_wait", async_wait, METH_VARARGS,
+  "Execute eventloop for a given number of microseconds" },
   { 0, 0, 0, 0 }
 };