shalabh / quixote_extras

User contributed add-ons and for Quixote. See http://mems-exchange.org/software/quixote/. This repository used to be hosted at http://www.cafepy.com/quixote_extras/. All links from old location are now redirected here.

Clone this repository (size: 917.6 KB): HTTPS / SSH
$ hg clone http://bitbucket.org/shalabh/quixote_extras
commit 149: 5b7911ccd017
parent 148: fbb05270db35
branch: default
tags: tip
Updated readme to reflect move to bitbucket.org
Shal...@cafepy.com
10 months ago
quixote_extras / rex / tests / test_dbutil.py
r149:5b7911ccd017 128 loc 4.1 KB embed / history / annotate / raw /
#!/usr/bin/env python
"""dbutil_test.py -- Test suite for dbutil.py

This program is designed to run under py.test 
(http://codespeak.net/py/current/doc/test.html), but it can also be run
standalone.  The advantage of py.test is that if an assertion fails, it will
show the values of all variables in the test expression, and also show a more
detailed traceback.  py.test is available only as a Subversion checkout.

If the environment variable 'DEBUG' is non-empty, it will log the SQL executed
to standard error.

CHANGELOG:
    2005-07-27: Add test_literal().
"""
import logging, os
import MySQLdb
import dbutil

if os.environ.get('DEBUG'):
    logging.basicConfig()
    logging.getLogger().setLevel(logging.DEBUG)

TABLE = 'test1'
SAVE_FINAL_RESULT = False

result1 = [(1L, 'foo'), (2L, 'bar')]
result2 = [{'llave': 1, 'valor': 'foo'}, {'llave': 2, 'valor': 'bar'}]
result3 = [(1L, 'baz'), (2L, 'bar')]
result4 = [(2L, 'bar')]

def basic(db):
    db.run("DROP TABLE IF EXISTS test1")
    db.run("CREATE TABLE test1 (llave INT, valor VARCHAR(255))")
    db.run("INSERT INTO test1 (llave, valor) VALUES (1, 'foo')")
    db.run("INSERT INTO test1 (llave, valor) VALUES (2, 'bar')")
    sql = "SELECT llave, valor FROM test1 ORDER BY llave"
    assert list(db.query(sql)) == result1
    assert list(db.query(sql, dict=True)) == result2
    db.run("DROP TABLE test1")

def extended(db):
    # Create the table (dropping any existing table of the same name).
    field_pairs = [('llave', 'INT'), ('valor', 'VARCHAR(255)')]
    db.create(TABLE, field_pairs)
    # Insert data.
    for row in result2:
        db.insert(TABLE, row)
    # Do a standard select and a dict select.
    assert select(db, False) == result1
    assert select(db, True) == result2
    # Update one of the rows; test with a select.
    db.update(TABLE, {'valor': 'baz'}, where="llave = %s", args=(1,))
    assert select(db, False) == result3
    # Delete one record; test with a select.
    db.delete(TABLE, where="llave = %s", args=(1,))
    assert select(db, False) == result4
    # Re-insert missing record.
    db.insert(TABLE, result2[0])
    # Do several kinds of selects.
    result = db.select_row(TABLE, ['llave', 'valor'], dict=False)
    assert result == result1[0]
    assert db.select_column(TABLE, 'valor') == ['foo', 'bar']
    assert db.exists(TABLE)
    assert not db.missing(TABLE)
    assert db.select_dict(TABLE, 'llave', 'valor') == {1: 'foo', 2: 'bar'}
    # Make sure our table exists.
    assert TABLE in db.tables()
    # Check our table's structure.
    result = list(db.describe_table(TABLE))
    assert result[0][0] == 'llave'
    assert result[0][1] == 'int(11)'
    assert result[1][0] == 'valor'
    assert result[1][1] == 'varchar(255)'
    # Drop the table.
    if not SAVE_FINAL_RESULT:
        db.drop(TABLE)

def select(db, dict):
    records = db.select(TABLE, ['llave', 'valor'], order='llave', dict=dict)
    return list(records)

conn = MySQLdb.connect(db='test')

def test_generic():
    db = dbutil.Database(conn)
    basic(db)

def test_mysql():
    db = dbutil.MySQLDatabase(conn)
    basic(db)

def test_wrapper():
    db = dbutil.DBAPI_Wrapper(conn)
    basic(db)
    extended(db)

def test_mysql_wrapper():
    db = dbutil.MySQLdb_Wrapper(conn)
    basic(db)
    extended(db)

def test_literal():
    db = dbutil.MySQLdb_Wrapper(conn)
    # Create the table (dropping any existing table of the same name).
    field_pairs = [('user', 'VARCHAR(255)'), ('modify_date', 'DATETIME')]
    db.create(TABLE, field_pairs)
    r = {'user': 'Foo Foo'}
    literal = "modify_date=20050401"
    db.insert(TABLE, r, literal=literal)
    where = "user = 'Foo Foo'"
    assert db.select_value(TABLE, 'DAYOFMONTH(modify_date)', where=where) == 1
    literal = "modify_date=DATE_ADD(modify_date, INTERVAL 1 DAY)"
    db.update(TABLE, {}, where=where, literal=literal)
    assert db.select_value(TABLE, 'DAYOFMONTH(modify_date)', where=where) == 2
    # Drop the table.
    if not SAVE_FINAL_RESULT:
        db.drop(TABLE)
    

def run_all_tests():
    test_generic()
    test_mysql()
    test_wrapper()
    test_mysql_wrapper()
    test_literal()

if __name__ == "__main__":  run_all_tests()