Commits

"LK4...@gmail.com>"  committed 312f432

add write process\nTODO: need sync all queues

  • Participants
  • Parent commits dd73de6

Comments (0)

Files changed (2)

File hostcount.py

     return result
 
 def main():
+    iter_data = (urlparse.urlparse(x).netloc for x in open("test.txt"))
     mred = MapReduce(map_f, reduce_f)
-    mred.fill_data_queue((urlparse.urlparse(x).netloc for x in open("test.txt")))
-    mred.run(4, 4)
+    mred.run(4, 4, iter_data)
 
 if __name__ == "__main__":
     main()

File mapreduce.py

         for i in iter_data:
             self.data_queue.put(i)
 
+
     def iterable_from_queue(self, q):
+        """
+        Helper function.
+        """
         while not q.empty():
             yield q.get()
 
+
     def merge_f(self, input_qs, output_q):
+        """
+        Merge step from mergesort.
+        """
 
         s = [self.iterable_from_queue(q) for q in input_qs]
-        last_found = None
-
         for (k, v), t in itertools.groupby(heapq.merge(*s)):
             output_q.put((k, [v]*len(tuple(t))))
-        """while True:
-            min_index, (k, v) = min(enumerate(s), key=operator.itemgetter(1,0))
 
-            if last_found is None:
-                count_list = []
-                last_found = k
-            elif last_found == k:
-                count_list.append(v)
-            else:
-                output_q.put((last_found, count_list))
-                count_list = []
-                last_found = k
 
-            try:
-                s[min_index] = input_qs[min_index].get_nowait()
-            except Empty:
-                s.remove(s[min_index])
-                input_qs.remove(input_qs[min_index])
-
-            if not input_qs:
-                output_q.put((k, count_list))
-                break
-                """
-
-
-    def run(self, map_num, red_num=1):
+    def run(self, map_num, red_num, iter_data):
         """
         Run map_num processes, that make map step(sort data by some keys also),
         its write to map_num queues, one merge process reading from that queues
         reduce_num process, that make reduce step
         """
 
+        write_proc = Process(target=self.fill_data_queue, args=(iter_data,))
+        write_proc.start()
+
         map_merge_qs = [self.manager.Queue() for _ in xrange(map_num)]
-
         map_procs = Pool(map_num)
-
         print("Begin {0} map processes".format(map_num))
         map_procs.map_async(self.map_f, 
                             [(self.data_queue, q) for q in map_merge_qs])
-
         map_procs.close()
-        map_procs.join()
+        #map_procs.join()
 
         merge_reduce_q = self.manager.Queue()
         merge_proc = Process(target=self.merge_f, 
 
         print("Begin merge process")
         merge_proc.start()
-        merge_proc.join()
+        #merge_proc.join()
 
-        print("Begin reduce process")
+        print("Begin {0} reduce processes".format(red_num))
         reduce_main_q = self.manager.Queue()
         red_procs = Pool(red_num)
         reduce_lists = red_procs.apply_async(self.reduce_f, args=(merge_reduce_q,))