Source

py / testing / io_ / test_capture.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
from __future__ import with_statement

import os, sys
import py

needsdup = py.test.mark.skipif("not hasattr(os, 'dup')")

from py.builtin import print_

if sys.version_info >= (3,0):
    def tobytes(obj):
        if isinstance(obj, str):
            obj = obj.encode('UTF-8')
        assert isinstance(obj, bytes)
        return obj
    def totext(obj):
        if isinstance(obj, bytes):
            obj = str(obj, 'UTF-8')
        assert isinstance(obj, str)
        return obj
else:
    def tobytes(obj):
        if isinstance(obj, unicode):
            obj = obj.encode('UTF-8')
        assert isinstance(obj, str)
        return obj
    def totext(obj):
        if isinstance(obj, str):
            obj = unicode(obj, 'UTF-8')
        assert isinstance(obj, unicode)
        return obj

def oswritebytes(fd, obj):
    os.write(fd, tobytes(obj))

class TestTextIO:
    def test_text(self):
        f = py.io.TextIO()
        f.write("hello")
        s = f.getvalue()
        assert s == "hello"
        f.close()

    def test_unicode_and_str_mixture(self):
        f = py.io.TextIO()
        if sys.version_info >= (3,0):
            f.write("\u00f6")
            py.test.raises(TypeError, "f.write(bytes('hello', 'UTF-8'))")
        else:
            f.write(unicode("\u00f6", 'UTF-8'))
            f.write("hello") # bytes
            s = f.getvalue()
            f.close()
            assert isinstance(s, unicode)

def test_bytes_io():
    f = py.io.BytesIO()
    f.write(tobytes("hello"))
    py.test.raises(TypeError, "f.write(totext('hello'))")
    s = f.getvalue()
    assert s == tobytes("hello")

def test_dontreadfrominput():
    from py._io.capture import DontReadFromInput
    f = DontReadFromInput()
    assert not f.isatty()
    py.test.raises(IOError, f.read)
    py.test.raises(IOError, f.readlines)
    py.test.raises(IOError, iter, f)
    py.test.raises(ValueError, f.fileno)
    f.close() # just for completeness

def pytest_funcarg__tmpfile(request):
    testdir = request.getfuncargvalue("testdir")
    f = testdir.makepyfile("").open('wb+')
    request.addfinalizer(f.close)
    return f

@needsdup
def test_dupfile(tmpfile):
    flist = []
    for i in range(5):
        nf = py.io.dupfile(tmpfile, encoding="utf-8")
        assert nf != tmpfile
        assert nf.fileno() != tmpfile.fileno()
        assert nf not in flist
        print_(i, end="", file=nf)
        flist.append(nf)
    for i in range(5):
        f = flist[i]
        f.close()
    tmpfile.seek(0)
    s = tmpfile.read()
    assert "01234" in repr(s)
    tmpfile.close()

def test_dupfile_no_mode():
    """
    dupfile should trap an AttributeError and return f if no mode is supplied.
    """
    class SomeFileWrapper(object):
        "An object with a fileno method but no mode attribute"
        def fileno(self):
            return 1
    tmpfile = SomeFileWrapper()
    assert py.io.dupfile(tmpfile) is tmpfile
    with py.test.raises(AttributeError):
        py.io.dupfile(tmpfile, raising=True)

def lsof_check(func):
    pid = os.getpid()
    try:
        out = py.process.cmdexec("lsof -p %d" % pid)
    except py.process.cmdexec.Error:
        py.test.skip("could not run 'lsof'")
    func()
    out2 = py.process.cmdexec("lsof -p %d" % pid)
    len1 = len([x for x in out.split("\n") if "REG" in x])
    len2 = len([x for x in out2.split("\n") if "REG" in x])
    assert len2 < len1 + 3, out2

class TestFDCapture:
    pytestmark = needsdup

    def test_not_now(self, tmpfile):
        fd = tmpfile.fileno()
        cap = py.io.FDCapture(fd, now=False)
        data = tobytes("hello")
        os.write(fd, data)
        f = cap.done()
        s = f.read()
        assert not s
        cap = py.io.FDCapture(fd, now=False)
        cap.start()
        os.write(fd, data)
        f = cap.done()
        s = f.read()
        assert s == "hello"

    def test_simple(self, tmpfile):
        fd = tmpfile.fileno()
        cap = py.io.FDCapture(fd)
        data = tobytes("hello")
        os.write(fd, data)
        f = cap.done()
        s = f.read()
        assert s == "hello"
        f.close()

    def test_simple_many(self, tmpfile):
        for i in range(10):
            self.test_simple(tmpfile)

    def test_simple_many_check_open_files(self, tmpfile):
        lsof_check(lambda: self.test_simple_many(tmpfile))

    def test_simple_fail_second_start(self, tmpfile):
        fd = tmpfile.fileno()
        cap = py.io.FDCapture(fd)
        f = cap.done()
        py.test.raises(ValueError, cap.start)
        f.close()

    def test_stderr(self):
        cap = py.io.FDCapture(2, patchsys=True)
        print_("hello", file=sys.stderr)
        f = cap.done()
        s = f.read()
        assert s == "hello\n"

    def test_stdin(self, tmpfile):
        tmpfile.write(tobytes("3"))
        tmpfile.seek(0)
        cap = py.io.FDCapture(0, tmpfile=tmpfile)
        # check with os.read() directly instead of raw_input(), because
        # sys.stdin itself may be redirected (as py.test now does by default)
        x = os.read(0, 100).strip()
        f = cap.done()
        assert x == tobytes("3")

    def test_writeorg(self, tmpfile):
        data1, data2 = tobytes("foo"), tobytes("bar")
        try:
            cap = py.io.FDCapture(tmpfile.fileno())
            tmpfile.write(data1)
            cap.writeorg(data2)
        finally:
            tmpfile.close()
        f = cap.done()
        scap = f.read()
        assert scap == totext(data1)
        stmp = open(tmpfile.name, 'rb').read()
        assert stmp == data2


class TestStdCapture:
    def getcapture(self, **kw):
        return py.io.StdCapture(**kw)

    def test_capturing_done_simple(self):
        cap = self.getcapture()
        sys.stdout.write("hello")
        sys.stderr.write("world")
        outfile, errfile = cap.done()
        s = outfile.read()
        assert s == "hello"
        s = errfile.read()
        assert s == "world"

    def test_capturing_reset_simple(self):
        cap = self.getcapture()
        print("hello world")
        sys.stderr.write("hello error\n")
        out, err = cap.reset()
        assert out == "hello world\n"
        assert err == "hello error\n"

    def test_capturing_readouterr(self):
        cap = self.getcapture()
        try:
            print ("hello world")
            sys.stderr.write("hello error\n")
            out, err = cap.readouterr()
            assert out == "hello world\n"
            assert err == "hello error\n"
            sys.stderr.write("error2")
        finally:
            out, err = cap.reset()
        assert err == "error2"

    def test_capturing_readouterr_unicode(self):
        cap = self.getcapture()
        print ("hx\xc4\x85\xc4\x87")
        out, err = cap.readouterr()
        assert out == py.builtin._totext("hx\xc4\x85\xc4\x87\n", "utf8")

    @py.test.mark.skipif('sys.version_info >= (3,)',
                      reason='text output different for bytes on python3')
    def test_capturing_readouterr_decode_error_handling(self):
        cap = self.getcapture()
        # triggered a internal error in pytest
        print('\xa6')
        out, err = cap.readouterr()
        assert out == py.builtin._totext('\ufffd\n', 'unicode-escape')

    def test_capturing_mixed(self):
        cap = self.getcapture(mixed=True)
        sys.stdout.write("hello ")
        sys.stderr.write("world")
        sys.stdout.write(".")
        out, err = cap.reset()
        assert out.strip() == "hello world."
        assert not err

    def test_reset_twice_error(self):
        cap = self.getcapture()
        print ("hello")
        out, err = cap.reset()
        py.test.raises(ValueError, cap.reset)
        assert out == "hello\n"
        assert not err

    def test_capturing_modify_sysouterr_in_between(self):
        oldout = sys.stdout
        olderr = sys.stderr
        cap = self.getcapture()
        sys.stdout.write("hello")
        sys.stderr.write("world")
        sys.stdout = py.io.TextIO()
        sys.stderr = py.io.TextIO()
        print ("not seen")
        sys.stderr.write("not seen\n")
        out, err = cap.reset()
        assert out == "hello"
        assert err == "world"
        assert sys.stdout == oldout
        assert sys.stderr == olderr

    def test_capturing_error_recursive(self):
        cap1 = self.getcapture()
        print ("cap1")
        cap2 = self.getcapture()
        print ("cap2")
        out2, err2 = cap2.reset()
        out1, err1 = cap1.reset()
        assert out1 == "cap1\n"
        assert out2 == "cap2\n"

    def test_just_out_capture(self):
        cap = self.getcapture(out=True, err=False)
        sys.stdout.write("hello")
        sys.stderr.write("world")
        out, err = cap.reset()
        assert out == "hello"
        assert not err

    def test_just_err_capture(self):
        cap = self.getcapture(out=False, err=True)
        sys.stdout.write("hello")
        sys.stderr.write("world")
        out, err = cap.reset()
        assert err == "world"
        assert not out

    def test_stdin_restored(self):
        old = sys.stdin
        cap = self.getcapture(in_=True)
        newstdin = sys.stdin
        out, err = cap.reset()
        assert newstdin != sys.stdin
        assert sys.stdin is old

    def test_stdin_nulled_by_default(self):
        print ("XXX this test may well hang instead of crashing")
        print ("XXX which indicates an error in the underlying capturing")
        print ("XXX mechanisms")
        cap = self.getcapture()
        py.test.raises(IOError, "sys.stdin.read()")
        out, err = cap.reset()

    def test_suspend_resume(self):
        cap = self.getcapture(out=True, err=False, in_=False)
        try:
            print ("hello")
            sys.stderr.write("error\n")
            out, err = cap.suspend()
            assert out == "hello\n"
            assert not err
            print ("in between")
            sys.stderr.write("in between\n")
            cap.resume()
            print ("after")
            sys.stderr.write("error_after\n")
        finally:
            out, err = cap.reset()
        assert out == "after\n"
        assert not err

class TestStdCaptureNotNow(TestStdCapture):
    def getcapture(self, **kw):
        kw['now'] = False
        cap = py.io.StdCapture(**kw)
        cap.startall()
        return cap

class TestStdCaptureFD(TestStdCapture):
    pytestmark = needsdup

    def getcapture(self, **kw):
        return py.io.StdCaptureFD(**kw)

    def test_intermingling(self):
        cap = self.getcapture()
        oswritebytes(1, "1")
        sys.stdout.write(str(2))
        sys.stdout.flush()
        oswritebytes(1, "3")
        oswritebytes(2, "a")
        sys.stderr.write("b")
        sys.stderr.flush()
        oswritebytes(2, "c")
        out, err = cap.reset()
        assert out == "123"
        assert err == "abc"

    def test_callcapture(self):
        def func(x, y):
            print (x)
            py.std.sys.stderr.write(str(y))
            return 42

        res, out, err = py.io.StdCaptureFD.call(func, 3, y=4)
        assert res == 42
        assert out.startswith("3")
        assert err.startswith("4")

    def test_many(self, capfd):
        def f():
            for i in range(10):
                cap = py.io.StdCaptureFD()
                cap.reset()
        lsof_check(f)

class TestStdCaptureFDNotNow(TestStdCaptureFD):
    pytestmark = needsdup

    def getcapture(self, **kw):
        kw['now'] = False
        cap = py.io.StdCaptureFD(**kw)
        cap.startall()
        return cap

@needsdup
def test_stdcapture_fd_tmpfile(tmpfile):
    capfd = py.io.StdCaptureFD(out=tmpfile)
    os.write(1, "hello".encode("ascii"))
    os.write(2, "world".encode("ascii"))
    outf, errf = capfd.done()
    assert outf == tmpfile

class TestStdCaptureFDinvalidFD:
    pytestmark = needsdup
    def test_stdcapture_fd_invalid_fd(self, testdir):
        testdir.makepyfile("""
            import py, os
            def test_stdout():
                os.close(1)
                cap = py.io.StdCaptureFD(out=True, err=False, in_=False)
                cap.done()
            def test_stderr():
                os.close(2)
                cap = py.io.StdCaptureFD(out=False, err=True, in_=False)
                cap.done()
            def test_stdin():
                os.close(0)
                cap = py.io.StdCaptureFD(out=False, err=False, in_=True)
                cap.done()
        """)
        result = testdir.runpytest("--capture=fd")
        assert result.ret == 0
        assert result.parseoutcomes()['passed'] == 3

def test_capture_not_started_but_reset():
    capsys = py.io.StdCapture(now=False)
    capsys.done()
    capsys.done()
    capsys.reset()

@needsdup
def test_capture_no_sys():
    capsys = py.io.StdCapture()
    try:
        cap = py.io.StdCaptureFD(patchsys=False)
        sys.stdout.write("hello")
        sys.stderr.write("world")
        oswritebytes(1, "1")
        oswritebytes(2, "2")
        out, err = cap.reset()
        assert out == "1"
        assert err == "2"
    finally:
        capsys.reset()

@needsdup
def test_callcapture_nofd():
    def func(x, y):
        oswritebytes(1, "hello")
        oswritebytes(2, "hello")
        print (x)
        sys.stderr.write(str(y))
        return 42

    capfd = py.io.StdCaptureFD(patchsys=False)
    try:
        res, out, err = py.io.StdCapture.call(func, 3, y=4)
    finally:
        capfd.reset()
    assert res == 42
    assert out.startswith("3")
    assert err.startswith("4")

@needsdup
@py.test.mark.multi(use=[True, False])
def test_fdcapture_tmpfile_remains_the_same(tmpfile, use):
    if not use:
        tmpfile = True
    cap = py.io.StdCaptureFD(out=False, err=tmpfile, now=False)
    cap.startall()
    capfile = cap.err.tmpfile
    cap.suspend()
    cap.resume()
    capfile2 = cap.err.tmpfile
    assert capfile2 == capfile

@py.test.mark.multi(method=['StdCapture', 'StdCaptureFD'])
def test_capturing_and_logging_fundamentals(testdir, method):
    if method == "StdCaptureFD" and not hasattr(os, 'dup'):
        py.test.skip("need os.dup")
    # here we check a fundamental feature
    p = testdir.makepyfile("""
        import sys, os
        import py, logging
        cap = py.io.%s(out=False, in_=False)

        logging.warn("hello1")
        outerr = cap.suspend()
        print ("suspend, captured %%s" %%(outerr,))
        logging.warn("hello2")

        cap.resume()
        logging.warn("hello3")

        outerr = cap.suspend()
        print ("suspend2, captured %%s" %% (outerr,))
    """ % (method,))
    result = testdir.runpython(p)
    result.stdout.fnmatch_lines([
        "suspend, captured*hello1*",
        "suspend2, captured*hello2*WARNING:root:hello3*",
    ])
    assert "atexit" not in result.stderr.str()