Anonymous avatar Anonymous committed 9d31a27

new test

Comments (0)

Files changed (1)

test/websocket_test.py

+import gevent
 from gevent.queue import Queue
 import pytest
 from mock import Mock, patch
         inbound._get_next_message()
         assert recv_mock.called
         assert queue.qsize() == 1
+
+
+def test_inbound_run_exception(inbound):
+    with patch.object(inbound, "_get_next_message") as gnm:
+        def f():
+            raise Exception("test_exception")
+
+        gnm.side_effect = f
+
+        class TooLong(Exception):
+            pass
+
+        # terminate test of endless loop using timeout
+        with gevent.Timeout(.1, TooLong):
+            try:
+                inbound._run()
+            except TooLong:
+                pass
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.