Source

tobikko / tobikko / core / event.c

#include "event.h"
#include "hub.h"
#include "greensupport.h"
#include "timer.h"

static PyObject *notify_func = NULL;

static PyObject* get_notify(void);

static int
EventObject_init(EventObject *self, PyObject *args, PyObject *kwargs)
{
    int ret = 0;
    PyObject *waiters, *links;

    waiters = PySet_New(NULL);
    if(waiters == NULL){
        return -1;
    }
    links = PySet_New(NULL);
    if(links == NULL){
        Py_DECREF(waiters);
        return -1;
    }

    if(self->waiters){
        Py_CLEAR(self->waiters);
    }
    if(self->links){
        Py_CLEAR(self->links);
    }

    self->waiters = waiters;
    self->links = links;
    self->flag = 0;
    RDEBUG("self:%p", self);
    return ret;
}

static int 
EventObject_clear(EventObject *self)
{
    DEBUG("self:%p", self);
    Py_CLEAR(self->waiters);
    Py_CLEAR(self->links);
    Py_CLEAR(self->notify_timer);
    return 0; 
}

static int
EventObject_traverse(EventObject *self, visitproc visit, void *arg)
{
    DEBUG("self:%p cnt:%d", self, (int)Py_REFCNT(self));
    DEBUG("waiters:%d links:%d", (int)PySet_GET_SIZE(self->waiters), (int)PySet_GET_SIZE(self->links));
    Py_VISIT(self->links);
    Py_VISIT(self->waiters);
    Py_VISIT(self->notify_timer);
    return 0;
}

static void
EventObject_dealloc(EventObject *self)
{
    RDEBUG("self:%p", self);
    PyObject_GC_UnTrack(self);
    Py_TRASHCAN_SAFE_BEGIN(self);
    EventObject_clear(self);
    Py_TYPE(self)->tp_free((PyObject*)self);
	Py_TRASHCAN_SAFE_END(self);
}

static int
notify_to_hub(EventObject *self, PyObject *callable, PyObject *args)
{
    TimerObject *t;

    DEBUG("self->notify_timer %p", self->notify_timer);
    if(!self->notify_timer){
        t = (TimerObject*)hub_schedule_call(0, callable, args, NULL, NULL);
        if(t == NULL){
            return 0;
        }
        self->notify_timer = (PyObject*)t;
    }else{
        t = (TimerObject*)self->notify_timer;
        DEBUG("notify_timer called ? %d", t->called);
        if(t->called){
            Py_CLEAR(self->notify_timer);
            t = (TimerObject*)hub_schedule_call(0, callable, args, NULL, NULL);
            if(t == NULL){
                return 0;
            }
            self->notify_timer = (PyObject*)t;
        }else{
            DEBUG("Not yet calll notify");
        }
    }
    return 1;
}

static int
check_exception_timer(TimerObject *timer)
{
    int ret = 0;
    PyObject *type, *value, *traceback;
    TimerObject *exc_timer;

    if(timer == NULL){
        return ret;
    }

    PyErr_Fetch(&type, &value, &traceback);
    if(PyObject_HasAttrString(value, "timer")){
        exc_timer = (TimerObject*)PyObject_GetAttrString(value, "timer");
        if(exc_timer == timer){
            ret = 1;
        }
    }
    if(ret){
        Py_XDECREF(type);
        Py_XDECREF(value);
        Py_XDECREF(traceback);
    }else{
        PyErr_Restore(type, value, traceback);
    }
    return ret;
}

static PyObject*
EventObject_rawlink(EventObject *self, PyObject *callable)
{
    PyObject *args = NULL;

    DEBUG("self:%p", self);
    if(!callable || !PyCallable_Check(callable)){
        PyErr_SetString(PyExc_TypeError, "must be callable");
        return NULL;
    }

    if(PySet_Add(self->links, callable) == -1){
        return NULL;
    }

    if(self->flag && !is_active_timer((TimerObject*)self->notify_timer)){
        if(PySet_Add(self->waiters, callable) == -1){
            return NULL;
        }
        args = Py_BuildValue("(O)", self);
        //DEBUG("args:%p", args);
        if(!notify_to_hub(self, get_notify(), args)){
            Py_DECREF(args);
            return NULL;
        }
        Py_DECREF(args);
    }
    Py_RETURN_NONE;
}

static PyObject*
EventObject_unlink(EventObject *self, PyObject *callable)
{
    DEBUG("self:%p self->waiters:%p", self, self->waiters);
    if(PySet_Discard(self->links, callable) == -1){
        if(PyErr_Occurred()){
            PyErr_Clear();
        }
    }
    Py_RETURN_NONE;
}

static PyObject*
internal_set(EventObject *self)
{
    PyObject *ret;
    PyObject *args = NULL;

    self->flag = 1;
    ret = PyNumber_InPlaceOr(self->waiters, self->links);
    //DEBUG("waiters:%p ret:%p", self->waiters, ret);
    if(ret == NULL) {
        return NULL;
    }
    Py_DECREF(ret);
    if(PySet_GET_SIZE(self->waiters) > 0){
        args = Py_BuildValue("(O)", self);
        //DEBUG("args:%p", args);
        if(!notify_to_hub(self, get_notify(), args)){
            Py_DECREF(args);
            return NULL;
        }
        Py_DECREF(args);
    }
    Py_RETURN_NONE;
}

PyObject*
event_set(EventObject *self)
{
    return internal_set(self);
}

static PyObject*
EventObject_set(EventObject *self, PyObject *args)
{
    return internal_set(self);
}

static PyObject*
EventObject_reset(EventObject *self, PyObject *args)
{
    self->flag = 0;
    Py_RETURN_NONE;
}

static PyObject*
internal_wait(EventObject *self, int timeout)
{
    PyObject *current, *s, *ret, *unlinkret;
    TimerObject *t = NULL;
    int success = 1;
    ret = Py_False;
    
    if(!self->flag){
        current = (PyObject*)greenlet_getcurrent();
        s = PyObject_GetAttrString((PyObject*)current, "switch");
        ret = EventObject_rawlink(self, s);
        Py_XDECREF(ret);
        Py_DECREF(s);
        Py_DECREF(current);

        if(ret == NULL){
            return NULL;
        }

        if(timeout > 0){
            t = (TimerObject*)hub_set_timeout(timeout, NULL);
            if(t == NULL){
                return NULL;
            }
        }
        ret = hub_switch();
        Py_CLEAR(ret);
        if(t){
            t->called = 1;
            Py_DECREF(t);
        }
        if(PyErr_Occurred()){
            success = 0;
            DEBUG("PyErr_Occurred");
            if(PyErr_ExceptionMatches(TimeoutException)){
                DEBUG("TimeoutException");
                if(check_exception_timer(t)){
                    success = 1;
                }else{
                    RDEBUG("raise TimeoutException");
                    ret = NULL;
                }
            }else{
               ret = NULL;
            }
        }

        unlinkret = EventObject_unlink(self, s);
        Py_XDECREF(unlinkret);
    }
    if(success){
        ret = self->flag == 1 ? Py_True : Py_False;
        Py_INCREF(ret);
    }
    return ret;
}

PyObject*
event_wait(EventObject *self, int timeout)
{
    return internal_wait(self, timeout);
}

static PyObject*
EventObject_wait(EventObject *self, PyObject *args, PyObject *kwargs)
{
    
    int timeout = 0;

	static char *keywords[] = {"timeout", NULL};

    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i:wait", keywords, &timeout)){
		return NULL;
    }
    if(timeout < 0){
        PyErr_SetString(PyExc_TypeError, "timeout value out of range");
        return NULL;
    }
    return internal_wait(self, timeout);
}

static PyObject*
EventObject_notify(EventObject *self, PyObject *args)
{
    PyObject *o = NULL, *ret, *type, *value, *traceback;
    int contains = -1;
    
    self = (EventObject*)PyTuple_GET_ITEM(args, 0);
    DEBUG("self:%p", self);
    while(1){
        o = PySet_Pop(self->waiters);
        if(o == NULL){
            PyErr_Clear();
            break;
        }
        contains = PySet_Contains(self->links, o);
        if(contains == -1){
            Py_DECREF(o);
            return NULL;
        }else if(contains == 1){
            ret = PyObject_CallObject(o, args);
            Py_XDECREF(ret);
            if(PyErr_Occurred()){
                PyErr_Fetch(&type, &value, &traceback);
                PyErr_Clear();
                ret = hub_handle_error((PyObject*)self, type, value, traceback);
                Py_XDECREF(ret);
            }
        }
        Py_XDECREF(o);

    }
    Py_XDECREF(self);
    Py_RETURN_NONE;
}


static PyMethodDef EventObject_methods[] = {
    {"set",  (PyCFunction)EventObject_set,  METH_NOARGS, 0},
    {"reset",  (PyCFunction)EventObject_reset,  METH_NOARGS, 0},
    {"clear",  (PyCFunction)EventObject_reset,  METH_NOARGS, 0},
    {"wait",  (PyCFunction)EventObject_wait,  METH_VARARGS|METH_KEYWORDS, 0},
    {NULL, NULL}           /* sentinel */
};

static PyMethodDef notify_func_def = {"notify",   (PyCFunction)EventObject_notify, METH_VARARGS, 0};

static PyObject*
get_notify(void)
{
    if(notify_func == NULL){
        notify_func = PyCFunction_NewEx(&notify_func_def, (PyObject *)NULL, NULL);
    }
    Py_INCREF(notify_func);
    return notify_func;
}

PyTypeObject EventObjectType = {
#ifdef PY3
	PyVarObject_HEAD_INIT(NULL, 0)
#else
	PyObject_HEAD_INIT(NULL)
	0,					/* ob_size */
#endif
    MODULE_NAME ".core.Event",             /*tp_name*/
    sizeof(EventObject), /*tp_basicsize*/
    0,                         /*tp_itemsize*/
    (destructor)EventObject_dealloc, /*tp_dealloc*/
    0,                         /*tp_print*/
    0,                         /*tp_getattr*/
    0,                         /*tp_setattr*/
    0,                         /*tp_compare*/
    0,                         /*tp_repr*/
    0,                         /*tp_as_number*/
    0,                         /*tp_as_sequence*/
    0,            /*tp_as_mapping*/
    0,                         /*tp_hash */
    0,                         /*tp_call*/
    0,                         /*tp_str*/
    0,                         /*tp_getattro*/
    0,                         /*tp_setattro*/
    0,                         /*tp_as_buffer*/
    Py_TPFLAGS_DEFAULT|Py_TPFLAGS_BASETYPE|Py_TPFLAGS_HAVE_GC,        /*tp_flags*/
    "EventObject",           /* tp_doc */
    (traverseproc)EventObject_traverse,		               /* tp_traverse */
    (inquiry)EventObject_clear,		               /* tp_clear */
    0,		               /* tp_richcompare */
    0,		               /* tp_weaklistoffset */
    0,		               /* tp_iter */
    0,		               /* tp_iternext */
    EventObject_methods,                    /* tp_methods */
    0,                         /* tp_members */
    0,                      /* tp_getset */
    0,                         /* tp_base */
    0,                         /* tp_dict */
    0,                         /* tp_descr_get */
    0,                         /* tp_descr_set */
    0,                         /* tp_dictoffset */
    (initproc)EventObject_init,                      /* tp_init */
    PyType_GenericAlloc,                         /* tp_alloc */
    0,                           /* tp_new */
    PyObject_GC_Del,             /* tp_free */
};