Commits

Ioan Eugen Stan committed 4d1ea7f

Pipeline almost done.

Comments (0)

Files changed (17)

             <artifactId>commons-compress</artifactId>
             <version>1.3</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-core</artifactId>
+            <version>3.5.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+            <version>1.0.2</version>
+        </dependency>
     </dependencies>
 
     <build>

src/main/java/ro/ieugen/wikiindexer/IndexingApp.java

  ****************************************************************/
 package ro.ieugen.wikiindexer;
 
+import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Date;
+import java.util.concurrent.Executors;
 import javax.xml.stream.XMLStreamException;
 import org.apache.commons.cli2.CommandLine;
 import org.apache.commons.cli2.Group;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import ro.ieugen.wikiindexer.model.WikiPage;
-import ro.ieugen.wikiindexer.parse.StaxWikiParser;
+import ro.ieugen.wikiindexer.parse.StaxWikiDumpParser;
+import ro.ieugen.wikiindexer.processing.IndexWriter;
+import ro.ieugen.wikiindexer.processing.ProcessingPipeline;
 
 /**
  * Index the wikipedia dump.
 
     private static final Logger LOG = LoggerFactory.getLogger(IndexingApp.class);
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws IOException {
         GroupBuilder gbuilder = new GroupBuilder();
 
         Option inputOpt = Utils.inputOption().create();
         Option outOpt = Utils.outputOption().create();
         Option helpOpt = Utils.helpOption();
         Group group = gbuilder.withOption(inputOpt).withOption(outOpt).withOption(helpOpt).create();
-
+        // read the command line options
+        String inputPath = null;
+        String outPath = null;
         try {
             Parser parser = new Parser();
             parser.setGroup(group);
             if (cmdLine.hasOption(helpOpt)) {
                 Utils.printHelp(group);
             }
-            String inputPath = null;
+
             if (cmdLine.hasOption(inputOpt)) {
                 inputPath = cmdLine.getValue(inputOpt).toString();
             }
-            String outPath = null;
+
             if (cmdLine.hasOption(outOpt)) {
                 outPath = cmdLine.getValue(outOpt).toString();
             }
-
-            LOG.info("Indexing from {} and outputing index to {}", inputPath, outPath);
-
-            InputStream is = null;
-            try {
-                is = createInputStream(inputPath);
-
-                long start = System.currentTimeMillis();
-                LOG.info("Started processing XML at {}", new Date());
-                int count = 0;
-                int countRedirects = 0;
-                for (WikiPage p : StaxWikiParser.buildAndInitiateParse(is)) {
-                    // do something with the page
-                    if (p.isRedirect()) {
-                        countRedirects++;
-                    } else {
-                        System.out.println(p);
-                    }
-                    count++;
-                    if (count % 5 == 0) {
-                        LOG.info("We have {} pages and {} redirects", count, countRedirects);
-                        break;
-                    }
-                }
-                LOG.info("Ended document parsing at {} after {} s", new Date(),
-                        (System.currentTimeMillis() - start) / 1000);
-            } catch (IOException ioe) {
-                LOG.error("Exception opening input file {}", inputPath, ioe);
-            } catch (XMLStreamException xmle) {
-                LOG.error("Exception reading XML.", xmle);
-            } finally {
-                Closeables.closeQuietly(is);
-            }
+            // start the indexing
+            doTheIndexingPart(inputPath, outPath);
         } catch (OptionException e) {
-            LOG.error("Exception", e);
+            LOG.error("Exception ", e);
             Utils.printHelp(group);
         }
-
     }
 
-    public static InputStream createInputStream(String inputPath) throws IOException, FileNotFoundException {
+    private static void doTheIndexingPart(String inputPath, String outPath) throws IOException {
+        Preconditions.checkNotNull(inputPath);
+        Preconditions.checkNotNull(outPath);
+        LOG.info("Indexing from {} and outputing index to {}", inputPath, outPath);
+        final IndexWriter iWriter = new IndexWriter(outPath);
+        final ProcessingPipeline pipeline = new ProcessingPipeline(Executors.newFixedThreadPool(100), iWriter);
+        InputStream is = null;
+        try {
+            is = createInputStream(inputPath);
+
+            long start = System.currentTimeMillis();
+            LOG.info("Started processing XML at {}", new Date());
+            int count = 0;
+            int countRedirects = 0;
+            for (WikiPage p : StaxWikiDumpParser.buildAndInitiateParse(is)) {
+                // do something with the page
+                if (p.isRedirect()) {
+                    countRedirects++;
+                } else {
+                    pipeline.process(p);
+                }
+                count++;
+                if (count % 10000 == 0) {
+                    LOG.info("We have {} pages and {} redirects", count, countRedirects);
+                    break;
+                }
+            }
+            LOG.info("Ended document parsing at {} after {} s", new Date(),
+                    (System.currentTimeMillis() - start) / 1000);
+        } catch (IOException ioe) {
+            LOG.error("Exception opening input file {}", inputPath, ioe);
+        } catch (XMLStreamException xmle) {
+            LOG.error("Exception reading XML.", xmle);
+        } finally {
+            Closeables.closeQuietly(is);
+        }
+    }
+
+    private static InputStream createInputStream(String inputPath) throws IOException, FileNotFoundException {
         InputStream is = new FileInputStream(inputPath);
         if (inputPath.toLowerCase().endsWith("bz2")
                 || inputPath.toLowerCase().endsWith("bzip")) {

src/main/java/ro/ieugen/wikiindexer/Utils.java

         }
         return "UNKNOWN_EVENT_TYPE , " + eventType;
     }
+
+    /**
+     * Compute the size of the string in bytes: assume 2 bytes/char + 2 bytes for referance.
+     * @param str
+     * @return
+     */
+    public static int sizeOf(String str) {
+        return str.length() * 2 + 2;
+    }
 }

src/main/java/ro/ieugen/wikiindexer/model/IndexSegment.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package ro.ieugen.wikiindexer.model;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.TreeMultimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ro.ieugen.wikiindexer.Utils;
+
+/**
+ * This index model is used to prepare data for persistance.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public class IndexSegment {
+
+    private static final Logger LOG = LoggerFactory.getLogger(IndexSegment.class);
+    private static final long DEFAULT_FLUSH_SIZE_LIMIT = 16 * 1024 * 1024;
+    private long flushLimit = DEFAULT_FLUSH_SIZE_LIMIT;
+    private final ArrayList<String> titles = Lists.newArrayList();
+    private final Multimap<String, Integer> postings = TreeMultimap.create();
+    private long indexSizeEstimate = 0;
+
+    public IndexSegment() {
+        this(DEFAULT_FLUSH_SIZE_LIMIT);
+    }
+
+    public IndexSegment(long requestedFlushLimit) {
+        LOG.info("Flushing segments at {} bytes.", requestedFlushLimit);
+        this.flushLimit = requestedFlushLimit;
+    }
+
+    public void add(SingleDocIndex pageIndex) {
+        titles.add(pageIndex.getTitle());
+        indexSizeEstimate += Utils.sizeOf(pageIndex.getTitle());
+        int docIndex = titles.size();
+        for (String term : pageIndex.getTerms()) {
+            postings.put(term, docIndex);
+            indexSizeEstimate += Utils.sizeOf(pageIndex.getTitle());
+        }
+    }
+
+    public ArrayList<String> titles() {
+        return (ArrayList<String>) Collections.unmodifiableList(titles);
+    }
+
+    public Multimap<String, Integer> postings() {
+        return Multimaps.unmodifiableMultimap(postings);
+    }
+
+    public boolean shouldFlush() {
+        return indexSizeEstimate < flushLimit;
+    }
+}

src/main/java/ro/ieugen/wikiindexer/model/PageTypePart.java

-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package ro.ieugen.wikiindexer.model;
-
-import javax.xml.namespace.QName;
-
-/**
- * XML elements that can apear in a wiki page.
- * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
- */
-public enum PageTypePart {
-
-    TITLE("title"), REVISION("revision"), OTHER("other"), PAGE("page"),
-    USERNAME("username"), TEXT("text");
-    private final String elementName;
-
-    private PageTypePart(String part) {
-        this.elementName = part;
-    }
-
-    public static PageTypePart pageTypeFromQName(QName qname) {
-        for (PageTypePart p : PageTypePart.values()) {
-            if (p.elementName.equalsIgnoreCase(qname.getLocalPart())) {
-                return p;
-            }
-        }
-        return OTHER;
-    }
-
-    public String getElementName() {
-        return elementName;
-    }
-}

src/main/java/ro/ieugen/wikiindexer/model/SingleDocIndex.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package ro.ieugen.wikiindexer.model;
+
+import java.util.Set;
+
+/**
+ * Index portion for a single page. No fields.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public class SingleDocIndex {
+
+    private final String title;
+    private final Set<String> terms;
+
+    public SingleDocIndex(String title,
+                           Set<String> terms) {
+        this.title = title;
+        this.terms = terms;
+    }
+
+    public Set<String> getTerms() {
+        return terms;
+    }
+
+    public String getTitle() {
+        return title;
+    }
+}

src/main/java/ro/ieugen/wikiindexer/model/WikiPage.java

 public class WikiPage {
 
     private final String title;
-    private final String contribuitors;
+    private final String contribuitor;
     private final String content;
 
     public WikiPage(String title, String contribuitors, String content) {
         Preconditions.checkNotNull(contribuitors);
         Preconditions.checkNotNull(content);
         this.title = title;
-        this.contribuitors = contribuitors;
+        this.contribuitor = contribuitors;
         this.content = content;
     }
 
         return content.startsWith("#REDIRECT");
     }
 
+    public String getContent() {
+        return content;
+    }
+
+    public String getContribuitor() {
+        return contribuitor;
+    }
+
+    public String getTitle() {
+        return title;
+    }
+
     @Override
     public String toString() {
-        return "WikiDocument{" + "title=" + title + ", contribuitors=" + contribuitors + ", content=" + content + '}';
+        return "WikiDocument{" + "title=" + title + ", contribuitors=" + contribuitor + ", content=" + content + '}';
     }
 }

src/main/java/ro/ieugen/wikiindexer/parse/PageTypePart.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package ro.ieugen.wikiindexer.parse;
+
+import javax.xml.namespace.QName;
+
+/**
+ * XML elements that can apear in a wiki page and we wish to track.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public enum PageTypePart {
+
+    TITLE("title"), REVISION("revision"), OTHER("other"), PAGE("page"),
+    USERNAME("username"), TEXT("text");
+    private final String elementName;
+
+    private PageTypePart(String part) {
+        this.elementName = part;
+    }
+
+    public static PageTypePart pageTypeFromQName(QName qname) {
+        for (PageTypePart p : PageTypePart.values()) {
+            if (p.elementName.equalsIgnoreCase(qname.getLocalPart())) {
+                return p;
+            }
+        }
+        return OTHER;
+    }
+
+    public String getElementName() {
+        return elementName;
+    }
+}

src/main/java/ro/ieugen/wikiindexer/parse/RegexpWikiDumpParser.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package ro.ieugen.wikiindexer.parse;
+
+import java.io.InputStream;
+
+/**
+ * Regexp based parser.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public class RegexpWikiDumpParser {
+
+    // parser using regexps instead of XML parsing
+    public static void parseWikiDumpWithRegexp(InputStream is) {
+        throw new UnsupportedOperationException("Not yet implemented");
+    }
+}

src/main/java/ro/ieugen/wikiindexer/parse/RegexpWikiParser.java

-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package ro.ieugen.wikiindexer.parse;
-
-import java.io.InputStream;
-
-/**
- * Regexp based parser.
- * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
- */
-public class RegexpWikiParser {
-
-    // parser using regexps instead of XML parsing
-    public static void parseWikiDumpWithRegexp(InputStream is) {
-        throw new UnsupportedOperationException("Not yet implemented");
-    }
-}

src/main/java/ro/ieugen/wikiindexer/parse/StaxWikiDumpParser.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package ro.ieugen.wikiindexer.parse;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.util.StreamReaderDelegate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ro.ieugen.wikiindexer.model.WikiPage;
+
+/**
+ * Parses a XML using StaX.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public class StaxWikiDumpParser implements Iterable<WikiPage>, Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StaxWikiDumpParser.class);
+    private final XMLStreamReader fr;
+    private final WikiPageIterator pageIterator = new WikiPageIterator();
+    private int event;
+
+    private StaxWikiDumpParser(InputStream is) throws XMLStreamException {
+        this.fr = createXMLStreamReader(is);
+        event = fr.next();
+        skipUntilPageStart(fr);
+    }
+
+    public static StaxWikiDumpParser buildAndInitiateParse(InputStream is) throws XMLStreamException {
+        return new StaxWikiDumpParser(is);
+    }
+
+    private XMLStreamReader createXMLStreamReader(InputStream is) throws XMLStreamException {
+        XMLInputFactory xmlif = XMLInputFactory.newInstance();
+        StreamReaderDelegate srd = new StreamReaderDelegate(xmlif.createXMLStreamReader(is)) {
+            // skip comments, processing instructions and white space characters
+
+            @Override
+            public int next() throws XMLStreamException {
+                while (true) {
+                    int event = super.next();
+                    switch (event) {
+                        case XMLStreamConstants.COMMENT:
+                        case XMLStreamConstants.PROCESSING_INSTRUCTION:
+                            continue;
+                        case XMLStreamConstants.CHARACTERS:
+                            if (this.isWhiteSpace()) {
+                                continue;
+                            }
+                        default:
+                            return event;
+                    }
+                }
+            }
+        };
+        return srd;
+    }
+
+    @Override
+    public Iterator<WikiPage> iterator() {
+        return pageIterator;
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            fr.close();
+        } catch (XMLStreamException xe) {
+            throw new IOException(xe);
+        }
+    }
+
+    private void skipUntilPageStart(XMLStreamReader fr) throws XMLStreamException {
+        boolean reachedPage = false;
+        while (!reachedPage) {
+            if (!fr.hasNext()) {
+                break;
+            }
+            event = fr.next();
+            if (event == XMLStreamConstants.START_ELEMENT) {
+                if (fr.getName().getLocalPart().equalsIgnoreCase("page")) {
+                    reachedPage = true;
+                }
+            }
+        }
+    }
+
+    private class WikiPageIterator implements Iterator<WikiPage> {
+
+        @Override
+        public boolean hasNext() {
+            try {
+                return fr.hasNext();
+            } catch (XMLStreamException ex) {
+                throw new RuntimeException("Exception ", ex);
+            }
+        }
+
+        @Override
+        public WikiPage next() {
+//            Preconditions.checkState(event == XMLStreamConstants.START_ELEMENT);
+//            Preconditions.checkState(fr.getName().getLocalPart().equalsIgnoreCase("page"));
+            return readPage();
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        private WikiPage readPage() {
+            String title = "";
+            String contributor = "";
+            String text = "";
+            try {
+                while (true) {
+                    switch (event) {
+                        case XMLStreamConstants.START_ELEMENT:
+                            switch (PageTypePart.pageTypeFromQName(fr.getName())) {
+                                case USERNAME:
+                                    contributor = readContributorUserName();
+                                    break;
+                                case TEXT:
+                                    text = readPageText();
+                                    break;
+                                case TITLE:
+                                    title = readPageTitle();
+                                    break;
+                                default:
+                                    break;
+                            }
+                            break;
+                        case XMLStreamConstants.END_ELEMENT:
+                            if (fr.isEndElement()) {
+                                if (PageTypePart.pageTypeFromQName(fr.getName()) == PageTypePart.PAGE) {
+                                    fr.next();
+                                    // we return a new wiki-page. only one revision will be taken.
+                                    return new WikiPage(title, contributor, text);
+                                }
+                            }
+                            break;
+                        case XMLStreamConstants.END_DOCUMENT:
+                            //TODO: shut down the processing
+                            return new WikiPage(title, contributor, text);
+                        default:
+                            //LOG.info("Element {}", Utils.getEventTypeString(event));
+                            break;
+                    }
+                    event = fr.next();
+                }
+            } catch (XMLStreamException e) {
+                throw new RuntimeException("Exception parsing page", e);
+            }
+        }
+
+        private String readContributorUserName() throws XMLStreamException {
+            return fr.getElementText();
+        }
+
+        private String readPageText() throws XMLStreamException {
+            return fr.getElementText();
+        }
+
+        private String readPageTitle() throws XMLStreamException {
+            return fr.getElementText();
+        }
+    }
+}

src/main/java/ro/ieugen/wikiindexer/parse/StaxWikiParser.java

-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package ro.ieugen.wikiindexer.parse;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import javax.xml.stream.*;
-import javax.xml.stream.util.StreamReaderDelegate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import ro.ieugen.wikiindexer.Utils;
-import ro.ieugen.wikiindexer.model.PageTypePart;
-import ro.ieugen.wikiindexer.model.WikiPage;
-
-/**
- * Parses a XML using StaX.
- * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
- */
-public class StaxWikiParser implements Iterable<WikiPage>, Closeable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(StaxWikiParser.class);
-    private final XMLStreamReader fr;
-    private final WikiPageIterator pageIterator = new WikiPageIterator();
-    private int event;
-
-    private StaxWikiParser(InputStream is) throws XMLStreamException {
-        this.fr = createXMLStreamReader(is);
-        event = fr.next();
-        skipUntilPageStart(fr);
-    }
-
-    public static StaxWikiParser buildAndInitiateParse(InputStream is) throws XMLStreamException {
-        return new StaxWikiParser(is);
-    }
-
-    private XMLStreamReader createXMLStreamReader(InputStream is) throws XMLStreamException {
-        XMLInputFactory xmlif = XMLInputFactory.newInstance();
-        StreamReaderDelegate srd = new StreamReaderDelegate(xmlif.createXMLStreamReader(is)) {
-            // skip comments, processing instructions and white space characters
-
-            @Override
-            public int next() throws XMLStreamException {
-                while (true) {
-                    int event = super.next();
-                    switch (event) {
-                        case XMLStreamConstants.COMMENT:
-                        case XMLStreamConstants.PROCESSING_INSTRUCTION:
-                            continue;
-                        case XMLStreamConstants.CHARACTERS:
-                            if (this.isWhiteSpace()) {
-                                continue;
-                            }
-                        default:
-                            return event;
-                    }
-                }
-            }
-        };
-        return srd;
-    }
-
-    public Iterator<WikiPage> iterator() {
-        return pageIterator;
-    }
-
-    public void close() throws IOException {
-        try {
-            fr.close();
-        } catch (XMLStreamException xe) {
-            throw new IOException(xe);
-        }
-    }
-
-    private void skipUntilPageStart(XMLStreamReader fr) throws XMLStreamException {
-        boolean reachedPage = false;
-        while (!reachedPage) {
-            if (!fr.hasNext()) {
-                break;
-            }
-            event = fr.next();
-            if (event == XMLStreamConstants.START_ELEMENT) {
-                if (fr.getName().getLocalPart().equalsIgnoreCase("page")) {
-                    reachedPage = true;
-                }
-            }
-        }
-    }
-
-    private class WikiPageIterator implements Iterator<WikiPage> {
-
-        public boolean hasNext() {
-            try {
-                return fr.hasNext();
-            } catch (XMLStreamException ex) {
-                throw new RuntimeException("Exception ", ex);
-            }
-        }
-
-        public WikiPage next() {
-//            Preconditions.checkState(event == XMLStreamConstants.START_ELEMENT);
-//            Preconditions.checkState(fr.getName().getLocalPart().equalsIgnoreCase("page"));
-            return readPage();
-        }
-
-        public void remove() {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        private WikiPage readPage() {
-            String title = "";
-            String contributor = "";
-            String text = "";
-            try {
-                while (true) {
-                    switch (event) {
-                        case XMLStreamConstants.START_ELEMENT:
-                            //LOG.info("Start of {}", fr.getName().getLocalPart());
-                            switch (PageTypePart.pageTypeFromQName(fr.getName())) {
-                                case USERNAME:
-                                    contributor = readContributorUserName();
-                                    break;
-                                case TEXT:
-                                    text = readPageText();
-                                    break;
-                                case TITLE:
-                                    title = readPageTitle();
-                                    break;
-                                default:
-                                    break;
-                            }
-                            break;
-                        case XMLStreamConstants.END_ELEMENT:
-                            if (fr.isEndElement()) {
-                                if (PageTypePart.pageTypeFromQName(fr.getName()) == PageTypePart.PAGE) {
-                                    fr.next();
-                                    return new WikiPage(title, contributor, text);
-                                }
-                            }
-                            break;
-                        default:
-                            //LOG.info("Element {}", Utils.getEventTypeString(event));
-                            break;
-                    }
-                    event = fr.next();
-                }
-            } catch (XMLStreamException e) {
-                throw new RuntimeException("Exception parsing page", e);
-            }
-        }
-
-        private String readContributorUserName() throws XMLStreamException {
-            return fr.getElementText();
-        }
-
-        private String readPageText() throws XMLStreamException {
-            return fr.getElementText();
-        }
-
-        private String readPageTitle() throws XMLStreamException {
-            return fr.getElementText();
-        }
-    }
-}

src/main/java/ro/ieugen/wikiindexer/processing/DocIndexer.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package ro.ieugen.wikiindexer.processing;
+
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Set;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ro.ieugen.wikiindexer.model.SingleDocIndex;
+import ro.ieugen.wikiindexer.model.WikiPage;
+
+/**
+ * Indexes a single wiki page.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public class DocIndexer implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DocIndexer.class);
+    private final static String DEFAULT_FIELD = "content";
+    private final WikiPage pageToIndex;
+    private final Analyzer analyzer;
+    private final ProcessingPipeline context;
+
+    public DocIndexer(ProcessingPipeline pipeline, WikiPage pageToIndex, Analyzer analyzer) {
+        this.context = pipeline;
+        this.pageToIndex = pageToIndex;
+        this.analyzer = analyzer;
+    }
+
+    @Override
+    public void run() {
+        Set<String> terms = Sets.newHashSet();
+        // tokenize the input stream
+        TokenStream tokenStream = analyzer.tokenStream(DEFAULT_FIELD, new StringReader(pageToIndex.getContent()));
+        CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
+        try {
+            while (tokenStream.incrementToken()) {
+                terms.add(charTermAttribute.toString());
+            }
+        } catch (IOException ex) {
+            LOG.info("Exception tokenizing content", ex);
+        }
+        // add the contributor as a normal term, we don't use fields
+        terms.add(pageToIndex.getContribuitor());
+
+        SingleDocIndex p = new SingleDocIndex(pageToIndex.getTitle(), terms);
+        // add the index part to the que for processing
+        context.offer(p);
+    }
+}

src/main/java/ro/ieugen/wikiindexer/processing/IndexWriter.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+package ro.ieugen.wikiindexer.processing;
+
+import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import ro.ieugen.wikiindexer.model.IndexSegment;
+
+/**
+ * Persists a {@link IndexModel} to disk.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public class IndexWriter implements Closeable {
+
+    private static final String TITLES_SUFFIX = ".dict";
+    private static final String POSTINGS_SUFFIX = ".pst";
+    private final String outputDirectory;
+    private final AtomicInteger segmentsCount = new AtomicInteger(1);
+    private final Configuration conf = new Configuration();
+    private final FileSystem fs = new LocalFileSystem();
+
+    public IndexWriter(String outputDirectory) throws IOException {
+        Preconditions.checkNotNull(outputDirectory);
+        this.outputDirectory = outputDirectory;
+    }
+
+    public void persist(IndexSegment segment) throws IOException {
+        int segmentId = segmentsCount.incrementAndGet();
+        persistTitles(segment, segmentId);
+        persistPostings(segment, segmentId);
+    }
+
+    private void persistTitles(IndexSegment segment, int segmentId) throws IOException {
+
+        final Writer titlesFile = SequenceFile.createWriter(fs, conf,
+                new Path(outputDirectory, segmentId + TITLES_SUFFIX), IntWritable.class, Text.class,
+                SequenceFile.CompressionType.RECORD);
+        ArrayList<String> titles = segment.titles();
+        LongWritable key = new LongWritable();
+        Text value = new Text();
+        for (int i = 0; i < titles.size(); i++) {
+            key.set(i);
+            value.set(titles.get(i));
+            titlesFile.append(key, value);
+        }
+        titlesFile.close();
+    }
+
+    private void persistPostings(IndexSegment segment, int segmentId) throws IOException {
+        final Writer postings = SequenceFile.createWriter(fs, conf,
+                new Path(outputDirectory, segmentId + POSTINGS_SUFFIX), IntWritable.class, Text.class,
+                SequenceFile.CompressionType.RECORD);
+        ArrayList<String> titles = segment.titles();
+        LongWritable key = new LongWritable();
+        Text value = new Text();
+        for (int i = 0; i < titles.size(); i++) {
+            key.set(i);
+            value.set(titles.get(i));
+            postings.append(key, value);
+        }
+        postings.close();
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}

src/main/java/ro/ieugen/wikiindexer/processing/ProcessingPipeline.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package ro.ieugen.wikiindexer.processing;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import java.io.IOException;
+import java.util.concurrent.*;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.util.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ro.ieugen.wikiindexer.model.IndexSegment;
+import ro.ieugen.wikiindexer.model.SingleDocIndex;
+import ro.ieugen.wikiindexer.model.WikiPage;
+
+/**
+ * Builds the processing pipeline and infrastructure.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public class ProcessingPipeline {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProcessingPipeline.class);
+    public static final int DEFAULT_INDEX_WRITER_THREAD_COUNT = 4;
+    private int indexWriterCount = DEFAULT_INDEX_WRITER_THREAD_COUNT;
+    private final ExecutorService indexProducers;
+    private final ExecutorService indexConsumer;
+    /** Use a {@link ArrayBlockingQueue} to control the indexing throughput */
+    private final BlockingQueue<SingleDocIndex> indexedDocuments = Queues.newLinkedBlockingQueue();
+    private final Future[] indexWriterStatus;
+    private final Analyzer analyzer;
+    private final IndexWriter indexWriter;
+
+    public ProcessingPipeline(ExecutorService indexProducers, IndexWriter indexWriter) {
+        this(indexProducers, DEFAULT_INDEX_WRITER_THREAD_COUNT, indexWriter, new StandardAnalyzer(Version.LUCENE_35));
+    }
+
+    public ProcessingPipeline(ExecutorService producers, int indexerCount, IndexWriter indexWriter, Analyzer analyzer) {
+        Preconditions.checkNotNull(producers);
+        Preconditions.checkNotNull(analyzer);
+        Preconditions.checkNotNull(indexWriter);
+        Preconditions.checkArgument(indexerCount > 0);
+        this.indexProducers = producers;
+        this.indexWriter = indexWriter;
+        this.analyzer = analyzer;
+        this.indexWriterCount = indexerCount;
+        this.indexConsumer = Executors.newFixedThreadPool(indexWriterCount);
+        this.indexWriterStatus = new Future[indexWriterCount];
+        // start the index consumers that will build the index and flush it to disk.
+        for (int i = 0; i < indexWriterStatus.length; i++) {
+            indexWriterStatus[i] = indexConsumer.submit(new SegmentBuilder(this, new IndexSegment()));
+        }
+    }
+
+    public void process(WikiPage page) {
+        indexProducers.submit(new DocIndexer(this, page, analyzer));
+    }
+
+    public boolean offer(SingleDocIndex pageIndex) {
+        return indexedDocuments.offer(pageIndex);
+    }
+
+    public SingleDocIndex take() throws InterruptedException {
+        return indexedDocuments.take();
+    }
+
+    public void persist(IndexSegment segment) {
+        try {
+            indexWriter.persist(segment);
+        } catch (IOException e) {
+            LOG.info("Exception persisting segmen", e);
+        }
+    }
+
+    /**
+     * Ensure proper application shutdown
+     */
+    public void shutdown() {
+        indexProducers.shutdown();
+        try {
+            indexProducers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException e) {
+            LOG.info("Interupted while shutting down indexers");
+        }
+        indexConsumer.shutdown();
+        try {
+            indexConsumer.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException e) {
+            LOG.info("Interupted while shutting down indexers");
+        }
+    }
+}

src/main/java/ro/ieugen/wikiindexer/processing/SegmentBuilder.java

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package ro.ieugen.wikiindexer.processing;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ro.ieugen.wikiindexer.model.IndexSegment;
+
+/**
+ * Builds one index segment in memory.
+ * @author Ioan Eugen Stan <stan.ieugen@gmail.com>
+ */
+public class SegmentBuilder implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SegmentBuilder.class);
+    private final IndexSegment segment;
+    private final ProcessingPipeline context;
+
+    public SegmentBuilder(ProcessingPipeline context, IndexSegment segment) {
+        Preconditions.checkNotNull(context);
+        Preconditions.checkNotNull(segment);
+        this.segment = segment;
+        this.context = context;
+    }
+
+    @Override
+    public void run() {
+        LOG.info("Started new memory index builder");
+        // TODO: add a way to signal the end of processing
+        while (!segment.shouldFlush()) {
+            try {
+                segment.add(context.take());
+            } catch (InterruptedException ex) {
+                LOG.info("Interrupted while adding docs to the index");
+            }
+        }
+        context.persist(segment);
+    }
+}

src/test/java/ro/ieugen/wikiindexer/IndexingAppTest.java

 package ro.ieugen.wikiindexer;
 
+import java.io.IOException;
 import org.junit.Test;
 
 /**
 
     // not really a test
     @Test
-    public void testMain() {
+    public void testMain() throws IOException {
         IndexingApp.main(new String[]{"-i", input, "-o", output});
     }
 }