Commits

Anonymous committed 99d4c7e

trying to sync, nothing at all

Comments (0)

Files changed (2)

 import urlparse
 import profile
 
-def map_f((input_q, output_q)):
+def map_f((input_q, output_q, lock)):
     result = []
     while True:
         try:
 from Queue import Empty
-from multiprocessing import Process, Pool, Manager
+from multiprocessing import Process, Pool, Manager, Lock
 import itertools
 import operator
 import heapq
         self.data_queue = self.manager.Queue()
 
 
-    def fill_data_queue(self, iter_data):
+    def fill_data_queue(self, iter_data, lock):
         """
         Fill self.data_queue with data from iter_data.
         """
             self.data_queue.put(i)
 
 
-    def iterable_from_queue(self, q):
+    def iterable_from_queue(self, q, lock):
         """
-        Helper function.
+        Helper function. Not thread safe.
         """
         while not q.empty():
             yield q.get()
 
 
-    def merge_f(self, input_qs, output_q):
+    def merge_f(self, input_qs, output_q, lock):
         """
         Merge step from mergesort.
         """
-
-        s = [self.iterable_from_queue(q) for q in input_qs]
+        s = [self.iterable_from_queue(q, lock) for q in input_qs]
         for (k, v), t in itertools.groupby(heapq.merge(*s)):
             output_q.put((k, [v]*len(tuple(t))))
 
         and make merging step from MergeSort, then write result to queue for
         reduce_num process, that make reduce step
         """
-
-        write_proc = Process(target=self.fill_data_queue, args=(iter_data,))
+        lock = self.manager.Lock()
+        write_proc = Process(target=self.fill_data_queue, 
+                args=(iter_data, lock))
         write_proc.start()
+        write_proc.join()
 
         map_merge_qs = [self.manager.Queue() for _ in xrange(map_num)]
         map_procs = Pool(map_num)
         print("Begin {0} map processes".format(map_num))
         map_procs.map_async(self.map_f, 
-                            [(self.data_queue, q) for q in map_merge_qs])
+                            [(self.data_queue, q, lock) 
+                                for q in map_merge_qs])
         map_procs.close()
-        #map_procs.join()
+        map_procs.join()
 
         merge_reduce_q = self.manager.Queue()
         merge_proc = Process(target=self.merge_f, 
-                args=(map_merge_qs, merge_reduce_q))
+                args=(map_merge_qs, merge_reduce_q, lock))
 
         print("Begin merge process")
         merge_proc.start()
-        #merge_proc.join()
+        merge_proc.join()
 
         print("Begin {0} reduce processes".format(red_num))
         reduce_main_q = self.manager.Queue()