pypy / pypy / module / _file /

Full commit
import py
from pypy.rlib import streamio
from pypy.rlib.streamio import StreamErrors

from pypy.interpreter.error import OperationError, wrap_oserror2
from pypy.interpreter.baseobjspace import ObjSpace, Wrappable
from pypy.interpreter.typedef import TypeDef
from pypy.interpreter.gateway import interp2app

def wrap_streamerror(space, e, w_filename=None):
    if isinstance(e, streamio.StreamError):
        return OperationError(space.w_ValueError,
    elif isinstance(e, OSError):
        return wrap_oserror_as_ioerror(space, e, w_filename)
        # should not happen: wrap_streamerror() is only called when
        # StreamErrors = (OSError, StreamError) are raised
        return OperationError(space.w_IOError, space.w_None)

def wrap_oserror_as_ioerror(space, e, w_filename=None):
    return wrap_oserror2(space, e, w_filename,

class W_AbstractStream(Wrappable):
    """Base class for interp-level objects that expose streams to app-level"""
    slock = None
    slockowner = None
    # Locking issues:
    # * Multiple threads can access the same W_AbstractStream in
    #   parallel, because many of the streamio calls eventually
    #   release the GIL in some external function call.
    # * Parallel accesses have bad (and crashing) effects on the
    #   internal state of the buffering levels of the stream in
    #   particular.
    # * We can't easily have a lock on each W_AbstractStream because we
    #   can't translate prebuilt lock objects.
    # We are still protected by the GIL, so the easiest is to create
    # the lock on-demand.

    def __init__(self, space, stream): = space = stream

    def _try_acquire_lock(self):
        # this function runs with the GIL acquired so there is no race
        # condition in the creation of the lock
        if self.slock is None:
            self.slock =
        me =   # used as thread ident
        if self.slockowner is me:
            return False    # already acquired by the current thread
        assert self.slockowner is None
        self.slockowner = me
        return True

    def _release_lock(self):
        self.slockowner = None

    def lock(self):
        if not self._try_acquire_lock():
            raise OperationError(,
                       "stream lock already held"))

    def unlock(self):
        me =   # used as thread ident
        if self.slockowner is not me:
            raise OperationError(,
                       "stream lock is not held"))

    def _freeze_(self):
        # remove the lock object, which will be created again as needed at
        # run-time.
        self.slock = None
        assert self.slockowner is None
        return False

    def stream_read(self, n):
        An interface for direct interp-level usage of W_AbstractStream,
        e.g. from
        NOTE: this assumes that the stream lock is already acquired.
        Like, this can return less than n bytes.
        except StreamErrors, e:
            raise wrap_streamerror(, e)

    def do_write(self, data):
        An interface for direct interp-level usage of W_Stream,
        e.g. from
        NOTE: this assumes that the stream lock is already acquired.
        except StreamErrors, e:
            raise wrap_streamerror(, e)

# ____________________________________________________________

class W_Stream(W_AbstractStream):
    """A class that exposes the raw stream interface to app-level."""
    # this exists for historical reasons, and kept around in case we want
    # to re-expose the raw stream interface to app-level.

for name, argtypes in streamio.STREAM_METHODS.iteritems():
    numargs = len(argtypes)
    args = ", ".join(["v%s" % i for i in range(numargs)])
    exec py.code.Source("""
    def %(name)s(self, space, %(args)s):
        acquired = self.try_acquire_lock()
                result =
            except streamio.StreamError, e:
                raise OperationError(space.w_ValueError,
            except OSError, e:
                raise wrap_oserror_as_ioerror(space, e)
            if acquired:
        return space.wrap(result)
    %(name)s.unwrap_spec = [W_Stream, ObjSpace] + argtypes
    """ % locals()).compile() in globals()

W_Stream.typedef = TypeDef("Stream",
    lock   = interp2app(W_Stream.lock),
    unlock = interp2app(W_Stream.unlock),
    **dict([(name, interp2app(globals()[name]))
                for name, _ in streamio.STREAM_METHODS.iteritems()]))