ginsfsm / ginsfsm / protocols / http / server / c_http_clisrv.py

The default branch has multiple heads

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
# -*- encoding: utf-8 -*-
"""
GObj :class:`GHttpCliSrv`
=========================

Http Channel.

.. autoclass:: GHttpCliSrv
    :members:

"""
from collections import deque
import logging
import traceback

from ginsfsm.c_timer import GTimer
from ginsfsm.gobj import GObj
from ginsfsm.protocols.http.common.parser import HTTPRequestParser
from ginsfsm.protocols.http.common.utilities import InternalServerError
from ginsfsm.protocols.http.common.response import (
    HttpResponse,
    HttpErrorResponse,
)


class ResponseInterrupt(Exception):
    """ To use when the response is asynchronous
        or infinite until top level wants to finish it.
    """


def ac_disconnected(self, event):
    """ Gsock closed.
    """
    if self.gsock:
        gsock = self.gsock
        self.gsock = None
        self.destroy_gobj(gsock)
    self.broadcast_event('EV_HTTP_CHANNEL_CLOSED', channel=self)


def ac_rx_data(self, event):
    """ Receiving data from the partner clisrv gsock.
        Can be one or more requests.
    """
    self.start_inactivity_timer()
    #gsock = event.source[-1]
    gsock = self.gsock  # must be same as event.source[-1]
    data = event.data
    if not data:
        return
    new_request = self.parsing_request
    while data:
        if new_request is None:
            new_request = HTTPRequestParser(self)
        n = new_request.received(data)
        if new_request.expect_continue and new_request.headers_finished:
            # guaranteed by parser to be a 1.1 new_request
            new_request.expect_continue = False

            if not self.sent_continue:
                self.send_event(gsock, b'HTTP/1.1 100 Continue\r\n\r\n')
                self.sent_continue = True
                new_request.completed = False

        if new_request.completed:
            # The new_request (with the body) is ready to use.
            self.parsing_request = None
            if not new_request.empty:
                self.enqueue_request(new_request)
            new_request = None
        else:
            self.parsing_request = new_request
        if n >= len(data):
            break
        data = data[n:]

    if len(self.dl_requests):
        self.send_event(self, 'EV_DEQUEUE_REQUEST')


def ac_dequeue_request(self, event):
    if self.responding_request:
        self.logger.exception('Internal ERROR!!!: '
                              'responding_request MUST be None')
    if len(self.dl_requests):
        self.responding_request = self.dl_requests.popleft()
        self.send_event(self, 'EV_HTTP_REQUEST')
    else:
        self.start_inactivity_timer()


def ac_http_request(self, event):
    """ Internal event.
        The request is saved in self.responding_request.
    """
    self.stop_inactivity_timer()

    if self.responding_request.error:
        response = HttpErrorResponse(self.responding_request)
        response.execute()
        self.finish(response)
        return

    self.start_responseless_timer()
    self.set_new_state('ST_WAIT_RESPONSE')
    # TODO: in stratus environment, we need to inform of who srvcli is.
    self.broadcast_event(
        'EV_HTTP_REQUEST',
        request=self.responding_request,
        channel=self,
    )


def ac_http_response(self, event):
    response = event.response
    if not isinstance(response, HttpResponse):
        logging.error("ERROR response doesn't mach HttpResponse %s" %
            response.request.path)

    if response.request != self.responding_request:
        logging.error("ERROR response doesn't mach responding request %s" %
            response.request.path)

    if self.responding_response:
        logging.error("ERROR responding_response is BUSY, of %s" %
            response.request.path)
    self.responding_response = response

    response.start()
    try:
        response.execute()
    except ResponseInterrupt:
        """ Response is asynchronous or infinite.
            Don't clear the current responding_request.
        """
        self.stop_responseless_timer()  # TODO: do some ping-alive
        return

    except:
        logging.exception('Exception when serving %s' % response.request.path)
        if not response.wrote_header:
            if self.parent.expose_tracebacks:
                body = traceback.format_exc()
            else:
                body = ('The server encountered an unexpected '
                        'internal server error')

            request = HTTPRequestParser(self)
            request.error = InternalServerError(body)
            response = HttpErrorResponse(request)
            response.execute()
        else:
            response.close_on_finish = True

    self.finish(response)


def ac_transmit_ready(self, event):
    pass


def ac_inactivity_timeout(self, event):
    """ Close the channel by inactivity.
    """
    if self.gsock:
        self.send_event(self.gsock, 'EV_DROP')


def ac_responseless_timeout(self, event):
    """ Close the channel by responseless of top level.
    """
    if not self.gsock:
        return

    body = ('Response Timeout. The server is busy. '
            'Please re-try your request in a few moments.'
    )
    request = HTTPRequestParser(self)
    request.error = InternalServerError(body)
    response = HttpErrorResponse(request)
    response.execute()
    self.finish(response)


GHTTPCLISRV_FSM = {
    'event_list': (
        'EV_SET_TIMER: bottom output',
        'EV_RESPONSELESS_TIMEOUT: bottom input',
        'EV_INACTIVITY_TIMEOUT: bottom input',
        'EV_DISCONNECTED: bottom input',
        'EV_RX_DATA: bottom input',
        'EV_DEQUEUE_REQUEST',
        'EV_TRANSMIT_READY: bottom input',
        'EV_SEND_DATA: bottom output',
        'EV_FLUSH_OUTPUT_DATA: bottom output',
        'EV_WRITE_OUTPUT_DATA: bottom output',
        'EV_HTTP_CHANNEL_OPENED: top output',
        'EV_HTTP_CHANNEL_CLOSED: top output',
        'EV_HTTP_REQUEST: top output',
        'EV_HTTP_RESPONSE: top input',
    ),
    'state_list': (
        'ST_IDLE',
        'ST_WAIT_RESPONSE',
    ),
    'machine': {
        'ST_IDLE':
        (
            ('EV_DISCONNECTED',         ac_disconnected,            None),
            ('EV_INACTIVITY_TIMEOUT',   ac_inactivity_timeout,      None),
            ('EV_RX_DATA',              ac_rx_data,                 None),
            ('EV_DEQUEUE_REQUEST',      ac_dequeue_request,         None),
            ('EV_HTTP_REQUEST',         ac_http_request,            None),
        ),
        'ST_WAIT_RESPONSE':
        (
            ('EV_DISCONNECTED',         ac_disconnected,            None),
            ('EV_RESPONSELESS_TIMEOUT', ac_responseless_timeout,    'ST_IDLE'),
            ('EV_RX_DATA',              ac_rx_data,                 None),
            ('EV_DEQUEUE_REQUEST',      None,                       None),
            ('EV_HTTP_RESPONSE',        ac_http_response,           'ST_IDLE'),

            ('EV_TRANSMIT_READY',       ac_transmit_ready,          None),
        ),
    }
}

GHTTPCLISRV_GCONFIG = {
    'subscriber': [None, None, 0, None,
        "subcriber of all output-events."
        ],
    'gsock': [None, None, 0, None,
        "partner gsock."
        ],
    'maximum_simultaneous_requests': [int, 0, 0, None,
        "maximum simultaneous requests."
        ],
    'url_scheme': [str, 'http', 0, None, "default ``http`` value"],
    'inactivity_timeout': [int, 5 * 60 * 60, 0, None,
        "Inactivity timeout in seconds."
        ],
    'responseless_timeout': [int, 5 * 60 * 60, 0, None,
        "'Without response' timeout in seconds."
        ],
    # A tempfile should be created if the pending input is larger than
    # inbuf_overflow, which is measured in bytes. The default is 512K.  This
    # is conservative.
    'inbuf_overflow': [int, 524288, 0, None, ""],
    # maximum number of bytes of all request headers combined (256K default)
    'max_request_header_size': [int, 262144, 0, None, ""],
    # maximum number of bytes in request body (1GB default)
    'max_request_body_size': [int, 1073741824, 0, None, ""],
}


class GHttpCliSrv(GObj):
    """  Http clisrv (client of server) class.

    This gobj is create by GHttpServer when it receives an EV_CONNECTED event
    from a new gsock gobj.

    This class will subscribe all the events of the partner
    :class:`ginsfsm.c_sock.GSock` gobj, to implement the http protocol.

    .. ginsfsm::
       :fsm: GHTTPCLISRV_FSM
       :gconfig: GHTTPCLISRV_GCONFIG

    *Top Output-Events:*
        * :attr:`'EV_HTTP_CHANNEL_OPENED'`: new http client.

          Event attributes:

            * ``channel``: http channel.

        * :attr:`'EV_HTTP_CHANNEL_CLOSED'`: http client closed.

          Event attributes:

            * ``channel``: http channel.

        * :attr:`'EV_HTTP_REQUEST'`: new http request.

          Event attributes:

            * ``channel``: http channel.
            * ``request``: http request.

    *Top Input-Events:*
        * :attr:`'EV_HTTP_RESPONSE'`: response to the current request.

          Event attributes:

            * ``response``: http response.

    *Bottom Input-Events:*
        * :attr:`'EV_DISCONNECTED'`: socket disconnected.

          The clisrv `gobj` will be destroyed.

        * :attr:`'EV_TRANSMIT_READY'`: socket ready to transmit more data.

        * :attr:`'EV_RX_DATA'`: data received. Process http protocol.

    *Bottom Output-Events:*

        * :attr:`'EV_SEND_DATA'`: transmit data socket.
        * :attr:`'EV_WRITE_OUTPUT_DATA'`: write data to socket output buffer.
        * :attr:`'EV_FLUSH_OUTPUT_DATA'`: flush data of socket output buffer.

    """

    def __init__(self):
        GObj.__init__(self, GHTTPCLISRV_FSM, GHTTPCLISRV_GCONFIG)
        self.parsing_request = None  # A request parser instance
        self.responding_request = None  # request waiting a top response
        self.responding_response = None  # current response being responding
        self.dl_requests = deque()  # requests queue
        self.sent_continue = False  # used as a latch after sending 100continue

    def start_up(self):
        if self.subscriber is None:
            self.subscriber = self.parent

        # gsock MUST be passed at creating gobj

        # Canalize the flow of messages
        self.gsock.subscribe_event(None, self)  # bottom events for me
        self.subscribe_event(None, self.subscriber)  # top events for subscrib.
        self.broadcast_event('EV_HTTP_CHANNEL_OPENED', channel=self)

        # Setup the timers
        self.inactivity_timer = self.create_gobj(
            None,
            GTimer,
            self,
            timeout_event_name='EV_INACTIVITY_TIMEOUT'
        )
        self.start_inactivity_timer()

        self.responseless_timer = self.create_gobj(
            None,
            GTimer,
            self,
            timeout_event_name='EV_RESPONSELESS_TIMEOUT'
        )

    def enqueue_request(self, new_request):
        self.dl_requests.append(new_request)
        if self.maximum_simultaneous_requests > 0 and \
                len(self.dl_requests) > self.maximum_simultaneous_requests:
            # Close the channel by maximum simultaneous requests reached.
            body = 'Please change your behavior.' \
                ' You have reached the maximum simultaneous requests (%d).' % (
                    self.maximum_simultaneous_requests)
            request = HTTPRequestParser(self)
            request.error = InternalServerError(body)
            response = HttpErrorResponse(request)
            response.service()
            self.clear_request_queue()

    def clear_request_queue(self):
        for request in self.dl_requests:
            request._close()
        self.dl_requests.clear()

    def start_inactivity_timer(self):
        self.send_event(
            self.inactivity_timer,
            'EV_SET_TIMER',
            seconds=self.inactivity_timeout
        )

    def stop_inactivity_timer(self):
        self.send_event(
            self.inactivity_timer,
            'EV_SET_TIMER',
            seconds=-1
        )

    def start_responseless_timer(self):
        self.send_event(
            self.responseless_timer,
            'EV_SET_TIMER',
            seconds=self.responseless_timeout
        )

    def stop_responseless_timer(self):
        self.send_event(
            self.responseless_timer,
            'EV_SET_TIMER',
            seconds=-1
        )

    def write(self, data):
        """ Write data to output buffer.
            To supply asynchronous access to high level.
        """
        if not self.responding_response:
            logging.error("ERROR channel.write() with no responding_response")
            return
        self.responding_response.write(data)

    def flush(self, callback=None):
        """ Flush output buffer.
            To supply asynchronous access to high level.
        """
        if not self.responding_response:
            logging.error("ERROR channel.flush() with no responding_response")
            return
        self.responding_response.flush(callback)

    def finish(self, response=None):
        """ Finishes this response,
            flushing output buffer,
            and ending the HTTP request.
            To supply asynchronous access to high level.
        """
        if response is None:
            if not self.responding_response:
                logging.error("ERROR channel.finish()"
                              " with no responding_response")
                return
            response = self.responding_response
        response.finish()
        self.stop_responseless_timer()
        self.responding_request = None
        self.responding_response = None

        if response.close_on_finish:
            # ignore all enqueued requests.
            self.clear_request_queue()
            if self.gsock:
                self.gaplic.add_callback(self.gsock.mt_drop)
            return

        # pull the request queue
        self.post_event(self, 'EV_DEQUEUE_REQUEST')
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.