Commits

Sebastian Hubbard  committed 29b231f

some changes to make distutils config work on my machine. consistent callback ticketing

  • Participants
  • Parent commits 682f5aa

Comments (0)

Files changed (3)

 #include <sys/types.h>
 #include <sys/socket.h>
 
-
-#ifndef HAVE_LIBEVENT2
-
-static int
-event_assign(struct event *ev,
-             struct event_base *base,
-             int fd,
-             short events,
-             void (*callback)(int, short, void *),
-             void *arg)
-{
-    event_base_set(base, ev);
-    ev->ev_callback = callback;
-    ev->ev_arg = arg;
-    ev->ev_fd = fd;
-    ev->ev_events = events;
-    ev->ev_res = 0;
-    ev->ev_flags = EVLIST_INIT;
-    ev->ev_ncalls = 0;
-    ev->ev_pncalls = NULL;
-
-    return 0;
-}
-
-static struct event *event_new(struct event_base *base,
-			       int fd,
-			       short events,
-			       void (*cb)(int, short, void *),
-			       void *arg)
-{
-    struct event *ev;
-    ev = malloc(sizeof(struct event));
-    if (ev == NULL) {
-        return NULL;
-    }
-    if (event_assign(ev, base, fd, events, cb, arg) < 0) {
-        free(ev);
-        return NULL;
-    }
-    return ev;
-}
-
-#endif
-
 char *asciiz(const void *data, size_t nbytes) {
   char *z = malloc(nbytes+1);
   memcpy(z, data, nbytes);
   libcouchbase_error_t e = LIBCOUCHBASE_SUCCESS;
   libcouchbase_io_opt_t *cb_base = libcouchbase_create_io_ops(LIBCOUCHBASE_IO_OPS_LIBEVENT, base, &e);
 
+  printf("%d\n", e);
+
   if (e == LIBCOUCHBASE_NOT_SUPPORTED) {
     printf("libcouchbase does not support libevent on this platform\n");
     exit(0);
 static PyObject *Failure;
 
 typedef struct t_pylibcb_instance {
-  int callback_counter;
+  int callback_ticket;
   int succeeded;
   int timed_out;
-  char *current_key;
-  int current_nkey;
   char returned_value[16384];
   int returned_value_nbytes;
   libcouchbase_cas_t returned_cas;
 
 static pylibcb_instance *context = 0;
 
-int operation_key_is_current(const void *key, libcouchbase_size_t nbytes) {
-  if (nbytes != context->current_nkey)
-    return 0;
-  if (memcmp(context->current_key, key, nbytes))
-    return 0;
-  return 1;
+int *new_ticket() {
+  int *boxed = malloc(sizeof(int) *2);
+  boxed[0] = ++context->callback_ticket;
+  boxed[1] = 0;
+  return boxed;
+}
+
+int *punch_ticket(int *t) {
+  ++t[1];
+  return t;
+}
+
+int rip_ticket(int *t) {
+  int r = t[0];
+  if (!--t[1])
+    free(t);
+  return r;
 }
 
 void *get_callback(libcouchbase_t instance,
 		   libcouchbase_size_t nbytes,
 		   libcouchbase_uint32_t flags,
 		   libcouchbase_cas_t cas) {
-  char *k = asciiz(key, nkey);
-  char *v = asciiz(bytes, nbytes);
-  printf("get_callback for key %s returned value %s\n", k, v);
-  free(k);
-  free(v);
-
-  /* ensure this callback matches up with the one expected by the current library invocation */
-  if (!operation_key_is_current(key, nbytes))
+  if (rip_ticket(cookie) != context->callback_ticket)
     return 0;
   
   /* flag the operation as a success */
 		   const void *key,
 		   libcouchbase_size_t nkey,
 		   libcouchbase_cas_t cas) {
-  
-  if (!operation_key_is_current(key, nkey))
+  if (rip_ticket(cookie) != context->callback_ticket)
     return 0;
 
   /* something here to pass on successes/failures up the chain */
 		      libcouchbase_error_t error,
 		      const void *key,
 		      libcouchbase_size_t nkey) {
-
-  if (!operation_key_is_current(key, nkey))
+  if (rip_ticket(cookie) != context->callback_ticket)
     return 0;
-
+  
   /* something here to pass on successes/failures up the chain */
 
   return 0;
 }
 
 void timeout_callback(libcouchbase_socket_t sock, short which, void *cb_data) {
-  int operation_number = *((int *) cb_data);
-  free(cb_data);
-  if (operation_number != context->callback_counter)
-    return;
+  if (rip_ticket(cb_data) != context->callback_ticket)
+    return 0;
 
   /* mark current operation as timed out and break the event loop */
  
   event_base_loopbreak(context->base);
 }
 
-void create_timeout(unsigned int usec, int callback_counter) {
+void create_timeout(unsigned int usec, int *ticket) {
   struct event *timeout = event_new(context->base, -1, 0, 0, 0);
   struct timeval tmo;
-  int *callback_box = malloc(sizeof(int));
-  *callback_box = callback_counter;
-  event_assign(timeout, context->base, -1, EV_TIMEOUT | EV_PERSIST, timeout_callback, callback_box);
+  event_assign(timeout, context->base, -1, EV_TIMEOUT, timeout_callback, ticket);
   tmo.tv_sec = usec / 1000000;
   tmo.tv_usec = usec % 1000000;
   event_add(timeout, &tmo);
     PyErr_SetString(OutOfMemory, "ran out of memory while allocating pylibcb instance");
     return 0;
   }
-  z->callback_counter = 0;
+  z->callback_ticket = 0;
 
   z->base = event_base_new();
   if (!z) {
-    free(z);
     PyErr_SetString(OutOfMemory, "could not create event base for new instance");
-    return 0;
+    goto free_instance;
   }
 
   libcouchbase_error_t e = LIBCOUCHBASE_SUCCESS;
   libcouchbase_io_opt_t *cb_base = libcouchbase_create_io_ops(LIBCOUCHBASE_IO_OPS_LIBEVENT, z->base, &e);
   if (e == LIBCOUCHBASE_NOT_SUPPORTED) {
-    event_base_free(z->base);
-    free(z);
     PyErr_SetString(Failure, "libcouchbase as built on this platform does not support libevent ?");
+    goto free_event_base;
+  } else if (e == LIBCOUCHBASE_ENOMEM) {
+    PyErr_SetString(OutOfMemory, "ran out of memory while setting up libcouchbase event loop");
+    goto free_event_base;
+  } else if (e == LIBCOUCHBASE_ERROR) {
+    PyErr_SetString(Failure, "a failure occurred while setting up libcouchbase event loop (probably couldn't locate libcouchbase_event shared library)");
+    goto free_event_base;
     return 0;
-  } else if (e == LIBCOUCHBASE_ENOMEM) {
-    event_base_free(z->base);
-    free(z);
-    PyErr_SetString(OutOfMemory, "ran out of memory while setting up libcouchbase event loop");
-    return 0;
-  }
-  if (e != LIBCOUCHBASE_SUCCESS) {
-    event_base_free(z->base);
-    free(z);
-    PyErr_SetString(Failure, "a mysterious failure occurred while setting up libcouchbase event loop");
-    return 0;
+  } else if (e != LIBCOUCHBASE_SUCCESS) {
+    PyErr_SetString(Failure, "an unhandled error condition occurred when calling libcouchbase_create_io_ops");
+    goto free_event_base;
   }
   
   z->cb = libcouchbase_create(host, user, passwd, bucket, cb_base);
   if (!z->cb) {
     /* unsure what to do with libcouchbase_io_ops_t created in last step */
-    event_base_free(z->base);
-    free(z);
     PyErr_SetString(OutOfMemory, "could not create libcouchbase instance");
-    return 0;
+    goto free_event_base;
   }
 
   libcouchbase_set_storage_callback(z->cb, (libcouchbase_storage_callback) set_callback);
   libcouchbase_set_remove_callback(z->cb, (libcouchbase_remove_callback) remove_callback);
 
   if (libcouchbase_connect(z->cb) != LIBCOUCHBASE_SUCCESS) {
-    event_base_free(z->base);
-    free(z);
     PyErr_SetString(ConnectionFailure, "libcouchbase_connect");
-    return 0;
+    goto free_event_base;
   }
 
   /* establish connection */
   libcouchbase_wait(z->cb);
 
   return PyCObject_FromVoidPtrAndDesc(z, pylibcb_instance_desc, pylibcb_instance_dest);
+
+ free_event_base:
+  free(z->base);
+ free_instance:
+  free(z);
+
+  return 0;
 }
 
 int pyobject_is_pylibcb_instance(PyObject *x) {
     return 0;
 
   context = (pylibcb_instance *) PyCObject_AsVoidPtr(cb);
-  context->current_key = key;
-  context->current_nkey = nkey;
-
-  libcouchbase_store_by_key(context->cb, 0, LIBCOUCHBASE_SET, 0, 0, key, nkey, val, nval, 0, 0, 0);
+  libcouchbase_store_by_key(context->cb, punch_ticket(new_ticket()), LIBCOUCHBASE_SET, 0, 0, key, nkey, val, nval, 0, 0, 0);
   libcouchbase_wait(context->cb);
 
   Py_INCREF(Py_None);
   return Py_None;
 }
 
-static PyObject *del(PyObject *self, PyObject *args) {
+static PyObject *_remove(PyObject *self, PyObject *args) {
   PyObject *cb;
   void *key;
   int nkey;
 
-  if (!PyArg_ParseTuple(args, "O#s", &cb, &key, &nkey))
+  if (!PyArg_ParseTuple(args, "Os#", &cb, &key, &nkey))
     return 0;
   if (!pyobject_is_pylibcb_instance(cb))
     return 0;
 
   context = (pylibcb_instance *) PyCObject_AsVoidPtr(cb);
-  context->current_key = key;
-  context->current_nkey = nkey;
-  
-  libcouchbase_remove_by_key(context->cb, 0, 0, 0, key, nkey, 0);
+  libcouchbase_remove_by_key(context->cb, punch_ticket(new_ticket()), 0, 0, key, nkey, 0);
   libcouchbase_wait(context->cb);
 
   Py_INCREF(Py_None);
   int nkey;
   int usec = 0;
 
-  if (!PyArg_ParseTuple(args, "O#s|i", &cb, &key, &nkey, &usec))
+  if (!PyArg_ParseTuple(args, "Os#|i", &cb, &key, &nkey, &usec))
     return 0;
   if (!pyobject_is_pylibcb_instance(cb))
     return 0;
   
   context = (pylibcb_instance *) PyCObject_AsVoidPtr(cb);
-  context->current_key = key;
-  context->current_nkey = nkey;
   context->succeeded = 0;
   context->timed_out = 0;
 
+  int *ticket = new_ticket();
+
   if (usec)
-    create_timeout(usec, ++context->callback_counter);
-  libcouchbase_mget_by_key(context->cb, 0, 0, 0, 1, &key, &nkey, 0);
+    create_timeout(usec, punch_ticket(ticket));
+  libcouchbase_mget_by_key(context->cb, punch_ticket(ticket), 0, 0, 1, &key, &nkey, 0);
 
   while (!context->timed_out && !context->succeeded)
     libcouchbase_wait(context->cb);
     "Get a value by key. Optionally specify timeout in usecs" },
   { "set", set, METH_VARARGS,
     "Set a value by key" },
-  { "del", del, METH_VARARGS,
+  { "remove", _remove, METH_VARARGS,
     "Remove a value by key" },
   { 0, 0, 0, 0 }
 };
 
 
 pylibcb = Extension('pylibcb',
-                    include_dirs = ['/opt/couchbase/include'],
+                    #include_dirs = ['/opt/couchbase/include'],
                     libraries = ['event', 'couchbase'],
                     library_dirs = ['/opt/couchbase/lib'],
                     sources = ['pylibcb.c'])