Commits

Ronny Pfannschmidt committed 9cb024a

turn to sqlite invoke based migrations

Comments (0)

Files changed (2)

micromigrate/migrate.py

 from __future__ import print_function
 from hashlib import sha256
 from collections import namedtuple
-
+import subprocess
 
 class Migration(namedtuple('MigrationBase', 'name checksum sql after')):
     __slots__ = ()
     return Migration(**meta)
 
 
-initial_migration = parse_migration(u"""
-        -- migration micromigrate:enable
-        create table micromigrate_migrations (
-            id integer primary key,
-            name unique,
-            checksum,
-            completed default 0
-            );
-""")
+MIGRATION_SCRIPT="""
+begin transaction;;
 
+create table if not exists micromigrate_migrations (
+    id integer primary key,
+    name unique,
+    checksum,
+    completed default 0
+    );
 
-def _prepare_migration(connection, migration, first):
-    if not first:
-        connection.execute("""
-            insert into micromigrate_migrations (name, checksum)
-            values (:name, :checksum)""", migration._asdict())
 
+insert into micromigrate_migrations (name, checksum)
+values ("{migration.name}", "{migration.checksum}");
 
-def _record_migration_result(connection, migration, first):
-    if not first:
-        c = connection.execute("""
-            update micromigrate_migrations
-                set completed = 1
-                where name = :name;
-            """, migration._asdict())
-    else:
-        c = connection.execute("""
-            insert into micromigrate_migrations (name, checksum, completed)
-            values (:name, :checksum, 1);""", migration._asdict())
-    assert c.rowcount == 1
+select 'executing' as status;
 
+{migration.sql}
+;
+select 'finalizing' as stats;
 
-def push_migration(connection, migration, first):
-    print('migration', migration.name)
-    _prepare_migration(connection, migration, first)
+update micromigrate_migrations
+set completed = 1
+where name = "{migration.name}";
+
+select 'commiting' as status, "{migration.name}" as migration;
+
+commit;
+"""
+
+def runsqlite(dbname, script):
+    subprocess.check_call([
+        'sqlite3', '-line',
+        str(dbname), script,
+    ])
+
+def runquery(dbname, query):
+    proc = subprocess.Popen(
+        ['sqlite3', '-line', str(dbname), query],
+        stdout = subprocess.PIPE,
+        universal_newlines=True,
+    )
+    out, ignored = proc.communicate()
+    if proc.returncode:
+        raise Exception(proc.returncode)
+    
+    chunks = out.split('\n\n')
+    return [
+        dict(x.strip().split(' = ') for x in chunk.splitlines())
+        for chunk in chunks if chunk.strip()
+    ]
+
+
+def push_migration(dbname, migration):
+    script = MIGRATION_SCRIPT.format(migration=migration)
     try:
-        connection.executescript(migration.sql)
-    except connection.Error as error:
-        print(' ', error)
-    else:
-        _record_migration_result(connection, migration, first)
+        runsqlite(dbname, script)
+    except subprocess.CalledProcessError:
+        pass
+    return migration_state(dbname)
 
-
-def migration_state(connection):
-    c = connection.execute("""
-        select name, type
+def migration_state(dbname):
+    proc = '''select name, type
         from sqlite_master
         where type = "table"
-        and name = "micromigrate_migrations";
-    """)
-    items = list(c)
-    if items:
-        return dict(connection.execute("""
-            select
+        and name = "micromigrate_migrations"
+    '''
+    listit = """select
                 name,
                 case
                     when completed = 1
                     then checksum
                     else ':failed to complete'
-                end
+                end as checksum
             from micromigrate_migrations
-        """))
+        """
+    result = runquery(dbname, proc)
+    if result:
+        return dict(
+            (row['name'], row['checksum'])
+            for row in runquery(dbname, listit))
+
 
 
 def verify_state(state, migrations):
 
 
 def can_do(migration, state):
-    if not state:
-        assert migration is initial_migration, \
-            'first migration must depend on %s' % initial_migration.name
     return (
         migration.after is None or
         not any(name not in state for name in migration.after)
     )
 
 
-def apply_migrations(connection, migrations):
+def apply_migrations(dbname, migrations):
     # we put our internal migrations behind the given ones intentionally
     # this requires that people depend on our own migrations
     # in order to have theirs work
-    all_migrations = migrations + [initial_migration]
-    state = migration_state(connection)
-    if state is None:
-        state = {}
-        missing_migrations = all_migrations
-    else:
-        missing_migrations = verify_state(state, all_migrations)
+    state = migration_state(dbname) or {}
+    missing_migrations = verify_state(state, migrations)
 
     for migration in iter_next_doable(missing_migrations):
         assert can_do(migration, state)
-        push_migration(connection, migration, first=not state)
+        push_migration(dbname, migration)
         state[migration.name] = migration.checksum
-    return migration_state(connection)
+    return migration_state(dbname)

testing/test_miromigrate.py

 
 
 @pytest.fixture
-def plain_conn(request):
-    import sqlite3
-    conn = sqlite3.connect(':memory:')
-    request._pyfuncitem._conn = conn
-    return conn
-
-
-@pytest.fixture
-def conn(plain_conn):
-    mm.apply_migrations(plain_conn, [])
-    return plain_conn
-
-
-def print_db(conn):
-    print('\n'.join(conn.iterdump()))
+def dbname(request, tmpdir):
+    db = tmpdir.join('test.sqlite.db')
+    @request.addfinalizer
+    def cleanup():
+        import subprocess
+        if db.check():
+            subprocess.call([
+                'sqlite3', str(db), '.dump',
+            ])
+    return db
 
 
 def test_parse_migration():
     assert result.after == frozenset(('fun',))
 
 
-def test_migration_initial(plain_conn):
-    state = mm.migration_state(plain_conn)
+def test_push_migration(dbname):
+    state = mm.migration_state(dbname)
     assert state is None
-    new_state = mm.apply_migrations(plain_conn, [])
-    assert 'micromigrate:enable' in new_state
-
-
-def test_migrate_missing_dep_breaks(plain_conn):
     migration = mm.parse_migration("""
         -- migration test
-        create table test(id, name);
-    """)
-    info = pytest.raises(
-        AssertionError, mm.apply_migrations,
-        plain_conn, [migration])
-    assert info.value.args[0].startswith('first migration must')
+        fail
+        """)
+    mm.push_migration(dbname, migration)
+    state = mm.migration_state(dbname)
+    assert state is None
 
+    migration = mm.parse_migration("""
+        -- migration test
+        create table test(name);
+        """)
+    mm.push_migration(dbname, migration)
+    state = mm.migration_state(dbname)
+    assert state == {'test': migration.checksum}
 
-def test_migration_state(plain_conn):
-    assert mm.migration_state(plain_conn) is None
-    plain_conn.execute(mm.initial_migration.sql)
-    assert mm.migration_state(plain_conn) == {}
-    mm._record_migration_result(plain_conn, mm.initial_migration, True)
-    assert mm.migration_state(plain_conn) == {
-        mm.initial_migration.name: mm.initial_migration.checksum,
-    }
+def test_migration_initial(dbname):
+    state = mm.migration_state(dbname)
+    assert state is None
+    migration = mm.parse_migration("""
+        -- migration test
+        create table test(name);
+        """)
+    new_state = mm.apply_migrations(dbname, [migration])
+    assert len(new_state) == 1
+    assert new_state[migration.name] == migration.checksum
 
 
-def test_boken_transaction(conn):
-    state = mm.migration_state(conn)
-    print('state', sorted(state))
+
+
+def test_boken_transaction(dbname):
     migration = mm.parse_migration(u"""
         -- migration broke
-        -- after micromigrate:enable
         create table foo(name unique);
         insert into foo values ('a');
         insert into foo values ('a');
         """)
-    print(migration, migration.after)
-    mm.apply_migrations(conn, [migration])
-    state = mm.migration_state(conn)
-    assert state[migration.name] == ':failed to complete'
+    state = mm.apply_migrations(dbname, [migration])
+    assert state is None
+
+    migration_working = mm.parse_migration(u"""
+        -- migration working
+        create table bar(name unique);
+        """)
+
+    state = mm.apply_migrations(dbname, [migration_working, migration])
+    assert len(state) == 1
+    assert state['working'] == migration_working.checksum