Commits

Ioan Eugen Stan committed c1e960e

indes=xing don but no persistance!!!

Comments (0)

Files changed (9)

src/main/java/ro/ieugen/wikiindexer/IndexingApp.java

         Preconditions.checkNotNull(outPath);
         LOG.info("Indexing from {} and outputing index to {}", inputPath, outPath);
         final IndexWriter iWriter = new IndexWriter(outPath);
-        final ProcessingPipeline pipeline = new ProcessingPipeline(Executors.newFixedThreadPool(100), iWriter);
+        final ProcessingPipeline pipeline = new ProcessingPipeline(Executors.newFixedThreadPool(30), iWriter);
         InputStream is = null;
         try {
             is = createInputStream(inputPath);
             int count = 0;
             int countRedirects = 0;
             for (WikiPage p : StaxWikiDumpParser.buildAndInitiateParse(is)) {
-                // do something with the page
+                // do something with the page, skip if redirect
                 if (p.isRedirect()) {
                     countRedirects++;
                 } else {
             }
             LOG.info("Ended document parsing at {} after {} s", new Date(),
                     (System.currentTimeMillis() - start) / 1000);
+            LOG.info("We have indexed {} pages and skipped {} redirects", count, countRedirects);
         } catch (IOException ioe) {
             LOG.error("Exception opening input file {}", inputPath, ioe);
         } catch (XMLStreamException xmle) {

src/main/java/ro/ieugen/wikiindexer/model/InMemoryIndexSegment.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package ro.ieugen.wikiindexer.model;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.TreeMultimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ro.ieugen.wikiindexer.Utils;
+
+/**
+ * This index model is used to prepare data for persistance.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public class InMemoryIndexSegment {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryIndexSegment.class);
+    private static final long DEFAULT_FLUSH_SIZE_LIMIT = 16 * 1024 * 1024;
+    private long flushLimit = DEFAULT_FLUSH_SIZE_LIMIT;
+    private final ArrayList<String> titles = Lists.newArrayList();
+    private final Multimap<String, Integer> postings = TreeMultimap.create();
+    private long indexSizeEstimate = 0;
+
+    public InMemoryIndexSegment() {
+        this(DEFAULT_FLUSH_SIZE_LIMIT);
+    }
+
+    public InMemoryIndexSegment(long requestedFlushLimit) {
+        this.flushLimit = requestedFlushLimit;
+    }
+
+    public void addFragment(IndexFragment pageIndex) {
+        titles.add(pageIndex.getTitle());
+        indexSizeEstimate += Utils.sizeOf(pageIndex.getTitle());
+        int docIndex = titles.size();
+        for (String term : pageIndex.getTerms()) {
+            postings.put(term, docIndex);
+            indexSizeEstimate += Utils.sizeOf(pageIndex.getTitle());
+        }
+    }
+
+    public ArrayList<String> titles() {
+        return (ArrayList<String>) Collections.unmodifiableList(titles);
+    }
+
+    public Multimap<String, Integer> postings() {
+        return Multimaps.unmodifiableMultimap(postings);
+    }
+
+    public boolean shouldFlush() {
+        return indexSizeEstimate < flushLimit;
+    }
+}

src/main/java/ro/ieugen/wikiindexer/model/IndexFragment.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package ro.ieugen.wikiindexer.model;
+
+import java.util.Set;
+
+/**
+ * Index portion for a single page. No fields.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public class IndexFragment {
+
+    private final String title;
+    private final Set<String> terms;
+
+    public IndexFragment(String title,
+                           Set<String> terms) {
+        this.title = title;
+        this.terms = terms;
+    }
+
+    public Set<String> getTerms() {
+        return terms;
+    }
+
+    public String getTitle() {
+        return title;
+    }
+}

src/main/java/ro/ieugen/wikiindexer/model/IndexSegment.java

-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package ro.ieugen.wikiindexer.model;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.TreeMultimap;
-import java.util.ArrayList;
-import java.util.Collections;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import ro.ieugen.wikiindexer.Utils;
-
-/**
- * This index model is used to prepare data for persistance.
- * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
- */
-public class IndexSegment {
-
-    private static final Logger LOG = LoggerFactory.getLogger(IndexSegment.class);
-    private static final long DEFAULT_FLUSH_SIZE_LIMIT = 16 * 1024 * 1024;
-    private long flushLimit = DEFAULT_FLUSH_SIZE_LIMIT;
-    private final ArrayList<String> titles = Lists.newArrayList();
-    private final Multimap<String, Integer> postings = TreeMultimap.create();
-    private long indexSizeEstimate = 0;
-
-    public IndexSegment() {
-        this(DEFAULT_FLUSH_SIZE_LIMIT);
-    }
-
-    public IndexSegment(long requestedFlushLimit) {
-        LOG.info("Flushing segments at {} bytes.", requestedFlushLimit);
-        this.flushLimit = requestedFlushLimit;
-    }
-
-    public void add(SingleDocIndex pageIndex) {
-        titles.add(pageIndex.getTitle());
-        indexSizeEstimate += Utils.sizeOf(pageIndex.getTitle());
-        int docIndex = titles.size();
-        for (String term : pageIndex.getTerms()) {
-            postings.put(term, docIndex);
-            indexSizeEstimate += Utils.sizeOf(pageIndex.getTitle());
-        }
-    }
-
-    public ArrayList<String> titles() {
-        return (ArrayList<String>) Collections.unmodifiableList(titles);
-    }
-
-    public Multimap<String, Integer> postings() {
-        return Multimaps.unmodifiableMultimap(postings);
-    }
-
-    public boolean shouldFlush() {
-        return indexSizeEstimate < flushLimit;
-    }
-}

src/main/java/ro/ieugen/wikiindexer/model/SingleDocIndex.java

-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package ro.ieugen.wikiindexer.model;
-
-import java.util.Set;
-
-/**
- * Index portion for a single page. No fields.
- * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
- */
-public class SingleDocIndex {
-
-    private final String title;
-    private final Set<String> terms;
-
-    public SingleDocIndex(String title,
-                           Set<String> terms) {
-        this.title = title;
-        this.terms = terms;
-    }
-
-    public Set<String> getTerms() {
-        return terms;
-    }
-
-    public String getTitle() {
-        return title;
-    }
-}

src/main/java/ro/ieugen/wikiindexer/processing/DocIndexer.java

 import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import ro.ieugen.wikiindexer.model.SingleDocIndex;
+import ro.ieugen.wikiindexer.model.IndexFragment;
 import ro.ieugen.wikiindexer.model.WikiPage;
 
 /**
         // add the contributor as a normal term, we don't use fields
         terms.add(pageToIndex.getContribuitor());
 
-        SingleDocIndex p = new SingleDocIndex(pageToIndex.getTitle(), terms);
+        IndexFragment p = new IndexFragment(pageToIndex.getTitle(), terms);
         // add the index part to the que for processing
         context.offer(p);
     }

src/main/java/ro/ieugen/wikiindexer/processing/IndexWriter.java

 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
-import ro.ieugen.wikiindexer.model.IndexSegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ro.ieugen.wikiindexer.model.InMemoryIndexSegment;
 
 /**
  * Persists a {@link IndexModel} to disk.
  */
 public class IndexWriter implements Closeable {
 
+    private static final Logger LOG = LoggerFactory.getLogger(IndexWriter.class);
     private static final String TITLES_SUFFIX = ".dict";
     private static final String POSTINGS_SUFFIX = ".pst";
     private final String outputDirectory;
 
     public IndexWriter(String outputDirectory) throws IOException {
         Preconditions.checkNotNull(outputDirectory);
+        LOG.info("Creading new instance. Saving data to {}", outputDirectory);
         this.outputDirectory = outputDirectory;
     }
 
-    public void persist(IndexSegment segment) throws IOException {
+    public void persist(InMemoryIndexSegment segment) throws IOException {
         int segmentId = segmentsCount.incrementAndGet();
         persistTitles(segment, segmentId);
         persistPostings(segment, segmentId);
     }
 
-    private void persistTitles(IndexSegment segment, int segmentId) throws IOException {
+    private void persistTitles(InMemoryIndexSegment segment, int segmentId) throws IOException {
 
         final Writer titlesFile = SequenceFile.createWriter(fs, conf,
                 new Path(outputDirectory, segmentId + TITLES_SUFFIX), IntWritable.class, Text.class,
         titlesFile.close();
     }
 
-    private void persistPostings(IndexSegment segment, int segmentId) throws IOException {
+    private void persistPostings(InMemoryIndexSegment segment, int segmentId) throws IOException {
         final Writer postings = SequenceFile.createWriter(fs, conf,
                 new Path(outputDirectory, segmentId + POSTINGS_SUFFIX), IntWritable.class, Text.class,
                 SequenceFile.CompressionType.RECORD);

src/main/java/ro/ieugen/wikiindexer/processing/ProcessingPipeline.java

 import com.google.common.collect.Queues;
 import java.io.IOException;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.util.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import ro.ieugen.wikiindexer.model.IndexSegment;
-import ro.ieugen.wikiindexer.model.SingleDocIndex;
+import ro.ieugen.wikiindexer.model.InMemoryIndexSegment;
+import ro.ieugen.wikiindexer.model.IndexFragment;
 import ro.ieugen.wikiindexer.model.WikiPage;
 
 /**
     private int indexWriterCount = DEFAULT_INDEX_WRITER_THREAD_COUNT;
     private final ExecutorService indexProducers;
     private final ExecutorService indexConsumer;
-    /** Use a {@link ArrayBlockingQueue} to control the indexing throughput */
-    private final BlockingQueue<SingleDocIndex> indexedDocuments = Queues.newLinkedBlockingQueue();
-    private final Future[] indexWriterStatus;
+    /** Use a {@link ArrayBlockingQueue} to control the indexing throughput.
+     */
+    private final BlockingQueue<IndexFragment> indexedDocuments = Queues.newLinkedBlockingQueue();
+    /** Limit the number of permits for building an index at a time.
+     */
+    private final Semaphore segmentBuilderPermit = new Semaphore(1, true);
+    private final Future[] segmentBuilderStatus;
     private final Analyzer analyzer;
     private final IndexWriter indexWriter;
+    /** Signal that the job is done and all threads should close and flush all data to disk.
+     */
+    private final AtomicBoolean processingDoneFlag = new AtomicBoolean(false);
 
     public ProcessingPipeline(ExecutorService indexProducers, IndexWriter indexWriter) {
         this(indexProducers, DEFAULT_INDEX_WRITER_THREAD_COUNT, indexWriter, new StandardAnalyzer(Version.LUCENE_35));
     }
 
-    public ProcessingPipeline(ExecutorService producers, int indexerCount, IndexWriter indexWriter, Analyzer analyzer) {
+    /**
+     * Builds a processing pipeline with a sufficient pool of producers and the number of index writer threads to max IO.
+     * @param producers
+     * @param indexWriters
+     * @param indexWriter
+     * @param analyzer
+     */
+    public ProcessingPipeline(ExecutorService producers, int indexWriters, IndexWriter indexWriter, Analyzer analyzer) {
+        LOG.info("Creating new processing pipeline.");
         Preconditions.checkNotNull(producers);
         Preconditions.checkNotNull(analyzer);
         Preconditions.checkNotNull(indexWriter);
-        Preconditions.checkArgument(indexerCount > 0);
+        Preconditions.checkArgument(indexWriters > 0);
         this.indexProducers = producers;
         this.indexWriter = indexWriter;
         this.analyzer = analyzer;
-        this.indexWriterCount = indexerCount;
+        // should we cap this to a max to prevent abuse?
+        this.indexWriterCount = indexWriters;
         this.indexConsumer = Executors.newFixedThreadPool(indexWriterCount);
-        this.indexWriterStatus = new Future[indexWriterCount];
-        // start the index consumers that will build the index and flush it to disk.
-        for (int i = 0; i < indexWriterStatus.length; i++) {
-            indexWriterStatus[i] = indexConsumer.submit(new SegmentBuilder(this, new IndexSegment()));
-        }
+        this.segmentBuilderStatus = new Future[indexWriterCount];
+        refreshSegmentBuilders();
     }
 
     public void process(WikiPage page) {
         indexProducers.submit(new DocIndexer(this, page, analyzer));
     }
 
-    public boolean offer(SingleDocIndex pageIndex) {
+    public boolean offer(IndexFragment pageIndex) {
         return indexedDocuments.offer(pageIndex);
     }
 
-    public SingleDocIndex take() throws InterruptedException {
+    public IndexFragment take() throws InterruptedException {
         return indexedDocuments.take();
     }
 
-    public void persist(IndexSegment segment) {
+    /**
+     * Call this from the segemnt builder threads to get a permit for building and writing the segment to disk.
+     */
+    public void acquireSegmentBuilderPermit() throws InterruptedException {
+        segmentBuilderPermit.acquire();
+    }
+
+    /**
+     * Allow another thread to build a segment.
+     */
+    public void releaseSegmentBuilderPermit() {
+        segmentBuilderPermit.release();
+        refreshSegmentBuilders();
+    }
+
+    /**
+     * Iterates over segment builder threads statuses and recreates them if processing is not finished.
+     */
+    public final void refreshSegmentBuilders() {
+        for (int i = 0; i < segmentBuilderStatus.length; i++) {
+            if (segmentBuilderStatus[i] == null) {
+                segmentBuilderStatus[i] = indexConsumer.submit(new SegmentBuilder(this));
+            } else {
+                if (!isProcessingDone()) {
+                    // recreate the threads
+                    if (segmentBuilderStatus[i].isDone()) {
+                        LOG.info("Creating new segment builder!");
+                        segmentBuilderStatus[i] = indexConsumer.submit(new SegmentBuilder(this));
+                    }
+                } else {
+                    LOG.info("Done with proceesing. Cancelling/Interrupting threads.");
+                    // we are shutting down, we should signal all the running threads to stop.
+                    segmentBuilderStatus[i].cancel(true);
+                    segmentBuilderPermit.release();
+                }
+            }
+        }
+    }
+
+    public boolean isProcessingDone() {
+        return processingDoneFlag.get();
+    }
+
+    public void persist(InMemoryIndexSegment segment) {
         try {
             indexWriter.persist(segment);
         } catch (IOException e) {
      * Ensure proper application shutdown
      */
     public void shutdown() {
+        processingDoneFlag.set(true);
         indexProducers.shutdown();
         try {
             indexProducers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

src/main/java/ro/ieugen/wikiindexer/processing/SegmentBuilder.java

 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import ro.ieugen.wikiindexer.model.IndexSegment;
+import ro.ieugen.wikiindexer.model.InMemoryIndexSegment;
 
 /**
  * Builds one index segment in memory.
 public class SegmentBuilder implements Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(SegmentBuilder.class);
-    private final IndexSegment segment;
+    private final InMemoryIndexSegment segment;
     private final ProcessingPipeline context;
 
-    public SegmentBuilder(ProcessingPipeline context, IndexSegment segment) {
+    public SegmentBuilder(ProcessingPipeline context) {
+        this(context, new InMemoryIndexSegment());
+    }
+
+    public SegmentBuilder(ProcessingPipeline context, InMemoryIndexSegment segment) {
+        LOG.info("Creating new instance.");
         Preconditions.checkNotNull(context);
         Preconditions.checkNotNull(segment);
         this.segment = segment;
     @Override
     public void run() {
         LOG.info("Started new memory index builder");
-        // TODO: add a way to signal the end of processing
-        while (!segment.shouldFlush()) {
-            try {
-                segment.add(context.take());
-            } catch (InterruptedException ex) {
-                LOG.info("Interrupted while adding docs to the index");
+        try {
+            context.acquireSegmentBuilderPermit();
+        } catch (InterruptedException e) {
+            LOG.info("Interrupted while waiting for permit.");
+            return;
+        }
+        LOG.info("Got write permit!");
+        if (!context.isProcessingDone()) {
+            while (!segment.shouldFlush() && !context.isProcessingDone()) {
+                try {
+                    segment.addFragment(context.take());
+                } catch (InterruptedException ex) {
+                    LOG.info("Interrupted while adding docs to the index. Flushing index to disk.");
+                    break;
+                }
             }
+            context.persist(segment);
+        } else {
+            LOG.info("Processing is done, quick- exiting!");
         }
-        context.persist(segment);
     }
 }