Commits

Christos Kannas committed 156dee3

Update cmdUI.py, setupHadoop.py.

  • Participants
  • Parent commits a9d162e

Comments (0)

Files changed (3)

File InitHadoop/setupHadoop.py

 import subprocess
 
 class Hadoop:
-	"""Class to setup Hadoop environment."""
-	def __init__(self):
-		"""Initialize Hadoop Environment."""
-		self.hadoop_dir = None
-		if ("HADOOP_HOME" in os.environ.keys()):
-			self.hadoop_dir = os.environ["HADOOP_HOME"]
-		
-		return
+    """Class to setup Hadoop environment."""
+    def __init__(self):
+        """Initialize Hadoop Environment."""
+        self.hadoop_dir = None
+        if ("HADOOP_HOME" in os.environ.keys()):
+            self.hadoop_dir = os.environ["HADOOP_HOME"]
 
-	def isHadoop(self):
-		if (self.hadoop_dir is None):
-			return 0
-		return 1
+        return
 
-	def setHadoopLoc(self, path):
-		self.hadoop_dir = os.path.abspath(path)
+    def isHadoop(self):
+        if (self.hadoop_dir is None):
+            return 0
+        return 1
 
-		return
-	
-	def goHadoop(self):
-		os.chdir(self.hadoop_dir)
-		
-		return
+    def setHadoopLoc(self, path):
+        self.hadoop_dir = os.path.abspath(path)
 
-	def startHadoop(self):
-		"""Start Hadoop."""
-		try:
-			subprocess.call(["bin/start-all.sh"])
-		except:
-			raise
-		
-		return
+        return
 
-	def stopHadoop(self):
-		"""Stop Hadoop."""
-		try:
-			subprocess.check_call(["bin/stop-all.sh"])
-		except:
-			raise
+    def goHadoop(self):
+        os.chdir(self.hadoop_dir)
 
-		return
-	
-	def lsHadoop(self):
-		subprocess.check_call(["bin/hadoop","dfs","-ls"])
-		
-		return
-	
-	def putFileHadoop(self, source, dest):
-		"""Put file(s) on HDFS."""
-		cmd = ["bin/hadoop","dfs","-put"]
-		source = os.path.abspath(source)
-		cmd.append(source)
-		cmd.append(dest)
-		
-		subprocess.call(cmd)
-		
-		return
-	
-	def getFileHadoop(self, source, dest):
-		"""Get file(s) from HDFS."""
-		cmd = ["bin/hadoop","dfs","-get"]
-		dest = os.path.abspath(dest)
-		cmd.append(source)
-		cmd.append(dest)
-		
-		subprocess.call(cmd)
-		
-		return
-	
-	def delFileHadoop(self, path):
-		"""Delete file from HDFS."""
-		cmd = ["bin/hadoop","dfs","-rm"]
-		cmd.append(path)
-		
-		subprocess.call(cmd)
-		
-		return
-	
-	def delFolderHadoop(self, path):
-		"""Delete file from HDFS."""
-		cmd = ["bin/hadoop","dfs","-rmr"]
-		cmd.append(path)
-		
-		subprocess.call(cmd)
-		
-		return
+        return
 
+    def startHadoop(self):
+        """Start Hadoop."""
+        try:
+            subprocess.call(["bin/start-all.sh"])
+        except:
+            raise
+
+        return
+
+    def stopHadoop(self):
+        """Stop Hadoop."""
+        try:
+            subprocess.check_call(["bin/stop-all.sh"])
+        except:
+            raise
+
+        return
+
+    def lsHadoop(self):
+        """Check HDFS. """
+        try:
+            subprocess.check_call(["bin/hadoop","dfs","-ls"])
+        except:
+            raise
+
+        return
+
+    def putFileHadoop(self, source, dest):
+        """Put file(s) on HDFS."""
+        cmd = ["bin/hadoop","dfs","-put"]
+        source = os.path.abspath(source)
+        cmd.append(source)
+        cmd.append(dest)
+        # execute
+        try:
+            subprocess.call(cmd)
+        except:
+            raise
+
+        return
+
+    def getFileHadoop(self, source, dest):
+        """Get file(s) from HDFS."""
+        cmd = ["bin/hadoop","dfs","-get"]
+        dest = os.path.abspath(dest)
+        cmd.append(source)
+        cmd.append(dest)
+        # execute
+        try:
+            subprocess.call(cmd)
+        except:
+            raise
+
+        return
+
+    def delFileHadoop(self, path):
+        """Delete file from HDFS."""
+        cmd = ["bin/hadoop","dfs","-rmr"]
+        cmd.append(path)
+        # execute
+        try:
+            subprocess.call(cmd)
+        except:
+            raise
+
+        return
+
+    def execHadoopStreaming(self, mapper, reducer, input_files, output_file):
+        """Execute Hadoop Streaming."""
+        # Hadoop Streaming
+        cmd = ["bin/hadoop", "jar", "contrib/streaming/hadoop-0.20.2-streaming.jar"]
+
+        # Add mapper
+        cmd.append("-mapper")
+        cmd.append(mapper)
+
+        # Add reducer
+        cmd.append("-reducer")
+        cmd.append(reducer)
+
+        # Add mapper and reducer files to distribute them
+        cmd.append("-file")
+        cmd.append(mapper)
+        cmd.append("-file")
+        cmd.append(reducer)
+
+        # Add input files
+        for infile in input_files:
+            cmd.append("-input")
+            cmd.append(infile)
+
+        # Add output
+        cmd.append("-output")
+        cmd.append(output_file)
+        # execute
+        try:
+            subprocess.call(cmd)
+        except:
+            raise
+
+        return
+
+###############################################################################
 def test():
-	h = Hadoop()
-	
-	print os.path.abspath(os.curdir)
-	if (not h.isHadoop()):
-		h.setHadoopLoc("/usr/local/hadoop")
-	h.goHadoop()
-	print ">>>",os.path.abspath(os.curdir)
-	
-	h.startHadoop()
-	
-	h.lsHadoop()
-	
-	p = raw_input("File to Delete: ")
-	h.delFolderHadoop(p)
-	
-	h.stopHadoop()
-	
-	return
+    h = Hadoop()
+
+    print os.path.abspath(os.curdir)
+    if (not h.isHadoop()):
+        h.setHadoopLoc("/usr/local/hadoop")
+        h.goHadoop()
+    print ">>>",os.path.abspath(os.curdir)
+
+    h.startHadoop()
+
+    h.lsHadoop()
+
+    p = raw_input("File to Delete: ")
+    h.delFileHadoop(p)
+
+    h.stopHadoop()
+
+    return
 
 if __name__ == "__main__":
-	test()
+    test()

File InvertedIndex/mapperZipFiles.py

         fileName = fileName + ".gz"
         cmd = ["bin/hadoop", "dfs", "-mkdir", "files_gz" ]
         subprocess.call(cmd)
-        cmd = ["bin/hadoop", "dfs", "-put", fileName, "files_gz" ]
+        cmd = ["bin/hadoop", "dfs", "-put", fileName, "dataset_gz" ]
         subprocess.call(cmd)
         cmd = ["rm", "-f", fileName]
         subprocess.call(cmd)
     return
 
 if __name__ == "__main__":
-    main()
+    main()
 #!/usr/bin/env python
 
+import os
 from DocIRHadoop.InitHadoop import setupHadoop
+from DocIRHadoop.InvertedIndex import mapperZipFiles
 
 class cmdUI:
     """Command Line User Interface."""
         print "=============================================================="
         print "                   You are using DocIRHadoop"
         print "=============================================================="
-        self._startHadoop()
+        # Start Hadoop
+        #self._startHadoop()
+        
+        # Add dataset in HDFS
+        dest, files = self.loadDataset()
+        # Create gzip files for the dataset
+        self._zipFiles(files)
+        # use Hadoop to gzip files...
+        self.hadoop.execHadoopStreaming(mapper = mapperZipFiles.__file__,
+                                        reducer = "NONE",
+                                        input_files = ["filenames.txt"],
+                                        output_file = "output_zip")
+        
+        # Stop Hadoop
+        #self._stopHadoop()
         
         return
     
     def _startHadoop(self):
-        # Check if Hadoop hoome directory is set via $HADOOP_HOME
+        # Check if Hadoop home directory is set via $HADOOP_HOME
         if (not self.hadoop.isHadoop()):
-            # else set it by hand...
-            self.hadoop.setHadoopLoc("/usr/local/hadoop")
+        	while(True):
+		    	# else ask for the location...
+		        h_path = raw_input("Enter the location of Hadoop "\
+		                            "installation: ").replace("\r", "")
+		        h_path = os.path.abspath(h_path)
+		        if (os.path.exists(h_path)):
+		        	self.hadoop.setHadoopLoc(h_path)
+		        	break
+		        else:
+		        	print "Invalid path."
+		    # End while
         # cd to Hadoop home dir
         self.hadoop.goHadoop()
         # Start Hadoop
-        print "Starting Hadoop..."
+        print "\nStarting Hadoop...\n"
         self.hadoop.startHadoop()
+        print
         
         return
     
     def _stopHadoop(self):
-        self.hadoop.stopHadoop()
+    	# Stop Hadoop
+	    print "\nStoping Hadoop...\n"
+	    self.hadoop.stopHadoop()
+	    return
+	
+    def loadDataset(self):
+        # Add a dataset in HDFS.
+        while (True):
+            print "Define the dataset to add in HDFS."    	
+            source = raw_input("Enter the current location of the dataset "\
+                                "(source): ").replace("\r", "")
+            dest = raw_input("Enter the destination in HDFS for the dataset "\
+                             "(destination): ").replace("\r", "")
+            source = os.path.abspath(source)
+            if (os.path.exists(source)):
+                files = ""
+                for f in os.listdir(source):
+                    files += dest+"/"+f+"\n"
+                print files
+                self.hadoop.putFileHadoop(source, dest)
+                self.hadoop.lsHadoop()
+                break
+            else:
+                print "Invalid source location of dataset."
+
+        return dest, files
         
-        return
-    
-    def loadDataset(self):
+    def _zipFiles(self, files):
+        """Gzip the files."""
+        # Create a file containing the file names of the files of the dataset.
+        # This file will be used to Gzip the dataset files using Hadoop.
+        fp = open("/tmp/filenames.txt", "w")
+        fp.write(files)
+        fp.close()
+        self.hadoop.putFileHadoop("/tmp/filenames.txt", "filenames.txt")
+        
         return
 
 def main():