Source

DocIRHadoop / UI / cmdUI.py

Christos Kannas cc98436 

Christos Kannas 156dee3 
Christos Kannas dd6fa76 
Christos Kannas cc98436 
Christos Kannas dd6fa76 


Christos Kannas cc98436 





Christos Kannas dd6fa76 
Christos Kannas cc98436 






Christos Kannas 156dee3 
Christos Kannas e1e88db 
Christos Kannas 156dee3 


Christos Kannas e1e88db 










Christos Kannas 156dee3 

Christos Kannas e1e88db 


Christos Kannas dd6fa76 
Christos Kannas e1e88db 

Christos Kannas dd6fa76 















Christos Kannas 156dee3 


Christos Kannas cc98436 



Christos Kannas 156dee3 
Christos Kannas cc98436 
Christos Kannas 156dee3 










Christos Kannas cc98436 


Christos Kannas e1e88db 


Christos Kannas cc98436 



Christos Kannas 156dee3 

















Christos Kannas e1e88db 




Christos Kannas dd6fa76 


Christos Kannas 156dee3 





Christos Kannas cc98436 
Christos Kannas 156dee3 



Christos Kannas e1e88db 
Christos Kannas 156dee3 

Christos Kannas dd6fa76 

Christos Kannas 156dee3 
Christos Kannas e1e88db 
Christos Kannas dd6fa76 









Christos Kannas 156dee3 
Christos Kannas cc98436 
Christos Kannas dd6fa76 

















































































Christos Kannas f38b133 




Christos Kannas dd6fa76 













































Christos Kannas cc98436 








#!/usr/bin/env python

import os
import subprocess
from DocIRHadoop.InitHadoop import setupHadoop
from DocIRHadoop.InvertedIndex import mapperZipFiles, mapperInvIndex, \
                                      reducerInvIndex, mapperReadIndex
from DocIRHadoop.Query import queryParser

class cmdUI:
    """Command Line User Interface."""
    def __init__(self):
        # Initialize Hadoop Environment.
        self.hadoop = setupHadoop.Hadoop()
        self.bqp = queryParser.BoolQuery()
        
        return
    
    def mainScreen(self):
        print "=============================================================="
        print "                   You are using DocIRHadoop"
        print "=============================================================="
        # Start Hadoop
        self._startHadoop()
        
        # Add dataset in HDFS
        dest, files = self.loadDataset()
        print
        
        # The following step is needed in order to create 
        # a positional inverted index.
        # Hadoop Streaming by default parses files a line at a time
        # and each mapper gets a single line.
        # In order to have positional indexing we have to feed a file
        # to each mapper.
        # The way to do this is by creating a zip file for each document, and 
        # feed this zipped files to the mappers.
        
        # Create gzip files for the dataset
        self._zipFiles(files)
        print
        
        # Create Inverted Index
        self.createInvertIndex()
        
        # Query Searching
        while (True):
            print
            print "=============================================================="
            print "                     DocIRHadoop Search"
            print "=============================================================="
            print " Enter your query or Ctrl-D to Exit."
            try:
                query = raw_input(" Query: ")
                self.queryParsing(query)
            except EOFError:
                print
                print
                print "Thank you!"
                print
                break
        # end while
        
        # Stop Hadoop
        #self._stopHadoop()
        
        return
    
    def _startHadoop(self):
        # Check if Hadoop home directory is set via $HADOOP_HOME
        if (not self.hadoop.isHadoop()):
        	while(True):
		    	# else ask for the location...
		        h_path = raw_input("Enter the location of Hadoop "\
		                            "installation: ").replace("\r", "")
		        h_path = os.path.abspath(h_path)
		        if (os.path.exists(h_path)):
		        	self.hadoop.setHadoopLoc(h_path)
		        	break
		        else:
		        	print "Invalid path."
		    # End while
        # cd to Hadoop home dir
        self.hadoop.goHadoop()
        # Start Hadoop
        #print "\nStarting Hadoop...\n"
        #self.hadoop.startHadoop()
        #print
        
        return
    
    def _stopHadoop(self):
    	# Stop Hadoop
	    print "\nStoping Hadoop...\n"
	    self.hadoop.stopHadoop()
	    return
	
    def loadDataset(self):
        # Add a dataset in HDFS.
        while (True):
            print "Define the dataset to add in HDFS."    	
            source = raw_input("Enter the current location of the dataset "\
                                "(source): ").replace("\r", "")
            dest = raw_input("Enter the destination in HDFS for the dataset "\
                             "(destination): ").replace("\r", "")
            source = os.path.abspath(source)
            if (os.path.exists(source)):
                files = ""
                for f in os.listdir(source):
                    files += dest+"/"+f+"\n"
                #print
                #print files
                #print
                #print source, dest
                #print
                self.hadoop.delFileHadoop(dest)
                print
                self.hadoop.putFileHadoop(source, dest)
                self.hadoop.lsHadoop()
                break
            else:
                print "Invalid source location of dataset."

        return dest, files
        
    def _zipFiles(self, files):
        """Gzip the files."""
        # Create a file containing the file names of the files of the dataset.
        # This file will be used to Gzip the dataset files using Hadoop.
        fp = open("/tmp/filenames.txt", "w+")
        fp.write(files)
        fp.close()
        self.hadoop.delFileHadoop("filenames.txt")
        print
        self.hadoop.putFileHadoop("/tmp/filenames.txt", "filenames.txt")
        self.hadoop.lsHadoop()
        print
        # use Hadoop to gzip files...
        self.hadoop.delFileHadoop("output_zip")
        print
        self.hadoop.execHadoopStreaming(mapper = mapperZipFiles.__file__,
                                        reducer = "NONE",
                                        input_files = ["filenames.txt"],
                                        output_file = "output_zip")
        self.hadoop.lsHadoop()
        print
        
        return
        
    def createInvertIndex(self):
        """Create the Inverted Index."""
        # bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -mapper "mapperInvIndex.py stoplist.txt" -reducer reducerInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/mapperInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/reducerInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/stoplist.txt -input dataset_gz -output invert_index
        stoplist_name = "stoplist.txt"
        stoplist_path = os.path.join(os.path.dirname(mapperInvIndex.__file__), stoplist_name)
        self.hadoop.delFileHadoop("invert_index")
        print
        self.hadoop.execHadoopStreaming(mapper = mapperInvIndex.__file__,
                                        reducer = reducerInvIndex.__file__,
                                        input_files = ["dataset_gz"],
                                        output_file = "invert_index",
                                        mapper_params=[stoplist_name],
                                        files = [stoplist_path])
        self.hadoop.lsHadoop()
        print
        
        return
        
    def queryParsing(self, query):
        """Parse a Boolean Query."""
        # get terms from the query string
        # - remove \n, \r
        # - remove and, or, not, (, )
        query = query.lower()
        terms = query.replace('\n', '').replace('\r', '')
        terms = terms.replace('and', '').replace('or', '').replace('not', '')
        terms = terms.replace('(', '').replace(')', '').split()
        #print query
        #print terms
        print
        self.hadoop.delFileHadoop("query_result")
        print
        # feed the terms to mappers to find them
        # bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -mapper mapperReadIndex.py -reducer NONE -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/mapperReadIndex.py -input invert_index -output query_result
        self.hadoop.execHadoopStreaming(mapper = mapperReadIndex.__file__,
                                        reducer = "NONE",
                                        input_files = ["invert_index"],
                                        output_file = "query_result",
                                        mapper_params = terms)
        # read results and add the rems info to the query parser...
        tmp_results = "/tmp/query_result/"
        # create temp results directory
        try:
            subprocess.call(["rm", "-r", tmp_results])
        except:
            pass
        #
        try:
            subprocess.call(["mkdir", tmp_results])
        except:
            pass
        #
        self.hadoop.getFileHadoop("query_result/part*", tmp_results)
        # the 1st keeps only document's name, the 2nd keep position also
        term_results_set, term_results_dict = self.read_file(tmp_results)
        # give the information about the terms in boolean query parser
        self.bqp.setIndex(term_results_set)
        # add documents info...
        # here i have to cheat a bit...
        # get the names of the zipped documents...
        # read results and add the rems info to the query parser...
        tmp_zip_files = "/tmp/zip_files/"
        # create temp results directory
        try:
            subprocess.call(["rm", "-r", tmp_zip_files])
        except:
            pass
        #
        try:
            subprocess.call(["mkdir", tmp_zip_files])
        except:
            pass
        #
        self.hadoop.getFileHadoop("output_zip/part*", tmp_zip_files)
        docs = self.read_zip(tmp_zip_files)
        self.bqp.setDocs(docs)
        # parse query
        results = self.bqp.doParsing(query)
        # print result
        print
        print "Results:"
        if len(results):
            for r in results:
                print "In '%s'."%(r)
        else:
            print "No Results available..."
        print
        
        return
        
    def read_file(self, tmpdir):
        result_dict = {}
        result_set = {}
        files = os.listdir(tmpdir)
        for tmpfile in files:
            tmpfile = os.path.join(tmpdir, tmpfile)
            #print tmpfile
            fp = open(tmpfile)
            while (True):
                line = fp.readline()
                # EOF
                if len(line) == 0:
                    break
                term, postings = line.replace('\n', '').replace('\r', '').strip().split('\t', 1)
                postings = eval(postings)
                result_dict[term] = postings
                result_set[term] = set(postings.keys())
            # end while
            fp.close()
        # end for
        
        return result_set, result_dict
        
    def read_zip(self, tmpdir):
        fileNames = {}
        files = os.listdir(tmpdir)
        for tmpfile in files:
            tmpfile = os.path.join(tmpdir, tmpfile)
            #print tmpfile
            fp = open(tmpfile)
            while (True):
                line = fp.readline()
                # EOF
                if len(line) == 0:
                    break
                filename = line.replace('\n', '').replace('\r', '').strip()
                fileNames[filename] = filename
                # end while
            fp.close()
        # end for
        
        return fileNames

def main():
    
    ui = cmdUI()
    ui.mainScreen()
    
    return

if __name__ == "__main__":
    main()