Source

HORNET / tests / test_application.py

Full commit
# -*- coding: utf-8 -*-
#
# Copyright 2009 Vanderbilt University
# 
# Licensed under the Apache License, Version 2.0 (the "License"); 
# you may not use this file except in compliance with the License. 
# You may obtain a copy of the License at 
# 
#     http://www.apache.org/licenses/LICENSE-2.0 
# 
# Unless required by applicable law or agreed to in writing, software 
# distributed under the License is distributed on an "AS IS" BASIS, 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
# See the License for the specific language governing permissions and 
# limitations under the License. 

from datetime import date, timedelta
from helpers import FakePlugin
from hornet.application import *
from hornet.event import ProcessingStart, EventNotAccepted
from nose import with_setup
from nose.tools import assert_raises

def setup_app():
    global plugins, config

    plugins = {}
    plugins['one'] = FakePlugin()
    plugins['two'] = FakePlugin()
    plugins['three'] = FakePlugin()
    
    config = Config()
    config.plugins = plugins
    config.start = date(2006,  1, 1)
    config.end = date(2006, 1, 28)
    config.period_size = timedelta(7)
    
def teardown_app():
    global plugins, config
    del plugins, config
    
def test_config():
    c = Config()
    assert c.plugins == {}
    assert hasattr(c, 'output_dir')
    assert hasattr(c, 'start')
    assert hasattr(c, 'end')
    assert hasattr(c, 'period_size')

@with_setup(setup_app, teardown_app)
def test_bind_plugins():
    parent = object()
    result = bind_plugins(parent, plugins)
    
    assert len(result) == 3
    assert plugins['one'] in result
    assert plugins['one'].id == 'one'
    assert plugins['one'].parent == parent
    
    assert plugins['two'] in result
    assert plugins['two'].id == 'two'
    assert plugins['two'].parent == parent
    
    assert plugins['three'] in result
    assert plugins['three'].id == 'three'
    assert plugins['three'].parent == parent
    

@with_setup(setup_app, teardown_app)
def test_setup_plugins():
    plugins['two'].listen_to = [plugins['one']]
    plugins['three'].listen_to = [plugins['one']]
    
    setup_plugins(plugins.values())
    
    assert len(plugins['one']._listeners) == 2
    assert len(plugins['two']._listeners) == 0
    assert len(plugins['three']._listeners) == 0
    
    for plugin in plugins.values():
        assert plugin.setup_called
    
@with_setup(setup_app, teardown_app)
def test_root_plugins():
    plugins['two'].listen_to = [plugins['one']]
    plugins['three'].listen_to = [plugins['one']]
    
    setup_plugins(plugins.values())
    roots = root_plugins(plugins.values())
    
    assert len(roots) == 1
    assert id(roots[0]) == id(plugins['one'])

@with_setup(setup_app, teardown_app)
def test_root_plugins_none():
    roots = root_plugins([])
    assert len(roots) == 0

@with_setup(setup_app, teardown_app)
def test_root_plugins_multi_roots():
    plugins['three'].listen_to = [plugins['one'], plugins['two']]
    
    setup_plugins(plugins.values())
    roots = root_plugins(plugins.values())
    
    assert len(roots) == 2
    assert plugins['one'] in roots
    assert plugins['two'] in roots

@with_setup(setup_app, teardown_app)
def test_root_plugins_no_roots_listeners():
    setup_plugins(plugins.values())
    roots = root_plugins(plugins.values())
    
    assert len(roots) == 3
    assert plugins['one'] in roots
    assert plugins['two'] in roots
    assert plugins['three'] in roots

@with_setup(setup_app, teardown_app)
def test_plugin_runner_initialize():
    runner = PluginRunner(config)
    
    assert len(runner.plugins) == 3
    assert len(runner.periods) == 4
    assert runner.config == config
    
    # make sure bind_plugins was called
    assert plugins['one'].id == 'one'

@with_setup(setup_app, teardown_app)
def test_plugin_runner_get_plugin():
    runner = PluginRunner(config)
    assert runner.get_plugin('one') == plugins['one']
    assert runner.get_plugin('two') == plugins['two']
    assert runner.get_plugin('three') == plugins['three']
    
@with_setup(setup_app, teardown_app)
def test_plugin_runner_get_plugin_not_found():
    runner = PluginRunner(config)
    assert_raises(KeyError, runner.get_plugin, 'not_a_plugin')

@with_setup(setup_app, teardown_app)
def test_plugin_runner_stop():
    runner = PluginRunner(config)
    runner.stop()
    
    assert not hasattr(runner, 'plugins')
    for plugin in plugins.values():
        assert plugin.teardown_called
    
@with_setup(setup_app, teardown_app)
def test_plugin_runner():
    plugins['one']._accepted_events = ProcessingStart
    plugins['two'].listen_to = [plugins['one']]
    plugins['three'].listen_to = [plugins['one']]
    runner = PluginRunner(config)
    runner.run()
    
    assert all([p.setup_called for p in runner.plugins])
    assert plugins['one'].notify_called
    assert isinstance(plugins['one'].notify_event, ProcessingStart)
    
@with_setup(setup_app, teardown_app)
def test_plugin_runner_circle_roots():
    plugins['two'].listen_to = [plugins['one']]
    plugins['three'].listen_to = [plugins['one']]
    plugins['one'].listen_to = [plugins['two']]
    
    runner = PluginRunner(config)
    assert_raises(InvalidConfiguration, runner.run)
    
@with_setup(setup_app, teardown_app)
def test_plugin_runner_plugin_does_not_accept_event():
    plugins['two'].listen_to = [plugins['one']]
    plugins['three'].listen_to = [plugins['one']]
    
    runner = PluginRunner(config)
    assert_raises(EventNotAccepted, runner.run)