halfak committed 31c8cab

Reverted back to no noop version and merged bug fixes in. Tests passing

  • Participants
  • Parent commits 71601c3

Comments (0)

Files changed (3)

File wmf/dump/

-def map(dumps, processPage, threads=cpu_count()-1):
+def map(dumps, processPage, threads=cpu_count()-1, outputBuffer=100):
 	Maps a function across all of the pages in a set of dump files and returns
-	an (order not guaranteed) iterator over the output.
+	an (order not guaranteed) iterator over the output.  Increasing the 
+	`outputBuffer` size will allow more mapplications to happen before the 
+	output is read, but will consume memory to do so.  Big output buffers 
+	are benefitial when the resulting iterator from this map will be read in
+	bursts.
+	The `processPage` function must return an iterable object (such as a 
+	generator).  If your processPage function does not need to produce 
+	output, make it return an empty iterable upon completion (like an empty
+	list).
 		dumps : list
 			a list of paths to dump files to process
 		processPage : function
-			a function to run on every page of a set of dump files.
+			a function to run on every page of a set of dump files
 		threads : int
 			the number of individual processing threads to spool up
+		outputBuffer : int
+			the maximum number of output values to buffer. 
 	input       = dumpFiles(dumps)
-	output      = Queue(maxsize=10000)
+	output      = Queue(maxsize=outputBuffer)
 	running     = Value('i', 0)
+	threads     = min(int(threads), input.qsize())
 	def dec(): running.value -= 1
-	for i in range(0, min(threads, input.qsize())):
+	for i in range(0, threads):
 		running.value += 1

File wmf/dump/tests/

 from import eq_
 from . import sample
 from ..iterator import Iterator, Namespace
-import util
+from wmf import util

File wmf/dump/tests/

 import sys, logging
 from import eq_
-from gl import wp
 from . import sample
 from import map