Wiki

Clone wiki

ENMA / modules / developers

EMPOWERING Analytical / ETL Modules - Documentation for developers

'Return home'

General information

Standard workflow

The standard workflow, used by the modules developed until now, is explained in the 'module workflow section'.

This 'Documentation for developers' section is made exclusively for developers who needs to adapt or create a new module in the platform.

Detailed input / output of actual modules

The description of the input and output data model for every existing module is explained in the 'detailed input / output section'.

Data models of the databases

In order to develop new modules it's very important to know how the data is stored in the databases. In the following sections are clearly explained:

Directory tree structure and brief description

The directory tree structure of the modules is initially divided by the analytical modules (/modules), the ETL modules (/ETL) and the generic module library (/lib). Also in the root folder (/) there are implemented the celery configuration files.

Inside the /modules folder, there is the main source code of every analytical module separated by directories. So, each analytical module have his own directory where all the needed specific scripts are places inside them (In this example: /modules/ot101). The main files inside it are the tasks.py and the config.json. The former is the script where the module function is defined which executes all the needed subtasks synchronously, the latter is the specific configuration file for that module.

However, in the root folder of modules (/modules) there is described also the general configuration file of the modules (/modules/config.json), the clients database connection functions (/modules/__init__.py) and the importation of all the module functions defined in each module folder (/modules/tasks.py).

In the /ETL folder there is the source code of every ETL module in the same schema than in the /module directory.

In the /lib folder there is located the source code of all the generic functions. In the root folder (/lib) there are the logging functions (/lib/logs.py), the functions used to build the HIVE queries (/lib/querybuilder.py) and all the generic Python functions used by all the modules (/lib/utils.py). In this folder there is also 4 directories for specific ETL functions (/lib/etl), Python map reduce scripts called by the utils.py file (/lib/mapreduce) and R generic scripts using the Rhipe library (/lib/rhipe) or using Rhadoop (/lib/rhadoop).

(root)
  |
  |
  +-- modules
  |    |
  |    +-- ot101
  |    |    |
  |    |    +-- R
  |    |    |   |
  |    |    |   +-- join_and_compare.R
  |    |    |
  |    |    +-- tasks.py
  |    |    +-- config.json
  |    |    +-- __init__.py
  |    |
  |    +-- otXXX
  |    |    |
  |    |    ...
  |    |
  |    + tasks.py
  |    + config.json
  |    + __init__.py
  |
  |
  |
  +-- ETL
  |    +-- measures
  |    |    |
  |    |    +-- tasks.py
  |    |    +-- hadoop_job.py
  |    |    +-- config.json
  |    |    +-- __init__.py
  |    |
  |    +-- xxx
  |    |    |
  |    |    ...
  |    |
  |    + tasks.py
  |    + config.json
  |    + __init__.py
  |
  |
  |
  +-- lib
  |    |
  |    +-- etl
  |    |    |
  |    |    +-- mapreduce
  |    |    |    |
  |    |    |    +-- __init__.py
  |    |    |    +-- MR_delete_keys_from_hbase.py
  |    |    |
  |    |    +-- __init__.py
  |    |    +-- delete_measures_hbase.py
  |    |    +-- delete_results_hbase.py
  |    |    +-- transformations.py
  |    |
  |    +-- mapreduce
  |    |    |
  |    |    +-- __init__.py
  |    |    +-- load_output_to_rest.py
  |    |    +-- load_output_to_hbase.py
  |    |
  |    +-- rhadoop
  |    |    |
  |    |    +-- MR_mean_meaneff.R
  |    |
  |    +-- rhipe
  |    |    |
  |    |    +-- customers_weather.R
  |    |    +-- MR_mean_meaneff.R
  |    |    +-- MR_mean_meaneff_tertiary_with_degree_days.R
  |    |    +-- MR_mean_meaneff_with_degree_days.R
  |    |    +-- MR_quantiles_with_degree_days.R
  |    |    +-- MR_yearly_distribution_with_degree_days.R
  |    |    +-- sequence_to_text.R
  |    |    +-- weather_calculations.R
  |    |    +-- weather_dependance.R
  |    |
  |    + __init__.py
  |    + logs.py
  |    + querybuilder.py
  |    + utils.py
  |
  |
  |
  +-- __init__.py
  +-- celery_backend.py
  +-- celeryconfig.py
  +-- envconfig.json

Example of creation of a new module

To create a new module, probably the best approach is to use a similar developed module as a template and make the needed changes in the subtasks, in order to calculate the desired results. In the next lines a list with the main steps for the creation of a new analytical module called 'ot000' is presented (module 'ot101' will be used as the template):

  1. Create a directory in the /modules folder called 'ot000'.
  2. Copy the tasks.py, __init__.py and config.json from module 'ot101' (/modules/ot101/*) and paste them inside the directory created in step 1.
  3. Open the tasks.py and config.json module and replace all the 'ot101' and 'OT101' to 'ot000' and 'OT000'.
  4. Add the import of the module_ot000 function. In the /modules/tasks.py add the line from ot000.tasks import module_ot000.
  5. Add this new module in the Celery task queue type. In the /celeryconfig.py file update the dictionary of the variable CELERY_ROUTES with another item with key 'ot000' and value {'queue':'modules'}.
  6. Now there is a module called 'ot000' which makes exactly the same analysis than the 'ot101' module.
  7. The following steps have to be focused into change the subtasks defined in 'module_ot000' located in /modules/ot000/tasks.py and the specific configurations defined in /modules/ot000/config.json in order to obtain the desired results.

Module tasks.py and config.json scheleton

In this section the schema of the module tasks.py is described. Remember that this file contains the module function which describes all the workflow of the module and is located in /modules/otXXX.

The configuration file of the module (config.json), located in /modules/otXXX, is a JSON file which describes the specific characteristics of this module. For example: the data output format, the temporary paths and some fixed configuration.

tasks.py schema

In the next indentation is explained tasks.py d

#!/usr/bin/python
# -*- coding: utf-8 -*-

"""LIBRARIES IMPORTATION"""

from __future__ import absolute_import
import sys

from datetime import datetime
import json
import itertools

#module imports
from modules import ModuleTaskFailed, ModuleTask
from lib.utils import *
from lib.logs import setup_log
from lib.querybuilder import QueryBuilder

# celery imports
from celery_backend import app
from celery import current_task
from celery.utils.log import get_task_logger
from celery.signals import after_setup_task_logger, after_setup_logger

# import function to execute the corresponding ETL's
from ETL.tasks import *



"""MODULE NAME"""

MODULE="ot000"



"""LOGGER CONFIGURATION"""

after_setup_logger.connect(setup_log)
after_setup_task_logger.connect(setup_log)
logger = get_task_logger('module.%s' % MODULE)



# CREATE THE MODULE CLASS

class ModuleOT000Task(ModuleTask):
    abstract = True
    def __init__(self):
        logger.debug('ModuleTask init for module %s' % MODULE)
        ModuleTask.__init__(self)

app.Task = ModuleOT000Task



####################################################################################################
#######     MODULE FUNCTION DEFINITION      ########################################################
####################################################################################################

@app.task(name='ot000')
def module_ot000(params):


    """ASSIGN THE NAME <MODULE> TO THIS MODULE"""

    module_ot000.MODULE = MODULE



    """CREATION OF THE CONTEXT OF THE MODULE"""

    logger.info('Starting Module ot000...')

    logger.debug('Builing task context with config, clients and report')
    context = {}
    context['current_task'] = current_task



    """LOAD TO CONTEXT THE CONFIG.JSON OF */modules/ot000* and */modules*

    context['config'] = module_ot000.config



    """TASK_UUID ASSIGNATION"""

    task_UUID = id_generator()
    context['config']['module']['paths'] = random_paths(context['config']['module']['paths'],'UUID', task_UUID)



    """CLIENTS CONNECTION"""

    # The functions hive, hdfs, mongo and hbase are defined in */modules/__init__.py*
    try:
        context['clients'] = {
                              'hive': module_ot000.hive,
                              'hdfs': module_ot000.hdfs,
                              'mongo': module_ot000.mongo,
                              'hbase': module_ot000.hbase
                              }
    except Exception, e:
        raise Exception('Error connecting to needed connection clients: %s' % e)



    """REPORT CREATION"""

    context['report'] = Report(monogodb_fd=context['clients']['mongo'], db=context['config']['app']['report']['db'], collection=context['config']['app']['report']['collection'])
    context['report']['task_id'] = current_task.request.id
    context['report']['task_UUID_paths_queries'] = task_UUID
    context['report']['started_at'] = datetime.now()
    context['report']['params'] = current_task.request.args
    context['report']['module'] = MODULE
    context['report']['progress'] = 1
    context['report'].update()



    """CHECK INITIAL PARAMS INCONSISTENCIES"""

    try:
        companyId = params['companyId']
    except KeyError, e:
        error_msg = 'Not enough parameters provided to module %s: %s' % (MODULE, e)
        logger.error(error_msg)
        raise ModuleTaskFailed(context, msg=error_msg)



    """STARTING THE MODULE SUBTASKS"""

    logger.debug('Starting module ot000 calculations for company %s' % companyId)

    context['report']['trace'] = {}

    context['job_meta'] = { 'current': 0, 'total': 0 }



    #######
    #### HERE, DEFINE ALL THE NEEDED SUBTASKS
    #######



    """MODULE FINISHED"""

    context['current_task'].update_state(state='SUCCESS', meta=True)
    logger.info('Module OT000 execution finished...')



    """MODULE REPORT FINAL SAVING"""

    context['report'].save()


    return True


if __name__ == '__main__':
    """
        params = {
                'companyId':1111111111
                 }

    """

Example for the specific config.json of the module:

{
  "R_scripts": {
     "example.R": "/path/to/R/example.R"
  },
  "paths" : {
    "input" : "/tmp/ot000/UUID/input_from_query",
    "result_dir": "hdfs:///tmp/ot000/UUID/result",
    "subtask_n": "hdfs:///tmp/ot000/UUID/subtask_n",
    "all": "/tmp/ot000/UUID"
  },
  "public": {
    "mongodb": {
        "collection": "OT000Results",
        "fields": [ "result_example1", "result_example2" ],
        "ttl": 24
    }
  },
  "hbase_table":{
    "name": "ot000",
    "cf": [ {
            "fields": [ "result_example1", "result_example2" ], "name": "results"
    } ]
  }
}

This configurations can be called from the tasks.py using the context['config']['module'] dictionary. For instance, to get the "example.R" path: context['config']['module']['R_scripts']['example.R'].

How to launch a module

Iniciate the celery process

Before the execution of the modules, the celery process has to be started with this shell sentence:

cd /home/empowering/<developer_user>
celery worker --app=celery_backend:app -l debug

When Celery is started, this process awaits to queue and launch new module executions. In the moment when a module is launched, all the logs that will produce, will be printed on the Celery screen. This logs are very helpful for the module developer to know if the execution is working properly or if it crashes in some subtask.

Launch from a Python console

To launch a module from a Python console, follow these steps:

  1. Open a Python console from the shell
cd /home/empowering/<developer_user>
python
  1. Import the module function (In this example, module ot000 and measures ETL functions are loaded).
from modules.tasks import module_ot000
from ETL.tasks import ETL_measures
  1. Create the params dictionary. This variable defines the input parameters of the module. (In this example, an example params of module ot101 is shown)
params = {
        'companyId': 0000000000,
        'ts_to': datetime(2015,1,31,23,59,59),
        'num_months': 12,
        'eff_users': 0.3,
        'criteria': ['postalCode','power__500','tariffId','power__500 + postalCode','power__1000 + postalCode','tariffId + postalCode'],
        'triggerValue': 100,
        'type': 'electricityConsumption',
        'quantile_min': 0.15, #Put 0 to consider data from the minimum consumption in the average calculations
        'quantile_max': 0.997, #Put 1 to consider data up to the maximum consumption in the average calculations
        'timezone':"Europe/Madrid",
        'setupId': ObjectId()
        }
  1. Launch the module (In this example, module ot000 is launched)
result = module_ot000.delay(params)

Launch multiple modules from an external script

If a synchronous execution of various modules is needed, a Python script can do the job.

The following script executes in a synchronously way some of the modules. In this example, the execution sequence is: launch the contracts ETL, wait until it finishes, launch the measures ETL, wait until it finishes, and finally, launch the ot102 module. This script has to be placed in the folder /home/empowering/<developer_user> and is launched from the shell doing python <name_script>.py.

from datetime import datetime
from bson import ObjectId

"""Final ts_to datetime for all the modules"""
ts = datetime(2015, 10, 31,23,59,59)

"""ETL contracts"""
try:
    from ETL.tasks import ETL_contracts
    params = {
             'companyId': 0000000000,
             }
    print 'ETL of contracts. Parameters provided: %s' % params
    print 'Running...'
    result = ETL_contracts.delay(params)
    res = result.get()
    print "result: %s" % res
except Exception, e:
    print 'ERROR: %s' % e

"""ETL measures"""
try:
    from ETL.tasks import ETL_hadoop_v2
    params = {
             'companyId': 0000000000,
             }
    print 'ETL of measures. Parameters provided: %s' % params
    print 'Running...'
    result = ETL_hadoop_v2.delay(params)
    res = result.get()
    print "result: %s" % res
except Exception, e:
    print 'ERROR: %s' % e

"""OT102"""
try:
    from modules.tasks import module_ot102
    params = {
            'companyId': 0000000000,
            'ts_to': datetime.now(),
            'num_months': 24,
            'type': 'electricityConsumption',
            'timezone': 'Europe/Vienna',
            'setupId': ObjectId("563798433425d95e7c573148")
            }
    params['ts_to']=ts
    print 'Module OT102. Parameters provided: %s' % params
    print 'Running...'
    result = module_ot102.delay(params)
    res = result.get()
    print "result: %s" % res
except Exception, e:
    print 'ERROR: %s' % e

Updated