Commits

Daniel Smith committed c74149d Draft

Ran go fix and fmt. (builds under go 1 now)

Comments (0)

Files changed (4)

 package mrlocal
 
 import (
+	"fmt"
 	"github.com/petar/GoLLRB/llrb"
-	"fmt"
 )
 
 //GetLocalTree returns a new MemTree.
 //for each goroutine. When the local tree is full, it will be sent off to be merged
 //and a new one made.
 type MemTree struct {
-	mr *MapReducer
-	tree *llrb.Tree
+	mr          *MapReducer
+	tree        *llrb.Tree
 	memKeyLimit int
 }
 
 //Emit adds the (key, value) pair to the MemTree.
 func (mt *MemTree) Emit(k Key, v Value) {
-	mt.add(&pair{k,v})
+	mt.add(&pair{k, v})
 }
 
 func (mt *MemTree) add(p *pair) {
 		p.Value = mt.mr.reduce(p.Key, vals)
 	}
 	mt.tree.ReplaceOrInsert(p)
-	
+
 	if mt.tree.Len() >= mt.memKeyLimit {
 		mt.mr.addTreeChan <- mt.tree
 		mt.tree = nil
 	}
 }
 
-
 func (mr *MapReducer) treeMaker(newTreeChan chan<- *llrb.Tree) {
 	for {
-		tree := llrb.New(func(a, b interface {}) bool{
-				return mr.compare(a.(*pair).Key, b.(*pair).Key)
+		tree := llrb.New(func(a, b interface{}) bool {
+			return mr.compare(a.(*pair).Key, b.(*pair).Key)
 		})
 		newTreeChan <- tree
 		//count up only after someone takes that tree
 func (mr *MapReducer) treeAdder(addTreeChan <-chan *llrb.Tree) {
 	var srcs []mergeSource
 	//totalItems := 0
-	
-	
-	
+
 	for tree := range addTreeChan {
 		mr.treesConsumed++
-		
+
 		dest := mr.newTempFileDest("")
 		dest.copy(memSrc{tree})
 		mr.toTempFileChan <- dest.name
-		
+
 		/*totalItems += tree.Len()
 		srcs = append(srcs, memSrc{tree})
-		
+
 		if len(srcs) > 0 {
 			dest := mr.newTempFileDest()
 			dest.copy(srcs[0])
 			totalItems = 0
 		}*/
 	}
-	
-	if (mr.treesConsumed != mr.treesMade) {
-		panic(fmt.Sprintf("Made %d trees, but only got %d back!", 
+
+	if mr.treesConsumed != mr.treesMade {
+		panic(fmt.Sprintf("Made %d trees, but only got %d back!",
 			mr.treesMade, mr.treesConsumed))
 	}
-	
+
 	mr.memSrcs = srcs
 	close(mr.toTempFileChan)
 	mr.doneChan <- true
 }
-
-
-
-
 	emit(*pair)
 }
 
-
-
 type memSrc struct {
 	tree *llrb.Tree
 }
+
 func (ms memSrc) get() (*pair, bool) {
 	if ms.tree.Len() == 0 {
 		return nil, false
 	return p, true
 }
 
-
-
 func (mr *MapReducer) eq(a, b Key) bool {
 	return !mr.compare(a, b) && !mr.compare(b, a)
 }
 
-
 //merge can merge any number of sources to any dest. finish is called at the end.
 func (mr *MapReducer) merge(srcs []mergeSource, dest mergeDest, finish func()) {
 	cur := make([]*pair, len(srcs))
-	
+
 	removeIndex := func(i int) {
 		copy(srcs[i:], srcs[i+1:])
 		srcs = srcs[:len(srcs)-1]
 		copy(cur[i:], cur[i+1:])
 		cur = cur[:len(cur)-1]
 	}
-	
+
 	//strategy: read from whichever source has the lowest key until that is no 
 	//longer the case. Combine any keys which are equal. Repeat. 
-	
+
 	processed := 0
-	
+
 	for len(srcs) > 1 {
 		for i := 0; i < len(srcs); i++ {
 			if cur[i] == nil {
 					i--
 					continue
 				}
-				if cur[i] == nil {panic("got a nil")}
+				if cur[i] == nil {
+					panic("got a nil")
+				}
 				processed++
 			}
 		}
 		if len(srcs) <= 1 {
 			break
 		}
-		
+
 		var minKey = 0
 		var secondMinKey = 1
-		
+
 		//find two lowest
 		for i := 1; i < len(cur); i++ {
 			if mr.compare(cur[i].Key, cur[minKey].Key) {
 			}
 		}
 		//mr.status("two lowest", cur, minKey, secondMinKey)
-		
+
 		//read from min until min is not less than second lowest
 		remove := false
 		for mr.compare(cur[minKey].Key, cur[secondMinKey].Key) {
 			dest.emit(cur[minKey])
 			cur[minKey] = nil
-			
+
 			var ok bool
 			cur[minKey], ok = srcs[minKey].get()
 			if !ok {
 			}
 			processed++
 		}
-		
+
 		//if read failed, remove the source
 		if remove {
 			removeIndex(minKey)
 			continue
 		}
-		
+
 		//combine any with the same key
 		if mr.eq(cur[minKey].Key, cur[secondMinKey].Key) {
 			//check for duplicates
 					cur[i] = nil
 				}
 			}
-			
+
 			dest.emit(&pair{checkKey, mr.reduce(checkKey, vals)})
 		}
 	}
-	
+
 	if len(cur) > 0 && cur[0] != nil {
 		dest.emit(cur[0])
 		cur[0] = nil
 	}
-	
+
 	if len(srcs) > 0 {
 		for {
 			p, ok := srcs[0].get()
 			processed++
 		}
 	}
-	
+
 	mr.status("\nMerge done, %d processed.\n", processed)
 	finish()
 }
-
 package mrlocal
 
 import (
+	"fmt"
 	"github.com/petar/GoLLRB/llrb"
 	"io"
-	"fmt"
-	"os"
 )
 
 //FastReadWriter should be implemented by both key and value types. Gobs seem to be too slow
 //for large data sets, so I'm making you write your own serialization code. FastRead must
 //return an EOF if it gets an EOF from the io.Reader.
 type FastReadWriter interface {
-	FastReadNew(io.Reader) (FastReadWriter, os.Error)
-	FastWrite(io.Writer) os.Error
+	FastReadNew(io.Reader) (FastReadWriter, error)
+	FastWrite(io.Writer) error
 }
 
 //Key is your key type
-type Key interface {FastReadWriter}
+type Key interface {
+	FastReadWriter
+}
 
 //Value is your data type
-type Value interface {FastReadWriter}
+type Value interface {
+	FastReadWriter
+}
 
 //pair is used internally
 type pair struct {
 
 //MapReducer represents a map/reduce operation.
 type MapReducer struct {
-	
+
 	//set to non-nil value if you want status reports (use os.StdOut to see them
 	//in the console).
 	StatusReports io.Writer
-	
+
 	keyExample Key
 	valExample Value
-	
-	compare CompareFunc
-	reduce ReduceFunc
-	scratchDirFmt string
-	memKeyLimit int
+
+	compare            CompareFunc
+	reduce             ReduceFunc
+	scratchDirFmt      string
+	memKeyLimit        int
 	nextTempFileNumber <-chan int
-	
-	addChan chan<- *pair
-	doneChan chan bool
+
+	addChan     chan<- *pair
+	doneChan    chan bool
 	donesNeeded int
-	
-	addTreeChan chan<- *llrb.Tree
-	newTreeChan <-chan *llrb.Tree
-	treesMade int
+
+	addTreeChan   chan<- *llrb.Tree
+	newTreeChan   <-chan *llrb.Tree
+	treesMade     int
 	treesConsumed int
-	
-	toTempFileChan chan<- string
+
+	toTempFileChan    chan<- string
 	finalTempFileList []string
-	
+
 	memSrcs []mergeSource
-	
+
 	tree *llrb.Tree
 }
 
 //
 //OutstandingKeys sets the size of the channel used to syncronize emits.
 //
-func New(KeyType Key, ValType Value, 
+func New(KeyType Key, ValType Value,
 	ScratchDirFmt string,
 	MemKeyLimit int,
 	OutstandingKeys int,
 	Compare CompareFunc,
 	Reduce ReduceFunc) *MapReducer {
-	
-	
+
 	addChan := make(chan *pair, OutstandingKeys)
 	addTreeChan := make(chan *llrb.Tree)
 	newTreeChan := make(chan *llrb.Tree)
 	toTempFileChan := make(chan string, 10)
 	tempFileNumber := make(chan int, 2)
-	
+
 	mr := &MapReducer{
-		keyExample: KeyType,
-		valExample: ValType,
-		compare: Compare,
-		reduce: Reduce,
-		scratchDirFmt: ScratchDirFmt,
-		addChan: addChan,
-		memKeyLimit: MemKeyLimit,
+		keyExample:         KeyType,
+		valExample:         ValType,
+		compare:            Compare,
+		reduce:             Reduce,
+		scratchDirFmt:      ScratchDirFmt,
+		addChan:            addChan,
+		memKeyLimit:        MemKeyLimit,
 		nextTempFileNumber: tempFileNumber,
-		doneChan: make(chan bool, 10),
-		donesNeeded: 3, //adder, tree adder, tempFileManager
-		
-		addTreeChan: addTreeChan,
-		newTreeChan: newTreeChan,
+		doneChan:           make(chan bool, 10),
+		donesNeeded:        3, //adder, tree adder, tempFileManager
+
+		addTreeChan:    addTreeChan,
+		newTreeChan:    newTreeChan,
 		toTempFileChan: toTempFileChan,
 	}
-	
+
 	go mr.adder(addChan)
 	go mr.treeMaker(newTreeChan)
 	go mr.treeAdder(addTreeChan)
 			i++
 		}
 	}()
-	
+
 	return mr
 }
 
 //Emit adds a new (Key, Value) pair to the collection. It is safe to call Emit
 //from multiple goroutines (in fact, this is the intended behavior).
 func (mr *MapReducer) Emit(k Key, v Value) {
-	mr.addChan <- &pair{k,v}
+	mr.addChan <- &pair{k, v}
 }
 
 //Finish informs mr that you will not be calling emit any more. mr will finish
 //writing any temp files before returning. Do not call Finish more than once.
 func (mr *MapReducer) Finish() {
 	close(mr.addChan)
-	
+
 	//wait for all outstanding writes and the adder
 	for i := 0; i < mr.donesNeeded; i++ {
 		<-mr.doneChan
 	Finalize func(Key, Value)
 	*tempFileDest
 }
+
 func (fd finalDestMerge) emit(p *pair) {
 	if fd.Finalize != nil {
 		fd.Finalize(p.Key, p.Value)
 	}
 }
 
-
 //FinalMerge merges all current temp files and the files given in otherSrcs to dest.
 //If Finalize is set, it's called for every key/value pair added. You must call Finish
 //first, if you have called Emit!
 	if len(srcs) == 0 {
 		panic("nothing to merge!")
 	}
-	
+
 	fdm := finalDestMerge{Finalize, nil}
 	if dest != "" {
 		fdm.tempFileDest = mr.newTempFileDest(dest)
 	if fdm.Finalize == nil && fdm.tempFileDest == nil {
 		panic("You're about to start a long operation for no reason...")
 	}
-	
+
 	mr.merge(mr.openTempFiles(srcs), fdm, func() {
 		if fdm.tempFileDest != nil {
 			fdm.tempFileDest.done()
 
 }
 
-
 type finalDest struct {
 	Finalize func(Key, Value)
 }
-func (fd finalDest) emit(p *pair) {fd.Finalize(p.Key, p.Value)}
+
+func (fd finalDest) emit(p *pair) { fd.Finalize(p.Key, p.Value) }
 
 //Iterate merges all temporary files and any keys currently in memory, calling
 //Finalize with each final (key, value) pair. Don't forget to call Finish() first.
 //Call either this function or FinalMerge, but not both. FinalMerge has more options.
 func (mr *MapReducer) Iterate(Finalize func(Key, Value)) {
 	srcs := mr.openTempFiles(mr.finalTempFileList)
-	
+
 	//add the tree currently in memory as a source
 	if mr.memSrcs != nil {
 		srcs = append(srcs, mr.memSrcs...)
 	}
-	
-	mr.merge(srcs, finalDest{Finalize}, func(){})
+
+	mr.merge(srcs, finalDest{Finalize}, func() {})
 }
 
 //DeleteTempFiles does what you would expect. Call after you call Iterate, of
 //course.
 func (mr *MapReducer) DeleteTempFiles() {
-	
+
 }
-
-
 
 import (
 	"bufio"
-	"os"
+	"compress/flate"
 	"fmt"
 	"io"
-	"compress/flate"
+	"os"
 )
 
 const temp_queue_size = 400
 	//c *flate.Writer//
 	c io.WriteCloser
 	//c io.Writer
-	name string
-	cmd chan *pair
+	name     string
+	cmd      chan *pair
 	doneChan chan bool
 }
 
 
 func (tfd *tempFileDest) worker() {
 	for p := range tfd.cmd {
-		if err := p.Key.FastWrite(tfd.c); err != nil {panic(err)}
-		if err := p.Value.FastWrite(tfd.c); err != nil {panic(err)}
+		if err := p.Key.FastWrite(tfd.c); err != nil {
+			panic(err)
+		}
+		if err := p.Value.FastWrite(tfd.c); err != nil {
+			panic(err)
+		}
 		//tfd.c.Flush()
 	}
 	tfd.c.Close()
 		name = mr.name(<-mr.nextTempFileNumber)
 	}
 	f, err := os.Create(name)
-	if err != nil {panic(err)}
+	if err != nil {
+		panic(err)
+	}
 	w := bufio.NewWriter(f)
-	c := flate.NewWriter(w, 1)
+	c, err := flate.NewWriter(w, 0)
+	if err != nil {
+		panic(err)
+	}
 	//c := lzw.NewWriter(w, lzw.LSB, 8)
 	//c := w
 	tfd := &tempFileDest{f, w, c, name, make(chan *pair, temp_queue_size), make(chan bool)}
 	return tfd
 }
 
-
-
-
-
 type tempFileReader struct {
 	f *os.File
 	//r *flate.Reader
-	r io.Reader
+	r        io.Reader
 	keyMaker Key
 	valMaker Value
-	pairs chan *pair
+	pairs    chan *pair
 }
+
 func (tfr tempFileReader) get() (*pair, bool) {
-	p, ok := <- tfr.pairs
+	p, ok := <-tfr.pairs
 	return p, ok
 }
 func (tfr tempFileReader) worker() {
 	for {
 		key, err := tfr.keyMaker.FastReadNew(tfr.r)
-		if err == os.EOF {break}
-		if err != nil {panic(err)}
-		val, err := tfr.valMaker.FastReadNew(tfr.r); if err != nil {panic(err)}
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			panic(err)
+		}
+		val, err := tfr.valMaker.FastReadNew(tfr.r)
+		if err != nil {
+			panic(err)
+		}
 		tfr.pairs <- &pair{key, val}
 	}
 	//tfr.r.Close()
 	for n, name := range names {
 		mr.status("Opening for merge: %s\n", name)
 		f, err := os.Open(name)
-		if err != nil {panic(err)}
+		if err != nil {
+			panic(err)
+		}
 		tfr := tempFileReader{
 			f,
 			//lzw.NewReader(bufio.NewReader(f), lzw.LSB, 8),
 	return files
 }
 
-
-
-
-
 type tempFileInfo struct {
-	name string
+	name  string
 	level int
 }
 
 func (mr *MapReducer) tempFileManager(incoming <-chan string) {
-	
+
 	completedFiles := make(map[int][]string)
-	
+
 	outstanding := 0
 	completedTempFiles := make(chan *tempFileInfo, 10)
 	totalTempFiles := 0
-	
+
 	done := false
 	for !done {
 		if len(completedFiles[0]) < 30 && totalTempFiles < 500 {
 			select {
-				case src, ok := <-incoming:
-					if !ok {
-						done = true
-						break
-					}
-					completedFiles[0] = append(completedFiles[0], src)
-					totalTempFiles++
-				case tfi := <-completedTempFiles: 
-					completedFiles[tfi.level] = append(completedFiles[tfi.level], tfi.name)
-					totalTempFiles++
-					outstanding--
+			case src, ok := <-incoming:
+				if !ok {
+					done = true
+					break
+				}
+				completedFiles[0] = append(completedFiles[0], src)
+				totalTempFiles++
+			case tfi := <-completedTempFiles:
+				completedFiles[tfi.level] = append(completedFiles[tfi.level], tfi.name)
+				totalTempFiles++
+				outstanding--
 			}
 		} else {
-			tfi := <-completedTempFiles 
+			tfi := <-completedTempFiles
 			completedFiles[tfi.level] = append(completedFiles[tfi.level], tfi.name)
 			totalTempFiles++
 			outstanding--
 		}
-		
+
 		if outstanding < 5 {
 			for i := range completedFiles {
 				lvl := i
-				if len(completedFiles[lvl]) >= 10 - i {
+				if len(completedFiles[lvl]) >= 10-i {
 					amt := len(completedFiles[lvl])
-					if amt > 10 - i {
+					if amt > 10-i {
 						amt = 10 - i
 					}
 					myfiles := completedFiles[lvl][:amt]
 					completedFiles[lvl] = completedFiles[lvl][amt:]
 					totalTempFiles -= amt
-					
+
 					//merge to next level up
 					mr.status("\nStarting a level %d merge...\n", lvl+1)
 					dest := mr.newTempFileDest("")
 					go mr.merge(mr.openTempFiles(myfiles), dest, func() {
-							dest.done()
-							completedTempFiles <- &tempFileInfo{dest.name, lvl+1}
-							for _, name := range myfiles {
-								os.Remove(name)
-							}
-							mr.status("\nFinished a level %d merge...\n", lvl+1)
+						dest.done()
+						completedTempFiles <- &tempFileInfo{dest.name, lvl + 1}
+						for _, name := range myfiles {
+							os.Remove(name)
+						}
+						mr.status("\nFinished a level %d merge...\n", lvl+1)
 					})
 					outstanding++
 					i--
 			}
 		}
 	}
-	
-	
+
 	names := []string{}
 	for _, list := range completedFiles {
 		names = append(names, list...)
 	}
-	
+
 	for outstanding > 0 {
 		mr.status("\nWaiting for %d intermediate merges...\n", outstanding)
 		tfi := <-completedTempFiles
 		names = append(names, tfi.name)
 		outstanding--
 	}
-	
+
 	mr.finalTempFileList = names
 	mr.doneChan <- true
 }
-