Matt Joiner avatar Matt Joiner committed 41d00ca

http_get example

Comments (0)

Files changed (5)

examples/http_get.py

+#!/usr/bin/env python3.3
+
+import collections
+import pdb
+import sys
+import urllib.parse
+import green380
+
+def http_get(url, timeout=None):
+    @green380.spawn
+    def timeout_routine():
+        yield from green380.sleep(timeout)
+        yield from ch.send()
+    split = urllib.parse.urlsplit(url)
+    sock = green380.Socket()
+    addr = split.netloc.split(':')
+    if len(addr) == 1:
+        addr.append(80)
+    else:
+        addr[-1] = int(addr[-1])
+    addr = tuple(addr)
+    @green380.spawn
+    def connect_routine():
+        yield from sock.connect(addr)
+        yield from ch.send()
+    ch = green380.Channel()
+    value, sender = yield from ch.recv_from()
+    assert value is None, value
+    if sender is timeout_routine:
+        raise Exception('timed out')
+    elif sender is not connect_routine:
+        assert False, sender
+    @green380.spawn
+    def send_header():
+        return sock.sendall(
+            'GET {split[2]}?{split[3]} HTTP/1.1\r\nHost: {addr[0]}\r\nConnection: close\r\n\r\n'.format(
+                split=split, addr=addr).encode())
+    response_header_lines = (yield from sock.recv_until(b'\r\n\r\n')).split(b'\r\n')
+    headers = collections.defaultdict(list)
+    for l in map(bytes.decode, response_header_lines[1:]):
+        if not l:
+            continue
+        h, *v = l.split(':', 1)
+        headers[h.lower().strip()].append(v[0])
+    return response_header_lines[0].decode(), headers, sock
+
+def main():
+    for url in sys.argv[1:]:
+        status, headers, sock = yield from http_get(url, 0.1)
+        print(status, file=sys.stderr)
+        print(headers, file=sys.stderr)
+        while True:
+            buf = yield from sock.recv(0x1000)
+            if not buf:
+                break
+            sys.stdout.buffer.write(buf)
+        sys.stdout.flush()
+
+if __name__ == '__main__':
+    green380.spawn(main)
+    green380.run()

green380/__init__.py

-from ._core import spawn, run, Channel
+from ._core import spawn, run, Channel, Timeout
 from .threading import RLock, Event
 from .queue import Queue
 from .time import sleep
+from .socket import socket as Socket

green380/_core.py

 import errno
 import fcntl
 import heapq
+import inspect
 import logging
 import os
 import pdb
 import select
 import signal
 import socket as _socket
+import sys
 import time
+import traceback
+#~ logging.basicConfig(level=logging.NOTSET, stream=sys.stderr)
 #
 from .fileno import fileno
+from .time import sleep
 
 logger = logging.getLogger('green380')
-logger.setLevel(logging.INFO)
+logger.setLevel(logging.DEBUG)
+
+
+def alt(*gens):
+    ch = Channel()
+    def wrap(gen):
+        ret = yield from gen
+        ch.send((ret, gen))
+
+    for g in gens:
+        spawn(gen)
 
 
 def timeout_multiplex(event, timeout):
 class Timeout(Exception):
 
     def __init__(self, timeout):
-        self.timeout = timeout
-        self.fiber = current
+        spawn(self._timeout_routine, timeout)
+        self._channels = set()
+        self._expired = False
 
-    def __enter__(self):
-        self.timeout_fiber = spawn(self._timeout_routine)
-        return self
+    def _timeout_routine(self, timeout):
+        yield from sleep(timeout)
+        self._expired = True
+        for ch in self._channels:
+            yield from ch.send()
 
-    def __exit__(self, tp, v, tb):
-        if v is self:
-            schedule(self.fiber)
-            yield
-        self.timeout_fiber.close()
+    def multiplex(self, gen):
+        if self._expired:
+            raise self
+        ch = Channel()
+        self._channels.add(ch)
+        @schedule
+        def routine():
+            result = yield from gen
+            ch.send(result)
+        value, sender = yield from ch.recv_from()
+        self._channels.remove(ch)
+        if sender is routine:
+            return value
+        assert sender is self._timeout_routine, sender
+        raise self
 
-    def _timeout_routine(self):
-        yield 'timeout', self.timeout
-        self.fiber.throw(self)
+#~ class Timeout:
+#~
+    #~ def __init__(self, timeout):
+        #~ self._timeout = timeout
+        #~ spawn(self._timeout_routine)
+#~
+    #~ def _timeout_routine(self):
+        #~ yield from sleep(self._timeout)
+        #~ self._expired = True
+        #~ while self._channels:
+            #~ yield from self._channels.pop().send(False)
+#~
+    #~ def __call__(self, task):
+        #~ if self._expired:
+            #~ return False
+        #~ ch = Channel()
+        #~ self._channels.append(ch)
+        #~ @spawn
+        #~ def event_fiber():
+            #~ yield event
+            #~ ch.send(True)
+        #~ ready = yield from ch.recv()
+        #~ assert self._channels[-1] is ch
+        #~ self._channels.pop()
+        #~ assert isinstance(ready, bool), ready
+        #~ return ready
 
 
 class Channel:
         self._receivers = collections.deque()
         self._senders = collections.deque()
 
-    def get(self):
+    def recv_from(self):
         if self._senders:
             sender = self._senders.popleft()
             item = next(sender)
             schedule(sender)
-            return item
+            return item, sender
         else:
             self._receivers.append(current)
-            item = yield
+            item, sender = yield
             yield
-            return item
+            return item, sender
 
-    recv = get
+    def recv(self):
+        return (yield from self.recv_from())[0]
 
-    def put(self, item):
+    get = recv
+
+    def send(self, item=None):
         if self._receivers:
             receiver = self._receivers.popleft()
-            receiver.send(item)
+            receiver.send((item, current))
             schedule(receiver)
         else:
             self._senders.append(current)
             yield
             yield item
 
-    send = put
+    put = send
 
 
+def fiber_bottom_location(gen):
+    import pdb
+    pdb.set_trace()
+    while inspect.isgenerator(gen):
+        frame = gen.gi_frame
+        gen = frame.f_yieldfrom
+    return frame.f_code, frame.f_lineno
+
 class _Scheduler:
 
     _close_mask = select.EPOLLHUP|select.EPOLLERR
     def handle_event(self, fiber, event):
         try:
             new_event = self.run_fiber(fiber)
-            if new_event != event:
-                if event is not None:
-                    self.remove_event(fiber, *event)
-                if new_event is not None:
-                    self.add_event(fiber, *new_event)
+            try:
+                if new_event != event:
+                    if event is not None:
+                        self.remove_event(fiber, *event)
+                    if new_event is not None:
+                        self.add_event(fiber, *new_event)
+            except:
+                logging.exception('Error handling event for fiber %r: %s', fiber, fiber_bottom_location(fiber))
+                raise
         except:
             logger.critical('Error handling event %r for fiber %r', event, fiber, exc_info=True)
             raise

green380/socket.py

         if self._io_refs <= 0:
             self._real_close()
 
-    def recv_term(self, term):
+    def recv_until(self, term):
         buf = b''
         while not buf.endswith(term):
             yield 'read', self
             if not buf1:
                 break
             index = (buf + buf1).find(term)
-            buf2 = yield from self.recv(len(buf1) if index == -1 else index - len(buf) + len(term))
+            buf2 = super().recv(len(buf1) if index == -1 else index - len(buf) + len(term))
             buf += buf2
         return buf
+    recv_term = recv_until
 
     def recv_exactly(self, count):
         buf = b''
         yield 'read', self
         return super().recv(count)
 
+    def recv_into(self, buf):
+        yield 'read', self
+        return super().recv_into(buf)
+
     def sendall(self, buf):
         while buf:
             sent = yield from self.send(buf)
             raise IOError("cannot read from timed out object")
         while True:
             try:
-                return self._sock.recv_into(b)
+                return (yield from self._sock.recv_into(b))
             except timeout:
                 self._timeout_occurred = True
                 raise
         self._checkClosed()
         self._checkWritable()
         try:
-            return self._sock.send(b)
+            return (yield from self._sock.send(b))
         except error as e:
             # XXX what about EINTR?
             if e.args[0] in _blocking_errnos:
                 sock.settimeout(timeout)
             if source_address:
                 sock.bind(source_address)
-            sock.connect(sa)
+            yield from sock.connect(sa)
             return sock
 
         except error as _:

green380/urllib/request.py

         handlers = chain.get(kind, ())
         for handler in handlers:
             func = getattr(handler, meth_name)
-            result = func(*args)
+            result = yield from func(*args)
             if result is not None:
                 return result
 
             meth = getattr(processor, meth_name)
             req = meth(req)
 
-        response = self._open(req, data)
+        response = yield from self._open(req, data)
 
         # post-process response
         meth_name = protocol+"_response"
         return response
 
     def _open(self, req, data=None):
-        result = self._call_chain(self.handle_open, 'default',
+        result = yield from self._call_chain(self.handle_open, 'default',
                                   'default_open', req)
         if result:
             return result
 
         protocol = req.type
-        result = self._call_chain(self.handle_open, protocol, protocol +
+        result = yield from self._call_chain(self.handle_open, protocol, protocol +
                                   '_open', req)
         if result:
             return result
             h.close()
             raise URLError(err)
         else:
-            r = h.getresponse()
+            r = yield from h.getresponse()
 
         r.url = req.get_full_url()
         # This line replaces the .msg attribute of the HTTPResponse
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.