Source

DocIRHadoop / UI / cmdUI.py

Full commit
#!/usr/bin/env python

import os
import subprocess
from DocIRHadoop.InitHadoop import setupHadoop
from DocIRHadoop.InvertedIndex import mapperZipFiles, mapperInvIndex, \
                                      reducerInvIndex, mapperReadIndex
from DocIRHadoop.Query import queryParser

class cmdUI:
    """Command Line User Interface."""
    def __init__(self):
        # Initialize Hadoop Environment.
        self.hadoop = setupHadoop.Hadoop()
        self.bqp = queryParser.BoolQuery()
        
        return
    
    def mainScreen(self):
        print "=============================================================="
        print "                   You are using DocIRHadoop"
        print "=============================================================="
        # Start Hadoop
        self._startHadoop()
        
        # Add dataset in HDFS
        dest, files = self.loadDataset()
        print
        
        # The following step is needed in order to create 
        # a positional inverted index.
        # Hadoop Streaming by default parses files a line at a time
        # and each mapper gets a single line.
        # In order to have positional indexing we have to feed a file
        # to each mapper.
        # The way to do this is by creating a zip file for each document, and 
        # feed this zipped files to the mappers.
        
        # Create gzip files for the dataset
        self._zipFiles(files)
        print
        
        # Create Inverted Index
        self.createInvertIndex()
        
        # Query Searching
        while (True):
            print
            print "=============================================================="
            print "                     DocIRHadoop Search"
            print "=============================================================="
            print " Enter your query or Ctrl-D to Exit."
            try:
                query = raw_input(" Query: ")
                self.queryParsing(query)
            except EOFError:
                print
                print
                print "Thank you!"
                print
                break
        # end while
        
        # Stop Hadoop
        #self._stopHadoop()
        
        return
    
    def _startHadoop(self):
        # Check if Hadoop home directory is set via $HADOOP_HOME
        if (not self.hadoop.isHadoop()):
        	while(True):
		    	# else ask for the location...
		        h_path = raw_input("Enter the location of Hadoop "\
		                            "installation: ").replace("\r", "")
		        h_path = os.path.abspath(h_path)
		        if (os.path.exists(h_path)):
		        	self.hadoop.setHadoopLoc(h_path)
		        	break
		        else:
		        	print "Invalid path."
		    # End while
        # cd to Hadoop home dir
        self.hadoop.goHadoop()
        # Start Hadoop
        #print "\nStarting Hadoop...\n"
        #self.hadoop.startHadoop()
        #print
        
        return
    
    def _stopHadoop(self):
    	# Stop Hadoop
	    print "\nStoping Hadoop...\n"
	    self.hadoop.stopHadoop()
	    return
	
    def loadDataset(self):
        # Add a dataset in HDFS.
        while (True):
            print "Define the dataset to add in HDFS."    	
            source = raw_input("Enter the current location of the dataset "\
                                "(source): ").replace("\r", "")
            dest = raw_input("Enter the destination in HDFS for the dataset "\
                             "(destination): ").replace("\r", "")
            source = os.path.abspath(source)
            if (os.path.exists(source)):
                files = ""
                for f in os.listdir(source):
                    files += dest+"/"+f+"\n"
                #print
                #print files
                #print
                #print source, dest
                #print
                self.hadoop.delFileHadoop(dest)
                print
                self.hadoop.putFileHadoop(source, dest)
                self.hadoop.lsHadoop()
                break
            else:
                print "Invalid source location of dataset."

        return dest, files
        
    def _zipFiles(self, files):
        """Gzip the files."""
        # Create a file containing the file names of the files of the dataset.
        # This file will be used to Gzip the dataset files using Hadoop.
        fp = open("/tmp/filenames.txt", "w+")
        fp.write(files)
        fp.close()
        self.hadoop.delFileHadoop("filenames.txt")
        print
        self.hadoop.putFileHadoop("/tmp/filenames.txt", "filenames.txt")
        self.hadoop.lsHadoop()
        print
        # use Hadoop to gzip files...
        self.hadoop.delFileHadoop("output_zip")
        print
        self.hadoop.execHadoopStreaming(mapper = mapperZipFiles.__file__,
                                        reducer = "NONE",
                                        input_files = ["filenames.txt"],
                                        output_file = "output_zip")
        self.hadoop.lsHadoop()
        print
        
        return
        
    def createInvertIndex(self):
        """Create the Inverted Index."""
        # bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -mapper "mapperInvIndex.py stoplist.txt" -reducer reducerInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/mapperInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/reducerInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/stoplist.txt -input dataset_gz -output invert_index
        stoplist_name = "stoplist.txt"
        stoplist_path = os.path.join(os.path.dirname(mapperInvIndex.__file__), stoplist_name)
        self.hadoop.delFileHadoop("invert_index")
        print
        self.hadoop.execHadoopStreaming(mapper = mapperInvIndex.__file__,
                                        reducer = reducerInvIndex.__file__,
                                        input_files = ["dataset_gz"],
                                        output_file = "invert_index",
                                        mapper_params=[stoplist_name],
                                        files = [stoplist_path])
        self.hadoop.lsHadoop()
        print
        
        return
        
    def queryParsing(self, query):
        """Parse a Boolean Query."""
        # get terms from the query string
        # - remove \n, \r
        # - remove and, or, not, (, )
        query = query.lower()
        terms = query.replace('\n', '').replace('\r', '')
        terms = terms.replace('and', '').replace('or', '').replace('not', '')
        terms = terms.replace('(', '').replace(')', '').split()
        #print query
        #print terms
        print
        self.hadoop.delFileHadoop("query_result")
        print
        # feed the terms to mappers to find them
        # bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -mapper mapperReadIndex.py -reducer NONE -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/mapperReadIndex.py -input invert_index -output query_result
        self.hadoop.execHadoopStreaming(mapper = mapperReadIndex.__file__,
                                        reducer = "NONE",
                                        input_files = ["invert_index"],
                                        output_file = "query_result",
                                        mapper_params = terms)
        # read results and add the rems info to the query parser...
        tmp_results = "/tmp/query_result/"
        # create temp results directory
        try:
            subprocess.call(["rm", "-r", tmp_results])
        except:
            pass
        #
        try:
            subprocess.call(["mkdir", tmp_results])
        except:
            pass
        #
        self.hadoop.getFileHadoop("query_result/part*", tmp_results)
        # the 1st keeps only document's name, the 2nd keep position also
        term_results_set, term_results_dict = self.read_file(tmp_results)
        # give the information about the terms in boolean query parser
        self.bqp.setIndex(term_results_set)
        # add documents info...
        # here i have to cheat a bit...
        # get the names of the zipped documents...
        # read results and add the rems info to the query parser...
        tmp_zip_files = "/tmp/zip_files/"
        # create temp results directory
        try:
            subprocess.call(["rm", "-r", tmp_zip_files])
        except:
            pass
        #
        try:
            subprocess.call(["mkdir", tmp_zip_files])
        except:
            pass
        #
        self.hadoop.getFileHadoop("output_zip/part*", tmp_zip_files)
        docs = self.read_zip(tmp_zip_files)
        self.bqp.setDocs(docs)
        # parse query
        results = self.bqp.doParsing(query)
        # print result
        print
        print "Results:"
        for r in results:
            print "In '%s'."%(r)
        print
        
        return
        
    def read_file(self, tmpdir):
        result_dict = {}
        result_set = {}
        files = os.listdir(tmpdir)
        for tmpfile in files:
            tmpfile = os.path.join(tmpdir, tmpfile)
            #print tmpfile
            fp = open(tmpfile)
            while (True):
                line = fp.readline()
                # EOF
                if len(line) == 0:
                    break
                term, postings = line.replace('\n', '').replace('\r', '').strip().split('\t', 1)
                postings = eval(postings)
                result_dict[term] = postings
                result_set[term] = set(postings.keys())
            # end while
            fp.close()
        # end for
        
        return result_set, result_dict
        
    def read_zip(self, tmpdir):
        fileNames = {}
        files = os.listdir(tmpdir)
        for tmpfile in files:
            tmpfile = os.path.join(tmpdir, tmpfile)
            #print tmpfile
            fp = open(tmpfile)
            while (True):
                line = fp.readline()
                # EOF
                if len(line) == 0:
                    break
                filename = line.replace('\n', '').replace('\r', '').strip()
                fileNames[filename] = filename
                # end while
            fp.close()
        # end for
        
        return fileNames

def main():
    
    ui = cmdUI()
    ui.mainScreen()
    
    return

if __name__ == "__main__":
    main()