Source

pypesocket / pypesocketwin.py

"""Hook socket.socket to handle unix domain socket on Microsoft Windows"""

import socket
import os
import time
import ctypes
import ctypes.wintypes
import Queue
from threading import Thread
import errno

_kernel32 = ctypes.windll.kernel32

_POINTER = ctypes.POINTER
_byref = ctypes.byref

_DWORD = ctypes.wintypes.DWORD
_BOOL = ctypes.wintypes.BOOL
_HANDLE = ctypes.wintypes.HANDLE
_LPCSTR = ctypes.wintypes.LPCSTR
_LPVOID = ctypes.wintypes.LPVOID
_LPDWORD = _POINTER(ctypes.wintypes.DWORD)

_TRUE = 1
_FALSE = 0
_INVALID_HANDLE_VALUE = _HANDLE(-1).value
_OPEN_EXISTING = 3
_PIPE_ACCESS_DUPLEX = 0x3
_PIPE_TYPE_MESSAGE = 0x4
_PIPE_READMODE_MESSAGE = 0x2
_PIPE_WAIT = 0
_PIPE_UNLIMITED_INSTANCES = 255
_NMPWAIT_USE_DEFAULT_WAIT = 0
_ERROR_FILE_NOT_FOUND = 2
_ERROR_PIPE_CONNECTED = 535
_ERROR_PIPE_BUSY = 231
_GENERIC_READ = 0x80000000
_GENERIC_WRITE = 0x40000000

_kernel32.CreateFileA.argtypes = [_LPCSTR, _DWORD, _DWORD, _LPVOID,
                                  _DWORD, _DWORD, _HANDLE]
_kernel32.CreateFileA.restype = _HANDLE
_kernel32.CreateNamedPipeA.argtypes = [_LPCSTR, _DWORD, _DWORD, _DWORD,
                                       _DWORD, _DWORD, _DWORD, _LPVOID]
_kernel32.CreateNamedPipeA.restype = _HANDLE
_kernel32.WriteFile.argtypes = [_HANDLE, _LPVOID, _DWORD, _LPDWORD,
                                 _LPVOID]
_kernel32.WriteFile.restype = _BOOL
_kernel32.ReadFile.argtypes = [_HANDLE, _LPVOID, _DWORD, _LPDWORD, _LPVOID]
_kernel32.ReadFile.restype = _BOOL
_kernel32.SetNamedPipeHandleState.argtypes = [_HANDLE, _LPDWORD, _LPDWORD,
                                              _LPDWORD]
_kernel32.SetNamedPipeHandleState.restype = _BOOL

class _pipesocket(object):
    """Use windows named pipe to simulate unix domain socket"""

    def accept(self):
        pipe = self._conn.get()
        if not pipe:
            raise socket.error(socket.EBADF, 'Bad file descriptor')
        sock = _pipesocket()
        sock._handle = pipe
        return sock, ''

    def bind(self, address):
        self._addr = address
        self.calc_pipename(address)

    def calc_pipename(self, address):
        self._path = '\\\\.\\pipe\\' + os.path.abspath(address)

    def close(self):
        if hasattr(self, '_handle'):
            _kernel32.CloseHandle(self._handle)
        elif hasattr(self, '_conn'):
            timeout = 0.01
            for retry in xrange(10):
                self.closing = True
                _kernel32.DeleteFileA(self._path)
                time.sleep(timeout)
                if self.closed:
                    break
                timeout *= 2
            else:
                raise socket.error("failed to close socket " + self._path)
            self.listener.join()

    def connect(self, address):
        err = self.connect_ex(address)
        if err:
            raise socket.error(err)

    def connect_ex(self, address):
        self._peer = address
        self.calc_pipename(address)
        timeout = 0.01
        for retry in xrange(10):
            self._handle = _kernel32.CreateFileA(
                self._path,
                _GENERIC_READ | _GENERIC_WRITE,
                0, None,
                _OPEN_EXISTING,
                0,
                None)
            if self._handle != _INVALID_HANDLE_VALUE:
                # connected, set message mode
                mode = _DWORD(_PIPE_READMODE_MESSAGE)
                if not _kernel32.SetNamedPipeHandleState(
                    self._handle,
                    _byref(mode),
                    None,
                    None):
                    return str(_kernel32.GetLastError()) # todo: should be meaningful text
                return 0
            # failed to connect, see if server isn't ready
            err = _kernel32.GetLastError()
            if err == _ERROR_FILE_NOT_FOUND:
                return errno.ENOENT
            if err != _ERROR_PIPE_BUSY:
                return str(err) # todo: should be meaningful text
                # retry
            time.sleep(timeout)
            timeout *= 2
        else:
            return str(_ERROR_PIPE_BUSY)

    def fileno(self):
        return self.event

    def getpeername(self):
        if not hasattr(self, '_handle'):
            raise socket.error(errno.ENOTCONN)
        return getattr(self, '_peer', '')

    def getsockname(self):
        return getattr(self, '_addr', '')

    def getsockopt(self, level, optname, buflen):
        pass

    def listen(self, backlog):
        def run():
            while True:
                pipe = _kernel32.CreateNamedPipeA(self._path,
                    _PIPE_ACCESS_DUPLEX,
                    _PIPE_TYPE_MESSAGE | _PIPE_READMODE_MESSAGE |
                    _PIPE_WAIT,
                    _PIPE_UNLIMITED_INSTANCES,
                    512,
                    512,
                    _NMPWAIT_USE_DEFAULT_WAIT,
                    None)
                if pipe == _INVALID_HANDLE_VALUE:
                    raise socket.error(str(_kernel32.GetLastError()))

                conn = _kernel32.ConnectNamedPipe(pipe, None)
                if not conn:
                    err = _kernel32.GetLastError()
                    if err != _ERROR_PIPE_CONNECTED:
                        raise socket.error(str(err))

                if self.closing:
                    self.closed = True
                    _kernel32.CloseHandle(self.event)
                    _kernel32.CloseHandle(pipe)
                    self._conn.put(None)
                    break
                else:
                    self._conn.put(pipe)
                    _kernel32.SetEvent(self.event)
            #print "listening thread finished"

        self.closing = False
        self.closed = False
        self.event = _kernel32.CreateEventA(None, _FALSE, _FALSE, None)
        self._conn = Queue.Queue(backlog)
        self.listener = Thread(target = run)
        self.listener.start()

    def recv(self, bufsize, flags=0):
        pass

    def recvfrom(self, bufsize, flags=0):
        pass

    def recvfrom_into(self, buffer, nbytes, flags=0):
        pass

    def recv_into(self, buffer, nbytes, flags=0):
        pass

    def send(self, string, flag=0):
        pass

    def sendall(self, string, flag=0):
        written = _DWORD(0)
        _kernel32.WriteFile(self._handle, string, len(string), _byref(written), None)

    def sendto(self, string, flag=0, address=None):
        pass

    def shutdown(self, how):
        pass

socket.AF_UNIX = 1
_socket = socket.socket

def pipesocket(*args):
    return _pipesocket() if args[0] == socket.AF_UNIX else _socket(*args)

socket.socket = pipesocket