Commits

coady committed 79e1d27

TermsFilter for searching external metadata.

Comments (0)

Files changed (6)

 
     Mapping of cached spellcheckers by field.
 
+  .. attribute:: termsfilters
+
+    Set of registered termsfilters.
+
 MultiSearcher
 ^^^^^^^^^^^^^
 .. autoclass:: lupyne.engine.MultiSearcher
 
     <SpanOrQuery: spanOr(spans)>
 
+TermsFilter
+^^^^^^^^^^^^^
+.. autoclass:: lupyne.engine.TermsFilter
+  :show-inheritance:
+  :members:
+
 SortField
 ^^^^^^^^^^^^^
 .. autoclass:: lupyne.engine.SortField

lupyne/engine/__init__.py

 import lucene
 warnings.simplefilter('default', DeprecationWarning)
 
-from .queries import Query, SortField
+from .queries import Query, SortField, TermsFilter
 from .documents import Document, Field, FormatField, NestedField, NumericField, DateTimeField
 from .indexers import TokenFilter, Analyzer, IndexSearcher, MultiSearcher, IndexWriter, Indexer
 from .spatial import PointField, PolygonField

lupyne/engine/indexers.py

         self.owned = closing([self.indexReader])
         self.analyzer = self.shared.analyzer(analyzer)
         self.filters, self.sorters, self.spellcheckers = {}, {}, {}
+        self.termsfilters = set()
     @classmethod
     def load(cls, directory, analyzer=None):
         "Open `IndexSearcher`_ with a lucene RAMDirectory, loading index into memory."
             self.close()
     def reopen(self, filters=False, sorters=False, spellcheckers=False):
         """Return current `IndexSearcher`_, only creating a new one if necessary.
+        Any registered :attr:`termsfilters` are also refreshed.
         
         :param filters: refresh cached facet :attr:`filters`
         :param sorters: refresh cached :attr:`sorters` with associated parsers
         other.decRef()
         other.shared = self.shared
         other.filters.update((key, value if isinstance(value, lucene.Filter) else dict(value)) for key, value in self.filters.items())
+        for termsfilter in self.termsfilters:
+            termsfilter.refresh(other)
         if filters:
             other.facets(Query.any(), *other.filters)
         other.sorters = dict((name, SortField(sorter.field, sorter.typename, sorter.parser)) for name, sorter in self.sorters.items())

lupyne/engine/queries.py

 import itertools
 import bisect
 import heapq
+import threading
 import lucene
 
 class Query(object):
         ids.sort(key=key, reverse=reverse)
         return ids, list(map(self.scores.__getitem__, ids))
 
+class TermsFilter(lucene.CachingWrapperFilter):
+    """Experimental caching filter based on a unique field and set of matching values.
+    Optimized for many terms and docs, with support for incremental updates.
+    Suitable for searching external metadata associated with indexed identifiers.
+    Call :meth:`refresh` to cache a new (or reopened) reader.
+    
+    :param field: field name
+    :param values: initial term values, synchronized with the cached filters
+    """
+    ops = {'or': 'update', 'and': 'intersection_update', 'andNot': 'difference_update'}
+    def __init__(self, field, values=()):
+        assert hasattr(lucene, 'FixedBitSet'), 'requires FixedBitSet introduced in lucene 3.4'
+        lucene.CachingWrapperFilter.__init__(self, lucene.TermsFilter())
+        self.field = field
+        self.values = set(values)
+        self.readers = set()
+        self.lock = threading.Lock()
+    def filter(self, values, cache=True):
+        "Return lucene TermsFilter, optionally using the FieldCache."
+        if cache:
+            return lucene.FieldCacheTermsFilter(self.field, tuple(values))
+        filter, term = lucene.TermsFilter(), lucene.Term(self.field)
+        for value in values:
+            filter.addTerm(term.createTerm(value))
+        return filter
+    def apply(self, filter, op, readers):
+        for reader in readers:
+            bitset = lucene.FixedBitSet.cast_(self.getDocIdSet(reader))
+            getattr(bitset, op)(filter.getDocIdSet(reader).iterator())
+    def update(self, values, op='or', cache=True):
+        """Update allowed values and corresponding cached bitsets.
+        
+        :param values: additional term values
+        :param op: set operation used to combine terms and docs
+        :param cache: optionally cache all term values using FieldCache
+        """
+        values = tuple(values)
+        filter = self.filter(values, cache)
+        with self.lock:
+            getattr(self.values, self.ops[op])(values)
+            self.apply(filter, op, self.readers)
+    def refresh(self, reader):
+        "Refresh cached bitsets of current values for new segments of top-level reader."
+        readers = set(reader.sequentialSubReaders)
+        with self.lock:
+            self.apply(self.filter(self.values), 'or', readers - self.readers)
+            self.readers = set(reader for reader in readers | self.readers if reader.refCount)
+    def add(self, *values):
+        "Add a few term values."
+        self.update(values, cache=False)
+    def discard(self, *values):
+        "Discard a few term values."
+        self.update(values, op='andNot', cache=False)
+
 class SortField(lucene.SortField):
     """Inherited lucene SortField used for caching FieldCache parsers.
-        
+    
     :param name: field name
     :param type: type object or name compatible with SortField constants
     :param parser: lucene FieldCache.Parser or callable applied to field values
    - PyLucene 3.6 supported
    - PyLucene 3.1 deprecated
    - Support for all IndexWriterConfig options
+   - Flexible hit sorting and filtering
+   - TermsFilter for searching external metadata
 
  * Server:
    
         indexer.add()
         indexer.commit()
         assert indexer.count() == engine.IndexSearcher(indexer.directory).count() == 2
+    
+    def testFilters(self):
+        "Custom filters."
+        if not hasattr(lucene, 'FixedBitSet'):
+            return self.assertRaises(AssertionError, engine.queries.TermsFilter, '')
+        indexer = engine.Indexer()
+        indexer.set('name', store=True, index=True)
+        for name in ('alpha', 'bravo', 'charlie'):
+            indexer.add(name=name)
+        indexer.commit()
+        filter = engine.TermsFilter('name', ('alpha', 'bravo'))
+        assert indexer.count(filter=filter) == len(filter.readers) == 0
+        filter.refresh(indexer)
+        indexer.termsfilters.add(filter)
+        assert [hit['name'] for hit in indexer.search(filter=filter)] == ['alpha', 'bravo']
+        filter.discard('bravo', 'charlie')
+        filter.add('charlie', 'delta')
+        assert [hit['name'] for hit in indexer.search(filter=filter)] == ['alpha', 'charlie']
+        indexer.add(name='delta')
+        indexer.delete('name', 'alpha')
+        indexer.commit()
+        assert filter.readers > set(indexer.sequentialSubReaders)
+        assert [hit['name'] for hit in indexer.search(filter=filter)] == ['charlie', 'delta']
+        filter.refresh(indexer)
+        assert filter.readers == set(indexer.sequentialSubReaders)
 
 if __name__ == '__main__':
     lucene.initVM()