Commits

Christos Kannas committed e1e88db

Mappers and Reducers for Inverted Index.

Comments (0)

Files changed (8)

InitHadoop/setupHadoop.py

         source = os.path.abspath(source)
         cmd.append(source)
         cmd.append(dest)
+        #print cmd
         # execute
         try:
             subprocess.call(cmd)
 
         return
 
-    def execHadoopStreaming(self, mapper, reducer, input_files, output_file):
+    def execHadoopStreaming(self, mapper, reducer, input_files, output_file, 
+                            mapper_params=[], reducer_params=[], files=[]):
         """Execute Hadoop Streaming."""
         # Hadoop Streaming
         cmd = ["bin/hadoop", "jar", "contrib/streaming/hadoop-0.20.2-streaming.jar"]
 
         # Add mapper
         cmd.append("-mapper")
+        if (len(mapper_params)):
+            mapper = str(mapper)
+            for mp in mapper_params:
+                mapper += " %s"%(mp)
+            mapper = '"' + mapper + '"'
         cmd.append(mapper)
 
         # Add reducer
         cmd.append("-reducer")
+        if (len(reducer_params)):
+            reducer = str(reducer)
+            for rp in reducer_params:
+                reducer += " %s"%(rp)
         cmd.append(reducer)
 
         # Add mapper and reducer files to distribute them
         cmd.append("-file")
         cmd.append(mapper)
-        cmd.append("-file")
-        cmd.append(reducer)
+        if (reducer <> "NONE"):
+            cmd.append("-file")
+            cmd.append(reducer)
+        # Add extra files to distribute them
+        if (len(files)):
+            for f in files:
+                cmd.append("-file")
+                cmd.append(f)
 
         # Add input files
         for infile in input_files:
         cmd.append(output_file)
         # execute
         try:
-            subprocess.call(cmd)
+            print cmd
+            #subprocess.call(cmd)
         except:
             raise
 

InvertedIndex/mapperInvIndex.py

+#!/usr/bin/env python
+
+import os
+import sys
+import string
+
+def read_input(file):
+    for line in file:
+        # remove \n, \r and leading and ending white chars(spaces)
+        # split the line into words
+        yield line.replace('\n', '').replace('\r', '').strip().split()
+        
+def getFileName():
+	if 'map_input_file' in os.environ:
+	    # get only the file's name
+	    name = os.environ['map_input_file']
+	    name = name.split('/')[-1]	    
+	else:
+	    # No name...
+		name = 'none'
+	
+	return name
+	
+def read_stoplist(stoplist):
+    stopList = set()
+    fp = open(stoplist)
+    while (True):
+        line = fp.readline()
+        # EOF
+        if len(line) == 0:
+            break
+        words = line.replace('\n', '').replace('\r', '').strip().split()
+        for word in words:
+            stopList.add(word.lower())
+    fp.close()
+    
+    return stopList
+
+def main(args, separator='\t'):
+    # get stoplist words
+    # args[0] should be the stoplist.txt
+    stoplist = read_stoplist(args[0])
+    # word position counter
+    pos = 1
+    # input comes from STDIN (standard input)
+    data = read_input(sys.stdin)
+    for words in data:
+        # write the results to STDOUT (standard output);
+        # what we output here will be the input for the
+        # Reduce step, i.e. the input for reducer.py
+        #
+        # tab-delimited; the trivial word count is 1
+        for word in words:
+            # convert to lower case
+            word = word.lower().strip()
+            # remove punctuation characters
+            for punc in string.punctuation:
+                word = word.replace(punc, '')
+            if (word not in stoplist):
+                d = {getFileName(): pos}
+                print '%s%s%s' % (word, separator, d)
+            pos += 1
+
+if __name__ == "__main__":
+    args = sys.argv
+    args = args[1:]
+    main(args)

InvertedIndex/mapperReadIndex.py

+#!/usr/bin/env python
+
+import os
+import sys
+from itertools import groupby
+from operator import itemgetter
+
+def read_input(file, separator='\t'):
+    for line in file:
+        # read a line and split it to key and values
+        yield line.rstrip().split(separator, 1)
+        
+def main(separator='\t'):
+    test_terms = ["sherlock", "greece", "queen"]
+    # input comes from STDIN (standard input)
+    data = read_input(sys.stdin, separator=separator)
+    for current_word, group in groupby(data, itemgetter(0)):
+        if current_word in test_terms:
+            for current_word, file_list in group:
+                print "%s%s%s" % (current_word, separator, file_list)
+    
+    return
+
+if __name__ == "__main__":
+    main()

InvertedIndex/mapperZipFiles.py

         cmd = ["gzip", fileName]
         subprocess.call(cmd)
         fileName = fileName + ".gz"
-        cmd = ["bin/hadoop", "dfs", "-mkdir", "files_gz" ]
+        cmd = ["bin/hadoop", "dfs", "-mkdir", "dataset_gz" ]
         subprocess.call(cmd)
-        cmd = ["bin/hadoop", "dfs", "-put", fileName, "dataset_gz" ]
+        # Pur files in HDFS folder
+        cmd = ["bin/hadoop", "dfs", "-put", fileName, "dataset_gz/"]
         subprocess.call(cmd)
         cmd = ["rm", "-f", fileName]
         subprocess.call(cmd)

InvertedIndex/reducerInvIndex.py

+#!/usr/bin/env python
+
+import sys
+from itertools import groupby
+from operator import itemgetter
+
+def read_mapper_output(file, separator='\t'):
+    for line in file:
+        # read a line and split it to key and values
+        yield line.rstrip().split(separator, 1)
+
+def main(separator='\t'):
+    # input comes from STDIN (standard input)
+    data = read_mapper_output(sys.stdin, separator=separator)
+    # groupby groups multiple word-count pairs by word,
+    # and creates an iterator that returns consecutive keys and their group:
+    #   current_word - string containing a word (the key)
+    #   group - iterator yielding all ["<current_word>", "<count>"] items
+    for current_word, group in groupby(data, itemgetter(0)):
+        fileList = {}
+        for current_word, file_pos in group:
+            # convert file_pos to dictionary
+            file_pos = eval(file_pos)
+            # each key is a file name
+            for fileName in file_pos.keys():
+                # word position in file
+                pos = file_pos[fileName]
+                # if the fileName exists in fileList
+                if fileList.has_key(fileName):
+                    # then add the position
+                    fileList[fileName].append(pos)
+                    fileList[fileName].sort()
+                else:
+                    # else add the fileName and the position
+                    fileList[fileName] = [pos]
+        print "%s%s%s" % (current_word, separator, fileList)
+
+if __name__ == "__main__":
+    main()

InvertedIndex/stoplist.txt

+a
+about
+an
+and
+are
+as
+at
+be
+but
+by
+for
+from
+has
+have
+he
+his
+in
+is
+it
+its
+more
+new
+of
+on
+one
+or
+said
+say
+that
+the
+their
+they
+this
+to
+was
+which
+who
+will
+with
+you
+

Query/queryParser.py

+#!/usr/bin/env python
+
+from DocIRHadoop.Query import searchparser2
+
+Set = set
+
+class BoolQuery(searchparser2.SearchQueryParser):
+    """A boolean query parser."""
+    def setIndex(self, index):
+        self.index = index.copy()
+        return
+        
+    def setDocs(self, docs):
+        self.docs = docs.copy()
+        return
+        
+    def doParsing(self, query):
+        r = self.Parse(query)
+        return r
+    
+    def GetWord(self, word):
+        if (self.index.has_key(word)):
+            return self.index[word]
+        else:
+            return Set()
+
+    def GetWordWildcard(self, word):
+        result = Set()
+        for item in self.index.keys():
+            if word == item[0:len(word)]:
+                result = result.union(self.index[item])
+        return result
+
+    def GetNot(self, not_set):
+        all = Set(self.docs.keys())
+        return all.difference(not_set)
+        
+
+def test():
+    bq = BoolQuery()
+    index = {'queen': Set([1, 4, 10]),
+             'story': Set([1, 4, 10, 15]),
+             'king': Set([4, 5, 10, 20]),
+             'kill': Set([6]),
+            }    
+    docs = {}
+    for i in range(1, 30):
+        docs[i] = i
+    bq.setIndex(index)
+    bq.setDocs(docs)
+    
+    print bq.Parse("queen") 
+    print bq.Parse("queen and king or kill")
+    print bq.Parse("queen and (king or kill)")
+    print bq.Parse("queen and king")
+    print bq.Parse("not king")
+    print bq.Parse("queen and not king")
+
+    return        
+        
+if __name__ == "__main__":
+    test()
 
 import os
 from DocIRHadoop.InitHadoop import setupHadoop
-from DocIRHadoop.InvertedIndex import mapperZipFiles
+from DocIRHadoop.InvertedIndex import mapperZipFiles, mapperInvIndex, reducerInvIndex
 
 class cmdUI:
     """Command Line User Interface."""
         print "                   You are using DocIRHadoop"
         print "=============================================================="
         # Start Hadoop
-        #self._startHadoop()
+        self._startHadoop()
         
         # Add dataset in HDFS
         dest, files = self.loadDataset()
+        print
+        
+        # The following step is needed in order to create 
+        # a positional inverted index.
+        # Hadoop Streaming by default parses files a line at a time
+        # and each mapper gets a single line.
+        # In order to have positional indexing we have to feed a file
+        # to each mapper.
+        # The way to do this is by creating a zip file for each document, and 
+        # feed this zipped files to the mappers.
+        
         # Create gzip files for the dataset
         self._zipFiles(files)
+        print
         # use Hadoop to gzip files...
         self.hadoop.execHadoopStreaming(mapper = mapperZipFiles.__file__,
                                         reducer = "NONE",
                                         input_files = ["filenames.txt"],
                                         output_file = "output_zip")
+        self.hadoop.lsHadoop()
+        print
+        
+        # Create Inverted Index
+        # bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -mapper "mapperInvIndex.py stoplist.txt" -reducer reducerInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/mapperInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/reducerInvIndex.py -file /home/cs246/Desktop/PythonHadoop/DocIRHadoop/InvertedIndex/stoplist.txt -input dataset_gz -output invert_index
+        stoplist_name = "stoplist.txt"
+        stoplist_path = os.path.join(os.path.dirname(mapperInvIndex.__file__), stoplist_name)
+        self.hadoop.execHadoopStreaming(mapper = mapperInvIndex.__file__,
+                                        reducer = reducerInvIndex.__file__,
+                                        input_files = ["dataset_gz"],
+                                        output_file = "invert_index",
+                                        mapper_params=[stoplist_name],
+                                        files = [stoplist_path])
+        self.hadoop.lsHadoop()
+        print
+        
+        # Query Searching
         
         # Stop Hadoop
         #self._stopHadoop()
         # cd to Hadoop home dir
         self.hadoop.goHadoop()
         # Start Hadoop
-        print "\nStarting Hadoop...\n"
-        self.hadoop.startHadoop()
-        print
+        #print "\nStarting Hadoop...\n"
+        #self.hadoop.startHadoop()
+        #print
         
         return
     
                 files = ""
                 for f in os.listdir(source):
                     files += dest+"/"+f+"\n"
-                print files
-                self.hadoop.putFileHadoop(source, dest)
+                #print
+                #print files
+                #print
+                #print source, dest
+                #print
+                #self.hadoop.putFileHadoop(source, dest)
                 self.hadoop.lsHadoop()
                 break
             else:
         """Gzip the files."""
         # Create a file containing the file names of the files of the dataset.
         # This file will be used to Gzip the dataset files using Hadoop.
-        fp = open("/tmp/filenames.txt", "w")
+        fp = open("/tmp/filenames.txt", "w+")
         fp.write(files)
         fp.close()
         self.hadoop.putFileHadoop("/tmp/filenames.txt", "filenames.txt")
+        self.hadoop.lsHadoop()
         
         return
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.