1. Christos Kannas
  2. DocIRHadoop

Source

DocIRHadoop / InitHadoop / setupHadoop.py

Christos Kannas cc98436 





Christos Kannas 156dee3 





Christos Kannas cc98436 
Christos Kannas 156dee3 
Christos Kannas cc98436 
Christos Kannas 156dee3 



Christos Kannas cc98436 
Christos Kannas 156dee3 

Christos Kannas cc98436 
Christos Kannas 156dee3 
Christos Kannas cc98436 
Christos Kannas 156dee3 

Christos Kannas cc98436 
Christos Kannas 156dee3 
Christos Kannas cc98436 
Christos Kannas 156dee3 
































Christos Kannas e1e88db 
Christos Kannas 156dee3 

































Christos Kannas e1e88db 

Christos Kannas 156dee3 





Christos Kannas e1e88db 
Christos Kannas dd6fa76 
Christos Kannas e1e88db 
Christos Kannas dd6fa76 




Christos Kannas 156dee3 


Christos Kannas e1e88db 



Christos Kannas 156dee3 




Christos Kannas e1e88db 







Christos Kannas 156dee3 










Christos Kannas e1e88db 
Christos Kannas dd6fa76 

Christos Kannas 156dee3 





Christos Kannas cc98436 
Christos Kannas 156dee3 

















Christos Kannas cc98436 

Christos Kannas 156dee3 
#!/usr/bin/env python

import os
import subprocess

class Hadoop:
    """Class to setup Hadoop environment."""
    def __init__(self):
        """Initialize Hadoop Environment."""
        self.hadoop_dir = None
        if ("HADOOP_HOME" in os.environ.keys()):
            self.hadoop_dir = os.environ["HADOOP_HOME"]

        return

    def isHadoop(self):
        if (self.hadoop_dir is None):
            return 0
        return 1

    def setHadoopLoc(self, path):
        self.hadoop_dir = os.path.abspath(path)

        return

    def goHadoop(self):
        os.chdir(self.hadoop_dir)

        return

    def startHadoop(self):
        """Start Hadoop."""
        try:
            subprocess.call(["bin/start-all.sh"])
        except:
            raise

        return

    def stopHadoop(self):
        """Stop Hadoop."""
        try:
            subprocess.check_call(["bin/stop-all.sh"])
        except:
            raise

        return

    def lsHadoop(self):
        """Check HDFS. """
        try:
            subprocess.check_call(["bin/hadoop","dfs","-ls"])
        except:
            raise

        return

    def putFileHadoop(self, source, dest):
        """Put file(s) on HDFS."""
        cmd = ["bin/hadoop","dfs","-put"]
        source = os.path.abspath(source)
        cmd.append(source)
        cmd.append(dest)
        #print cmd
        # execute
        try:
            subprocess.call(cmd)
        except:
            raise

        return

    def getFileHadoop(self, source, dest):
        """Get file(s) from HDFS."""
        cmd = ["bin/hadoop","dfs","-get"]
        dest = os.path.abspath(dest)
        cmd.append(source)
        cmd.append(dest)
        # execute
        try:
            subprocess.call(cmd)
        except:
            raise

        return

    def delFileHadoop(self, path):
        """Delete file from HDFS."""
        cmd = ["bin/hadoop","dfs","-rmr"]
        cmd.append(path)
        # execute
        try:
            subprocess.call(cmd)
        except:
            raise

        return

    def execHadoopStreaming(self, mapper, reducer, input_files, output_file, 
                            mapper_params=[], reducer_params=[], files=[]):
        """Execute Hadoop Streaming."""
        # Hadoop Streaming
        cmd = ["bin/hadoop", "jar", "contrib/streaming/hadoop-0.20.2-streaming.jar"]

        # Add mapper
        cmd.append("-mapper")
        if (len(mapper_params)):
            str_mapper = str(mapper)
            for mp in mapper_params:
                str_mapper += " %s"%(mp)
            str_mapper = '"' + str_mapper + '"'
            cmd.append(str_mapper)
        else:
            cmd.append(mapper)

        # Add reducer
        cmd.append("-reducer")
        if (len(reducer_params)):
            reducer = str(reducer)
            for rp in reducer_params:
                reducer += " %s"%(rp)
        cmd.append(reducer)

        # Add mapper and reducer files to distribute them
        cmd.append("-file")
        cmd.append(mapper)
        if (reducer <> "NONE"):
            cmd.append("-file")
            cmd.append(reducer)
        # Add extra files to distribute them
        if (len(files)):
            for f in files:
                cmd.append("-file")
                cmd.append(f)

        # Add input files
        for infile in input_files:
            cmd.append("-input")
            cmd.append(infile)

        # Add output
        cmd.append("-output")
        cmd.append(output_file)
        # execute
        try:
            print cmd
            print
            subprocess.call(cmd)
        except:
            raise

        return

###############################################################################
def test():
    h = Hadoop()

    print os.path.abspath(os.curdir)
    if (not h.isHadoop()):
        h.setHadoopLoc("/usr/local/hadoop")
        h.goHadoop()
    print ">>>",os.path.abspath(os.curdir)

    h.startHadoop()

    h.lsHadoop()

    p = raw_input("File to Delete: ")
    h.delFileHadoop(p)

    h.stopHadoop()

    return

if __name__ == "__main__":
    test()