Commits

Trent Nelson committed 2cf10a3 Draft

Working checkpoint commit.

Comments (0)

Files changed (13)

 #define _Py_NOT_PARALLEL ((void *)_Py_DEADBEEF)
 #define _Py_IS_PARALLEL  ((void *)_Px_DEADBEEF)
 
-#define _PyObject_HEAD_EXTRA              \
-    void *is_px;                          \
-    void *px;                             \
-    struct _object *_ob_next;             \
+#define _PyObject_HEAD_EXTRA            \
+    void   *is_px;                      \
+    void   *px;                         \
+    size_t  px_flags;                   \
+    void   *srw_lock;                   \
+    struct _object *_ob_next;           \
     struct _object *_ob_prev;
 
-#define _PyObject_EXTRA_INIT              \
-    (void *)_Py_NOT_PARALLEL,             \
-    (void *)_Py_NOT_PARALLEL,             \
-    (struct _object *)_Py_NOT_PARALLEL,   \
+#define _PyObject_EXTRA_INIT            \
+    (void *)_Py_NOT_PARALLEL,           \
+    (void *)_Py_NOT_PARALLEL,           \
+    0,                                  \
+    NULL,                               \
+    (struct _object *)_Py_NOT_PARALLEL, \
     (struct _object *)_Py_NOT_PARALLEL,
 #endif /* WITH_PARALLEL */
 

Include/objimpl.h

     assert(Py_TYPE(op));
     op->is_px = _Py_NOT_PARALLEL;
     op->px    = _Py_NOT_PARALLEL;
+    op->px_flags = 0;
+    op->srw_lock = NULL;
 #ifdef Py_TRACE_REFS
     op->_ob_next = NULL;
     op->_ob_prev = NULL;

Include/pyparallel.h

 #endif
 #include "pyintrinsics.h"
 
+#define Py_PXFLAGS(o)   (((PyObject *)(o))->px_flags)
+
+#define Py_PXFLAGS_DEFAULT              (0)
+#define Py_PXFLAGS_ISPX                 (1UL <<  1)
+#define Py_PXFLAGS_SRWLOCK              (1UL <<  2)
+
 PyAPI_DATA(long) Py_MainThreadId;
 PyAPI_DATA(long) Py_MainProcessId;
 PyAPI_DATA(long) Py_ParallelContextsEnabled;
 
 PyAPI_FUNC(int)     _Px_TEST(void *p);
 #ifdef Py_DEBUG
-PyAPI_FUNC(int)     _Py_ISPX(void *ob);
+
 PyAPI_FUNC(int)     _Py_ISPY(void *ob);
 PyAPI_FUNC(int)     _Py_PXCTX(void);
 
         m,                         \
         _PYOBJ_TEST                \
     )
+#define Py_PYOBJ(m) Py_TEST_OBJ(m)
 
 #define Px_TEST_OBJ(m)             \
     _PyParallel_Guard(             \
         m,                         \
         _PXOBJ_TEST                \
     )
+#define Py_PXOBJ(m) Px_TEST_OBJ(m)
 
 #define Py_GUARD_OBJ(m)            \
     _PyParallel_Guard(             \

Lib/async/__init__.py

 
 from _async import (
     run,
+    client,
+    server,
     run_once,
+    protect,
+    unprotect,
+    protected,
 )
 
 def call_from_main_thread_and_wait(f):
 RECEIVE_MODE_DATA = 1
 RECEIVE_MODE_LINE = 2
 
-class _client:
-    __slots__ = [
-        'socket',
-
-        'data_received',
-        'line_received',
-        'write_succeeded',
-        'connection_lost',
-        'connection_closed',
-
-        'initial_data',
-        'receive_mode',
-        'auto_reconnect',
-        'eol',
-        'wait_for_eol',
-    ]
-    def __init__(self, sock, **kwds):
-        self.socket = sock
-        self.__dict__.update(**kwds)
-
-def make_client(initial_data=None,
-            connected=None,
-            data_received=None,
-            line_received=None,
-            write_succeeded=None,
-            connection_lost=None,
-            connection_closed=None,
-
-            receive_mode=RECEIVE_MODE_DATA,
-            auto_reconnect=False,
-            eol='\r\n',
-            wait_for_eol=True):
-    c = object()
-
-
-
-def client(address,
-           initial_data=None,
-           connected=None,
-           data_received=None,
-           line_received=None,
-           write_succeeded=None,
-           connection_lost=None,
-           connection_closed=None,
-
-           receive_mode=RECEIVE_MODE_DATA,
-           auto_reconnect=False,
-           eol='\r\n',
-           wait_for_eol=True):
-
-    c = _async.client(address,
-                      initial_data=initial_data,
-                      connected=connected,
-                      data_received=data_received,
-                      line_received=line_received,
-                      write_succeeded=write_succeeded,
-                      connection_lost=connection_lost,
-                      connection_closed=connection_closed,
-
-                      auto_reconnect=auto_reconnect,
-                      eol=eol,
-                      wait_for_eol=wait_for_eol)
-
-
-
-
-
-
 # vim:set ts=8 sw=4 sts=4 tw=78 et:

Lib/async/test/test_protect.py

+import unittest
+import async
+
+class TestProtect(unittest.TestCase):
+
+    def test_protect_basic(self):
+        o = object()
+        async.protect(o)
+        self.assertEqual(async.protected(o), True)
+        async.unprotect(o)
+        self.assertEqual(async.protected(o), False)
+
+if __name__ == '__main__':
+    unittest.main()
+
+# vim:set ts=8 sw=4 sts=4 tw=78 et:

Lib/async/test/test_socket.py

 )
 
 import async
-import async.socket
-
-from async.socket import (
-    Socket,
-    ClientSocket,
-    ServerSocket,
-)
 
 from async.test import (
     QOTD,

Lib/async/test/test_work.py

         _async.submit_work(f, None, None, None, None)
         _async.run()
 
-    def test_submit_simple_work_errback_invoked2(self):
-        #def f():
-        #    i = len(laksjdflaskjdflsakjdfsalkjdf)
-        #    laksjdflaskjdflsakjdfsalkjdf = 'lkasjdflskjdf'
-
+    def test_submit_simple_work_errback_invoked1(self):
         def f():
             return laksjdflaskjdflsakjdfsalkjdfaa
 
         _async.submit_work(f, None, None, cb, eb)
         _async.run()
 
-    def test_submit_simple_work_errback_invoked1(self):
+    def test_submit_simple_work_errback_invoked2(self):
         def f():
             return laksjdflaskjdflsakjdfsalkjdf
 
         _async.submit_work(f, None, None, None, eb)
         _async.run()
 
-    def test_submit_simple_work_errback_invoked1(self):
+    def test_submit_simple_work_errback_invoked3(self):
         def f():
             return laksjdflaskjdflsakjdfsalkjdf
 
+        def eb(e):
+            self.assertIsInstance(e, tuple)
+            (et, ev, eb) = e
+            try:
+                f()
+            except NameError as e2:
+                self.assertEqual(et, e2.__class__)
+                self.assertEqual(ev, e2.args[0])
+                self.assertEqual(eb.__class__, e2.__traceback__.__class__)
+            else:
+                self.assertEqual(0, 1)
+
+        _async.submit_work(f, None, None, None, eb)
+        _async.run()
+
+    def test_submit_simple_work_errback_invoked4(self):
+        def f():
+            i = len(laksjdflaskjdflsakjdfsalkjdf)
+            laksjdflaskjdflsakjdfsalkjdf = 'lkasjdflskjdf'
+
         @async.call_from_main_thread_and_wait
         def test_e(e):
             self.assertFalse(_async.is_parallel_thread())
         _async.submit_work(f, None, None, None, eb)
         _async.run()
 
+class TestProtect(unittest.TestCase):
+
+    def test_protect_basic(self):
+        o = object()
+        async.protect(o)
+        self.assertEqual(async.protected(o), True)
+        async.unprotect(o)
+        self.assertEqual(async.protected(o), False)
+
+
 
 if __name__ == '__main__':
     unittest.main()

Modules/socketmodule.c

 #define SEGMENT_SIZE (32 * 1024 -1)
 #endif
 
-#ifndef WITH_PARALLEL
-/* Convert "sock_addr_t *" to "struct sockaddr *". */
-#define SAS2SA(x)       (&((x)->sa))
-#endif
-
 /*
  * Constants for getnameinfo()
  */
    This is always a string of the form 'dd.dd.dd.dd' (with variable
    size numbers). */
 
-PyObject *
+static PyObject *
 makeipaddr(struct sockaddr *addr, int addrlen)
 {
     char buf[NI_MAXHOST];
    to determine what kind of address it really is. */
 
 /*ARGSUSED*/
-PyObject *
+static PyObject *
 makesockaddr(SOCKET_T sockfd, struct sockaddr *addr, size_t addrlen, int proto)
 {
     if (addrlen == 0) {
    0 of not.  The address is returned through addr_ret, its length
    through len_ret. */
 
-int
+static int
 getsockaddrarg(PySocketSockObject *s, PyObject *args,
                struct sockaddr *addr_ret, int *len_ret)
 {
     &sock_type,
     NULL,
     NULL
+#ifdef WITH_PARALLEL
+    ,
+    getsockaddrarg,
+    getsockaddrlen,
+    makesockaddr,
+    NULL, /* LPFN_ACCEPTEX             */
+    NULL, /* LPFN_CONNECTEX            */
+    NULL, /* LPFN_WSARECVMSG           */
+    NULL, /* LPFN_WSASENDMSG           */
+    NULL, /* LPFN_DISCONNECTEX         */
+    NULL, /* LPFN_TRANSMITFILE         */
+    NULL, /* LPFN_TRANSMITPACKETS      */
+    NULL  /* LPFN_GETACCEPTEXSOCKADDRS */
+#endif
 };
 
 
     Py_INCREF(has_ipv6);
     PyModule_AddObject(m, "has_ipv6", has_ipv6);
 
+#ifdef WITH_PARALLEL
+#ifdef MS_WINDOWS
+    PySocketModuleAPI.AcceptEx = _AcceptEx;
+    PySocketModuleAPI.ConnectEx = _ConnectEx;
+    PySocketModuleAPI.WSARecvMsg = _WSARecvMsg;
+    PySocketModuleAPI.WSASendMsg = _WSASendMsg;
+    PySocketModuleAPI.DisconnectEx = _DisconnectEx;
+    PySocketModuleAPI.TransmitFile = _TransmitFile;
+    PySocketModuleAPI.TransmitPackets = _TransmitPackets;
+    PySocketModuleAPI.GetAcceptExSockaddrs = _GetAcceptExSockaddrs;
+#endif /* MS_WINDOWS    */
+#endif /* WITH_PARALLEL */
+
     /* Export C API */
     if (PyModule_AddObject(m, PySocket_CAPI_NAME,
            PyCapsule_New(&PySocketModuleAPI, PySocket_CAPSULE_NAME, NULL)

Modules/socketmodule.h

     PyTypeObject *Sock_Type;
     PyObject *error;
     PyObject *timeout_error;
+#ifdef WITH_PARALLEL
+    int (*getsockaddrarg)(PySocketSockObject *s,
+                          PyObject *args,
+                          struct sockaddr *addr_ret,
+                          int *len_ret);
+    int (*getsockaddrlen)(PySocketSockObject *s, socklen_t *len_ret);
+    PyObject *(*makesockaddr)(SOCKET_T sockfd,
+                              struct sockaddr *addr,
+                              size_t addrlen,
+                              int proto);
+#ifdef MS_WINDOWS
+    LPFN_ACCEPTEX AcceptEx;
+    LPFN_CONNECTEX ConnectEx;
+    LPFN_WSARECVMSG WSARecvMsg;
+    LPFN_WSASENDMSG WSASendMsg;
+    LPFN_DISCONNECTEX DisconnectEx;
+    LPFN_TRANSMITFILE TransmitFile;
+    LPFN_TRANSMITPACKETS TransmitPackets;
+    LPFN_GETACCEPTEXSOCKADDRS GetAcceptExSockaddrs;
+#else /* MS_WINDOWS */
+    void (*null01)(void); /* LPFN_ACCEPTEX             */
+    void (*null02)(void); /* LPFN_CONNECTEX            */
+    void (*null03)(void); /* LPFN_WSARECVMSG           */
+    void (*null04)(void); /* LPFN_WSASENDMSG           */
+    void (*null05)(void); /* LPFN_DISCONNECTEX         */
+    void (*null06)(void); /* LPFN_TRANSMITFILE         */
+    void (*null07)(void); /* LPFN_TRANSMITPACKETS      */
+    void (*null08)(void); /* LPFN_GETACCEPTEXSOCKADDRS */
+#endif /* MS_WINDOWS */
+#endif /* WITH_PARALLEL */
 } PySocketModule_APIObject;
 
 #define PySocketModule_ImportModuleAndAPI() PyCapsule_Import(PySocket_CAPSULE_NAME, 1)
 
-#ifdef WITH_PARALLEL
-PyAPI_FUNC(int) getsockaddrarg(PySocketSockObject *s,
-                               PyObject *args,
-                               struct sockaddr *addr_ret,
-                               int *len_ret);
-
-PyAPI_FUNC(int) getsockaddrlen(PySocketSockObject *s, socklen_t *len_ret);
-
-PyAPI_FUNC(PyObject *) makesockaddr(SOCKET_T sockfd,
-                                    struct sockaddr *addr,
-                                    size_t addrlen,
-                                    int proto);
-PyAPI_FUNC(PyObject *) makeipaddr(struct sockaddr *addr, int addrlen);
 /* Convert "sock_addr_t *" to "struct sockaddr *". */
 #define SAS2SA(x)       (&((x)->sa))
-#endif
 
 #ifdef __cplusplus
 }
 extern "C" {
 #endif
 
+#ifdef WITH_PARALLEL
+#include <Windows.h>
+#endif
+
 #ifdef Py_REF_DEBUG
 Py_ssize_t _Py_RefTotal;
 
 static PyObject refchain = {
     _Py_NOT_PARALLEL,
     _Py_NOT_PARALLEL,
+    0,
+    NULL,
     &refchain,
     &refchain
 };
     return res;
 }
 
+#ifndef WITH_PARALLEL
 PyObject *
 PyObject_GenericGetAttr(PyObject *obj, PyObject *name)
 {
     return _PyObject_GenericGetAttrWithDict(obj, name, NULL);
 }
+#else
+PyObject *
+PyObject_GenericGetAttr(PyObject *obj, PyObject *name)
+{
+    if (!obj->px_flags & Py_PXFLAGS_SRWLOCK)
+        return _PyObject_GenericGetAttrWithDict(obj, name, NULL);
+
+    AcquireSRWLockShared((PSRWLOCK)&(obj->srw_lock));
+    __try {
+        return _PyObject_GenericGetAttrWithDict(obj, name, NULL);
+    } __finally {
+        ReleaseSRWLockShared((PSRWLOCK)&(obj->srw_lock));
+    }
+}
+#endif
 
 int
 _PyObject_GenericSetAttrWithDict(PyObject *obj, PyObject *name,
     return res;
 }
 
+#ifndef WITH_PARALLEL
 int
 PyObject_GenericSetAttr(PyObject *obj, PyObject *name, PyObject *value)
 {
     return _PyObject_GenericSetAttrWithDict(obj, name, value, NULL);
 }
+#else
+int
+PyObject_GenericSetAttr(PyObject *obj, PyObject *name, PyObject *value)
+{
+    if (Py_ISPY(obj) && value->is_px == _Py_IS_PARALLEL) {
+        PyErr_SetString(
+            PyExc_ValueError,
+            "parallel object cannot be set to non-parallel object"
+        );
+        return -1;
+    }
+    if (!obj->px_flags & Py_PXFLAGS_SRWLOCK)
+        return _PyObject_GenericSetAttrWithDict(obj, name, value, NULL);
+
+    AcquireSRWLockExclusive((PSRWLOCK)&(obj->srw_lock));
+    __try {
+        return _PyObject_GenericSetAttrWithDict(obj, name, value, NULL);
+    } __finally {
+        ReleaseSRWLockExclusive((PSRWLOCK)&(obj->srw_lock));
+    }
+}
+#endif
 
 int
 PyObject_GenericSetDict(PyObject *obj, PyObject *value, void *context)
     assert(op->ob_refcnt == 1);
     assert(op->is_px    == _Py_NOT_PARALLEL);
     assert(op->px       == _Py_NOT_PARALLEL);
+    assert(op->srw_lock == NULL);
 #ifndef Py_TRACE_REFS
     assert(op->_ob_next == _Py_NOT_PARALLEL);
     assert(op->_ob_prev == _Py_NOT_PARALLEL);

PCbuild/pcbuild.sln

 		{31FFC478-7B4A-43E8-9954-8D03E2187E9C}.Debug|Win32.ActiveCfg = Debug|Win32
 		{31FFC478-7B4A-43E8-9954-8D03E2187E9C}.Debug|Win32.Build.0 = Debug|Win32
 		{31FFC478-7B4A-43E8-9954-8D03E2187E9C}.Debug|x64.ActiveCfg = Debug|x64
+		{31FFC478-7B4A-43E8-9954-8D03E2187E9C}.Debug|x64.Build.0 = Debug|x64
 		{31FFC478-7B4A-43E8-9954-8D03E2187E9C}.PGInstrument|Win32.ActiveCfg = PGInstrument|Win32
 		{31FFC478-7B4A-43E8-9954-8D03E2187E9C}.PGInstrument|Win32.Build.0 = PGInstrument|Win32
 		{31FFC478-7B4A-43E8-9954-8D03E2187E9C}.PGInstrument|x64.ActiveCfg = PGInstrument|x64

Python/pyparallel.c

     return l;
 }
 
-#define _Px_X_OFFSET(n) (Px_PTR_ALIGN(n))
-#define _Px_O_OFFSET(n) \
-    (Px_PTR_ALIGN((_Px_X_OFFSET(n)) + (Px_PTR_ALIGN(sizeof(PxObject)))))
-
-#define _Px_X_PTR(p, n) \
-    ((PxObject *)(Px_PTR_ALIGN(Px_PTR_ALIGNED_ADD((p), _Px_X_OFFSET((n))))))
-
-#define _Px_O_PTR(p, n) \
-    ((Object *)(Px_PTR_ALIGN(Px_PTR_ALIGNED_ADD((p), _Px_O_OFFSET((n))))))
-
-#define _Px_SZ(n) (Px_PTR_ALIGN(      \
-    Px_PTR_ALIGN(n)                 + \
-    Px_PTR_ALIGN(sizeof(PxObject))  + \
-    Px_PTR_ALIGN(sizeof(Object))      \
-))
-
-#define _Px_VSZ(t, n) (Px_PTR_ALIGN( \
-    ((!((t)->tp_itemsize)) ?         \
-        _PyObject_SIZE(t) :          \
-        _PyObject_VAR_SIZE(t, n))))
-
-PyObject *
-init_object(Context *c, PyObject *p, PyTypeObject *tp, Py_ssize_t nitems)
-{
-    register PxObject *x;
-    register Object   *o;
-    const register size_t sz = _Px_VSZ(tp, nitems);
-    const register size_t total = _Px_SZ(sz);
-
-    if (!p) {
-        p = (PyObject *)_PyHeap_Malloc(c, total, 0);
-        if (!p)
-            return PyErr_NoMemory();
-    } else
-        Px_GUARD_MEM(p);
-
-    assert(p);
-    p->is_px = _Py_IS_PARALLEL;
-
-    Py_TYPE(p)   = tp;
-    Py_REFCNT(p) = 2;
-
-    o = _Px_O_PTR(p, sz);
-    o->op = p;
-
-    x = _Px_X_PTR(p, sz);
-    x->ctx       = c;
-    x->size      = sz;
-    x->signature = _PxObjectSignature;
-    Py_PX(p)     = x;
-    x->resized_to   = NULL;
-    x->resized_from = NULL;
-
-    if (!tp->tp_itemsize) {
-        append_object(&c->varobjs, o);
-        (c->stats.objects)++;
-    } else {
-        Py_SIZE(p) = nitems;
-        append_object(&c->varobjs, o);
-        (c->stats.varobjs)++;
-    }
-
-    if (!c->ob_first) {
-        c->ob_first = p;
-        c->ob_last  = p;
-        p->_ob_next = NULL;
-        p->_ob_prev = NULL;
-    } else {
-        PyObject *last;
-        assert(!c->ob_first->_ob_prev);
-        assert(!c->ob_last->_ob_next);
-        last = c->ob_last;
-        last->_ob_next = p;
-        p->_ob_prev = last;
-        p->_ob_next = NULL;
-        c->ob_last = p;
-    }
-
-    return p;
-}
-
-__inline
-PyObject *
-Object_Init(PyObject *op, PyTypeObject *tp, Context *c)
-{
-    assert(tp->tp_itemsize == 0);
-    return init_object(c, op, tp, 0);
-}
-
-__inline
-PyObject *
-Object_New(PyTypeObject *tp, Context *c)
-{
-    return init_object(c, NULL, tp, 0);
-}
-
-__inline
-PyVarObject *
-VarObject_Init(PyVarObject *v, PyTypeObject *tp, Py_ssize_t nitems, Context *c)
-{
-    assert(tp->tp_itemsize > 0);
-    return (PyVarObject *)init_object(c, (PyObject *)v, tp, nitems);
-}
-
-__inline
-PyVarObject *
-VarObject_New(PyTypeObject *tp, Py_ssize_t nitems, Context *c)
-{
-    return (PyVarObject *)init_object(c, NULL, tp, nitems);
-}
-
 void *
 _PyHeap_Realloc(Context *c, void *p, size_t n)
 {
     s->frees++;
 }
 
+
+#define _Px_X_OFFSET(n) (Px_PTR_ALIGN(n))
+#define _Px_O_OFFSET(n) \
+    (Px_PTR_ALIGN((_Px_X_OFFSET(n)) + (Px_PTR_ALIGN(sizeof(PxObject)))))
+
+#define _Px_X_PTR(p, n) \
+    ((PxObject *)(Px_PTR_ALIGN(Px_PTR_ALIGNED_ADD((p), _Px_X_OFFSET((n))))))
+
+#define _Px_O_PTR(p, n) \
+    ((Object *)(Px_PTR_ALIGN(Px_PTR_ALIGNED_ADD((p), _Px_O_OFFSET((n))))))
+
+#define _Px_SZ(n) (Px_PTR_ALIGN(      \
+    Px_PTR_ALIGN(n)                 + \
+    Px_PTR_ALIGN(sizeof(PxObject))  + \
+    Px_PTR_ALIGN(sizeof(Object))      \
+))
+
+#define _Px_VSZ(t, n) (Px_PTR_ALIGN( \
+    ((!((t)->tp_itemsize)) ?         \
+        _PyObject_SIZE(t) :          \
+        _PyObject_VAR_SIZE(t, n))))
+
+PyObject *
+init_object(Context *c, PyObject *p, PyTypeObject *tp, Py_ssize_t nitems)
+{
+    /* Main use cases:
+     *  1.  Redirect from PyObject_NEW/PyObject_NEW_VAR.  p will be NULL.
+     *  2.  PyObject_MALLOC/PyObject_INIT combo (PyUnicode* does this).
+     *      p will pass Px_GUARD_MEM(p) and everything will be null.  We
+     *      need to allocate x & o manually.
+     *  3.  Redirect from PyObject_GC_Resize.  p shouldn't be NULL, although
+     *      it might not be a PX allocation -- we could be resizing a PY
+     *      allocation.  That's fine, as our realloc doesn't actually free
+     *      anything, so we're, in effect, just copying the existing object.
+     */
+    PyObject *n;
+    PxObject *x;
+    Object   *o;
+    size_t    object_size;
+    size_t    total_size;
+    size_t    bytes_to_copy;
+    int       init_type;
+    int       is_varobj = -1;
+
+#define _INIT_NEW       1
+#define _INIT_INIT      2
+#define _INIT_RESIZE    3
+
+    if (!p) {
+        /* Case 1: PyObject_NEW/NEW_VAR (via (Object|VarObject)_New). */
+        init_type = _INIT_NEW;
+        assert(tp);
+
+        object_size = _Px_VSZ(tp, nitems);
+        total_size  = _Px_SZ(object_size);
+        n = (PyObject *)_PyHeap_Malloc(c, total_size, 0);
+        if (!n)
+            return PyErr_NoMemory();
+        x = _Px_X_PTR(n, object_size);
+        o = _Px_O_PTR(n, object_size);
+
+        Py_TYPE(n) = tp;
+        Py_REFCNT(n) = -1;
+        if (is_varobj = (tp->tp_itemsize > 0)) {
+            (c->stats.varobjs)++;
+            Py_SIZE(n) = nitems;
+        } else
+            (c->stats.objects)++;
+
+    } else {
+        if (!Py_TYPE(p)) {
+            /* Case 2: PyObject_INIT/INIT_VAR called against manually
+             * allocated memory (i.e. not allocated via PyObject_NEW). */
+            init_type = _INIT_INIT;
+            assert(tp);
+            Px_GUARD_MEM(p);
+
+            is_varobj = (tp->tp_itemsize > 0);
+
+            /* XXX: I haven't seen GC/varobjects use this approach yet. */
+            assert(!is_varobj);
+
+            /* Need to manually allocate x + o storage. */
+            x = (PxObject *)_PyHeap_Malloc(c, _Px_SZ(0), 0);
+            if (!x)
+                return PyErr_NoMemory();
+            o = _Px_O_PTR(x, 0);
+            n = p;
+
+            Py_TYPE(n) = tp;
+            Py_REFCNT(n) = -1;
+
+            (c->stats.objects)++;
+        } else {
+            /* Case 3: PyObject_GC_Resize called.  Object to resize may or may
+             * not be from a parallel context.  Doesn't matter either way as
+             * we don't really realloc anything behind the scenes -- we just
+             * malloc another, larger chunk from our heap and copy over the
+             * previous data. */
+            init_type = _INIT_RESIZE;
+            assert(!tp);
+            assert(Py_TYPE(p) != NULL);
+            tp = Py_TYPE(p);
+            is_varobj = 1;
+
+            object_size = _Px_VSZ(tp, nitems);
+            total_size  = _Px_SZ(object_size);
+            n = (PyObject *)_PyHeap_Malloc(c, total_size, 0);
+            if (!n)
+                return PyErr_NoMemory();
+            x = _Px_X_PTR(n, object_size);
+            o = _Px_O_PTR(n, object_size);
+
+            /* Just do a blanket copy of everything rather than trying to
+             * isolate the underlying VarObject ob_items.  It doesn't matter
+             * if we pick up old pointers and whatnot (i.e. old px/is_px refs)
+             * as all that stuff is initialized in the next section. */
+            bytes_to_copy = _PyObject_VAR_SIZE(tp, Py_SIZE(p));
+
+            /* (We may need to change this to <= down the track.) */
+            assert(bytes_to_copy < object_size);
+            assert(Py_SIZE(p) < nitems);
+
+            memcpy(n, p, bytes_to_copy);
+
+            Py_TYPE(n) = tp;
+            Py_SIZE(n) = nitems;
+            Py_REFCNT(n) = -1;
+
+            if (Py_PXOBJ(p)) {
+                /* XXX do we really need to do this?  (Original line of
+                 * thinking was that we might need to treat the object
+                 * differently down the track (i.e. during cleanup) if
+                 * it was resized.) */
+                Py_ASPX(p)->resized_to = n;
+                x->resized_from = p;
+            }
+
+            (c->h->resizes)++;
+            (c->stats.resizes)++;
+        }
+    }
+    assert(tp);
+    assert(Py_TYPE(n) == tp);
+    assert(Py_REFCNT(n) == -1);
+    assert(is_varobj == 0 || is_varobj == 1);
+
+    Py_PXFLAGS(n) = Py_PXFLAGS_ISPX;
+
+    if (is_varobj)
+        assert(Py_SIZE(n) == nitems);
+
+    n->px = x;
+    n->is_px = _Py_IS_PARALLEL;
+    n->srw_lock = NULL;
+
+    x->ctx = c;
+    x->signature = _PxObjectSignature;
+
+    o->op = n;
+    append_object((is_varobj ? &c->varobjs : &c->objects), o);
+
+    if (!c->ob_first) {
+        c->ob_first = n;
+        c->ob_last  = n;
+        n->_ob_next = NULL;
+        n->_ob_prev = NULL;
+    } else {
+        PyObject *last;
+        assert(!c->ob_first->_ob_prev);
+        assert(!c->ob_last->_ob_next);
+        last = c->ob_last;
+        last->_ob_next = n;
+        n->_ob_prev = last;
+        n->_ob_next = NULL;
+        c->ob_last = n;
+    }
+
+    return n;
+}
+
+__inline
+PyObject *
+Object_Init(PyObject *op, PyTypeObject *tp, Context *c)
+{
+    assert(tp->tp_itemsize == 0);
+    return init_object(c, op, tp, 0);
+}
+
+__inline
+PyObject *
+Object_New(PyTypeObject *tp, Context *c)
+{
+    return init_object(c, NULL, tp, 0);
+}
+
+__inline
+PyVarObject *
+VarObject_Init(PyVarObject *v, PyTypeObject *tp, Py_ssize_t nitems, Context *c)
+{
+    assert(tp->tp_itemsize > 0);
+    return (PyVarObject *)init_object(c, (PyObject *)v, tp, nitems);
+}
+
+__inline
+PyVarObject *
+VarObject_New(PyTypeObject *tp, Py_ssize_t nitems, Context *c)
+{
+    return (PyVarObject *)init_object(c, NULL, tp, nitems);
+}
+
 PyVarObject *
 VarObject_Resize(PyObject *v, Py_ssize_t nitems, Context *c)
 {
-    PyTypeObject *tp;
-    PyVarObject  *r, *r2;
-    const register size_t sz = _Px_VSZ(Py_TYPE(v), nitems);
-    const register size_t total = _Px_SZ(sz);
-
-    if (!_Px_TEST(v))
-        printf("\nVarObject_Resize: *v failed _Px_TEST!\n");
-
-    r = (PyVarObject *)_PyHeap_Malloc(c, total, 0);
-    if (!r)
-        return (PyVarObject *)PyErr_NoMemory();
-
-    memcpy(r, v, Py_ASPX(v)->size);
-    tp = Py_TYPE(v);
-    r2 = (PyVarObject *)init_object(c, (PyObject *)r, tp, nitems);
-    assert(r == r2);
-
-    Py_ASPX(v)->resized_to = (PyObject *)r;
-    Py_ASPX(r)->resized_from = v;
-
-    c->h->resizes++;
-    c->stats.resizes++;
-    v = (PyObject *)r;
-    return r;
+    return (PyVarObject *)init_object(c, v, NULL, nitems);
 }
 
 void *
     }
 }
 
+/*
 __inline
 PyObject *
 _PyHeap_NewTuple(Context *c, Py_ssize_t nitems)
 {
     return (PyObject *)VarObject_Resize(op, nitems, c);
 }
+*/
 
 __inline
 int
     return _call_from_main_thread(self, args, 1);
 }
 
+PyObject *
+_async_protect(PyObject *self, PyObject *obj)
+{
+    if (!(obj->px_flags & Py_PXFLAGS_SRWLOCK)) {
+        InitializeSRWLock((PSRWLOCK)&(obj->srw_lock));
+        obj->px_flags |= Py_PXFLAGS_SRWLOCK;
+    }
+
+    Py_RETURN_NONE;
+}
+
+PyObject *
+_async_unprotect(PyObject *self, PyObject *obj)
+{
+    if (obj->px_flags & Py_PXFLAGS_SRWLOCK) {
+        obj->px_flags &= ~Py_PXFLAGS_SRWLOCK;
+        obj->srw_lock = NULL;
+    }
+
+    Py_RETURN_NONE;
+}
+
+PyObject *
+_async_protected(PyObject *self, PyObject *obj)
+{
+    if (obj->px_flags & Py_PXFLAGS_SRWLOCK)
+        Py_RETURN_TRUE;
+    else
+        Py_RETURN_FALSE;
+}
+
 PyDoc_STRVAR(_async_doc,
 "_async module.\n\
 \n\
 PyDoc_STRVAR(_async_rdtsc_doc, "XXX TODO\n");
 PyDoc_STRVAR(_async_client_doc, "XXX TODO\n");
 PyDoc_STRVAR(_async_server_doc, "XXX TODO\n");
+PyDoc_STRVAR(_async_protect_doc, "XXX TODO\n");
 PyDoc_STRVAR(_async_run_once_doc, "XXX TODO\n");
+PyDoc_STRVAR(_async_unprotect_doc, "XXX TODO\n");
+PyDoc_STRVAR(_async_protected_doc, "XXX TODO\n");
 PyDoc_STRVAR(_async_is_active_doc, "XXX TODO\n");
 PyDoc_STRVAR(_async_submit_io_doc, "XXX TODO\n");
 PyDoc_STRVAR(_async_submit_work_doc, "XXX TODO\n");
     if (op->is_px != _Py_IS_PARALLEL)
         op->is_px = _Py_IS_PARALLEL;
 
+#endif
     assert(Py_TYPE(op));
-#endif
 
     op->ob_refcnt = 1;
     ctx->stats.newrefs++;
 
 
 /* socket */
+
 void
 _pxsocket_dealloc(PxSocket *self)
 {
         goto done;
     }
 
-    if (!getsockaddrarg(PXS2S(s), args, SAS2SA(&s->remote), &(s->addrlen)))
+    if (!getsockaddrarg(PXS2S(s), args, SAS2SA(&s->remote), &(s->remote_addrlen)))
         return NULL;
 
+    Py_RETURN_NONE;
+
 done:
     return result;
 }
 
 #define _MEMBER(n, t, c, f, d) {#n, t, offsetof(c, n), f, d}
 #define _PXSOCKETMEM(n, t, f, d)  _MEMBER(n, t, PxSocket, f, d)
-#define _PXSOCKET_CB(n)        _PXSOCKETMEM(n, T_OBJECT, 0, #n " callback")
-#define _PXSOCKET_ATTR_I(n)    _PXSOCKETMEM(n, T_INT,    0, #n " attribute")
-#define _PXSOCKET_ATTR_B(n)    _PXSOCKETMEM(n, T_BOOL,   0, #n " attribute")
-#define _PXSOCKET_ATTR_S(n)    _PXSOCKETMEM(n, T_STRING, 0, #n " attribute")
+#define _PXSOCKET_CB(n)        _PXSOCKETMEM(n, T_OBJECT_EX, 0, #n " callback")
+#define _PXSOCKET_ATTR_O(n)    _PXSOCKETMEM(n, T_OBJECT_EX, 0, #n " callback")
+#define _PXSOCKET_ATTR_OR(n)   _PXSOCKETMEM(n, T_OBJECT_EX, 1, #n " callback")
+#define _PXSOCKET_ATTR_I(n)    _PXSOCKETMEM(n, T_INT,       0, #n " attribute")
+#define _PXSOCKET_ATTR_B(n)    _PXSOCKETMEM(n, T_BOOL,      0, #n " attribute")
+#define _PXSOCKET_ATTR_BR(n)   _PXSOCKETMEM(n, T_BOOL,      1, #n " attribute")
+#define _PXSOCKET_ATTR_S(n)    _PXSOCKETMEM(n, T_STRING,    0, #n " attribute")
 
 static PyMemberDef PxSocketMembers[] = {
+    /* underlying socket (readonly) */
+    _PXSOCKET_ATTR_OR(sock),
+
+    /* handler */
+    _PXSOCKET_ATTR_O(handler),
+
     /* callbacks */
     _PXSOCKET_CB(connected),
     _PXSOCKET_CB(data_received),
     /* attributes */
     _PXSOCKET_ATTR_S(eol),
     _PXSOCKET_ATTR_B(line_mode),
+    _PXSOCKET_ATTR_BR(is_client),
     _PXSOCKET_ATTR_I(wait_for_eol),
+    _PXSOCKET_ATTR_BR(is_connected),
     _PXSOCKET_ATTR_I(max_line_length),
 
     { NULL }
     0,                                          /* tp_hash */
     0,                                          /* tp_call */
     0,                                          /* tp_str */
-    0,                                          /* tp_getattro */
-    0,                                          /* tp_setattro */
+    PyObject_GenericGetAttr,                    /* tp_getattro */
+    PyObject_GenericSetAttr,                    /* tp_setattro */
     0,                                          /* tp_as_buffer */
     Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,   /* tp_flags */
     "Asynchronous Socket Objects",              /* tp_doc */
 };
 
 static char *_pxsocket_kwlist[] = {
+    /* mandatory */
+    "sock",
+
+    /* optional */
+    "handler",
+
     "connected",
     "data_received",
     "lines_received",
     _ASYNC_N(rdtsc),
     _ASYNC_K(client),
     _ASYNC_K(server),
+    _ASYNC_O(protect),
     _ASYNC_N(run_once),
+    _ASYNC_O(unprotect),
+    _ASYNC_O(protected),
     _ASYNC_N(is_active),
     _ASYNC_V(submit_io),
     _ASYNC_V(submit_work),
     if (PyModule_AddObject(m, "socket", (PyObject *)&PxSocket_Type))
         return NULL;
 
+    if (Py_VerboseFlag)
+        printf("sizeof(PxSocket): %d\n", sizeof(PxSocket));
+
     return m;
 }
 

Python/pyparallel_private.h

 } PxObject;
 
 typedef struct _PxSocket {
+    PyObject_HEAD
     /* internal */
-    PySocketSockObject _sock;
-    PyObject *socket_weakref;
+    PySocketSockObject *sock;
     WSAOVERLAPPED overlapped;
     HANDLE completion_port;
 
     sock_addr_t local;
     sock_addr_t remote;
-    int         addrlen;
+    int         local_addrlen;
+    int         remote_addrlen;
 
-    /* callbacks */
-    SRWLOCK   callbacks_srwlock;
+    /* default handler and callbacks */
+    PyObject *handler;
+
     PyObject *connected;
     PyObject *data_received;
     PyObject *lines_received;
     PyObject *initial_connection_error;
 
     /* attributes */
-    SRWLOCK   attributes_srwlock;
     PyObject *initial_bytes_to_send;
     PyObject *initial_regex_to_expect;
     char      is_client;
+    char      is_connected;
     char      line_mode;
     char      wait_for_eol;
+    char      auto_reconnect;
     char     *eol[2];
     int       max_line_length;
 
-    /* We want each PxSocket object to fit into a system page (4k) exactly.
-       The buffer sizes below may need to be tweaked upon addition of new
-       struct memebers above. */
+    __declspec(align(64))
 
 #ifndef _WIN64
-#define _PxSocket_BUFSIZE 3584
+#define _PxSocket_BUFSIZE (4096-448)
 #else
-#define _PxSocket_BUFSIZE 3520
+#define _PxSocket_BUFSIZE (4096-512)
 #endif
 
-    __declspec(align(SYSTEM_CACHE_ALIGNMENT_SIZE))
-    /* 32-bit offset: 512 */
-    /* 64-bit offset: 576 */
     char buf[_PxSocket_BUFSIZE];
 } PxSocket;
 
 
 static PySocketModule_APIObject PySocketModule;
 
+#define getsockaddrarg          PySocketModule.getsockaddrarg
+#define getsockaddrlen          PySocketModule.getsockaddrlen
+#define makesockaddr            PySocketModule.makesockaddr
+#define AcceptEx                PySocketModule.AcceptEx
+#define ConnectEx               PySocketModule.ConnectEx
+#define WSARecvMsg              PySocketModule.WSARecvMsg
+#define WSASendMsg              PySocketModule.WSASendMsg
+#define DisconnectEx            PySocketModule.DisconnectEx
+#define TransmitFile            PySocketModule.TransmitFile
+#define TransmitPackets         PySocketModule.TransmitPackets
+#define GetAcceptExSockaddrs    PySocketModule.GetAcceptExSockaddrs
+
 #define PxSocket_Check(v)         (Py_TYPE(v) == &PxSocket_Type)
 #define PxClientSocket_Check(v)   (Py_TYPE(v) == &PxClientSocket_Type)
 #define PxServerSocket_Check(v)   (Py_TYPE(v) == &PxServerSocket_Type)