Commits

Brian Beck committed 8da4d34

Changed the way n3()-able objects print in repr(). Added pattern tests.

Comments (0)

Files changed (7)

lib/telescope/sparql/expressions.py

+import weakref
 import operator
 from rdflib import Variable
 
         value = self.value
         if hasattr(value, 'n3'):
             value = value.n3()
+        else:
+            value = repr(value)
         if self.operator:
-            return "Expression(%r, %r)" % (value, self.operator)
+            return "Expression(%s, %r)" % (value, self.operator)
         else:
-            return "Expression(%r)" % (value,)
+            return "Expression(%s)" % (value,)
 
     def _clone(self, **kwargs):
         clone = self.__class__.__new__(self.__class__)
     return ConditionalExpression(operator.or_, operands)
 
 class VariableExpressionConstructor(object):
+    _VARIABLES = weakref.WeakValueDictionary()
+    
     def __call__(self, name):
-        return Expression(Variable(name))
-
+        if isinstance(name, basestring):
+            name = unicode(name)
+        else:
+            raise TypeError("variable names must be strings")
+        try:
+            expr = self._VARIABLES[name]
+        except KeyError:
+            expr = self._VARIABLES[name] = Expression(Variable(name))
+        return expr
+    
     def __getattr__(self, name):
         return self(name)
-
+    
     def __getitem__(self, name):
         return self(name)
-

lib/telescope/sparql/helpers.py

 
 def subject(term):
     return TriplesSameSubject(term)
-

lib/telescope/sparql/patterns.py

 class TriplesBlock(object):
     pass
 
-class TriplesSameSubject(object):
+class TriplesSameSubject(TriplesBlock):
     def __init__(self, subject, predicate_object_list=()):
         self.subject = subject
         self.predicate_object_list = tuple(predicate_object_list)
         clone.__dict__.update(self.__dict__)
         clone.__dict__.update(kwargs)
         return clone
-
-    def __getitem__(self, *predicate_object_pairs):
+    
+    def _to_predicate_object_tuple(self, obj):
+        if isinstance(obj, slice):
+            if obj.step is not None:
+                warnings.warn("step value ignored: %r" % (obj.step,))
+            return (obj.start, obj.stop)
+        elif isinstance(obj, tuple) and len(obj) == 2:
+            try:
+                self._to_predicate_object_tuple(obj[0])
+            except ValueError:
+                return tuple(obj)
+        raise ValueError("could not convert to predicate-object pair: "
+                         "%r is not a slice or valid 2-tuple" % (obj,))
+    
+    def __getitem__(self, predicate_object_pairs):
         predicate_object_list = list(self.predicate_object_list)
-        for pair in predicate_object_pairs:
-            if isinstance(pair, slice):
-                if pair.step is not None:
-                    warnings.warn("step value %r ignored." % (pair.step,))
-                pred, obj = pair.start, pair.stop
-            else:
-                pred, obj = pair
-            predicate_object_list.append((pred, obj))
-        return self._clone(predicate_object_list=tuple(predicate_object_list))
-
-    def __call__(self, predicate_object_pairs):
-        predicate_object_list = list(self.predicate_object_list)
-        predicate_object_list.extend(predicate_object_pairs)
+        try:
+            pair = self._to_predicate_object_tuple(predicate_object_pairs)
+        except ValueError:
+            pairs = map(self._to_predicate_object_tuple, predicate_object_pairs)
+            predicate_object_list.extend(pairs)
+        else:
+                predicate_object_list.append(pair)
         return self._clone(predicate_object_list=tuple(predicate_object_list))
 
 class Filter(object):
     def __init__(self, patterns):
         self.patterns = []
         self.filters = []
-        self.add(*patterns)
+        self.pattern(*patterns)
     
-    def add(self, *patterns):
+    def pattern(self, *patterns):
         for pattern in patterns:
             if not isinstance(pattern, (Triple, GraphPattern)):
                 pattern = Triple.from_obj(pattern)
             self.patterns.append(pattern)
     
-    def filter(self, *expressions):
-        self.filters.append(Filter(and_(*expressions)))
+    def filter(self, *constraints):
+        constraints = list(constraints)
+        for i, constraint in enumerate(constraints):
+            if isinstance(constraint, Filter):
+                constraints[i] = constraint.constraint
+        self.filters.append(Filter(and_(*constraints)))
     
     def __nonzero__(self):
-        return bool(self.patterns)
-    
-    def __len__(self):
-        return len(self.patterns)
-    
-    def __getitem__(self, item):
-        return self.patterns[item]
+        return bool(self.patterns or self.filters)
     
     def __or__(self, other):
         return UnionGraphPattern([self, GraphPattern.from_obj(other)])
         clone = self.__class__.__new__(self.__class__)
         clone.__dict__.update(self.__dict__)
         clone.patterns = self.patterns[:]
+        clone.filters = self.filters[:]
         clone.__dict__.update(kwargs)
         return clone
     

lib/telescope/sparql/select.py

         self._limit = kwargs.pop('limit', None)
         self._offset = kwargs.pop('offset', None)
         self._order_by = kwargs.pop('order_by', None)
-        self.graph = kwargs.pop('graph', None)
         if kwargs:
             key, value = kwargs.popitem()
             raise TypeError("Unexpected keyword argument: %r" % key)
         clone = self._clone()
         if patterns:
             graph_pattern = GroupGraphPattern.from_obj(patterns, **kwargs)
-            clone._where.add(graph_pattern)
+            clone._where.pattern(graph_pattern)
         return clone
     
     def filter(self, *constraints, **kwargs):

tests/test_expressions.py

             value = n3_type('test')
             value_repr = value.n3()
             expr = Expression(value)
-            assert repr(expr) == "Expression(%r)" % value_repr
+            assert repr(expr) == "Expression(%s)" % value_repr
 
 class TestPrintingBinaryExpression:
     def test_repr_args_print_with_repr(self):
     def test_getitem_returns_variable_expression(self):
         foo = self.v['foo']
         self.assert_is_variable_expression(foo, 'foo')
-
+    
+    def test_variable_name_must_be_a_string(self):
+        assert_raises(TypeError, self.v, 1)
+    
     def assert_is_variable_expression(self, obj, name):
         assert isinstance(obj, Expression)
         assert isinstance(obj.value, Variable)

tests/test_patterns.py

 from nose.tools import assert_raises
-from rdflib import Namespace, URIRef
+from rdflib import Variable, Namespace, URIRef
 from telescope.sparql.patterns import *
 from telescope.sparql.helpers import *
 
 FOAF = Namespace('http://xmlns.com/foaf/0.1/')
 
+class TestCreatingTriple:
+    def test_args_set_subject_predicate_object(self):
+        triple = Triple(Variable('x'), FOAF.name, "Alice")
+        assert triple.subject == Variable('x')
+        assert triple.predicate == FOAF.name
+        assert triple.object == "Alice"
+    
+    def test_from_obj_with_triple_returns_same_object(self):
+        triple_a = Triple(Variable('x'), FOAF.name, "Alice")
+        triple_b = Triple.from_obj(triple_a)
+        assert triple_a is triple_b
+    
+    def test_from_obj_with_sequence_unpacks_object_as_args(self):
+        args = (Variable('x'), FOAF.name, "Alice")
+        triple = Triple.from_obj(args)
+        assert triple.subject == Variable('x')
+        assert triple.predicate == FOAF.name
+        assert triple.object == "Alice"
+        assert_raises(TypeError, Triple.from_obj, 1)
+
+class TestIteratingTriple:
+    def setup(self):
+        self.triple = Triple(Variable('x'), FOAF.name, "Alice")
+    
+    def test_iteration_yields_subject_predicate_object(self):
+        items = list(self.triple)
+        assert len(items) == 3
+        assert items[0] == Variable('x')
+        assert items[1] == FOAF.name
+        assert items[2] == "Alice"
+    
+    def test_iteration_does_not_consume_or_change_items(self):
+        items_a = list(self.triple)
+        items_b = list(self.triple)
+        assert items_a == items_b
+
+class TestPrintingTriple:
+    def test_repr_args_print_with_repr(self):
+        args = (Variable('x'), FOAF.name, "Alice")
+        triple = Triple(*args)
+        assert repr(triple) == "Triple(%r, %r, %r)" % args
+
 class TestCreatingTriplesSameSubject:
     def test_first_arg_sets_subject(self):
         triples = TriplesSameSubject(v.x)
     def test_second_arg_sets_predicate_object_list(self):
         triples = TriplesSameSubject(v.x, [(FOAF.name, v.name)])
         assert (FOAF.name, v.name) in triples.predicate_object_list
-
+    
+    def test_getitem_accepts_any_sequence_of_pairs(self):
+        triples = TriplesSameSubject(v.x)
+        for i in range(10):
+            pairs = [(FOAF.name, j) for j in range(i)]
+            new_triples = triples[pairs]
+        for i in range(10):
+            pairs = tuple([(FOAF.name, j) for j in range(i)])
+            new_triples = triples[pairs]
+        for i in range(10):
+            pairs = ((FOAF.name, j) for j in range(i))
+            new_triples = triples[pairs]
+    
     def test_getitem_adds_predicate_object_pairs(self):
         triples = TriplesSameSubject(v.x)[(FOAF.name, v.name)]
         assert (FOAF.name, v.name) in triples.predicate_object_list

tests/test_select.py

     
     def test_patterns_arg_adds_clauses(self):
         select = Select([], (v.x, FOAF.name, v.name))
-        for pattern in select._where:
+        for pattern in select._where.patterns:
             if isinstance(pattern, (Triple, GraphPattern)):
                 break
         else:
         select = self.select.where(
             (v.x, FOAF.name, "Alice"), (v.x, FOAF.mbox, v.mbox)
         )
-        assert len(select._where[-1].patterns) == 2
+        assert len(select._where.patterns[-1].patterns) == 2
     
     def test_optional_kwarg_makes_optional_pattern(self):
         select = self.select.where((v.x, FOAF.mbox, v.mbox), optional=True)
-        assert select._where[-1].optional
+        assert select._where.patterns[-1].optional
 
 class TestAddingFilterConstraints:
     def setup(self):