Commits

Ioan Eugen Stan committed fc22f66

indexing works with sequence file

  • Participants
  • Parent commits 5ea69a9
  • Tags index-works-sq

Comments (0)

Files changed (8)

File src/main/java/ro/ieugen/wikiindexer/IndexingApp.java

 import org.slf4j.LoggerFactory;
 import ro.ieugen.wikiindexer.model.WikiPage;
 import ro.ieugen.wikiindexer.parse.StaxWikiDumpParser;
-import ro.ieugen.wikiindexer.processing.IndexWriter;
+import ro.ieugen.wikiindexer.processing.Persistance;
 import ro.ieugen.wikiindexer.processing.ProcessingPipeline;
 
 /**
         Preconditions.checkNotNull(inputPath);
         Preconditions.checkNotNull(outPath);
         LOG.info("Indexing from {} and outputing index to {}", inputPath, outPath);
-        final IndexWriter iWriter = new IndexWriter(outPath);
-        final ProcessingPipeline pipeline = new ProcessingPipeline(Executors.newFixedThreadPool(10), iWriter);
+        final Persistance iWriter = new Persistance(outPath);
+        final ProcessingPipeline pipeline = new ProcessingPipeline(iWriter);
         InputStream is = null;
         try {
             is = createInputStream(inputPath);

File src/main/java/ro/ieugen/wikiindexer/model/InMemoryIndexSegment.java

  ****************************************************************/
 package ro.ieugen.wikiindexer.model;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.TreeMultimap;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import ro.ieugen.wikiindexer.Utils;
 public class InMemoryIndexSegment {
 
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryIndexSegment.class);
-    private static final long DEFAULT_FLUSH_SIZE_LIMIT = 16 * 1024 * 1024;
+    private static final long DEFAULT_FLUSH_SIZE_LIMIT = 100 * 1024 * 1024;
     private long flushLimit = DEFAULT_FLUSH_SIZE_LIMIT;
     private final ArrayList<String> titles = Lists.newArrayList();
-    private final Multimap<String, Integer> postings = TreeMultimap.create();
+    private final SortedMap<String, TreeSet<Integer>> postings = new TreeMap<String, TreeSet<Integer>>();
     private long indexSizeEstimate = 0;
-    private AtomicInteger fragmentCount = new AtomicInteger(0);
+    private int fragmentCount = 0;
 
     public InMemoryIndexSegment() {
         this(DEFAULT_FLUSH_SIZE_LIMIT);
     }
 
     public void addFragment(IndexFragment pageIndex) {
-        LOG.info("Adding fragment {} - size estimate {}", fragmentCount.incrementAndGet(), sizeEstimate());
-//        LOG.info("Fragment \n {} \n {}", pageIndex.getTitle(), pageIndex.getTerms());
         titles.add(pageIndex.getTitle());
         indexSizeEstimate += Utils.sizeOf(pageIndex.getTitle());
-        int docIndex = titles.size();
-        LOG.info("Fragment info: {} - {}", titles.get(docIndex), docIndex);
+        int docIndex = titles.size() - 1;
         for (String term : pageIndex.getTerms()) {
-            LOG.info("Adding {} to {}", term, docIndex);
-            postings.put(term, docIndex);
+            if (postings.containsKey(term)) {
+                postings.get(term).add(docIndex);
+            } else {
+                TreeSet<Integer> newset = new TreeSet<Integer>();
+                newset.add(docIndex);
+                postings.put(term, newset);
+            }
             indexSizeEstimate += Utils.sizeOf(pageIndex.getTitle());
         }
+        fragmentCount++;
     }
 
-    public ImmutableList<String> titles() {
-        return ImmutableList.copyOf(titles);
+    public List<String> titles() {
+        return titles;
     }
 
-    public ImmutableMultimap<String, Integer> postings() {
-        return ImmutableMultimap.copyOf(postings);
+    public SortedMap<String, TreeSet<Integer>> postings() {
+        return postings;
     }
 
     public boolean shouldFlush() {
     }
 
     public long fragmentCount() {
-        return fragmentCount.get();
+        return fragmentCount;
     }
 }

File src/main/java/ro/ieugen/wikiindexer/model/IndexFragment.java

  ****************************************************************/
 package ro.ieugen.wikiindexer.model;
 
-import com.google.common.collect.ImmutableSet;
+import java.util.Set;
 
 /**
  * Index portion for a single page. No fields.
 public class IndexFragment {
 
     private final String title;
-    private final ImmutableSet<String> terms;
+    private final Set<String> terms;
 
-    public IndexFragment(String title,
-                         ImmutableSet<String> terms) {
+    public IndexFragment(String title, Set<String> terms) {
         this.title = title;
         this.terms = terms;
     }
 
-    public ImmutableSet<String> getTerms() {
+    public Set<String> getTerms() {
         return terms;
     }
 

File src/main/java/ro/ieugen/wikiindexer/processing/DocIndexer.java

-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package ro.ieugen.wikiindexer.processing;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Set;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.TokenStream;
-import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import ro.ieugen.wikiindexer.model.IndexFragment;
-import ro.ieugen.wikiindexer.model.WikiPage;
-
-/**
- * Indexes a single wiki page.
- * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
- */
-public class DocIndexer implements Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DocIndexer.class);
-    private final static String DEFAULT_FIELD = "content";
-    private final WikiPage pageToIndex;
-    private final Analyzer analyzer;
-    private final ProcessingPipeline context;
-
-    public DocIndexer(ProcessingPipeline pipeline, WikiPage pageToIndex, Analyzer analyzer) {
-        this.context = pipeline;
-        this.pageToIndex = pageToIndex;
-        this.analyzer = analyzer;
-    }
-
-    @Override
-    public void run() {
-        try {
-            Set<String> terms = Sets.newHashSet();
-            // tokenize the input stream
-            TokenStream tokenStream = analyzer.tokenStream(DEFAULT_FIELD, new StringReader(pageToIndex.getContent()));
-            CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
-            try {
-                while (tokenStream.incrementToken()) {
-                    terms.add(charTermAttribute.toString());
-                }
-            } catch (IOException ex) {
-                LOG.info("Exception tokenizing content {}", pageToIndex, ex);
-            }
-            // add the contributor as a normal term, we don't use fields
-            terms.add(pageToIndex.getContribuitor());
-
-            IndexFragment p = new IndexFragment(pageToIndex.getTitle(), ImmutableSet.copyOf(terms));
-            // add the index part to the que for processing
-            context.offer(p);
-        } catch (Exception e) {
-            LOG.info("Exception indexing page {}", pageToIndex);
-        }
-    }
-}

File src/main/java/ro/ieugen/wikiindexer/processing/IndexWriter.java

-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ***************************************************************/
-package ro.ieugen.wikiindexer.processing;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Multimap;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import ro.ieugen.wikiindexer.model.InMemoryIndexSegment;
-
-/**
- * Persists a {@link IndexModel} to disk.
- * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
- */
-public class IndexWriter {
-
-    private static final Logger LOG = LoggerFactory.getLogger(IndexWriter.class);
-    private static final String TITLES_SUFFIX = ".dict";
-    private static final String POSTINGS_SUFFIX = ".pst";
-    private final String outputDirectory;
-    private final AtomicInteger segmentsCount = new AtomicInteger(0);
-    private final Configuration conf = new Configuration();
-    private final FileSystem fs;
-
-    public IndexWriter(String outputDirectory) throws IOException {
-        Preconditions.checkNotNull(outputDirectory);
-        LOG.info("Creading new instance. Saving data to {}", outputDirectory);
-        this.outputDirectory = outputDirectory;
-        this.fs = FileSystem.get(URI.create(outputDirectory), conf);
-    }
-
-    public void persist(InMemoryIndexSegment segment) throws IOException {
-        int segmentId = segmentsCount.incrementAndGet();
-        persistTitles(segment, segmentId);
-        persistPostings(segment, segmentId);
-    }
-
-    private void persistTitles(InMemoryIndexSegment segment, int segmentId) {
-        LOG.info("Persisting titles for segment {} with size {}", currentSegmentCount(), segment.fragmentCount());
-        SequenceFile.Writer writer = null;
-        try {
-            ImmutableList<String> titles = segment.titles();
-            LongWritable key = new LongWritable();
-            Text value = new Text();
-            writer = SequenceFile.createWriter(fs, conf,
-                    new Path(outputDirectory, segmentId + TITLES_SUFFIX), key.getClass(), value.getClass(),
-                    SequenceFile.CompressionType.NONE);
-            for (int i = 0; i < titles.size(); i++) {
-//                LOG.info("Peristing key {} - {}.", i, titles.get(i));
-                key.set(i);
-                value.set(titles.get(i));
-                writer.append(key, value);
-            }
-        } catch (IOException ioe) {
-            LOG.info("Exception persisting titles!", ioe);
-        } finally {
-            LOG.info("Closing file");
-            IOUtils.closeStream(writer);
-        }
-    }
-
-    private void persistPostings(InMemoryIndexSegment segment, int segmentId) throws IOException {
-        LOG.info("Persisting postings for segment {}", currentSegmentCount());
-        SequenceFile.Writer writer = null;
-        try {
-            Multimap<String, Integer> postingsList = segment.postings();
-            Text key = new Text();
-            ArrayWritable value = new ArrayWritable(IntWritable.class);
-            writer = SequenceFile.createWriter(fs, conf,
-                    new Path(outputDirectory, segmentId + POSTINGS_SUFFIX), key.getClass(), value.getClass(),
-                    SequenceFile.CompressionType.NONE);
-            for (String theTerm : postingsList.keySet()) {
-                key.set(theTerm);
-                Collection<Integer> theIndex = postingsList.get(theTerm);
-                LOG.info("Peristing key {} - {}.", theTerm, theIndex.size());
-                IntWritable[] array = new IntWritable[theIndex.size()];
-                int i = 0;
-                for (Integer docId : theIndex) {
-                    array[i] = new IntWritable(docId);
-                }
-                value.set(array);
-                writer.append(key, value);
-            }
-        } catch (IOException ioe) {
-            LOG.info("Exception persisting postings!", ioe);
-        } finally {
-            LOG.info("Closing file");
-            IOUtils.closeStream(writer);
-        }
-    }
-
-    public int currentSegmentCount() {
-        return segmentsCount.get();
-    }
-}

File src/main/java/ro/ieugen/wikiindexer/processing/Persistance.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+package ro.ieugen.wikiindexer.processing;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ro.ieugen.wikiindexer.model.InMemoryIndexSegment;
+
+/**
+ * Persists a {@link IndexModel} to disk.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public class Persistance {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Persistance.class);
+    private static final String TITLES_SUFFIX = ".dict";
+    private static final String POSTINGS_SUFFIX = ".pst";
+    private final String outputDirectory;
+    private final AtomicInteger segmentsCount = new AtomicInteger(0);
+    private final Configuration conf = new Configuration();
+    private final FileSystem fs;
+
+    public Persistance(String outputDirectory) throws IOException {
+        Preconditions.checkNotNull(outputDirectory);
+        LOG.info("Creading new instance. Saving data to {}", outputDirectory);
+        this.outputDirectory = outputDirectory;
+        this.fs = FileSystem.get(URI.create(outputDirectory), conf);
+    }
+
+    public void persist(InMemoryIndexSegment segment) throws IOException {
+        if (segment.fragmentCount() != 0) {
+            int segmentId = segmentsCount.incrementAndGet();
+            persistTitles(segment, segmentId);
+            persistPostings(segment, segmentId);
+        } else {
+            LOG.info("Skipping empty segment.");
+        }
+    }
+
+    private void persistTitles(InMemoryIndexSegment segment, int segmentId) {
+        LOG.info("Persisting titles for segment {} with size {}", currentSegmentCount(),
+                segment.fragmentCount());
+        SequenceFile.Writer writer = null;
+        try {
+            List<String> titles = segment.titles();
+            LongWritable key = new LongWritable();
+            Text value = new Text();
+            writer = SequenceFile.createWriter(fs, conf,
+                    new Path(outputDirectory, segmentId + TITLES_SUFFIX), key.getClass(), value.getClass(),
+                    SequenceFile.CompressionType.NONE);
+            for (int i = 0; i < titles.size(); i++) {
+                key.set(i);
+                value.set(titles.get(i));
+                writer.append(key, value);
+            }
+        } catch (IOException ioe) {
+            LOG.info("Exception persisting titles!", ioe);
+        } finally {
+            LOG.info("Closing file");
+            IOUtils.closeStream(writer);
+        }
+    }
+
+    private void persistPostings(InMemoryIndexSegment segment, int segmentId) throws IOException {
+        LOG.info("Persisting postings for segment {}", currentSegmentCount());
+        SequenceFile.Writer writer = null;
+        try {
+            SortedMap<String, TreeSet<Integer>> postingsList = segment.postings();
+            Text key = new Text();
+//            IntWritable value = new IntWritable();
+            ArrayWritable value = new ArrayWritable(IntWritable.class);
+            writer = SequenceFile.createWriter(fs, conf,
+                    new Path(outputDirectory, segmentId + POSTINGS_SUFFIX), key.getClass(), value.getClass(),
+                    SequenceFile.CompressionType.BLOCK);
+            for (String theTerm : postingsList.keySet()) {
+                key.set(theTerm);
+                TreeSet<Integer> theIndex = postingsList.get(theTerm);
+//                for (Integer docId : theIndex) {
+//                    value.set(docId);
+//                    writer.append(key, value);
+//                }
+                IntWritable[] ints = new IntWritable[theIndex.size()];
+                int pos = 0;
+                for (Integer docId : theIndex) {
+                    ints[pos] = new IntWritable(docId);
+                    pos++;
+                }
+                value.set(ints);
+                writer.append(key, value);
+            }
+        } catch (IOException ioe) {
+            LOG.info("Exception persisting postings!", ioe);
+        } finally {
+            LOG.info("Closing file");
+            IOUtils.closeStream(writer);
+        }
+    }
+
+    public int currentSegmentCount() {
+        return segmentsCount.get();
+    }
+}

File src/main/java/ro/ieugen/wikiindexer/processing/ProcessingPipeline.java

 import java.io.IOException;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.util.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import ro.ieugen.wikiindexer.model.InMemoryIndexSegment;
-import ro.ieugen.wikiindexer.model.IndexFragment;
 import ro.ieugen.wikiindexer.model.WikiPage;
 
 /**
 public class ProcessingPipeline {
 
     private static final Logger LOG = LoggerFactory.getLogger(ProcessingPipeline.class);
-    public static final int DEFAULT_INDEX_WRITER_THREAD_COUNT = 4;
-    private int indexWriterCount = DEFAULT_INDEX_WRITER_THREAD_COUNT;
-    private final ExecutorService indexProducers;
-    private final ExecutorService indexConsumer;
+    public static final int INDEX_WRITERS_COUNT = 4;
+    private final ExecutorService indexWriters = Executors.newFixedThreadPool(INDEX_WRITERS_COUNT);
     /** Use a {@link ArrayBlockingQueue} to control the indexing throughput.
      */
-    private final BlockingQueue<IndexFragment> indexedDocuments = Queues.newLinkedBlockingQueue();
+    private final BlockingQueue<WikiPage> documents = Queues.newArrayBlockingQueue(100);
     /** Limit the number of permits for building an index at a time. We order the disk writes so one
      * index segment is built and committed to disk.
      */
     private final Semaphore segmentBuilderPermit = new Semaphore(1, true);
-    private final Future[] segmentBuilderStatus;
     private final Analyzer analyzer;
-    private final IndexWriter indexWriter;
+    private final Persistance indexWriter;
+    private Future[] writerStatus = new Future[INDEX_WRITERS_COUNT];
     /** Signal that the job is done and all threads should close and flush all data to disk.
      */
     private final AtomicBoolean pipelineShutdownFlag = new AtomicBoolean(false);
-    private final AtomicBoolean stage1Done = new AtomicBoolean(false);
-    private final AtomicBoolean stage2Done = new AtomicBoolean(false);
-    private final AtomicInteger submitsCount = new AtomicInteger(0);
+    private final AtomicBoolean flushFlag = new AtomicBoolean(false);
 
-    public ProcessingPipeline(ExecutorService indexProducers, IndexWriter indexWriter) {
-        this(indexProducers, DEFAULT_INDEX_WRITER_THREAD_COUNT, indexWriter, new StandardAnalyzer(Version.LUCENE_35));
+    public ProcessingPipeline(Persistance indexWriter) {
+        this(indexWriter, new StandardAnalyzer(Version.LUCENE_35));
     }
 
     /**
      * @param indexWriter
      * @param analyzer
      */
-    public ProcessingPipeline(ExecutorService producers, int indexWriters, IndexWriter indexWriter, Analyzer analyzer) {
+    public ProcessingPipeline(Persistance indexWriter, Analyzer analyzer) {
         LOG.info("Creating new processing pipeline.");
-        Preconditions.checkNotNull(producers);
         Preconditions.checkNotNull(analyzer);
         Preconditions.checkNotNull(indexWriter);
-        Preconditions.checkArgument(indexWriters > 0);
-        this.indexProducers = producers;
         this.indexWriter = indexWriter;
         this.analyzer = analyzer;
-        // should we cap this to a max to prevent abuse?
-        this.indexWriterCount = indexWriters;
-        this.indexConsumer = Executors.newFixedThreadPool(indexWriterCount);
-        this.segmentBuilderStatus = new Future[indexWriterCount];
-        refreshSegmentBuilders();
     }
 
     public void process(WikiPage page) {
         Preconditions.checkState(isPipelineShuttingDown() == false);
         // we assume that threads won't fail and will offer() the indexed documents to the queue.
         // we should check the future status to be sure.
-        incrementProcessingSubmits();
-        indexProducers.submit(new DocIndexer(this, page, analyzer));
-    }
-
-    public boolean offer(IndexFragment pageIndex) {
-        LOG.info("Queueing fragment {}", processingSubmitsCount());
-        // stage 1 complete == ahutdown started, all documents have been indexed
-        Preconditions.checkState(stage1Complete() == false);
-        boolean added;
+        boolean result;
         do {
-            added = indexedDocuments.offer(pageIndex);
-        } while (!added);
-        decrementProcessingSubmits();
-        return added;
-    }
-
-    public IndexFragment take() throws InterruptedException {
-        LOG.info("De-queueing fragment");
-        return indexedDocuments.take();
-    }
-
-    private int incrementProcessingSubmits() {
-        return submitsCount.incrementAndGet();
-    }
-
-    private int decrementProcessingSubmits() {
-        // this will fail if indexing threads die prematurely
-        int newVal = submitsCount.decrementAndGet();
-        Preconditions.checkArgument(newVal >= 0);
-        return newVal;
-    }
-
-    public boolean hasProcessingSubmits() {
-        return submitsCount.get() > 0;
-    }
-
-    public int processingSubmitsCount() {
-        return submitsCount.get();
+            result = documents.offer(page);
+        } while (!result);
+        refreshSegmentBuilders();
     }
 
     /**
-     * Call this from the segemnt builder threads to get a permit for building and writing the segment to disk.
+     * Iterates over segment builder threads statuses and recreates them.
+     */
+    public final void refreshSegmentBuilders() {
+        for (int i = 0; i < writerStatus.length; i++) {
+            if (writerStatus[i] == null) {
+                writerStatus[i] = indexWriters.submit(new SegmentBuilder(this, analyzer));
+            } else {
+                if (writerStatus[i].isDone()) {
+                    LOG.info("Creating new segment builder!");
+                    writerStatus[i] = indexWriters.submit(new SegmentBuilder(this, analyzer));
+                }
+            }
+        }
+    }
+
+    /**
+     * Call this from the segemnt builder threads to get a permit for building and
+     * writing the segment to disk.
      */
     public void acquireSegmentBuilderPermit() throws InterruptedException {
         segmentBuilderPermit.acquire();
-        refreshSegmentBuilders();
     }
 
     /**
     }
 
     /**
-     * Iterates over segment builder threads statuses and recreates them if processing is not finished.
-     */
-    public final void refreshSegmentBuilders() {
-        for (int i = 0; i < segmentBuilderStatus.length; i++) {
-            if (segmentBuilderStatus[i] == null) {
-                segmentBuilderStatus[i] = indexConsumer.submit(new SegmentBuilder(this));
-            } else {
-                // recreate the threads only if we have not
-                if (!isPipelineShuttingDown()) {
-                    // recreate the threads if they finished
-                    if (segmentBuilderStatus[i].isDone()) {
-                        LOG.info("Creating new segment builder!");
-                        segmentBuilderStatus[i] = indexConsumer.submit(new SegmentBuilder(this));
-                    }
-                } else {
-                    LOG.info("Done with proceesing. Cancelling/Interrupting threads.");
-                    // we are shutting down, release all permits
-                    segmentBuilderPermit.release();
-                }
-            }
-        }
-    }
-
-    /**
      * Check if weare still accepting submits.
      * @return
      */
     }
 
     /**
-     * Check if all processing threads have finished.
-     * @return
+     * Returns the queue length. When the pipeline is shutting down, we can see how many
+     * documents remain to be processed.
+     * @return the number of documents on the queue.
      */
-    public boolean stage1Complete() {
-        return stage1Done.get();
+    public int pipelineQueueLength() {
+        return documents.size();
     }
 
     /**
-     * Check if all segments have been written to disk.
+     * Returns true if we have more documents to index.
      * @return
      */
-    public boolean stage2Complete() {
-        return stage2Done.get();
+    public boolean hasMore() {
+        if (isPipelineShuttingDown()) {
+            return pipelineQueueLength() > 0;
+        } else {
+            return true;
+        }
+    }
+
+    public void flush() {
+        flushFlag.set(true);
+    }
+
+    public WikiPage nextDocument() throws InterruptedException {
+        return documents.take();
     }
 
     /**
      * @param segment
      */
     public void persist(InMemoryIndexSegment segment) {
-        LOG.info("Persisting segment {}", indexWriter.currentSegmentCount());
         try {
             indexWriter.persist(segment);
         } catch (IOException e) {
      */
     public void shutdown() {
         pipelineShutdownFlag.set(true);
-        awaitProducersToFinish();
-        stage1Done.set(true);
         shutdownProducers();
-        shutdownConsumers();
-    }
-
-    /**
-     * Checkes wheather all documents submited to the processing threads (producers) have been submited to the queue.
-     * Waits until this is true. Possible deadlock if the threads die before they offer().
-     * We assume this doesn't happen for now.
-     */
-    private void awaitProducersToFinish() {
-        while (hasProcessingSubmits()) {
-            LOG.info("Waiting for remaining {} documents to be processed.", processingSubmitsCount());
-            try {
-                TimeUnit.MILLISECONDS.sleep(500);
-            } catch (InterruptedException e) {
-                LOG.info("Interrupted while waiting for indexing to finish");
-            }
-        }
-    }
-
-    private void shutdownConsumers() {
-        // signal all threads to stop
-        indexConsumer.shutdown();
-        try {
-            LOG.info("Waiting for persisting threads to finish!");
-            if (!indexConsumer.awaitTermination(20, TimeUnit.SECONDS)) {
-                LOG.info("Not all persisting threads finished!");
-            }
-        } catch (InterruptedException e) {
-            LOG.info("Interupted while shutting down indexers");
-        }
     }
 
     /**
      * Shuts down the processing threads.
      */
     private void shutdownProducers() {
-        indexProducers.shutdown();
+        indexWriters.shutdown();
         try {
             LOG.info("Waiting for indexing threads to finish!");
-            if (!indexProducers.awaitTermination(100, TimeUnit.SECONDS)) {
+            if (!indexWriters.awaitTermination(10, TimeUnit.SECONDS)) {
                 LOG.info("Not all indexing threads finished!");
             }
         } catch (InterruptedException e) {

File src/main/java/ro/ieugen/wikiindexer/processing/SegmentBuilder.java

 package ro.ieugen.wikiindexer.processing;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Set;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import ro.ieugen.wikiindexer.model.InMemoryIndexSegment;
+import ro.ieugen.wikiindexer.model.IndexFragment;
+import ro.ieugen.wikiindexer.model.WikiPage;
 
 /**
  * Builds one index segment in memory.
 public class SegmentBuilder implements Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(SegmentBuilder.class);
-    private final InMemoryIndexSegment segment;
+    private final static String DEFAULT_FIELD = "content";
+    private final InMemoryIndexSegment segment = new InMemoryIndexSegment();
     private final ProcessingPipeline context;
+    private final Analyzer analyzer;
 
-    public SegmentBuilder(ProcessingPipeline context) {
-        this(context, new InMemoryIndexSegment());
-    }
-
-    public SegmentBuilder(ProcessingPipeline context, InMemoryIndexSegment segment) {
+    public SegmentBuilder(ProcessingPipeline context, Analyzer analyzer) {
         LOG.info("Creating new instance.");
         Preconditions.checkNotNull(context);
-        Preconditions.checkNotNull(segment);
-        this.segment = segment;
         this.context = context;
+        this.analyzer = analyzer;
     }
 
     @Override
     public void run() {
         LOG.info("Started new memory index builder");
         try {
+            // one builder at a time
             context.acquireSegmentBuilderPermit();
         } catch (InterruptedException e) {
             LOG.info("Interrupted while waiting for permit.");
             return;
         }
-        LOG.info("Got write permit!");
-        if (!context.isPipelineShuttingDown()) {
-            while (!shouldFlush()) {
-                try {
-                    // create the in memory index and flush it when it's full or when we don't have any data left.
-                    segment.addFragment(context.take());
-                } catch (InterruptedException ex) {
-                    LOG.info("Interrupted while adding docs to the index. Flushing index to disk.");
-                    break;
-                }
+        LOG.info("Got index building permit!");
+        try {
+            while (context.hasMore() && !segment.shouldFlush()) {
+                // get our unit of work and index it
+                WikiPage p = context.nextDocument();
+                IndexFragment f = index(p);
+                segment.addFragment(f);
             }
+        } catch (InterruptedException ie) {
+            LOG.info("Interrupted while waiting for document!");
+        } finally {
+            // let the next thread start building the index while we persist this one
             context.releaseSegmentBuilderPermit();
+            // dumping context to disk
             context.persist(segment);
-        } else {
-            LOG.info("Processing is done, quick- exiting!");
         }
     }
 
-    /**
-     * Flush if segment is full or server has done processing and we have no more segments.
-     * @return
-     */
-    public boolean shouldFlush() {
-        return segment.shouldFlush();
+    public IndexFragment index(WikiPage pageToIndex) {
+        Set<String> terms = Sets.newHashSet();
+        // tokenize the input stream
+        TokenStream tokenStream = analyzer.tokenStream(DEFAULT_FIELD,
+                new StringReader(pageToIndex.getContent()));
+        CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
+        try {
+            while (tokenStream.incrementToken()) {
+                terms.add(charTermAttribute.toString());
+            }
+        } catch (IOException ex) {
+            LOG.info("Exception tokenizing content {}", pageToIndex, ex);
+        }
+        // add the contributor as a normal term, we don't use fields
+        terms.add(pageToIndex.getContribuitor());
+        return new IndexFragment(pageToIndex.getTitle(), terms);
     }
 }