Commits

Michael Trier committed 3bce578

Added new basic match() operator that performs a full-text search. Supported on PostgreSQL, SQLite, MySQL, MS-SQL, and Oracle backends.

Comments (0)

Files changed (14)

     - Unicode, UnicodeText types now set "assert_unicode" and
       "convert_unicode" by default, but accept overriding
       **kwargs for these values.
-      
+
+- sql
+    - Added new match() operator that performs a full-text search.
+      Supported on PostgreSQL, SQLite, MySQL, MS-SQL, and Oracle
+      backends.
+
 - sqlite
     - Modified SQLite's representation of "microseconds" to 
       match the output of str(somedatetime), i.e. in that the

doc/build/content/ormtutorial.txt

         {python}
         from sqlalchemy import or_
         filter(or_(User.name == 'ed', User.name == 'wendy'))
+
+  * match
+        
+        {python}
+        query.filter(User.name.match('wendy'))
+
+    The contents of the match parameter are database backend specific.
         
 ### Returning Lists and Scalars {@name=scalars}
 

lib/sqlalchemy/databases/mssql.py

 
 from sqlalchemy import sql, schema, exc, util
 from sqlalchemy.sql import compiler, expression, operators as sqlops, functions as sql_functions
+from sqlalchemy.sql import compiler, expression, operators as sql_operators, functions as sql_functions
 from sqlalchemy.engine import default, base
 from sqlalchemy import types as sqltypes
 from sqlalchemy.util import Decimal as _python_Decimal
 
 class MSSQLCompiler(compiler.DefaultCompiler):
     operators = compiler.OPERATORS.copy()
-    operators[sqlops.concat_op] = '+'
+    operators.update({
+        sql_operators.concat_op: '+',
+        sql_operators.match_op: lambda x, y: "CONTAINS (%s, %s)" % (x, y)
+    })
 
     functions = compiler.DefaultCompiler.functions.copy()
     functions.update (

lib/sqlalchemy/databases/mysql.py

     operators = compiler.DefaultCompiler.operators.copy()
     operators.update({
         sql_operators.concat_op: lambda x, y: "concat(%s, %s)" % (x, y),
-        sql_operators.mod: '%%'
+        sql_operators.mod: '%%',
+        sql_operators.match_op: lambda x, y: "MATCH (%s) AGAINST (%s IN BOOLEAN MODE)" % (x, y)
     })
     functions = compiler.DefaultCompiler.functions.copy()
     functions.update ({

lib/sqlalchemy/databases/oracle.py

     operators = compiler.DefaultCompiler.operators.copy()
     operators.update(
         {
-            sql_operators.mod : lambda x, y:"mod(%s, %s)" % (x, y)
+            sql_operators.mod : lambda x, y:"mod(%s, %s)" % (x, y),
+            sql_operators.match_op: lambda x, y: "CONTAINS (%s, %s)" % (x, y)
         }
     )
 

lib/sqlalchemy/databases/postgres.py

             sql_operators.mod : '%%',
             sql_operators.ilike_op: lambda x, y, escape=None: '%s ILIKE %s' % (x, y) + (escape and ' ESCAPE \'%s\'' % escape or ''),
             sql_operators.notilike_op: lambda x, y, escape=None: '%s NOT ILIKE %s' % (x, y) + (escape and ' ESCAPE \'%s\'' % escape or ''),
+            sql_operators.match_op: lambda x, y: '%s @@ to_tsquery(%s)' % (x, y),
         }
     )
 

lib/sqlalchemy/sql/compiler.py

     operators.ilike_op : lambda x, y, escape=None: "lower(%s) LIKE lower(%s)" % (x, y) + (escape and ' ESCAPE \'%s\'' % escape or ''),
     operators.notilike_op : lambda x, y, escape=None: "lower(%s) NOT LIKE lower(%s)" % (x, y) + (escape and ' ESCAPE \'%s\'' % escape or ''),
     operators.between_op : 'BETWEEN',
+    operators.match_op : 'MATCH',
     operators.in_op : 'IN',
     operators.notin_op : 'NOT IN',
     operators.comma_op : ', ',

lib/sqlalchemy/sql/expression.py

     def contains(self, other, **kwargs):
         return self.operate(operators.contains_op, other, **kwargs)
 
+    def match(self, other, **kwargs):
+        return self.operate(operators.match_op, other, **kwargs)
+
     def desc(self):
         return self.operate(operators.desc_op)
 
 
         return self.__compare(operators.like_op, literal_column("'%'", type_=sqltypes.String) + self._check_literal(other) + literal_column("'%'", type_=sqltypes.String), escape=escape)
 
+    def match(self, other):
+        """Produce a MATCH clause, i.e. ``MATCH '<other>'``
+        
+        The allowed contents of ``other`` are database backend specific.
+        """
+
+        return self.__compare(operators.match_op, self._check_literal(other))
+
     def label(self, name):
         """Produce a column label, i.e. ``<columnname> AS <name>``.
 

lib/sqlalchemy/sql/operators.py

 def contains_op(a, b, escape=None):
     return a.contains(b, escape=escape)
 
+def match_op(a, b):
+    return a.match(b)
+
 def comma_op(a, b):
     raise NotImplementedError()
 
     add:6,
     sub:6,
     concat_op:6,
+    match_op:6,
     ilike_op:5,
     notilike_op:5,
     like_op:5,

test/dialect/mssql.py

         assert list(query[:10]) == orig[:10]
         assert list(query[:10]) == orig[:10]
 
+def full_text_search_missing():
+    """Test if full text search is not implemented and return False if 
+    it is and True otherwise."""
+
+    try:
+        connection = testing.db.connect()
+        connection.execute("CREATE FULLTEXT CATALOG Catalog AS DEFAULT")
+        return False
+    except:
+        return True
+    finally:
+        connection.close()
+
+class MatchTest(TestBase, AssertsCompiledSQL):
+    __only_on__ = 'mssql'
+    __skip_if__ = (full_text_search_missing, )
+
+    def setUpAll(self):
+        global metadata, cattable, matchtable
+        metadata = MetaData(testing.db)
+        
+        cattable = Table('cattable', metadata,
+            Column('id', Integer),
+            Column('description', String(50)),
+            PrimaryKeyConstraint('id', name='PK_cattable'),
+        )
+        matchtable = Table('matchtable', metadata,
+            Column('id', Integer),
+            Column('title', String(200)),
+            Column('category_id', Integer, ForeignKey('cattable.id')),
+            PrimaryKeyConstraint('id', name='PK_matchtable'),
+        )
+        DDL("""CREATE FULLTEXT INDEX 
+                       ON cattable (description) 
+                       KEY INDEX PK_cattable"""
+                   ).execute_at('after-create', matchtable)
+        DDL("""CREATE FULLTEXT INDEX 
+                       ON matchtable (title) 
+                       KEY INDEX PK_matchtable"""
+                   ).execute_at('after-create', matchtable)
+        metadata.create_all()
+
+        cattable.insert().execute([
+            {'id': 1, 'description': 'Python'},
+            {'id': 2, 'description': 'Ruby'},
+        ])
+        matchtable.insert().execute([
+            {'id': 1, 'title': 'Agile Web Development with Rails', 'category_id': 2},
+            {'id': 2, 'title': 'Dive Into Python', 'category_id': 1},
+            {'id': 3, 'title': 'Programming Matz''s Ruby', 'category_id': 2},
+            {'id': 4, 'title': 'The Definitive Guide to Django', 'category_id': 1},
+            {'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1}
+        ])
+        DDL("WAITFOR DELAY '00:00:05'").execute(bind=engines.testing_engine())
+
+    def tearDownAll(self):
+        metadata.drop_all()
+        connection = testing.db.connect()
+        connection.execute("DROP FULLTEXT CATALOG Catalog")
+        connection.close()
+
+    def test_expression(self):
+        self.assert_compile(matchtable.c.title.match('somstr'), "CONTAINS (matchtable.title, ?)")
+
+    def test_simple_match(self):
+        results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([2, 5], [r.id for r in results])
+
+    def test_simple_match_with_apostrophe(self):
+        results = matchtable.select().where(matchtable.c.title.match('"Matz''s"')).execute().fetchall()
+        self.assertEquals([3], [r.id for r in results])
+
+    def test_simple_prefix_match(self):
+        results = matchtable.select().where(matchtable.c.title.match('"nut*"')).execute().fetchall()
+        self.assertEquals([5], [r.id for r in results])
+
+    def test_simple_inflectional_match(self):
+        results = matchtable.select().where(matchtable.c.title.match('FORMSOF(INFLECTIONAL, "dives")')).execute().fetchall()
+        self.assertEquals([2], [r.id for r in results])
+
+    def test_or_match(self):
+        results1 = matchtable.select().where(or_(matchtable.c.title.match('nutshell'), 
+                                                 matchtable.c.title.match('ruby'))
+                                            ).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([3, 5], [r.id for r in results1])
+        results2 = matchtable.select().where(matchtable.c.title.match('nutshell OR ruby'), 
+                                            ).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([3, 5], [r.id for r in results2])    
+
+    def test_and_match(self):
+        results1 = matchtable.select().where(and_(matchtable.c.title.match('python'), 
+                                                  matchtable.c.title.match('nutshell'))
+                                            ).execute().fetchall()
+        self.assertEquals([5], [r.id for r in results1])
+        results2 = matchtable.select().where(matchtable.c.title.match('python AND nutshell'), 
+                                            ).execute().fetchall()
+        self.assertEquals([5], [r.id for r in results2])
+
+    def test_match_across_joins(self):
+        results = matchtable.select().where(and_(cattable.c.id==matchtable.c.category_id, 
+                                            or_(cattable.c.description.match('Ruby'), 
+                                                matchtable.c.title.match('nutshell')))
+                                           ).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([1, 3, 5], [r.id for r in results])
+
 
 if __name__ == "__main__":
     testenv.main()

test/dialect/mysql.py

         assert ('mysql', 'charset') in cx.info
 
 
+class MatchTest(TestBase, AssertsCompiledSQL):
+    __only_on__ = 'mysql'
+
+    def setUpAll(self):
+        global metadata, cattable, matchtable
+        metadata = MetaData(testing.db)
+
+        cattable = Table('cattable', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('description', String(50)),
+        )
+        matchtable = Table('matchtable', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('title', String(200)),
+            Column('category_id', Integer, ForeignKey('cattable.id')),
+        )
+        metadata.create_all()
+
+        cattable.insert().execute([
+            {'id': 1, 'description': 'Python'},
+            {'id': 2, 'description': 'Ruby'},
+        ])
+        matchtable.insert().execute([
+            {'id': 1, 'title': 'Agile Web Development with Rails', 'category_id': 2},
+            {'id': 2, 'title': 'Dive Into Python', 'category_id': 1},
+            {'id': 3, 'title': 'Programming Matz''s Ruby', 'category_id': 2},
+            {'id': 4, 'title': 'The Definitive Guide to Django', 'category_id': 1},
+            {'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1}
+        ])
+
+    def tearDownAll(self):
+        metadata.drop_all()
+
+    def test_expression(self):
+        self.assert_compile(matchtable.c.title.match('somstr'), "MATCH (matchtable.title) AGAINST (%s IN BOOLEAN MODE)")
+
+    def test_simple_match(self):
+        results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([2, 5], [r.id for r in results])
+
+    def test_simple_match_with_apostrophe(self):
+        results = matchtable.select().where(matchtable.c.title.match('"Matz''s"')).execute().fetchall()
+        self.assertEquals([3], [r.id for r in results])
+
+    def test_or_match(self):
+        results1 = matchtable.select().where(or_(matchtable.c.title.match('nutshell'), 
+                                                 matchtable.c.title.match('ruby'))
+                                            ).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([3, 5], [r.id for r in results1])
+        results2 = matchtable.select().where(matchtable.c.title.match('nutshell ruby'), 
+                                            ).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([3, 5], [r.id for r in results2])
+        
+
+    def test_and_match(self):
+        results1 = matchtable.select().where(and_(matchtable.c.title.match('python'), 
+                                                  matchtable.c.title.match('nutshell'))
+                                            ).execute().fetchall()
+        self.assertEquals([5], [r.id for r in results1])
+        results2 = matchtable.select().where(matchtable.c.title.match('+python +nutshell'), 
+                                            ).execute().fetchall()
+        self.assertEquals([5], [r.id for r in results2])
+
+    def test_match_across_joins(self):
+        results = matchtable.select().where(and_(cattable.c.id==matchtable.c.category_id, 
+                                            or_(cattable.c.description.match('Ruby'), 
+                                                matchtable.c.title.match('nutshell')))
+                                           ).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([1, 3, 5], [r.id for r in results])
+
+
 def colspec(c):
     return testing.db.dialect.schemagenerator(testing.db.dialect,
         testing.db, None, None).get_column_specification(c)

test/dialect/postgres.py

         finally:
             test_table.drop(checkfirst=True)
 
+class MatchTest(TestBase, AssertsCompiledSQL):
+    __only_on__ = 'postgres'
+    __excluded_on__ = (('postgres', '<', (8, 3, 0)),)
+
+    def setUpAll(self):
+        global metadata, cattable, matchtable
+        metadata = MetaData(testing.db)
+
+        cattable = Table('cattable', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('description', String(50)),
+        )
+        matchtable = Table('matchtable', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('title', String(200)),
+            Column('category_id', Integer, ForeignKey('cattable.id')),
+        )
+        metadata.create_all()
+
+        cattable.insert().execute([
+            {'id': 1, 'description': 'Python'},
+            {'id': 2, 'description': 'Ruby'},
+        ])
+        matchtable.insert().execute([
+            {'id': 1, 'title': 'Agile Web Development with Rails', 'category_id': 2},
+            {'id': 2, 'title': 'Dive Into Python', 'category_id': 1},
+            {'id': 3, 'title': 'Programming Matz''s Ruby', 'category_id': 2},
+            {'id': 4, 'title': 'The Definitive Guide to Django', 'category_id': 1},
+            {'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1}
+        ])
+
+    def tearDownAll(self):
+        metadata.drop_all()
+
+    def test_expression(self):
+        self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%(title_1)s)")
+
+    def test_simple_match(self):
+        results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([2, 5], [r.id for r in results])
+
+    def test_simple_match_with_apostrophe(self):
+        results = matchtable.select().where(matchtable.c.title.match("Matz''s")).execute().fetchall()
+        self.assertEquals([3], [r.id for r in results])
+
+    def test_simple_derivative_match(self):
+        results = matchtable.select().where(matchtable.c.title.match('nutshells')).execute().fetchall()
+        self.assertEquals([5], [r.id for r in results])
+
+    def test_or_match(self):
+        results1 = matchtable.select().where(or_(matchtable.c.title.match('nutshells'), 
+                                                 matchtable.c.title.match('rubies'))
+                                            ).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([3, 5], [r.id for r in results1])
+        results2 = matchtable.select().where(matchtable.c.title.match('nutshells | rubies'), 
+                                            ).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([3, 5], [r.id for r in results2])
+        
+
+    def test_and_match(self):
+        results1 = matchtable.select().where(and_(matchtable.c.title.match('python'), 
+                                                  matchtable.c.title.match('nutshells'))
+                                            ).execute().fetchall()
+        self.assertEquals([5], [r.id for r in results1])
+        results2 = matchtable.select().where(matchtable.c.title.match('python & nutshells'), 
+                                            ).execute().fetchall()
+        self.assertEquals([5], [r.id for r in results2])
+
+    def test_match_across_joins(self):
+        results = matchtable.select().where(and_(cattable.c.id==matchtable.c.category_id, 
+                                            or_(cattable.c.description.match('Ruby'), 
+                                                matchtable.c.title.match('nutshells')))
+                                           ).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([1, 3, 5], [r.id for r in results])
+
 
 if __name__ == "__main__":
     testenv.main()

test/dialect/sqlite.py

         finally:
             tbl.drop()
 
+def full_text_search_missing():
+    """Test if full text search is not implemented and return False if 
+    it is and True otherwise."""
+
+    try:
+        testing.db.execute("CREATE VIRTUAL TABLE t using FTS3;")
+        testing.db.execute("DROP TABLE t;")
+        return False
+    except:
+        return True
+
+class MatchTest(TestBase, AssertsCompiledSQL):
+    __only_on__ = 'sqlite'
+    __skip_if__ = (full_text_search_missing, )
+
+    def setUpAll(self):
+        global metadata, cattable, matchtable
+        metadata = MetaData(testing.db)
+        
+        testing.db.execute("""
+        CREATE VIRTUAL TABLE cattable using FTS3 (
+            id INTEGER NOT NULL, 
+            description VARCHAR(50), 
+            PRIMARY KEY (id)
+        )
+        """)
+        cattable = Table('cattable', metadata, autoload=True)
+        
+        testing.db.execute("""
+        CREATE VIRTUAL TABLE matchtable using FTS3 (
+            id INTEGER NOT NULL, 
+            title VARCHAR(200),
+            category_id INTEGER NOT NULL, 
+            PRIMARY KEY (id)
+        )
+        """)
+        matchtable = Table('matchtable', metadata, autoload=True)
+        metadata.create_all()
+
+        cattable.insert().execute([
+            {'id': 1, 'description': 'Python'},
+            {'id': 2, 'description': 'Ruby'},
+        ])
+        matchtable.insert().execute([
+            {'id': 1, 'title': 'Agile Web Development with Rails', 'category_id': 2},
+            {'id': 2, 'title': 'Dive Into Python', 'category_id': 1},
+            {'id': 3, 'title': 'Programming Matz''s Ruby', 'category_id': 2},
+            {'id': 4, 'title': 'The Definitive Guide to Django', 'category_id': 1},
+            {'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1}
+        ])
+
+    def tearDownAll(self):
+        metadata.drop_all()
+
+    def test_expression(self):
+        self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title MATCH ?")
+
+    def test_simple_match(self):
+        results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([2, 5], [r.id for r in results])
+
+    def test_simple_prefix_match(self):
+        results = matchtable.select().where(matchtable.c.title.match('nut*')).execute().fetchall()
+        self.assertEquals([5], [r.id for r in results])
+
+    def test_or_match(self):
+        results2 = matchtable.select().where(matchtable.c.title.match('nutshell OR ruby'), 
+                                            ).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([3, 5], [r.id for r in results2])
+        
+
+    def test_and_match(self):
+        results2 = matchtable.select().where(matchtable.c.title.match('python nutshell'), 
+                                            ).execute().fetchall()
+        self.assertEquals([5], [r.id for r in results2])
+
+    def test_match_across_joins(self):
+        results = matchtable.select().where(and_(cattable.c.id==matchtable.c.category_id, 
+                                            cattable.c.description.match('Ruby'))
+                                           ).order_by(matchtable.c.id).execute().fetchall()
+        self.assertEquals([1, 3], [r.id for r in results])
+
 
 if __name__ == "__main__":
     testenv.main()

test/sql/select.py

             (~table1.c.name.ilike('%something%'), "mytable.name NOT ILIKE %(name_1)s", postgres.PGDialect()),
         ]:
             self.assert_compile(expr, check, dialect=dialect)
+    
+    def test_match(self):
+        for expr, check, dialect in [
+            (table1.c.myid.match('somstr'), "mytable.myid MATCH ?", sqlite.SQLiteDialect()),
+            (table1.c.myid.match('somstr'), "MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)", mysql.MySQLDialect()),
+            (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, ?)", mssql.MSSQLDialect()),
+            (table1.c.myid.match('somstr'), "mytable.myid @@ to_tsquery(%(myid_1)s)", postgres.PGDialect()),
+            (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", oracle.OracleDialect()),            
+        ]:
+            self.assert_compile(expr, check, dialect=dialect)
         
     def test_composed_string_comparators(self):
         self.assert_compile(