Commits

Eric Larson committed 149d4c6

Initial commit

  • Participants

Comments (0)

Files changed (3)

+MgoQuery (MongoDB Query Parser)
+===============================
+
+A search-like interface that constructs a valid MongoDB query.
+
+MongoDB provides a flexible query model that is powerful, yet somewhat
+intimidating to write for non-technical users. Mqueryparser aims to
+create a concise search-like parser.
+
+This type of parser has two main goals:
+
+ 1. Provide a safe and limited interface for querying MongoDB.
+ 2. Provide a query language that is URL friendly
+
+MQueryParser allows an API developer to create a thin layer above a
+raw query that can be validated and tested for injection like attacks
+while providing a simple interface via the URL.
+
+
+Query Grammar
+-------------
+
+The query grammar is inspired by tools such as Xapian, Lucene and
+GMail's advance query search.
+
+Here is an example of the basic format: ::
+
+  "x>3, x<5" | "y>10, z:True"
+
+Which translates to: ::
+
+  {'$or': [{'$and': [{'x': {'$gte': 3}},
+                     {'x': {'$lte': 5}}]},
+           {'$and': [{'y': {'$gte': 10}},
+                     {'z': True}]}]}
+
+
+The operators are as follows:
+
+  equals = ":"
+  greater than or equal to >= ">"
+  less than or equal to <= "<"
+
+The "," acts as an AND operator meaning the following
+field/operator/value will used in conjunction with the
+preceeding.
+
+The "|" acts as an OR operator such that any match of the
+field/operator/value combinations should match.
+
+from __future__ import print_function
+from pyparsing import (Word, alphanums, Suppress,
+                       Optional, OneOrMore)
+
+
+class Expr(object):
+    def __init__(self, op, k, v):
+        self.op = op
+        self.k = k
+        self.v = v
+
+    def as_dict(self):
+        if self.op == '$eq':
+            return {self.k: self.v}
+        return {self.k: {self.op: self.v}}
+
+
+class AndOr(object):
+    def __init__(self, op, exprs):
+        self.op = op
+        self.exprs = exprs
+
+    def as_dict(self):
+        return {self.op: [e.as_dict() for e in self.exprs]}
+
+
+class Query(object):
+    def __init__(self, parse_result):
+        self.parse_result = parse_result
+
+    def as_dict(self):
+        query = {}
+        for part in self.parse_result:
+            query.update(part.as_dict())
+        return query
+
+
+class Parser(object):
+
+    def __init__(self):
+        self._parser = self.parser()
+        self._query = {}
+
+    def parser(self):
+        """
+        Create our grammar and parser
+        """
+
+        # Basic elements for expressions
+        field = Word(alphanums + '_-')
+        operator = Word(':><')
+        value = Word(alphanums + '_-/:.\[]()')
+
+        # Our expression
+        expression = field + operator + value
+        expression.setParseAction(self.handle_expression)
+
+        # Grouping with AND/OR
+        andor_token = Word('|,')
+
+        # An expression list is a list of expression delimited with an
+        # AND/OR token.
+        expression_list = OneOrMore(expression + Optional(andor_token))
+        expression_list.setParseAction(self.handle_and_or)
+
+        # A group allows combining different AND and OR expression
+        # lists.
+        group = Suppress('"') + expression_list + Suppress('"')
+        group_or_expression = group | expression
+
+        # A top level AND/OR. We only support one level of
+        # grouping. You can have an OR with ANDs or an AND with
+        # ORs. If you want something more complicated, then you
+        # probably should just construct the query yourself.
+        andor = OneOrMore(group_or_expression + Optional(andor_token))
+        andor.setParseAction(self.handle_and_or)
+
+        # We can start with a top level AND/OR, expression list or a
+        # group
+        grammar = andor | expression_list | group
+        return grammar
+
+    def parse(self, s):
+        return self._parser.parseString(s)
+
+    def handle_expression(self, s, loc, toks):
+        """
+        Take the operator and move it to the front in order to make
+        using the results prefix heavy.
+        """
+        ops = {
+            '>': '$gte', '<': '$lte', ':': '$eq'
+        }
+        k, op, v = toks
+        return [Expr(ops[op], k, v)]
+
+    def handle_and_or(self, s, loc, toks):
+        """
+        Take the combining AND/OR token and move it to the front of
+        the list in order to make the results prefix heavy.
+        """
+        expressions = []
+        andor = None
+        for t in toks:
+            if t == '|':
+                andor = '$or'
+            elif t == ',':
+                andor = '$and'
+            else:
+                expressions.append(t)
+
+        if not andor:
+            return toks
+        return [AndOr(andor, expressions)]
+
+if __name__ == '__main__':
+    p = Parser()
+    print(Query(p.parse('"x>3,x<5"|"y>10,z:True"')).as_dict())
+    print(Query(p.parse('"x:3,y>8"|foo:bar')).as_dict())

File tests/test_parser.py

+"""
+Test our parsing and output.
+
+One thing to note is that MgoQuery is not doing anything regarding
+converting types. That is the responsibility of the caller to do so or
+extend the Query class to do it.
+"""
+
+from mgoquery import Parser, Query
+
+
+class TestParser(object):
+
+    def test_operators(self):
+        p = Parser()
+        eq = Query(p.parse('x:y'))
+        assert eq.as_dict() == {'x': 'y'}
+
+        gt = Query(p.parse('x>y'))
+        assert gt.as_dict() == {'x': {'$gte': 'y'}}
+
+        lt = Query(p.parse('x<y'))
+        assert lt.as_dict() == {'x': {'$lte': 'y'}}
+
+    def test_operator_in_group(self):
+        p = Parser()
+        eq = Query(p.parse('"x:y"'))
+        assert eq.as_dict() == {'x': 'y'}
+
+        gt = Query(p.parse('"x>y"'))
+        assert gt.as_dict() == {'x': {'$gte': 'y'}}
+
+        lt = Query(p.parse('"x<y"'))
+        assert lt.as_dict() == {'x': {'$lte': 'y'}}
+
+    def test_no_group_or(self):
+        p = Parser()
+        query = Query(p.parse('"x:y|x:z"'))
+        assert query.as_dict() == {'$or': [{'x': 'y'}, {'x': 'z'}]}
+
+    def test_no_group_and(self):
+        p = Parser()
+        query = Query(p.parse('"x:y,a:b"'))
+        assert query.as_dict() == {'$and': [{'x': 'y'}, {'a': 'b'}]}
+
+        query = Query(p.parse('"x:y a:b"'))
+        assert query.as_dict() == {'x': 'y', 'a': 'b'}
+
+    def test_grouped_or_with_and(self):
+        p = Parser()
+        query = Query(p.parse('"x:y|a:b","foo:bar"'))
+        assert query.as_dict() == {'$and': [{'$or': [{'x': 'y'}, {'a': 'b'}]},
+                                            {'foo': 'bar'}]}
+
+    def test_grouped_or_with_implicit_and(self):
+        p = Parser()
+        query = Query(p.parse('"x:y|a:b" "foo:bar"'))
+        assert query.as_dict() == {'foo': 'bar',
+                                   '$or': [{'x': 'y'}, {'a': 'b'}]}
+
+    def test_grouped_and_with_or(self):
+        p = Parser()
+        query = Query(p.parse('"x>1,x<5" | "y>10|y:None"'))
+        assert query.as_dict() == {'$or': [{'$and': [{'x': {'$gte': '1'}},
+                                                     {'x': {'$lte': '5'}}]},
+                                           {'$or': [{'y': {'$gte': '10'}},
+                                                    {'y': 'None'}]}]}