1. Peter Rudenko
  2. Templeton API


Templeton API / templeton / src / templeton / api.py

# -*- coding: utf-8 -*-
import urllib
import urllib2
import simplejson as json

TEMPLETON_URL = "http://ec2-75-101-199-141.compute-1.amazonaws.com:50111/templeton/v1/"

class Templeton(object):

    def get_query(self, url, data):
        Make GET query to templeton url.
        data = urllib.urlencode(data)
        response = urllib2.urlopen(TEMPLETON_URL + url + "?" + data)
        return json.loads(response.read())

    def post_query(self, url, data):
        Make POST query to templeton url.
        data = urllib.urlencode(data)
        req = urllib2.Request(TEMPLETON_URL + url, data)
        response = urllib2.urlopen(req)
        return json.loads(response.read()) 

    def pig_query(self, user="hdfs", execute=None, pig_file=None, statusdir=None, callback=None):
        Create and queue a Pig job.

        Keyword arguments:
        user -- Hue/Hadoop user
        execute -- String containing an entire, short pig program to run. (e.g. pwd)
        file -- HDFS file name of a pig program to run. (One of either "execcute" or "file" is required )
        statusdir -- A directory where Templeton will write the status of the Pig job. If
                     provided, it is the caller's responsibility to remove this directory when done.
        callback -- Define a URL to Optional be called upon job completion. You may embed a specific job
                    ID into this URL using $jobId. This tag will be replaced in the callback URL with this job's job

        Returns dict:
        id -- A string containing the job ID similar to "job_201110132141_0001".
        info -- A JSON object containing the information returned when the job was queued.
        if not any([execute, pig_file]):
            raise Exception("""One of either "execcute" or "file" is required""")
        data = {"user.name": user}
        if execute:
            data['execute'] = execute
        if pig_file:
            data['file'] = pig_file
        if statusdir:
            data['statusdir'] = statusdir
        if callback:
            data['callback'] = callback
        return self.post_query("pig", data)

    def check_job(self, job_id, user="hdfs"):
        Check the status of a job and get related job information given its job ID.
        data = {'user.name': user}
        return self.get_query("queue/%s" % job_id, data)

if __name__ == "__main__":
    t = Templeton()
    pig_query = t.pig_query(user="hdfs", execute="pwd")
    job = t.check_job(pig_query['id'])
    print job