Source

protorpc / python / protorpc / webapp_test_util.py

Full commit
#!/usr/bin/env python
#
# Copyright 2010 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Testing utilities for the webapp libraries.

  GetDefaultEnvironment: Method for easily setting up CGI environment.
  RequestHandlerTestBase: Base class for setting up handler tests.
"""

__author__ = 'rafek@google.com (Rafe Kaplan)'

import cStringIO
import threading
import time
import unittest
import urllib2
from wsgiref import simple_server
from wsgiref import validate

from . import protojson
from . import remote
from . import test_util
from . import transport
from .webapp import service_handlers

from google.appengine.ext import webapp


class TestService(remote.Service):
  """Service used to do end to end tests with."""

  @remote.method(test_util.OptionalMessage,
                 test_util.OptionalMessage)
  def optional_message(self, request):
    if request.string_value:
      request.string_value = '+%s' % request.string_value
    return request


def GetDefaultEnvironment():
  """Function for creating a default CGI environment."""
  return {
    'LC_NUMERIC': 'C',
    'wsgi.multiprocess': True,
    'SERVER_PROTOCOL': 'HTTP/1.0',
    'SERVER_SOFTWARE': 'Dev AppServer 0.1',
    'SCRIPT_NAME': '',
    'LOGNAME': 'nickjohnson',
    'USER': 'nickjohnson',
    'QUERY_STRING': 'foo=bar&foo=baz&foo2=123',
    'PATH': '/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/bin/X11',
    'LANG': 'en_US',
    'LANGUAGE': 'en',
    'REMOTE_ADDR': '127.0.0.1',
    'LC_MONETARY': 'C',
    'CONTENT_TYPE': 'application/x-www-form-urlencoded',
    'wsgi.url_scheme': 'http',
    'SERVER_PORT': '8080',
    'HOME': '/home/mruser',
    'USERNAME': 'mruser',
    'CONTENT_LENGTH': '',
    'USER_IS_ADMIN': '1',
    'PYTHONPATH': '/tmp/setup',
    'LC_TIME': 'C',
    'HTTP_USER_AGENT': 'Mozilla/5.0 (X11; U; Linux i686 (x86_64); en-US; '
        'rv:1.8.1.6) Gecko/20070725 Firefox/2.0.0.6',
    'wsgi.multithread': False,
    'wsgi.version': (1, 0),
    'USER_EMAIL': 'test@example.com',
    'USER_EMAIL': '112',
    'wsgi.input': cStringIO.StringIO(),
    'PATH_TRANSLATED': '/tmp/request.py',
    'SERVER_NAME': 'localhost',
    'GATEWAY_INTERFACE': 'CGI/1.1',
    'wsgi.run_once': True,
    'LC_COLLATE': 'C',
    'HOSTNAME': 'myhost',
    'wsgi.errors': cStringIO.StringIO(),
    'PWD': '/tmp',
    'REQUEST_METHOD': 'GET',
    'MAIL': '/dev/null',
    'MAILCHECK': '0',
    'USER_NICKNAME': 'test',
    'HTTP_COOKIE': 'dev_appserver_login="test:test@example.com:True"',
    'PATH_INFO': '/tmp/myhandler'
  }


class RequestHandlerTestBase(test_util.TestCase):
  """Base class for writing RequestHandler tests.

  To test a specific request handler override CreateRequestHandler.
  To change the environment for that handler override GetEnvironment.
  """

  def setUp(self):
    """Set up test for request handler."""
    self.ResetHandler()

  def GetEnvironment(self):
    """Get environment.

    Override for more specific configurations.

    Returns:
      dict of CGI environment.
    """
    return GetDefaultEnvironment()

  def CreateRequestHandler(self):
    """Create RequestHandler instances.

    Override to create more specific kinds of RequestHandler instances.

    Returns:
      RequestHandler instance used in test.
    """
    return webapp.RequestHandler()

  def CheckResponse(self,
                    expected_status,
                    expected_headers,
                    expected_content):
    """Check that the web response is as expected.

    Args:
      expected_status: Expected status message.
      expected_headers: Dictionary of expected headers.  Will ignore unexpected
        headers and only check the value of those expected.
      expected_content: Expected body.
    """
    def check_content(content):
      self.assertEquals(expected_content, content)

    def start_response(status, headers):
      self.assertEquals(expected_status, status)

      found_keys = set()
      for name, value in headers:
        name = name.lower()
        try:
          expected_value = expected_headers[name]
        except KeyError:
          pass
        else:
          found_keys.add(name)
          self.assertEquals(expected_value, value)

      missing_headers = set(expected_headers.iterkeys()) - found_keys
      if missing_headers:
        self.fail('Expected keys %r not found' % (list(missing_headers),))

      return check_content

    self.handler.response.wsgi_write(start_response)

  def ResetHandler(self, change_environ=None):
    """Reset this tests environment with environment changes.

    Resets the entire test with a new handler which includes some changes to
    the default request environment.

    Args:
      change_environ: Dictionary of values that are added to default
        environment.
    """
    environment = self.GetEnvironment()
    environment.update(change_environ or {})

    self.request = webapp.Request(environment)
    self.response = webapp.Response()
    self.handler = self.CreateRequestHandler()
    self.handler.initialize(self.request, self.response)


class SyncedWSGIServer(simple_server.WSGIServer):
  pass


class ServerThread(threading.Thread):
  """Thread responsible for managing wsgi server.

  This server does not just attach to the socket and listen for requests.  This
  is because the server classes in Python 2.5 or less have no way to shut them
  down.  Instead, the thread must be notified of how many requests it will
  receive so that it listens for each one individually.  Tests should tell how
  many requests to listen for using the handle_request method.
  """

  def __init__(self, server, *args, **kwargs):
    """Constructor.

    Args:
      server: The WSGI server that is served by this thread.
      As per threading.Thread base class.

    State:
      __serving: Server is still expected to be serving.  When False server
        knows to shut itself down.
    """
    self.server = server
    self.server.socket.settimeout(None)
    self.__serving = True

    super(ServerThread, self).__init__(*args, **kwargs)

  def shutdown(self):
    """Notify server that it must shutdown gracefully."""
    self.__serving = False

  def run(self):
    """Handle incoming requests until shutdown."""
    while self.__serving:
      self.server.handle_request()

    self.server = None


class TestService(remote.Service):
  """Service used to do end to end tests with."""

  def __init__(self, message='uninitialized'):
    self.__message = message

  @remote.method(test_util.OptionalMessage, test_util.OptionalMessage)
  def optional_message(self, request):
    if request.string_value:
      request.string_value = '+%s' % request.string_value
    return request

  @remote.method(response_type=test_util.OptionalMessage)
  def init_parameter(self, request):
    return test_util.OptionalMessage(string_value=self.__message)

  @remote.method(test_util.NestedMessage, test_util.NestedMessage)
  def nested_message(self, request):
    request.string_value = '+%s' % request.string_value
    return request

  @remote.method()
  def raise_application_error(self, request):
    raise remote.ApplicationError('This is an application error', 'ERROR_NAME')

  @remote.method()
  def raise_unexpected_error(self, request):
    raise TypeError('Unexpected error')

  @remote.method()
  def raise_rpc_error(self, request):
    raise remote.NetworkError('Uncaught network error')

  @remote.method(response_type=test_util.NestedMessage)
  def return_bad_message(self, request):
    return test_util.NestedMessage()


class AlternateService(remote.Service):
  """Service used to requesting non-existant methods."""

  @remote.method()
  def does_not_exist(self, request):
    raise NotImplementedError('Not implemented')


class WebServerTestBase(test_util.TestCase):

  SERVICE_PATH = '/my/service'

  def setUp(self):
    self.server = None
    self.schema = 'http'
    self.ResetServer()

    self.bad_path_connection = self.CreateTransport(self.service_url + '_x')
    self.bad_path_stub = TestService.Stub(self.bad_path_connection)

  def tearDown(self):
    self.server.shutdown()

  def ResetServer(self, application=None):
    """Reset web server.

    Shuts down existing server if necessary and starts a new one.

    Args:
      application: Optional WSGI function.  If none provided will use
        tests CreateWsgiApplication method.
    """
    if self.server:
      self.server.shutdown()

    self.port = test_util.pick_unused_port()
    self.server, self.application = self.StartWebServer(self.port, application)

    self.connection = self.CreateTransport(self.service_url)

  def CreateTransport(self, service_url, protocol=protojson):
    """Create a new transportation object."""
    return transport.HttpTransport(service_url, protocol=protocol)

  def StartWebServer(self, port, application=None):
    """Start web server.

    Args:
      port: Port to start application on.
      application: Optional WSGI function.  If none provided will use
        tests CreateWsgiApplication method.

    Returns:
      A tuple (server, application):
        server: An instance of ServerThread.
        application: Application that web server responds with.
    """
    if not application:
      application = self.CreateWsgiApplication()
    validated_application = validate.validator(application)
    server = simple_server.make_server('localhost', port, validated_application)
    server = ServerThread(server)
    server.start()
    return server, application

  def make_service_url(self, path):
    """Make service URL using current schema and port."""
    return '%s://localhost:%d%s' % (self.schema, self.port, path)

  @property
  def service_url(self):
    return self.make_service_url(self.SERVICE_PATH)


class EndToEndTestBase(WebServerTestBase):

  # Sub-classes may override to create alternate configurations.
  DEFAULT_MAPPING = service_handlers.service_mapping(
    [('/my/service', TestService),
     ('/my/other_service', TestService.new_factory('initialized')),
    ])

  def setUp(self):
    super(EndToEndTestBase, self).setUp()

    self.stub = TestService.Stub(self.connection)

    self.other_connection = self.CreateTransport(self.other_service_url)
    self.other_stub = TestService.Stub(self.other_connection)

    self.mismatched_stub = AlternateService.Stub(self.connection)

  @property
  def other_service_url(self):
    return 'http://localhost:%d/my/other_service' % self.port

  def CreateWsgiApplication(self):
    """Create WSGI application used on the server side for testing."""
    return webapp.WSGIApplication(self.DEFAULT_MAPPING, True)

  def DoRawRequest(self,
                   method,
                   content='',
                   content_type='application/json',
                   headers=None):
    headers = headers or {}
    headers.update({'content-length': len(content or ''),
                    'content-type': content_type,
                   })
    request = urllib2.Request('%s.%s' % (self.service_url, method),
                              content,
                              headers)
    return urllib2.urlopen(request)

  def RawRequestError(self,
                      method,
                      content=None,
                      content_type='application/json',
                      headers=None):
    try:
      self.DoRawRequest(method, content, content_type, headers)
      self.fail('Expected HTTP error')
    except urllib2.HTTPError, err:
      return err.code, err.read(), err.headers