Commits

Christos Kannas committed dd6fa76

Finalized query parsing.

  • Participants
  • Parent commits e1e88db

Comments (0)

Files changed (4)

InitHadoop/setupHadoop.py

         # Add mapper
         cmd.append("-mapper")
         if (len(mapper_params)):
-            mapper = str(mapper)
+            str_mapper = str(mapper)
             for mp in mapper_params:
-                mapper += " %s"%(mp)
-            mapper = '"' + mapper + '"'
-        cmd.append(mapper)
+                str_mapper += " %s"%(mp)
+            str_mapper = '"' + str_mapper + '"'
+            cmd.append(str_mapper)
+        else:
+            cmd.append(mapper)
 
         # Add reducer
         cmd.append("-reducer")
         # execute
         try:
             print cmd
-            #subprocess.call(cmd)
+            print
+            subprocess.call(cmd)
         except:
             raise
 

InvertedIndex/mapperReadIndex.py

         # read a line and split it to key and values
         yield line.rstrip().split(separator, 1)
         
-def main(separator='\t'):
-    test_terms = ["sherlock", "greece", "queen"]
+def main(args, separator='\t'):
+    terms = args
     # input comes from STDIN (standard input)
     data = read_input(sys.stdin, separator=separator)
     for current_word, group in groupby(data, itemgetter(0)):
-        if current_word in test_terms:
-            for current_word, file_list in group:
+        for current_word, file_list in group:
+            # get the terms...
+            if (current_word in terms):
                 print "%s%s%s" % (current_word, separator, file_list)
+            # end if
+        # end for
+    # end for
     
     return
 
 if __name__ == "__main__":
-    main()
+    args = sys.argv
+    args = args[1:]
+    main(args)

InvertedIndex/mapperZipFiles.py

         subprocess.call(cmd)
         # Pur files in HDFS folder
         cmd = ["bin/hadoop", "dfs", "-put", fileName, "dataset_gz/"]
+        print "%s"%(os.path.basename(fileName))
         subprocess.call(cmd)
         cmd = ["rm", "-f", fileName]
         subprocess.call(cmd)
 #!/usr/bin/env python
 
 import os
+import subprocess
 from DocIRHadoop.InitHadoop import setupHadoop
-from DocIRHadoop.InvertedIndex import mapperZipFiles, mapperInvIndex, reducerInvIndex
+from DocIRHadoop.InvertedIndex import mapperZipFiles, mapperInvIndex, \
+                                      reducerInvIndex, mapperReadIndex
+from DocIRHadoop.Query import queryParser
 
 class cmdUI:
     """Command Line User Interface."""
     def __init__(self):
         # Initialize Hadoop Environment.
         self.hadoop = setupHadoop.Hadoop()
+        self.bqp = queryParser.BoolQuery()
         
         return
     
         # Create gzip files for the dataset
         self._zipFiles(files)
         print
-        # use Hadoop to gzip files...
-        self.hadoop.execHadoopStreaming(mapper = mapperZipFiles.__file__,
-                                        reducer = "NONE",
-                                        input_files = ["filenames.txt"],
-                                        output_file = "output_zip")
-        self.hadoop.lsHadoop()
-        print
         
         # Create Inverted Index
-        # bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -mapper "mapperInvIndex.py stoplist.txt" -reducer reducerInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/mapperInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/reducerInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/stoplist.txt -input dataset_gz -output invert_index
-        stoplist_name = "stoplist.txt"
-        stoplist_path = os.path.join(os.path.dirname(mapperInvIndex.__file__), stoplist_name)
-        self.hadoop.execHadoopStreaming(mapper = mapperInvIndex.__file__,
-                                        reducer = reducerInvIndex.__file__,
-                                        input_files = ["dataset_gz"],
-                                        output_file = "invert_index",
-                                        mapper_params=[stoplist_name],
-                                        files = [stoplist_path])
-        self.hadoop.lsHadoop()
-        print
+        self.createInvertIndex()
         
         # Query Searching
+        while (True):
+            print
+            print "=============================================================="
+            print "                     DocIRHadoop Search"
+            print "=============================================================="
+            print " Enter your query or Ctrl-D to Exit."
+            try:
+                query = raw_input(" Query: ")
+                self.queryParsing(query)
+            except EOFError:
+                print
+                print
+                print "Thank you!"
+                print
+                break
+        # end while
         
         # Stop Hadoop
         #self._stopHadoop()
                 #print
                 #print source, dest
                 #print
-                #self.hadoop.putFileHadoop(source, dest)
+                self.hadoop.delFileHadoop(dest)
+                print
+                self.hadoop.putFileHadoop(source, dest)
                 self.hadoop.lsHadoop()
                 break
             else:
         fp = open("/tmp/filenames.txt", "w+")
         fp.write(files)
         fp.close()
+        self.hadoop.delFileHadoop("filenames.txt")
+        print
         self.hadoop.putFileHadoop("/tmp/filenames.txt", "filenames.txt")
         self.hadoop.lsHadoop()
+        print
+        # use Hadoop to gzip files...
+        self.hadoop.delFileHadoop("output_zip")
+        print
+        self.hadoop.execHadoopStreaming(mapper = mapperZipFiles.__file__,
+                                        reducer = "NONE",
+                                        input_files = ["filenames.txt"],
+                                        output_file = "output_zip")
+        self.hadoop.lsHadoop()
+        print
         
         return
+        
+    def createInvertIndex(self):
+        """Create the Inverted Index."""
+        # bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -mapper "mapperInvIndex.py stoplist.txt" -reducer reducerInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/mapperInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/reducerInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/stoplist.txt -input dataset_gz -output invert_index
+        stoplist_name = "stoplist.txt"
+        stoplist_path = os.path.join(os.path.dirname(mapperInvIndex.__file__), stoplist_name)
+        self.hadoop.delFileHadoop("invert_index")
+        print
+        self.hadoop.execHadoopStreaming(mapper = mapperInvIndex.__file__,
+                                        reducer = reducerInvIndex.__file__,
+                                        input_files = ["dataset_gz"],
+                                        output_file = "invert_index",
+                                        mapper_params=[stoplist_name],
+                                        files = [stoplist_path])
+        self.hadoop.lsHadoop()
+        print
+        
+        return
+        
+    def queryParsing(self, query):
+        """Parse a Boolean Query."""
+        # get terms from the query string
+        # - remove \n, \r
+        # - remove and, or, not, (, )
+        query = query.lower()
+        terms = query.replace('\n', '').replace('\r', '')
+        terms = terms.replace('and', '').replace('or', '').replace('not', '')
+        terms = terms.replace('(', '').replace(')', '').split()
+        #print query
+        #print terms
+        print
+        self.hadoop.delFileHadoop("query_result")
+        print
+        # feed the terms to mappers to find them
+        # bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -mapper mapperReadIndex.py -reducer NONE -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/mapperReadIndex.py -input invert_index -output query_result
+        self.hadoop.execHadoopStreaming(mapper = mapperReadIndex.__file__,
+                                        reducer = "NONE",
+                                        input_files = ["invert_index"],
+                                        output_file = "query_result",
+                                        mapper_params = terms)
+        # read results and add the rems info to the query parser...
+        tmp_results = "/tmp/query_result/"
+        # create temp results directory
+        try:
+            subprocess.call(["rm", "-r", tmp_results])
+        except:
+            pass
+        #
+        try:
+            subprocess.call(["mkdir", tmp_results])
+        except:
+            pass
+        #
+        self.hadoop.getFileHadoop("query_result/part*", tmp_results)
+        # the 1st keeps only document's name, the 2nd keep position also
+        term_results_set, term_results_dict = self.read_file(tmp_results)
+        # give the information about the terms in boolean query parser
+        self.bqp.setIndex(term_results_set)
+        # add documents info...
+        # here i have to cheat a bit...
+        # get the names of the zipped documents...
+        # read results and add the rems info to the query parser...
+        tmp_zip_files = "/tmp/zip_files/"
+        # create temp results directory
+        try:
+            subprocess.call(["rm", "-r", tmp_zip_files])
+        except:
+            pass
+        #
+        try:
+            subprocess.call(["mkdir", tmp_zip_files])
+        except:
+            pass
+        #
+        self.hadoop.getFileHadoop("output_zip/part*", tmp_zip_files)
+        docs = self.read_zip(tmp_zip_files)
+        self.bqp.setDocs(docs)
+        # parse query
+        results = self.bqp.doParsing(query)
+        # print result
+        print
+        print "Results:"
+        for r in results:
+            print "In '%s'."%(r)
+        print
+        
+        return
+        
+    def read_file(self, tmpdir):
+        result_dict = {}
+        result_set = {}
+        files = os.listdir(tmpdir)
+        for tmpfile in files:
+            tmpfile = os.path.join(tmpdir, tmpfile)
+            #print tmpfile
+            fp = open(tmpfile)
+            while (True):
+                line = fp.readline()
+                # EOF
+                if len(line) == 0:
+                    break
+                term, postings = line.replace('\n', '').replace('\r', '').strip().split('\t', 1)
+                postings = eval(postings)
+                result_dict[term] = postings
+                result_set[term] = set(postings.keys())
+            # end while
+            fp.close()
+        # end for
+        
+        return result_set, result_dict
+        
+    def read_zip(self, tmpdir):
+        fileNames = {}
+        files = os.listdir(tmpdir)
+        for tmpfile in files:
+            tmpfile = os.path.join(tmpdir, tmpfile)
+            #print tmpfile
+            fp = open(tmpfile)
+            while (True):
+                line = fp.readline()
+                # EOF
+                if len(line) == 0:
+                    break
+                filename = line.replace('\n', '').replace('\r', '').strip()
+                fileNames[filename] = filename
+                # end while
+            fp.close()
+        # end for
+        
+        return fileNames
 
 def main():