pypy / pypy / module / _file /

Full commit
import py
from pypy.rlib import streamio
from pypy.rlib.streamio import StreamErrors

from pypy.interpreter.error import OperationError
from pypy.interpreter.baseobjspace import ObjSpace, Wrappable
from pypy.interpreter.typedef import TypeDef
from pypy.interpreter.gateway import interp2app
from pypy.interpreter.streamutil import wrap_streamerror, wrap_oserror_as_ioerror

class W_AbstractStream(Wrappable):
    """Base class for interp-level objects that expose streams to app-level"""
    slock = None
    slockowner = None
    # Locking issues:
    # * Multiple threads can access the same W_AbstractStream in
    #   parallel, because many of the streamio calls eventually
    #   release the GIL in some external function call.
    # * Parallel accesses have bad (and crashing) effects on the
    #   internal state of the buffering levels of the stream in
    #   particular.
    # * We can't easily have a lock on each W_AbstractStream because we
    #   can't translate prebuilt lock objects.
    # We are still protected by the GIL, so the easiest is to create
    # the lock on-demand.

    def __init__(self, space, stream): = space = stream

    def _try_acquire_lock(self):
        # this function runs with the GIL acquired so there is no race
        # condition in the creation of the lock
        if self.slock is None:
            self.slock =
        me =   # used as thread ident
        if self.slockowner is me:
            return False    # already acquired by the current thread
        assert self.slockowner is None
        self.slockowner = me
        return True

    def _release_lock(self):
        self.slockowner = None

    def lock(self):
        if not self._try_acquire_lock():
            raise OperationError(,
                       "stream lock already held"))

    def unlock(self):
        me =   # used as thread ident
        if self.slockowner is not me:
            raise OperationError(,
                       "stream lock is not held"))

    def _freeze_(self):
        # remove the lock object, which will be created again as needed at
        # run-time.
        self.slock = None
        assert self.slockowner is None
        return False

    def stream_read(self, n):
        An interface for direct interp-level usage of W_AbstractStream,
        e.g. from
        NOTE: this assumes that the stream lock is already acquired.
        Like, this can return less than n bytes.
        except StreamErrors, e:
            raise wrap_streamerror(, e)

    def do_write(self, data):
        An interface for direct interp-level usage of W_Stream,
        e.g. from
        NOTE: this assumes that the stream lock is already acquired.
        except StreamErrors, e:
            raise wrap_streamerror(, e)

# ____________________________________________________________

class W_Stream(W_AbstractStream):
    """A class that exposes the raw stream interface to app-level."""
    # this exists for historical reasons, and kept around in case we want
    # to re-expose the raw stream interface to app-level.

for name, argtypes in streamio.STREAM_METHODS.iteritems():
    numargs = len(argtypes)
    args = ", ".join(["v%s" % i for i in range(numargs)])
    exec py.code.Source("""
    def %(name)s(self, space, %(args)s):
        acquired = self.try_acquire_lock()
                result =
            except streamio.StreamError, e:
                raise OperationError(space.w_ValueError,
            except OSError, e:
                raise wrap_oserror_as_ioerror(space, e)
            if acquired:
        return space.wrap(result)
    %(name)s.unwrap_spec = [W_Stream, ObjSpace] + argtypes
    """ % locals()).compile() in globals()

W_Stream.typedef = TypeDef("Stream",
    lock   = interp2app(W_Stream.lock),
    unlock = interp2app(W_Stream.unlock),
    **dict([(name, interp2app(globals()[name]))
                for name, _ in streamio.STREAM_METHODS.iteritems()]))