1. coady
  2. lupyne

Commits

coady  committed 03c899f

Parallel indexing.

  • Participants
  • Parent commits 79e1d27
  • Branches default

Comments (0)

Files changed (6)

File docs/engine.rst

View file
  • Ignore whitespace
   :show-inheritance:
   :members:
 
+ParallelIndexer
+^^^^^^^^^^^^^^^
+.. versionadded:: 1.1+
+.. note::  This interface is experimental and might change in incompatible ways in the next release.
+.. autoclass:: ParallelIndexer
+  :show-inheritance:
+  :members:
+
+  .. attribute:: termsfilters
+
+    Mapping of filters to synchronized termsfilters.
+
 
 documents
 ---------
 
 TermsFilter
 ^^^^^^^^^^^^^
+.. versionadded:: 1.1+
+.. note::  This interface is experimental and might change in incompatible ways in the next release.
 .. autoclass:: lupyne.engine.TermsFilter
   :show-inheritance:
   :members:

File lupyne/engine/__init__.py

View file
  • Ignore whitespace
 
 from .queries import Query, SortField, TermsFilter
 from .documents import Document, Field, FormatField, NestedField, NumericField, DateTimeField
-from .indexers import TokenFilter, Analyzer, IndexSearcher, MultiSearcher, IndexWriter, Indexer
+from .indexers import TokenFilter, Analyzer, IndexSearcher, MultiSearcher, IndexWriter, Indexer, ParallelIndexer
 from .spatial import PointField, PolygonField
 
 assert lucene.VERSION >= '3.1'

File lupyne/engine/indexers.py

View file
  • Ignore whitespace
 import abc, collections
 import warnings
 import lucene
-from .queries import Query, Collector, SortField, Highlighter, FastVectorHighlighter, SpellChecker, SpellParser
+from .queries import Query, Collector, TermsFilter, SortField, Highlighter, FastVectorHighlighter, SpellChecker, SpellParser
 from .documents import Field, Document, Hits
 from .spatial import DistanceComparator
 
                 self.optimize(merge)
             IndexWriter.commit(self)
         self.refresh(**caches)
+
+class ParallelIndexer(Indexer):
+    """Indexer which tracks a unique identifying field.
+    Handles atomic updates of rapidly changing fields, managing :attr:`termsfilters`.
+    """
+    def __init__(self, field, *args, **kwargs):
+        Indexer.__init__(self, *args, **kwargs)
+        self.field = field
+        self.set(field, index=True, omitNorms=True)
+        self.termsfilters = {}
+    def termsfilter(self, filter, *others):
+        "Return `TermsFilter`_ synced to given filter and optionally associated with other indexers."
+        terms = self.sorter(self.field).terms(filter, *self.sequentialSubReaders)
+        termsfilter = self.termsfilters[filter] = TermsFilter(self.field, terms)
+        for other in others:
+            termsfilter.refresh(other)
+            other.termsfilters.add(termsfilter)
+        return termsfilter
+    def update(self, value, document=(), **terms):
+        "Atomically update document based on unique field."
+        terms[self.field] = value
+        self.updateDocument(lucene.Term(self.field, value), self.document(document, **terms))
+    def refresh(self, **caches):
+        "Store refreshed searcher and synchronize :attr:`termsfilters`."
+        sorter, segments = self.sorter(self.field), self.segments
+        searcher = self.indexSearcher.reopen(**caches)
+        readers = [reader for reader in searcher.sequentialSubReaders if lucene.SegmentReader.cast_(reader).segmentName not in segments]
+        terms = list(itertools.chain.from_iterable(IndexReader(reader).terms(self.field) for reader in readers))
+        for filter, termsfilter in self.termsfilters.items():
+            if terms:
+                termsfilter.update(terms, op='andNot', cache=not self.nrt)
+            if readers:
+                termsfilter.update(sorter.terms(filter, *readers), cache=not self.nrt)
+        self.indexSearcher = searcher

File lupyne/engine/queries.py

View file
  • Ignore whitespace
             filter = lucene.PrefixFilter(self.getPrefix())
         elif isinstance(self, lucene.TermRangeQuery):
             filter = lucene.TermRangeFilter(self.field, self.lowerTerm, self.upperTerm, self.includesLower(), self.includesUpper())
+        elif isinstance(self, lucene.TermQuery):
+            filter = lucene.TermsFilter()
+            filter.addTerm(self.getTerm())
         else:
             filter = lucene.QueryWrapperFilter(self)
         return lucene.CachingWrapperFilter(filter) if cache else filter
         return ids, list(map(self.scores.__getitem__, ids))
 
 class TermsFilter(lucene.CachingWrapperFilter):
-    """Experimental caching filter based on a unique field and set of matching values.
+    """Caching filter based on a unique field and set of matching values.
     Optimized for many terms and docs, with support for incremental updates.
     Suitable for searching external metadata associated with indexed identifiers.
     Call :meth:`refresh` to cache a new (or reopened) reader.
             namespace = {'parse' + type: staticmethod(parser)}
             parser = object.__class__(base.__name__, (base,), namespace)()
         lucene.SortField.__init__(self, name, parser, reverse)
+    def array(self, reader):
+        method = getattr(lucene.FieldCache.DEFAULT, 'get{0}s'.format(self.typename))
+        return method(reader, self.field, *[self.parser][:bool(self.parser)])
     def comparator(self, reader):
-        "Return indexed values from default FieldCache using the given reader."
-        method = getattr(lucene.FieldCache.DEFAULT, 'get{0}s'.format(self.typename))
-        args = [self.parser] * bool(self.parser)
+        "Return indexed values from default FieldCache using the given top-level reader."
         readers = reader.sequentialSubReaders
         if lucene.MultiReader.instance_(reader):
             readers = itertools.chain.from_iterable(reader.sequentialSubReaders for reader in readers)
-        arrays = [method(reader, self.field, *args) for reader in readers]
+        arrays = list(map(self.array, readers))
         if len(arrays) <= 1:
             return arrays[0]
         cls, = set(map(type, arrays))
         "Return lucene FieldCacheRangeFilter based on field and type."
         method = getattr(lucene.FieldCacheRangeFilter, 'new{0}Range'.format(self.typename))
         return method(self.field, self.parser, start, stop, lower, upper)
+    def terms(self, filter, *readers):
+        "Generate field cache terms from docs which match filter from all segments."
+        for reader in readers:
+            array, it = self.array(reader), filter.getDocIdSet(reader).iterator()
+            try:
+                while True:
+                    yield array[it.nextDoc()]
+            except IndexError:
+                pass
 
 class Highlighter(lucene.Highlighter):
     """Inherited lucene Highlighter with stored analysis options.

File setup.py

View file
  • Ignore whitespace
    - PyLucene 3.1 deprecated
    - Support for all IndexWriterConfig options
    - Flexible hit sorting and filtering
-   - TermsFilter for searching external metadata
+   - TermsFilter and ParallelIndexer for searching external metadata
 
  * Server:
    

File test/local.py

View file
  • Ignore whitespace
         query = engine.Query.term('text', 'right', boost=2.0)
         assert query.boost == 2.0
         assert indexer.facets(str(query), 'amendment', 'article') == {'amendment': 12, 'article': 1}
-        self.assertRaises(TypeError, indexer.overlap, query.filter(), query.filter(cache=False))
+        self.assertRaises(TypeError, indexer.overlap, query.filter(), lucene.QueryWrapperFilter(query))
         hits = indexer.search('text:people', filter=query.filter())
         assert len(hits) == 4
         hit, = indexer.search('date:192*')
         for name in ('alpha', 'bravo', 'charlie'):
             indexer.add(name=name)
         indexer.commit()
-        filter = engine.TermsFilter('name', ('alpha', 'bravo'))
+        filter = engine.TermsFilter('name')
         assert indexer.count(filter=filter) == len(filter.readers) == 0
-        filter.refresh(indexer)
-        indexer.termsfilters.add(filter)
+        filter.add('alpha', 'bravo')
+        filter.discard('bravo', 'charlie')
+        assert filter.values == set(['alpha'])
+        parallel = engine.ParallelIndexer('name')
+        parallel.set('priority', index=True)
+        for name in ('alpha', 'bravo', 'delta'):
+            parallel.update(name, priority='high')
+        parallel.commit()
+        filter = parallel.termsfilter(engine.Query.term('priority', 'high').filter(), indexer)
         assert [hit['name'] for hit in indexer.search(filter=filter)] == ['alpha', 'bravo']
-        filter.discard('bravo', 'charlie')
-        filter.add('charlie', 'delta')
-        assert [hit['name'] for hit in indexer.search(filter=filter)] == ['alpha', 'charlie']
         indexer.add(name='delta')
         indexer.delete('name', 'alpha')
         indexer.commit()
         assert filter.readers > set(indexer.sequentialSubReaders)
+        assert [hit['name'] for hit in indexer.search(filter=filter)] == ['bravo', 'delta']
+        parallel.update('bravo')
+        parallel.update('charlie', priority='high')
+        parallel.commit()
         assert [hit['name'] for hit in indexer.search(filter=filter)] == ['charlie', 'delta']
+        parallel.commit()
         filter.refresh(indexer)
         assert filter.readers == set(indexer.sequentialSubReaders)