Wiki

Clone wiki

ENMA / modules / developers_hive

HIVE tutorial

'Return home'

HIVE usage

HIVE can be used directly from the shell or using the Python client connection function defined in /modules/__init__.py. In the modules, the HIVE connection is defined in context['clients']['hive'].

HIVE shell console:

root@empowering:/# hive

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
Hive history file=/tmp/root/hive_job_log_343939e8-5447-471b-92cf-3dff3d3604f3_299554502.txt
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hive/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

hive>

This tutorial will explain how to make queries using the shell or from a Python script.

For more information about HIVE usage visit: https://cwiki.apache.org/confluence/display/Hive/Tutorial. Also there is an interesting manual about the available HIVE operators and User-Defined Functions (UDF): https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

HIVE tables administration

HIVE tables have to be created before make queries using HIVE. This tables allow to read data from HBase or read and write data from HDFS.

There are some of them which are always available in HIVE, like customers, tariffs, stations, postalcode_geoloc and all the HDFS measures tables (see the ETL_measures_hive description in the `'detailed input / output section'`_). All these managing tasks can executed directly from the shell or with Python using the generic functions defined in /lib/utils.py.

Create a HIVE table to read data from an HBase table

From shell:

In this example, the 'ot000hbase' HIVE table is created from the 'ot000' HBase table. The key of this Hbase table is formed as keyfield1~keyfield2, and it has two columns called 'field1' and 'field2' of the 'results' column family.

CREATE EXTERNAL TABLE IF NOT EXISTS ot000hbase
(key struct< keyfield1:string, keyfield2:int >, field1 float, field2 string)
ROW FORMAT DELIMITED COLLECTION ITEMS TERMINATED BY '~'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key, results:field1, results:field2')
TBLPROPERTIES ('hbase.table.name' = 'ot000')

From Python:

  • create_measures_temp_table(context, reading, companyId, id_task, cumulative = False, tertiary=False, tariffs=False, number_period_tertiary = 6)
    • It is a function to create a HIVE table to read HBase measures tables, either tertiary, residential time of use or standard measures.
    • Arguments:
      • context: the context dictionary of the module.
      • reading: is the consumption type. For instance, electricityConsumption.
      • companyId: the companyId of the utility.
      • id_task: the task_UUID variable defined at the beginning of the module.
      • cumulative: True if it's needed to map the HBase cumulative consumption columns in the HIVE table.
      • tertiary: True if the consumption type is tertiary or residential time of use.
      • tariffs: True if it's needed to map the HBase tariff description columns in the HIVE table.
      • number_period_tertiary: is the number of periods to map in the HIVE table (only used if tertiary is True.
  • create_weather_measures_temp_table(context, reading, id_task)
    • It is a function to create a HIVE table to read HBase weather measures.
    • Arguments:
      • context: the context dictionary of the module.
      • reading: is the consumption type. For instance, temperatureAir.
      • id_task: the task_UUID variable defined at the beginning of the module.
  • create_module_oldResults_temp_table(context, module, id_task, fields, key_fields=[('month','bigint'),('companyId','string'),('contractId','string'),('setupId','string')])
    • It is a function to create a HIVE table to read the historical module results stored in HBase
    • Arguments:
      • context: the context dictionary of the module.
      • module: name of the module. For instance, ot101.
      • id_task: the task_UUID variable defined at the beginning of the module.
      • fields: a list of tuples with the HBase column name and desired class for that column in the HIVE table (Remember that all HBase columns are strings). The available class formats in HIVE columns are: bigint, int, string and float.
      • key_fields: a list of tuples with the HBase key fields and desired class for that column in the HIVE table (Remember that all HBase keys are strings). The default HBase key field separator is "~".
  • create_summary_results_temp_table(context, module, columns, key=[("month","bigint"),("companyId","bigint"),("criteria","string"),("groupCriteria","string"),("setupId","string")])
    • It is a function to create a HIVE table to read the summary results HBase table. This table contains the summary comparative values for every utility and group criteria.
    • Arguments:
      • context: the context dictionary of the module.
      • module: name of the module. For instance, ot101.
      • columns: a list of tuples with the HBase column name and desired class for that column in the HIVE table (Remember that all HBase columns are strings). The available class formats in HIVE columns are: bigint, int, string and float.
      • key: a list of tuples with the HBase key fields and desired class for that column in the HIVE table (Remember that all HBase keys are strings). The default HBase key field separator is "~".

Create an auxiliar HIVE table for an HDFS text file

From shell:

In this example, the 'ot000input' HIVE table is created. The output text table has two columns called 'column1' and 'column2', and has a comma as the column separator. It is located in '/tmp/ot000/input_from_query' HDFS directory.

CREATE EXTERNAL TABLE IF NOT EXISTS ot000input (column1 int, column2 string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE LOCATION '/tmp/ot000/input_from_query';

From python:

  • create_hive_module_input_table(context, name, location, fields, id_task, sep="\t")
    • It is a function to create a HIVE table which reads/writes in HDFS.
    • Arguments:
      • context: the context dictionary of the module.
      • name: name of the HIVE table.
      • location: HDFS path where the temporary output files are stored.
      • fields: a list of tuples with the column name in the input text file and desired class for that column in the HIVE table. The available class formats in HIVE columns are: bigint, int, string and float.
      • id_task: the task_UUID variable defined at the beginning of the module.
      • sep: the column separator of the output text files. By default is "\t".
  • create_hive_partitioned_table(context, type, companyId, table_fields, partitioner_fields, location, table_needs_to_be_recreated, sep='\t')
    • It is a function to create a partitioned HIVE table of measures which reads/writes in HDFS. This function is used to generate the partitioned HDFS measures files, which are much quicker to read than the HBase measures tables.
    • Arguments:
      • context: the context dictionary of the module.
      • type: is the consumption type. For instance, electricityConsumption.
      • companyId: the companyId of the utility.
      • table_fields: a list of tuples with the column name in the input text file and desired class for that column in the HIVE table. The available class formats in HIVE columns are: bigint, int, string and float.
      • partitioner_fields: a list of tuples with the column name to partition in the input text file and the desired class for that column in the partitioned HIVE table. The available class formats in partitioned HIVE columns are: int and string.
      • location: HDFS path where the HDFS measures are stored.
      • table_needs_to_be_recreated: if True, before to create the new HIVE table, the function deletes the previous one with the same name. If False, the HIVE table will append and will update the measures from the previous version, but will not delete measures removed in HBase.
      • sep: the column separator of the HDFS measure text files. By default is "\t".

Delete an auxiliar HIVE table

From shell:

This example delete the 'ot000input' HIVE table.

DROP TABLE ot000input;

From Python:

  • delete_measures_temp_table(context, table_name)
    • This function is to delete temporary HBase measures tables
    • Arguments:
      • context: the context dictionary of the module.
      • table_name: the name of the table to delete.
  • delete_hive_module_input_table(context, name)
    • This function is to delete any module input HIVE table
    • Arguments:
      • context: the context dictionary of the module.
      • name: the name of the table to delete.

Make queries

The HIVE queries can be executed directly from the HIVE shell or from a Python script.

From the shell:

# This sentence show all the available HIVE tables
SHOW TABLES;

# This sentence describe the fields of a HIVE table called consumptionXX
DESCRIBE consumptionXX;

# This sentence shows all the contractId and postalCode pairs for utility 0000000000
SELECT key.contractId, postalCode FROM customers WHERE key.company=0000000000;

From Python:

To execute HIVE queries from Python, which is mandatory for a module developer, a class called 'QueryBuilder()' is used to build and execute the needed instances. This class is defined in /lib/querybuilder.py and contains some functions to build and launch a query sentence.

The QueryBuilder() functions are:

  • FROM functions:
    • add_from(table,alias=None): It defines the main HIVE table where the data is stored and the alias for this table.
  • JOIN functions:
    Examples of HIVE queries using joins.
    • add_join(table, alias, condition=None): This function joins the rows between two tables using an optional condition. It considers only the rows which have a value (not NULL) in the selected columns of the two tables.
    • add_full_outer_join(table,alias,condition=None): This function joins the rows between two tables using an optional condition. It considers all the rows of the two tables, also the selected columns which have a NULL value, either of the main table or the joined one.
    • add_left_outer_join(table,alias,condition=None): This function joins the rows between two tables using an optional condition. It considers all the rows of the main table, also the ones which have NULL value in some of their selected columns. In contrast, only the rows with values (not NULL) are considered from the selected columns of the joined table.
    • add_right_outer_join(table,alias,condition=None): This function joins the rows between two tables using an optional condition. It considers all the rows of the joined table, also the ones which have NULL value in some of their selected columns. In contrast, only the rows with values (not NULL) are considered from the selected columns of the main table.
  • INSERT functions:
    • add_insert(table=None, overwrite=True, directory=None, partition=None): This function defines the place where the output is stored. If the table argument is given, the output is stored in that table overwriting by default the previous data. In this case, the table has to be created before. In contrast, if the directory argument is given, the output of the query is stored in the specified path. The partition argument is useful to give the list of columns which split the HIVE partitioned table.
  • SELECT functions:
    • add_select(select): This function defines the output calculation to do in each of the columns of the output table. All kind of UDFs functions can be used. All operators and HIVE functions can be used with this instance, look at the 'HIVE Operators and UDFs official manual'. The select argument have to be defined using a string list separated by commas. The columns can be associated with an specific alias using the structure 'as <alias>' after each column definition. This alias is very important if the query has to be ordered.
  • FILTERING and ORDERING functions:
    • add_where(condition): This function defines a condition to the query. All operators and HIVE functions can be used with this instance, look at the 'HIVE Operators and UDFs official manual'.
    • add_and_where(condition): This function add more where conditions to the previous one.
    • add_order(name, order=None): This function defines the HIVE column which have to be ordered. Remember to use a column alias in the 'name' argument.
  • GROUPING functions:
    • add_groups(groups): This function defines the grouping fields for every calculation made in the 'select' instance. For example, if the monthly consumption has to be calculated for every month and every contract, in the add_groups instance, has to be defined the contract, the year and the month columns. The 'groups' argument of this function has to be defined as a Python list of strings.
  • OTHER functions:
    • add_dynamic(): This function has to be called before some partitioned HIVE table is executed.
    • create_query(): This function joins all the instances in order to generate the final string sentence to send via HIVE thrift later.
    • execute_query(): This function execute the create_query() sentence in HIVE.

Example query which reads measures from HBase

From shell:

Query of the number of measures and total electricity consumption by month of the contractId 000111 of the spanish companyId 0000000000. Before launch this query, remember to create the HIVE table to read the measures from HBase (electricityConsumption_0000000000 in this case). The results are shown on the screen ordered by month.

SELECT key.contractId,
       year(from_utc_timestamp(from_unixtime(key.ts),"Europe/Madrid")),
       month(from_utc_timestamp(from_unixtime(key.ts),"Europe/Madrid")),
       min(key.ts) as minimum_timestamp,
       count(value),
       sum(value)
FROM electricityConsumption_0000000000
WHERE contractId='000111'
GROUP BY key.contractId,
         year(from_utc_timestamp(from_unixtime(key.ts),"Europe/Madrid")),
         month(from_utc_timestamp(from_unixtime(key.ts),"Europe/Madrid"))
ORDER BY minimum_timestamp;

From Python:

Query of the number of measures and total electricity consumption by month from January 1st 2014 to December 31th 2014 for all the contracts of the spanish companyId 0000000000. This script considers the creation of all the necessary HIVE tables (see the table_from and the table_input variables).

The variables used in this example are:

type = 'electricityConsumption'
companyId = 0000000000
timezone = 'Europe/Madrid'
ts_to = datetime(2014,1,1,0,0,0)
ts_to = datetime(2014,12,31,23,59,59)
task_UUID = 'TEST'
context = <Context dictionary of the module>

The script to launch the query is:

from lib.querybuilder import QueryBuilder

qb = QueryBuilder(context['clients']['hive'])

table_from = create_measures_temp_table(context, type, companyId, task_UUID)

fields = [('contractId','string'),('total_consumption','float'),('number_measures','int'),('month','int'),('year','int')]
table_input = create_hive_module_input_table(context, 'OT000Input', context['config']['module']['paths']['input'], fields, task_UUID)

qb = qb.add_from(table_from, 'e').add_insert(table=table_input)
qb = qb.add_select('e.key.contractId,\
                    sum(e.value),\
                    count(e.value),\
                    month(from_utc_timestamp(from_unixtime(e.key.ts),"%s")),\
                    year(from_utc_timestamp(from_unixtime(e.key.ts),"%s"))' % (timezone,timezone)
                  )
qb = qb.add_where( 'unix_timestamp(from_utc_timestamp(from_unixtime(e.key.ts),"%s")) >= unix_timestamp("%s","yyyy-MM-dd HH:mm:ss")' % (timezone, ts_to) )
qb = qb.add_and_where( 'unix_timestamp(from_utc_timestamp(from_unixtime(e.key.ts),"%s")) <= unix_timestamp("%s","yyyy-MM-dd HH:mm:ss")' % (timezone, ts_from) )
qb = qb.add_groups([ 'e.key.contractId',
                     'month(from_utc_timestamp(from_unixtime(e.key.ts),"%s"))' % timezone,
                     'year(from_utc_timestamp(from_unixtime(e.key.ts),"%s"))' %timezone
                  ])

qb.execute_query()

Example query which reads measures from HDFS

From shell: Query of the number of measures and total electricity consumption by month of the contractId 000111 of the spanish companyId 0000000000. The results are shown on the screen ordered by month.

SELECT contractId,
       year,
       month,
       min(ts) as minimum_timestamp,
       count(value),
       sum(value)
FROM electricityConsumption_0000000000_HDFS
WHERE contractId='000111'
GROUP BY contractId,
         year,
         month
ORDER BY minimum_timestamp;

From Python: Query of the number of measures and total electricity consumption by month from January 1st 2014 to December 31th 2014 for all the contracts of the spanish companyId 0000000000. This script considers the creation of all the necessary HIVE tables (see the table_from and the table_input variables).

The variables used in this example are:

type = 'electricityConsumption'
companyId = 0000000000
timezone = 'Europe/Madrid'
ts_to = datetime(2014,1,1,0,0,0)
ts_to = datetime(2014,12,31,23,59,59)
task_UUID = 'TEST'
context = <Context dictionary of the module>

The script to launch the query is:

from lib.querybuilder import QueryBuilder

qb = QueryBuilder(context['clients']['hive'])

table_from = "%s_%s_HDFS" % (params['type'], companyId)

fields = [('contractId','string'),('total_consumption','float'),('number_measures','int'),('month','int'),('year','int')]
table_input = create_hive_module_input_table(context, 'OT000Input', context['config']['module']['paths']['input'], fields, task_UUID)

qb = qb.add_from(table_from, 'e').add_insert(table=table_input)
qb = qb.add_select('e.contractId,\
                    sum(e.value),\
                    count(e.value),\
                    e.month,\
                    e.year'
                  )
qb = qb.add_where( 'e.ts >= unix_timestamp("%s","yyyy-MM-dd HH:mm:ss")' % (ts_to) )
qb = qb.add_and_where( 'e.ts <= unix_timestamp("%s","yyyy-MM-dd HH:mm:ss")' % (ts_from) )
qb = qb.add_and_where('e.value is not null')
qb = qb.add_groups([ 'e.contractId',
                     'e.month',
                     'e.year'
                  ])

qb.execute_query()

Create a query which joins HDFS measures and contracts characteristics

From shell: Query of the number of measures and the total electricity consumption by month, and the postal code and tariffId of the contractId 000111 of the spanish companyId 0000000000. The results are shown on the screen ordered by month.

SELECT e.contractId,
       e.year,
       e.month,
       min(e.ts) as minimum_timestamp,
       count(e.value),
       sum(e.value),
       c.postalCode,
       c.tariffId
FROM electricityConsumption_0000000000_HDFS e
JOIN customers c ON c.key.contractId = e.contractId
WHERE e.contractId='000111'
GROUP BY e.contractId,
         e.year,
         e.month,
         c.postalCode,
         c.tariffId
ORDER BY minimum_timestamp;

From Python: Query, between January 1st 2014 and December 31th 2014, the number of measures and the total electricity consumption by month, and some customer characteristics of all the contracts of the spanish companyId 0000000000. Select only the measures of the contracts which have not null values in the customer characteristics (in this example: postal code and tariff identificator) This script considers the creation of all the necessary HIVE tables. The output results will be stored in the OT000Input_TEST table.

The variables used in this example are:

type = 'electricityConsumption'
companyId = 0000000000
timezone = 'Europe/Madrid'
ts_to = datetime(2014,1,1,0,0,0)
ts_to = datetime(2014,12,31,23,59,59)
task_UUID = 'TEST'
context = <Context dictionary of the module>

The script to launch the query is:

from lib.querybuilder import QueryBuilder

qb = QueryBuilder(context['clients']['hive'])

table_from = "%s_%s_HDFS" % (params['type'], companyId)

fields = [('contractId','string'),('total_consumption','float'),('number_measures','int'),('month','int'),('year','int')]
table_input = create_hive_module_input_table(context, 'OT000Input', context['config']['module']['paths']['input'], fields, task_UUID)

qb = qb.add_from(table_from, 'e').add_insert(table=table_input)
qb = qb.add_left_outer_join('customers','c','c.key.contractId = e.contractId')
qb = qb.add_select('e.contractId,\
                    sum(e.value),\
                    count(e.value),\
                    e.month,\
                    e.year'
                  )
qb = qb.add_where( 'e.ts >= unix_timestamp("%s","yyyy-MM-dd HH:mm:ss")' % (ts_to) )
qb = qb.add_and_where( 'e.ts <= unix_timestamp("%s","yyyy-MM-dd HH:mm:ss")' % (ts_from) )
qb = qb.add_and_where('e.value is not null')
qb = qb.add_groups([ 'e.contractId',
                     'e.month',
                     'e.year'
                  ])

qb.execute_query()

Updated