Wiki
bi_etl / Bi_Etl.Tests.Etl_Jobs.Etl_Task_1
bi_etl.tests.etl_jobs.etl_task_1 module
Created on Apr 18, 2016
author: woodd
class bi_etl.tests.etl_jobs.etl_task_1.ETL_Task_1(task_id=None, parent_task_id=None, root_task_id=None, scheduler=None, task_rec=None, config=None)
Bases: bi_etl.tests.etl_jobs.etl_test_task_base.ETL_Test_Task_Base
CLASS_VERSION \= 1.0
DEFAULT_NO_MAIL \= False
add_child_task_by_partial_name_to_scheduler(partial_module_name, parameters=None, display_name=None)
Start a new task on the :class`bi_etl.scheduler.scheduler.Scheduler` that will be a child of this task.add_child_task_to_scheduler(etl_task_class_type, parameters=None, display_name=None)
Start a new task on the :class`bi_etl.scheduler.scheduler.Scheduler` that will be a child of this task.add_database(database_object)
add_parameter(param_name: str, param_value: object, local_only: bool = False, commit: bool = True)
add_parameters(local_only=False, commit=True, *args, **kwargs)
allow_concurrent_runs()
children
close()
Cleanup the task. Close any registered objects, close any database connections.config
Get the task configuration object. If it was not passed in, it will be read from the users folder.debug_sql(mode: int = True)
Control the output of sqlalchemy engine
Parameters: mode -- Boolean (debug if True, Error if false) or int logging level. depends_on()
display_name
finish()
Placeholder for post-load cleanup. This might be useful for cleaning up what was done in init. It could also allow an inheriting class to begin waiting for children (see process_messages)get_database(database_name, user=None, schema=None, **kwargs) -> bi_etl.database.database_metadata.DatabaseMetadata
Get a new database connection.
Parameters:
- database_name (str) -- The name of the database section in config.ini
- user (str) --
- schema (str) --
get_parameter(param_name, default=None)
Returns the value of the parameter with the name provided, or default if that is not None.
Parameters: Raises: ParameterError: -- If named parameter does not exist and no default is provided.
get_setting(setting_name, assignments=None)
get_setting_or_default(setting_name, default)
init()
pre-load initialization. Runs on the execution server. Override to add setup tasks.
Note: init method is useful in cases were you wish to define a common base class with a single load method. Each inheriting class can then do it's own stuff in init With init you can have the flow of execution be:
- spec_class.init (if any code before super call)
- base_class.init
- spec_class.init (after super call, where your code should really go)
- base_class.load
Note 2: Sometimes the functionality above can be achieved with __init__. However, when the scheduler thread creates an ETLTask, it creates an instance and thus runs __init__. Therefore, you will want __init__ to be as simple as possible. On the other hand, init is run only by the task execution thread. So it can safely do more time consuming work. Again though this method is for when class inheritance is used, and that logic can not go into the load method.
Why does the scheduler create an instance? It does that in case a task needs a full instance and possibly parameter values in order to answer some of the methods like depends_on or mutually_exclusive_execution.
load()
load_parameters()
Load parameters for this task from the scheduler.log
Get a logger using the task name.log_logging_level()
mutually_exclusive_execution()
Override to provide a list of task names (or partial names that match modules) that this task can not run at the same time as.
If allow_concurrent_runs is false, defaults to a list with just self.name If allow_concurrent_runs is true, defaults to an empty list
mutually_exclusive_with_set
Build a set of modules this task is mutually exclusive with. The list is obtained using mutually_exclusive_execution. Each list member will be "normalized" to be a fully qualified name.name
Note -- Return value needs to be compatible with find_etl_classneeds_to_get_ancestor_statuses()
Override and return True if you want to get status updates on any ancestor.needs_to_get_child_statuses()
Override and return True if you want to get status updates on children.needs_to_ok_child_runs()
Override and return True if you need to give OK before children are allowed to run. See process_child_run_requestednormalized_dependents_set
Build a set of modules this task depends on. See depends_on. Each will be "normalized" to be a fully qualified name.parameter_names()
Returns a list of parameter namesparameters()
Returns a generator yielding tuples of parameter (name,value)parent_task
parent_task_id
process_child_run_requested(childRunRequested)
Override to examine child task before giving OK.process_child_status_update(childStatusUpdate)
Override to examine child task status (ChildRunFinished instances)process_messages(block=False)
Processes messages for this task. Should be called somewhere in any row looping.
Parameters: block (boolean) -- Block while waiting. Defaults to False. If block is True, this will run until a terminating message is received or an exception is thrown by the process_X calls. If block if False, you probably want to call inside a loop. Example Code:
from bi_etl.scheduler.exceptions import WorkflowFinished def process_child_status_update(self, childStatusUpdate): # Placeholder for real check if done example_all_done \= self.foo() if example_all_done: raise WorkflowFinished() def load(self): # Placeholder for real load code self.load_foo() #Begin waiting for children try: self.process_messages(block\=True) except WorkflowFinished: passregister_child(child_task_object)
register_object(obj)
Register an ETLComponent object with the task. This allows the task to 1) Get statistics from the component 2) Close the component when done
Parameters: obj (bi_etl.components.etlcomponent.ETLComponent) -- A sub-class of :class`~bi_etl.components.etlcomponent.ETLComponent` root_task
root_task_id
run(no_mail=None, parent_to_child=None, child_to_parent=None)
Should not generally be overridden. This is called smtp_to run the task's code in the init, load, and finish methods.scheduler
Get the existing -- class`bi_etl.scheduler.scheduler.Scheduler` that this task is running under. or Get an instance of :class`bi_etl.scheduler.scheduler_interface.SchedulerInterface` that can be used to interact with the main Scheduler.send_mesage(msg)
set_parameter(param_name: str, param_value: object, local_only: bool = False, commit: bool = True)
Add a single parameter to this task.
Parameters: set_parameters(local_only=False, commit=True, *args, **kwargs)
Add multiple parameters to this task. Parameters can be passed in as any combination of: * dict instance. Example add_parameters( {'param1':'example', 'param2':100} ) * list of lists. Example: add_parameters( [ ['param1','example'], ['param2',100] ] ) * list of tuples. Example: add_parameters( [ ('param1','example'), ('param2',100) ] ) * keyword arguments. Example: add_parameters(foo=1, bar='apple')
Parameters:
- local_only (boolean) -- Optional. Default= False. Add parameters to local task only. Do not record in the scheduler.
- commit (boolean) -- Optional. Default= True. Commit changes to the task database.
- args (list) -- list of lists. or list of tuples. See above.
- kwargs (dict) -- keyword arguments send to parameters. See above.
start_following_tasks()
Override to add tasks that should follow after this tasks to the scheduler. This is called at the end of ETLTask.runstatistics
Return the execution statistics from the task and all of it's registered components.status
task_id
task_rec
Updated