Commits

Ioan Eugen Stan committed dbaff1c

still problems.

Comments (0)

Files changed (7)

src/main/java/ro/ieugen/wikiindexer/IndexingApp.java

         Preconditions.checkNotNull(outPath);
         LOG.info("Indexing from {} and outputing index to {}", inputPath, outPath);
         final IndexWriter iWriter = new IndexWriter(outPath);
-        final ProcessingPipeline pipeline = new ProcessingPipeline(Executors.newFixedThreadPool(30), iWriter);
+        final ProcessingPipeline pipeline = new ProcessingPipeline(Executors.newFixedThreadPool(10), iWriter);
         InputStream is = null;
         try {
             is = createInputStream(inputPath);
             LOG.info("Ended document parsing at {} after {} s", new Date(),
                     (System.currentTimeMillis() - start) / 1000);
             LOG.info("We have indexed {} pages and skipped {} redirects", count, countRedirects);
+            pipeline.shutdown();
         } catch (IOException ioe) {
             LOG.error("Exception opening input file {}", inputPath, ioe);
         } catch (XMLStreamException xmle) {

src/main/java/ro/ieugen/wikiindexer/model/InMemoryIndexSegment.java

  ****************************************************************/
 package ro.ieugen.wikiindexer.model;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 import com.google.common.collect.TreeMultimap;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import ro.ieugen.wikiindexer.Utils;
     private final ArrayList<String> titles = Lists.newArrayList();
     private final Multimap<String, Integer> postings = TreeMultimap.create();
     private long indexSizeEstimate = 0;
+    private AtomicInteger fragmentCount = new AtomicInteger(0);
 
     public InMemoryIndexSegment() {
         this(DEFAULT_FLUSH_SIZE_LIMIT);
     }
 
     public void addFragment(IndexFragment pageIndex) {
+        LOG.info("Adding fragment {} - size estimate {}", fragmentCount.incrementAndGet(), sizeEstimate());
+//        LOG.info("Fragment \n {} \n {}", pageIndex.getTitle(), pageIndex.getTerms());
         titles.add(pageIndex.getTitle());
         indexSizeEstimate += Utils.sizeOf(pageIndex.getTitle());
         int docIndex = titles.size();
+        LOG.info("Fragment info: {} - {}", titles.get(docIndex), docIndex);
         for (String term : pageIndex.getTerms()) {
+            LOG.info("Adding {} to {}", term, docIndex);
             postings.put(term, docIndex);
             indexSizeEstimate += Utils.sizeOf(pageIndex.getTitle());
         }
     }
 
-    public ArrayList<String> titles() {
-        return (ArrayList<String>) Collections.unmodifiableList(titles);
+    public ImmutableList<String> titles() {
+        return ImmutableList.copyOf(titles);
     }
 
-    public Multimap<String, Integer> postings() {
-        return Multimaps.unmodifiableMultimap(postings);
+    public ImmutableMultimap<String, Integer> postings() {
+        return ImmutableMultimap.copyOf(postings);
     }
 
     public boolean shouldFlush() {
-        return indexSizeEstimate < flushLimit;
+        return indexSizeEstimate >= flushLimit;
+    }
+
+    public long sizeEstimate() {
+        return indexSizeEstimate;
+    }
+
+    public long fragmentCount() {
+        return fragmentCount.get();
     }
 }

src/main/java/ro/ieugen/wikiindexer/model/IndexFragment.java

  ****************************************************************/
 package ro.ieugen.wikiindexer.model;
 
-import java.util.Set;
+import com.google.common.collect.ImmutableSet;
 
 /**
  * Index portion for a single page. No fields.
 public class IndexFragment {
 
     private final String title;
-    private final Set<String> terms;
+    private final ImmutableSet<String> terms;
 
     public IndexFragment(String title,
-                           Set<String> terms) {
+                         ImmutableSet<String> terms) {
         this.title = title;
         this.terms = terms;
     }
 
-    public Set<String> getTerms() {
+    public ImmutableSet<String> getTerms() {
         return terms;
     }
 

src/main/java/ro/ieugen/wikiindexer/processing/DocIndexer.java

  ****************************************************************/
 package ro.ieugen.wikiindexer.processing;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.StringReader;
 
     @Override
     public void run() {
-        Set<String> terms = Sets.newHashSet();
-        // tokenize the input stream
-        TokenStream tokenStream = analyzer.tokenStream(DEFAULT_FIELD, new StringReader(pageToIndex.getContent()));
-        CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
         try {
-            while (tokenStream.incrementToken()) {
-                terms.add(charTermAttribute.toString());
+            Set<String> terms = Sets.newHashSet();
+            // tokenize the input stream
+            TokenStream tokenStream = analyzer.tokenStream(DEFAULT_FIELD, new StringReader(pageToIndex.getContent()));
+            CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
+            try {
+                while (tokenStream.incrementToken()) {
+                    terms.add(charTermAttribute.toString());
+                }
+            } catch (IOException ex) {
+                LOG.info("Exception tokenizing content {}", pageToIndex, ex);
             }
-        } catch (IOException ex) {
-            LOG.info("Exception tokenizing content", ex);
+            // add the contributor as a normal term, we don't use fields
+            terms.add(pageToIndex.getContribuitor());
+
+            IndexFragment p = new IndexFragment(pageToIndex.getTitle(), ImmutableSet.copyOf(terms));
+            // add the index part to the que for processing
+            boolean added;
+            do {
+                added = context.offer(p);
+            } while (!added);
+        } catch (Exception e) {
+            LOG.info("Exception indexing page {}", pageToIndex);
         }
-        // add the contributor as a normal term, we don't use fields
-        terms.add(pageToIndex.getContribuitor());
-
-        IndexFragment p = new IndexFragment(pageToIndex.getTitle(), terms);
-        // add the index part to the que for processing
-        context.offer(p);
     }
 }

src/main/java/ro/ieugen/wikiindexer/processing/IndexWriter.java

 package ro.ieugen.wikiindexer.processing;
 
 import com.google.common.base.Preconditions;
-import java.io.Closeable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import ro.ieugen.wikiindexer.model.InMemoryIndexSegment;
  * Persists a {@link IndexModel} to disk.
  * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
  */
-public class IndexWriter implements Closeable {
+public class IndexWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(IndexWriter.class);
     private static final String TITLES_SUFFIX = ".dict";
     private static final String POSTINGS_SUFFIX = ".pst";
     private final String outputDirectory;
-    private final AtomicInteger segmentsCount = new AtomicInteger(1);
+    private final AtomicInteger segmentsCount = new AtomicInteger(0);
     private final Configuration conf = new Configuration();
-    private final FileSystem fs = new LocalFileSystem();
+    private final FileSystem fs;
 
     public IndexWriter(String outputDirectory) throws IOException {
         Preconditions.checkNotNull(outputDirectory);
         LOG.info("Creading new instance. Saving data to {}", outputDirectory);
         this.outputDirectory = outputDirectory;
+        this.fs = FileSystem.get(URI.create(outputDirectory), conf);
     }
 
     public void persist(InMemoryIndexSegment segment) throws IOException {
         persistPostings(segment, segmentId);
     }
 
-    private void persistTitles(InMemoryIndexSegment segment, int segmentId) throws IOException {
-
-        final Writer titlesFile = SequenceFile.createWriter(fs, conf,
-                new Path(outputDirectory, segmentId + TITLES_SUFFIX), IntWritable.class, Text.class,
-                SequenceFile.CompressionType.RECORD);
-        ArrayList<String> titles = segment.titles();
-        LongWritable key = new LongWritable();
-        Text value = new Text();
-        for (int i = 0; i < titles.size(); i++) {
-            key.set(i);
-            value.set(titles.get(i));
-            titlesFile.append(key, value);
+    private void persistTitles(InMemoryIndexSegment segment, int segmentId) {
+        LOG.info("Persisting titles for segment {} with size {}", currentSegmentCount(), segment.fragmentCount());
+        SequenceFile.Writer writer = null;
+        try {
+            ImmutableList<String> titles = segment.titles();
+            LongWritable key = new LongWritable();
+            Text value = new Text();
+            writer = SequenceFile.createWriter(fs, conf,
+                    new Path(outputDirectory, segmentId + TITLES_SUFFIX), key.getClass(), value.getClass(),
+                    SequenceFile.CompressionType.NONE);
+            for (int i = 0; i < titles.size(); i++) {
+//                LOG.info("Peristing key {} - {}.", i, titles.get(i));
+                key.set(i);
+                value.set(titles.get(i));
+                writer.append(key, value);
+            }
+        } catch (IOException ioe) {
+            LOG.info("Exception persisting titles!", ioe);
+        } finally {
+            LOG.info("Closing file");
+            IOUtils.closeStream(writer);
         }
-        titlesFile.close();
     }
 
     private void persistPostings(InMemoryIndexSegment segment, int segmentId) throws IOException {
-        final Writer postings = SequenceFile.createWriter(fs, conf,
-                new Path(outputDirectory, segmentId + POSTINGS_SUFFIX), IntWritable.class, Text.class,
-                SequenceFile.CompressionType.RECORD);
-        ArrayList<String> titles = segment.titles();
-        LongWritable key = new LongWritable();
-        Text value = new Text();
-        for (int i = 0; i < titles.size(); i++) {
-            key.set(i);
-            value.set(titles.get(i));
-            postings.append(key, value);
+        LOG.info("Persisting postings for segment {}", currentSegmentCount());
+        SequenceFile.Writer writer = null;
+        try {
+            Multimap<String, Integer> postingsList = segment.postings();
+            Text key = new Text();
+            ArrayWritable value = new ArrayWritable(IntWritable.class);
+            writer = SequenceFile.createWriter(fs, conf,
+                    new Path(outputDirectory, segmentId + POSTINGS_SUFFIX), key.getClass(), value.getClass(),
+                    SequenceFile.CompressionType.NONE);
+            for (String theTerm : postingsList.keySet()) {
+                key.set(theTerm);
+                Collection<Integer> theIndex = postingsList.get(theTerm);
+                LOG.info("Peristing key {} - {}.", theTerm, theIndex.size());
+                IntWritable[] array = new IntWritable[theIndex.size()];
+                int i = 0;
+                for (Integer docId : theIndex) {
+                    array[i] = new IntWritable(docId);
+                }
+                value.set(array);
+                writer.append(key, value);
+            }
+        } catch (IOException ioe) {
+            LOG.info("Exception persisting postings!", ioe);
+        } finally {
+            LOG.info("Closing file");
+            IOUtils.closeStream(writer);
         }
-        postings.close();
     }
 
-    @Override
-    public void close() throws IOException {
+    public int currentSegmentCount() {
+        return segmentsCount.get();
     }
 }

src/main/java/ro/ieugen/wikiindexer/processing/ProcessingPipeline.java

 import java.io.IOException;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.util.Version;
     /** Signal that the job is done and all threads should close and flush all data to disk.
      */
     private final AtomicBoolean processingDoneFlag = new AtomicBoolean(false);
+    private final AtomicInteger submits = new AtomicInteger(0);
 
     public ProcessingPipeline(ExecutorService indexProducers, IndexWriter indexWriter) {
         this(indexProducers, DEFAULT_INDEX_WRITER_THREAD_COUNT, indexWriter, new StandardAnalyzer(Version.LUCENE_35));
     }
 
     public void process(WikiPage page) {
+        incrementSubmits();
         indexProducers.submit(new DocIndexer(this, page, analyzer));
     }
 
     public boolean offer(IndexFragment pageIndex) {
+        LOG.info("Adding new index fragment to queue");
         return indexedDocuments.offer(pageIndex);
     }
 
     public IndexFragment take() throws InterruptedException {
+        decrementSubmits();
         return indexedDocuments.take();
     }
 
+    private int incrementSubmits() {
+        return submits.incrementAndGet();
+    }
+
+    private int decrementSubmits() {
+        // this will fail if indexing threads die prematurely
+        int newVal = submits.decrementAndGet();
+        Preconditions.checkArgument(newVal >= 0);
+        return newVal;
+    }
+
+    public boolean hasOffers() {
+        return submits.get() > 0;
+    }
+
     /**
      * Call this from the segemnt builder threads to get a permit for building and writing the segment to disk.
      */
     public void acquireSegmentBuilderPermit() throws InterruptedException {
         segmentBuilderPermit.acquire();
+        refreshSegmentBuilders();
     }
 
     /**
      */
     public void releaseSegmentBuilderPermit() {
         segmentBuilderPermit.release();
-        refreshSegmentBuilders();
     }
 
     /**
                     }
                 } else {
                     LOG.info("Done with proceesing. Cancelling/Interrupting threads.");
-                    // we are shutting down, we should signal all the running threads to stop.
-                    segmentBuilderStatus[i].cancel(true);
+                    // we are shutting down, release all permits
                     segmentBuilderPermit.release();
                 }
             }
     }
 
     public void persist(InMemoryIndexSegment segment) {
+        LOG.info("Persisting segment {}", indexWriter.currentSegmentCount());
         try {
             indexWriter.persist(segment);
         } catch (IOException e) {
      * Ensure proper application shutdown
      */
     public void shutdown() {
-        processingDoneFlag.set(true);
+        LOG.info("Offers - takes: {}", submits.get());
         indexProducers.shutdown();
         try {
-            indexProducers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+            LOG.info("Waiting for indexing threads to finish!");
+            if (!indexProducers.awaitTermination(100, TimeUnit.SECONDS)) {
+                LOG.info("Not all indexing threads finished!");
+            }
         } catch (InterruptedException e) {
             LOG.info("Interupted while shutting down indexers");
         }
+        // signal all threads to stop
+        processingDoneFlag.set(true);
         indexConsumer.shutdown();
         try {
-            indexConsumer.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+            LOG.info("Waiting for persisting threads to finish!");
+            if (!indexConsumer.awaitTermination(20, TimeUnit.SECONDS)) {
+                LOG.info("Not all persisting threads finished!");
+            }
         } catch (InterruptedException e) {
             LOG.info("Interupted while shutting down indexers");
         }

src/main/java/ro/ieugen/wikiindexer/processing/SegmentBuilder.java

         }
         LOG.info("Got write permit!");
         if (!context.isProcessingDone()) {
-            while (!segment.shouldFlush() && !context.isProcessingDone()) {
+            while (!shouldFlush()) {
                 try {
-                    segment.addFragment(context.take());
+                    if (context.hasOffers()) {
+                        segment.addFragment(context.take());
+                    } else {
+                        LOG.info("Finished all documents!");
+                        break;
+                    }
                 } catch (InterruptedException ex) {
                     LOG.info("Interrupted while adding docs to the index. Flushing index to disk.");
                     break;
                 }
             }
+            context.releaseSegmentBuilderPermit();
             context.persist(segment);
         } else {
             LOG.info("Processing is done, quick- exiting!");
         }
     }
+
+    /**
+     * Flush if segment is full or server has done processing and we have no more segments.
+     * @return
+     */
+    public boolean shouldFlush() {
+        return segment.shouldFlush();
+    }
 }