Wiki

Clone wiki

ENMA / modules / developers_python

Python tutorial

'Return home'

Other developers tutorials

MongoDB connection and usage

Read, write and make queries to MongoDB database from Python is permited thanks to the pymongo library.

This is an example of connection:

from pymongo import MongoClient

def mongo():

    try:
        print 'Connecting to the Mongo client'
        mongo_con = MongoClient( <host>, <port> )
        mongo_con[ <database> ].authenticate( <username>, <password> )
        print 'Succesfully connected!'

    except Exception, e:
        print 'There was some error: %s' % e

    return mongo_con

con = mongo()

As seen, this is quite simple, but, as explained above in this tutorial, a generic Mongo connection function is defined in modules/__init__.py.

This function must be always used to define the context['clients']['mongo'] variable in the modules.

In the following list there are some typical MongoDB instances executed from Python. In this examples, consider that the context['clients']['mongo'] is the mongo variable and the database used is rest_service.

  1. Generate a cursor to find documents in a collection:
    • Example: Find all the documents with results of January 2015 and companyId 0000000000 in OT000Results collection.

      cursor = mongo['rest_service']['OT000Results'].find({
                                                           'companyId':0000000000,
                                                           'month':201501
                                                          })
      
  2. Find a single document
    • Example: Find, in OT000Results collection, the first document which have a month greater than January 2009 and lower than October 2010 for contract XSD of companyId 0000000000.

      result = mongo['rest_service']['OT000Results'].find_one({
                                             'companyId':0000000000,
                                             'month':{'$gt':200901,'$lt':201010},
                                             'contractId':'XSD'
                                            })
      
  3. Update a document (or multiple documents)
    • Example: Update the field 'language' to spanish inside the 'info' dictionary in the OT000Results collection, only for documents which have a month greater than January 2009 and lower than October 2010.

      # Update only the first document that meets the query
      result = mongo['rest_service']['OT000Results'].update({
                                               'month':{'$gt':200901,'$lt':201010}
                                              },{
                                               'info.language':'spanish'
                                              })
      
      # Update all the documents which meet the query
      result = mongo['rest_service']['OT000Results'].update({
                                           'month':{'$gt':200901,'$lt':201010}
                                          },{
                                           'info.language':'spanish'
                                          }, multi=True )
      
  4. Delete documents
    • Example: Remove, in OT000Results collection, the documents which have a month greater than or equal to January 2009 and lower than October 2010 for contractId XSD of companyId 0000000000.

      result = mongo['rest_service']['OT000Results'].remove({
                                           'companyId':0000000000,
                                           'month':{'$gte':200901,'$lt':201010},
                                           'contractId':'XSD'
                                          })
      
  5. Generate an index in a defined collection
    • Example: Create an index in OT000Results collection with companyId, month and contractId fields.

      from pymongo import ASCENDING, DESCENDING
      result = mongo['rest_service']['OT000Results'].create_index([
                                                 ("companyId", ASCENDING),
                                                 ("month", DESCENDING),
                                                 ('contractId', ASCENDING)
                                                ])
      

Here it is the documentation of pymongo library for further information: http://api.mongodb.org/python/2.7rc0/tutorial.html

Load results to MongoDB

Finally, the load_output_to_rest function, defined in /lib/utils is the Python Map Reduce script which transform and load all the text rows contained in the files inside the HDFS temporary results directory to definitive MongoDB documents inside an specific collection. The different fields of the document are usually defined in the specific config.json of the module and inside the function arguments.

load_output_to_rest_data(context, input, fields, exists_setupId=True, collection=None, mongo_params=None,ts_from_remove=None, ts_to_remove=None, company_to_remove=None, type_to_remove=None, key=['month','companyId','contractId','setupId'],update=False,sep=",")

  • Arguments:
    • context: the context dictionary of the module.
    • input: HDFS text file or folder containing results to be loaded into MongoDB.
    • fields: list of the name of the different columns of the input file.
      • Specific considerations of this argument:
        • The first colum must not be considered because it is the key column which have specific properties (see the key argument of this function).
        • The column names will be also the field names of each document stored in the output collection.
        • It automatically transform the values of each column in the most feasible Python class, but for special classes and considerations, it exists a methodology in order to transform the initial string class of each input column to a datetime object, a dictionary or to avoid the load of an specific column.
          • Add the suffix ('~DATETIME~"%Y/%m/%d %H:%M"') in the column name string to transform a string, like '2015/03/10 15:44', to a valid datetime object. The datetime format can be adapted to fit the input column format.
          • Add the suffix ('~JSON') in the column name string to transform a string to a valid dictionary.
          • Add the suffix ('~DO_NOT_LOAD') in the column name string to not load this column in MongoDB
    • exists_setupId: if false, the setupId is not considered in the loading process.
    • collection: name of the output MongoDB collection.
    • mongo_params: if some different MongoDB connection has to be used, pass the connection in this field. If the field is not defined, the function will use by default the context['client']['mongo'] connection.
    • arguments to delete deprecated documents of the collection(optional, but mandatory to define all them if the delete is needed):
      • ts_from_remove: defines the initial datetime from which the results have to be deleted.
      • ts_to_remove: defines the final datetime until the results have to be deleted.
      • company_to_remove: defines the companyId of the results that have to be deleted.
      • type_to_remove: defines the type of consumption of the results that have to be deleted.
    • key: list with the name of the different columns of the key field of the result. Remember that the key columns must be separated with "~".
    • update: if the results have to be updated using que 'update' from pymongo library. It is a valid methodology for small loads of results (<1M). For bigger loading processes, it's better to fill the ts_from_remove, ts_to_remove, company_to_remove and type_to_remove arguments to delete the deprecated results before the insertion of the new ones.
    • sep: column separator of the input file.

HBase connection and usage

Read and write in the HBase database from Python scripts is permited thanks to the happybase library.

This is an example of connection:

import happybase

hbase = happybase.Connection(self.config['app']['hbase']['host'], self.config['app']['hbase']['port'])
hbase.open()

The generic HBase connection function is defined in modules/__init__.py. This function must be always used to define the context['clients']['hbase'] variable in the modules.

In the following list there are some typical HBase processes executed from Python. Remember that the advanced queries to HBase tables are always performed by HIVE, but simple processes like, create or delete a table, or to insert or remove some specific keys, are done using the happybase library. In this examples, consider that the context['clients']['hbase'] is the hbase variable.

Some typical HBase processes executed from Python are:

1. Table creation An HBase table 'ot000' is created. Their unique column family is 'results'.

hbase.create_table( 'ot000', {'results': dict()} )

2. Table delete An HBase table 'ot000' is deleted.

hbase.delete_table( 'ot000', disable=True )

3. List of existing tables Generate a string list with the name of all existing tables in HBase

hbase.tables()

3. Insert a row Insert in the key '25110~0000000000' of table 'ot000' some new column values in the results column family.

hbase_table = hbase.table('ot000')
hbase_table.put( '25110~0000000000', {'results:column1': '23', 'results:column2': 'postalCode'} )

4. Remove a specific key Remove the row with key '25110~0000000000' of table 'ot000'

hbase_table = hbase.table('ot000')
hbase_table.delete('25110~0000000000')

The happybase documentation for further information can be found in http://happybase.readthedocs.org/en/latest/api.html

Finally, the 'load_output_to_hbase' function, defined in /lib/utils.py calls the Python Map Reduce script which transform and load to HBase all the rows contained in the temporary results text files temporally stored in HDFS.

load_output_to_hbase(context, input, table, sep=",") - Arguments:

  • context: the context dictionary of the module.
  • input: HDFS text file or folder containing results to be loaded into HBase.
  • table: Dictionary with this structure {'name':<>,'fields':[<>],'cf':<>}. The 'name' key of the dictionary is the HBase table where the results have to be loaded. The 'fields' key of the dictionary is the list of column names for each of the input text file columns (Remember to not consider the first column of the file, because it is the results key column). Finally, the 'cf' key of the dictionary is the name of the column family of the whole 'fields' list.
  • sep: Column separator of the input text file.

Create a basic Map Reduce job

To explain the creation of a new Map Reduce job using Python, a very simple example will be considered.

Imagine we have an input text file in HDFS (In path /tmp/test_mrjob) with lines composed by the monthly consumption, the postal code and the country code for every contract of a company separated by 't'. First three lines of input file:

412.15\t25110\tES
234.19\t25006\tES
768.85\t08080\tES

The job will consist to calculate the average monthly consumption for every country code and postal code combination (<postalCode>~<countryCode>").

In order to execute this job, the steps to follow are:

  1. Create a Python class defining the mapper, the reducer, and all the MRJob functions needed for the Map Reduce job. This class can be defined in an external file, for example, in a script called hadoop_job.py inside the directory of a module (e.g. /modules/ot000/). The main structure of this file will be:
# Library imports (All the libraries for every function used in the MRjob have to be imported here)
from mrjob.job import MRJob
from json import load

class example_mrjob(MRJob):
    # Mapper initialization to load global configurations for the mapper
    def mapper_init(self):

        # Load the config variables
        fn = glob.glob('*.json')
        self.config = load(open(fn[0]))

        # All the connections to clients (HBase, MongoDB, HDFS,...) and assignation of needed global variables for the mapper have to be defined here.

    # Reducer initialization to load global configurations for the mapper
    def reducer_init(self):

        # Load the config variables
        fn = glob.glob('*.json')
        self.config = load(open(fn[0]))

        # All the connections to clients (HBase, MongoDB, HDFS,...) and assignation of needed global variables for the reducer have to be defined here.

    # Mapper definition
    def mapper(self, _, line):

        # Convert the original string to a strings list separated by '\t'
        line = line.split('\t')

        # Val variable definition
        val = line[self.config['value_column']]

        # Key variable definition
        key = '%s~%s' % (line[self.config['postalCode_column']], line[self.config['countryCode_column']])

        # Emit the key - value to the reducer. All the keys with the same name will be treated by the same reducer process.
        yield key,val

    # Reducer definition
    def reducer(self,key,values):

        # Sum all the values and count the number of objects in values list
        c = 0
        s = 0.0
        for item in values:
            s += float(item)
            c += 1

        # Calculate the average monthly consumption
        avg = s/float(c)

        # Emit the key - value to the output file
        yield key, avg


# Run the MRjob when call the example_mrjob
if __name__ == '__main__':
    example_mrjob.run()
  1. In the tasks.py of the module you are developing, insert the following code to execute the MRjob defined in the class called 'example_mrjob()' inside the script hadoop_job.py.
# Import the example_mrjob class
from hadoop_job import example_mrjob

# Create temporary file to upload configuration variables to the Map Reduce job
f = NamedTemporaryFile(delete=False, suffix='.json')
f.write(JSONdumps(context['config']))
f.close()

# Create the hadoop job instance adding file location to be uploaded
mr_job = example_mrjob(args=['-r', 'hadoop', '/tmp/test_mrjob', '--file', f.name, '--output-dir', '/tmp/output_test_mrjob', '--python-archive', path.dirname(lib.__file__)])
with mr_job.make_runner() as runner:
    try:
        runner.run()
    except Exception,e:
        f.unlink(f.name)
        raise Exception('Error running MRJob process using hadoop: %s' % e)
f.unlink(f.name)
  1. Once executed, check the correction of the results in HDFS (/tmp/output_test_mrjob).

Updated