Wiki

Clone wiki

bi_etl / Bi_Etl.Tests.Etl_Jobs.Etl_Task_1

bi_etl.tests.etl_jobs.etl_task_1 module

Created on Apr 18, 2016

author: woodd

class bi_etl.tests.etl_jobs.etl_task_1.ETL_Task_1(task_id=None, parent_task_id=None, root_task_id=None, scheduler=None, task_rec=None, config=None)

Bases: bi_etl.tests.etl_jobs.etl_test_task_base.ETL_Test_Task_Base

CLASS_VERSION \= 1.0

DEFAULT_NO_MAIL \= False

add_child_task_by_partial_name_to_scheduler(partial_module_name, parameters=None, display_name=None)

Start a new task on the :class`bi_etl.scheduler.scheduler.Scheduler` that will be a child of this task.

add_child_task_to_scheduler(etl_task_class_type, parameters=None, display_name=None)

Start a new task on the :class`bi_etl.scheduler.scheduler.Scheduler` that will be a child of this task.

add_database(database_object)

add_parameter(param_name: str, param_value: object, local_only: bool = False, commit: bool = True)

add_parameters(local_only=False, commit=True, *args, **kwargs)

allow_concurrent_runs()

children

close()

Cleanup the task. Close any registered objects, close any database connections.

config

Get the task configuration object. If it was not passed in, it will be read from the users folder.

debug_sql(mode: int = True)

Control the output of sqlalchemy engine

Parameters:mode -- Boolean (debug if True, Error if false) or int logging level.

depends_on()

display_name

finish()

Placeholder for post-load cleanup. This might be useful for cleaning up what was done in init. It could also allow an inheriting class to begin waiting for children (see process_messages)

get_database(database_name, user=None, schema=None, **kwargs) -> bi_etl.database.database_metadata.DatabaseMetadata

Get a new database connection.

Parameters:
  • database_name (str) -- The name of the database section in config.ini
  • user (str) --
  • schema (str) --

get_parameter(param_name, default=None)

Returns the value of the parameter with the name provided, or default if that is not None.

Parameters:
  • param_name (str) -- The parameter to retrieve
  • default (any) -- The default value. Default default = None
Raises:

ParameterError: -- If named parameter does not exist and no default is provided.

get_setting(setting_name, assignments=None)

get_setting_or_default(setting_name, default)

init()

pre-load initialization. Runs on the execution server. Override to add setup tasks.

Note: init method is useful in cases were you wish to define a common base class with a single load method. Each inheriting class can then do it's own stuff in init With init you can have the flow of execution be:

  1. spec_class.init (if any code before super call)
  2. base_class.init
  3. spec_class.init (after super call, where your code should really go)
  4. base_class.load

Note 2: Sometimes the functionality above can be achieved with __init__. However, when the scheduler thread creates an ETLTask, it creates an instance and thus runs __init__. Therefore, you will want __init__ to be as simple as possible. On the other hand, init is run only by the task execution thread. So it can safely do more time consuming work. Again though this method is for when class inheritance is used, and that logic can not go into the load method.

Why does the scheduler create an instance? It does that in case a task needs a full instance and possibly parameter values in order to answer some of the methods like depends_on or mutually_exclusive_execution.

load()

load_parameters()

Load parameters for this task from the scheduler.

log

Get a logger using the task name.

log_logging_level()

mutually_exclusive_execution()

Override to provide a list of task names (or partial names that match modules) that this task can not run at the same time as.

If allow_concurrent_runs is false, defaults to a list with just self.name If allow_concurrent_runs is true, defaults to an empty list

mutually_exclusive_with_set

Build a set of modules this task is mutually exclusive with. The list is obtained using mutually_exclusive_execution. Each list member will be "normalized" to be a fully qualified name.

name

Note -- Return value needs to be compatible with find_etl_class

needs_to_get_ancestor_statuses()

Override and return True if you want to get status updates on any ancestor.

needs_to_get_child_statuses()

Override and return True if you want to get status updates on children.

needs_to_ok_child_runs()

Override and return True if you need to give OK before children are allowed to run. See process_child_run_requested

normalized_dependents_set

Build a set of modules this task depends on. See depends_on. Each will be "normalized" to be a fully qualified name.

parameter_names()

Returns a list of parameter names

parameters()

Returns a generator yielding tuples of parameter (name,value)

parent_task

parent_task_id

process_child_run_requested(childRunRequested)

Override to examine child task before giving OK.

process_child_status_update(childStatusUpdate)

Override to examine child task status (ChildRunFinished instances)

process_messages(block=False)

Processes messages for this task. Should be called somewhere in any row looping.

Parameters:block (boolean) -- Block while waiting. Defaults to False. If block is True, this will run until a terminating message is received or an exception is thrown by the process_X calls. If block if False, you probably want to call inside a loop.

Example Code:

from bi_etl.scheduler.exceptions import WorkflowFinished

def process_child_status_update(self, childStatusUpdate):
    # Placeholder for real check if done
    example_all_done \= self.foo()

    if example_all_done:
        raise WorkflowFinished()

def load(self):
    # Placeholder for real load code
    self.load_foo()

    #Begin waiting for children
    try:
       self.process_messages(block\=True)
    except WorkflowFinished:
        pass

register_child(child_task_object)

register_object(obj)

Register an ETLComponent object with the task. This allows the task to 1) Get statistics from the component 2) Close the component when done

Parameters:obj (bi_etl.components.etlcomponent.ETLComponent) -- A sub-class of :class`~bi_etl.components.etlcomponent.ETLComponent`

root_task

root_task_id

run(no_mail=None, parent_to_child=None, child_to_parent=None)

Should not generally be overridden. This is called smtp_to run the task's code in the init, load, and finish methods.

scheduler

Get the existing -- class`bi_etl.scheduler.scheduler.Scheduler` that this task is running under. or Get an instance of :class`bi_etl.scheduler.scheduler_interface.SchedulerInterface` that can be used to interact with the main Scheduler.

send_mesage(msg)

set_parameter(param_name: str, param_value: object, local_only: bool = False, commit: bool = True)

Add a single parameter to this task.

Parameters:
  • param_name (str) -- The name of the parameter to add
  • param_value (object) -- The value of the parameter
  • commit (bool) --
  • local_only (bool) --

set_parameters(local_only=False, commit=True, *args, **kwargs)

Add multiple parameters to this task. Parameters can be passed in as any combination of: * dict instance. Example add_parameters( {'param1':'example', 'param2':100} ) * list of lists. Example: add_parameters( [ ['param1','example'], ['param2',100] ] ) * list of tuples. Example: add_parameters( [ ('param1','example'), ('param2',100) ] ) * keyword arguments. Example: add_parameters(foo=1, bar='apple')

Parameters:
  • local_only (boolean) -- Optional. Default= False. Add parameters to local task only. Do not record in the scheduler.
  • commit (boolean) -- Optional. Default= True. Commit changes to the task database.
  • args (list) -- list of lists. or list of tuples. See above.
  • kwargs (dict) -- keyword arguments send to parameters. See above.

start_following_tasks()

Override to add tasks that should follow after this tasks to the scheduler. This is called at the end of ETLTask.run

statistics

Return the execution statistics from the task and all of it's registered components.

status

task_id

task_rec

Updated