Source

vdm / vdm / test / sqlalchemy / test_demo.py

import logging
# logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('vdm')

from sqlalchemy.orm import object_session, class_mapper

import vdm.sqlalchemy
from vdm.sqlalchemy import Changeset, ChangeObject
from vdm.sqlalchemy.model import get_object_id
from demo import *

_clear = Session.expunge_all

def all_revisions(obj):
    objid = get_object_id(obj)
    alldata = Session.query(ChangeObject).all()
    out = Session.query(ChangeObject).filter_by(object_id=objid
            ).join('changeset'
                ).order_by(Changeset.timestamp.desc())
    return list(out)
    
class Test_01_SQLAlchemySession:
    @classmethod
    def setup_class(self):
        repo.rebuild_db()
    @classmethod
    def teardown_class(self):
        Session.remove()

    def test_1(self):
        assert not hasattr(Session, 'revision')
        assert vdm.sqlalchemy.SQLAlchemySession.at_HEAD(Session)
        rev = Changeset()
        vdm.sqlalchemy.SQLAlchemySession.set_revision(Session, rev)
        assert vdm.sqlalchemy.SQLAlchemySession.at_HEAD(Session)
        assert Session.revision is not None
        out = vdm.sqlalchemy.SQLAlchemySession.get_revision(Session)
        assert out == rev
        out = vdm.sqlalchemy.SQLAlchemySession.get_revision(Session())
        assert out == rev
        assert vdm.sqlalchemy.SQLAlchemySession.at_HEAD(Session)
        assert vdm.sqlalchemy.SQLAlchemySession.at_HEAD(Session())
        Session.remove()


class Test_02_Versioning:
    @classmethod
    def setup_class(self):
        repo.rebuild_db()

        logger.debug('===== STARTING REV 1')
        session = Session()
        rev1 = Changeset()
        session.add(rev1)
        vdm.sqlalchemy.SQLAlchemySession.set_revision(session, rev1)

        self.name1 = 'anna'
        self.name2 = 'warandpeace'
        self.title1 = 'XYZ'
        self.title2 = 'ABC'
        self.notes1 = u'Here\nare some\nnotes'
        self.notes2 = u'Here\nare no\nnotes'
        lic1 = License(name='blah', open=True)
        lic2 = License(name='foo', open=True)
        p1 = Package(name=self.name1, license=lic1, notes=self.notes1)
        p2 = Package(name=self.name2, title=self.title1, license=lic1)
        session.add_all([lic1,lic2,p1,p2])

        # Call session.flush here as this is, apparently, even more demanding
        # What is weird is that this should work whether we call flush here or
        # after p1.title = .. as we call commit below but that is not true!!
        session.flush()

        # test vdm by flushing now and then making another change
        p1.title = self.title1
        # call before rather than after title set (see comment above)
        # session.flush()

        logger.debug('***** Committing/Flushing Rev 1')
        session.commit()
        # can only get it after the flush
        self.rev1_id = rev1.id
        self.p2_objid = get_object_id(p2)
        _clear()
        Session.remove()

        logger.debug('===== STARTING REV 2')
        session = Session()
        rev2 = Changeset()
        session.add(rev2)
        vdm.sqlalchemy.SQLAlchemySession.set_revision(session, rev2)
        outlic1 = Session.query(License).filter_by(name='blah').first()
        outlic2 = Session.query(License).filter_by(name='foo').first()
        outlic2.open = False
        outp1 = Session.query(Package).filter_by(name=self.name1).one()
        outp2 = Session.query(Package).filter_by(name=self.name2).one()
        outp1.title = self.title2
        outp1.notes = self.notes2
        outp1.license = outlic2
        t1 = Tag(name='geo')
        session.add_all([outp1,outp2,t1])
        outp1.tags = [t1]
        Session.delete(outp2)
        session.commit()
        # must do this after flush as timestamp not set until then
        self.ts2 = rev2.timestamp
        self.rev2_id = rev2.id
        Session.remove()

    @classmethod
    def teardown_class(self):
        repo.rebuild_db()
        Session.remove()

    def test_01_revisions_exist(self):
        revs = Session.query(Changeset).all()
        assert len(revs) == 2
        # also check order (youngest first)
        print [ rev.timestamp for rev in revs ]
        assert revs[0].timestamp > revs[1].timestamp

    def test_02_revision_youngest(self):
        rev = Changeset.youngest(Session)
        assert rev.timestamp == self.ts2

    def test_03_basic(self):
        assert Session.query(License).count() == 2, Session.query(License).count()
        assert Session.query(Package).count() == 1, Session.query(Package).count()
        pkg = Session.query(Package).first()
        assert pkg.title == self.title2, pkg.title

    def test_04_all_revisions(self):
        p1 = Session.query(Package).filter_by(name=self.name1).one()
        assert len(all_revisions(p1)) == 2
        # problem here is that it might pass even if broken because ordering of
        # uuid ids is 'right' 
        revs = [ pr.changeset for pr in all_revisions(p1) ]
        assert revs[0].timestamp > revs[1].timestamp, revs

    def test_05_basic_2(self):
        # should be at HEAD (i.e. rev2) by default 
        p1 = Session.query(Package).filter_by(name=self.name1).one()
        assert p1.license.open == False
        assert len(p1.tags) == 1

    def test_07_operation_type(self):
        p1 = Session.query(Package).filter_by(name=self.name1).one()
        changeobjects = all_revisions(p1)

        _create = ChangeObject.OperationType.CREATE
        optype = changeobjects[-1].operation_type 
        assert optype == _create, optype

        optype = changeobjects[0].operation_type 
        assert optype == ChangeObject.OperationType.UPDATE

        p2_changeobjects = Session.query(ChangeObject).filter_by(object_id=self.p2_objid).all()
        p2_changeobjects = sorted(
                p2_changeobjects,
                lambda x,y: cmp(x.changeset.timestamp, y.changeset.timestamp)
                )
        optype = p2_changeobjects[-1].operation_type
        print p2_changeobjects
        assert optype == ChangeObject.OperationType.DELETE, optype

    def test_08_versioning(self):
        p1 = Session.query(Package).filter_by(name=self.name1).one()
        changeobjects = all_revisions(p1)
        co = changeobjects[-1]
        assert co.data['name'] == self.name1
        assert co.data['title'] == self.title1, co.data

#    def test_15_diff(self):
#        p1 = Session.query(Package).filter_by(name=self.name1).one()
#        pr2, pr1 = all_revisions(p1)
#        # pr1, pr2 = prs[::-1]
#        
#        diff = p1.diff_revisioned_fields(pr2, pr1, Package)
#        assert diff['title'] == '- XYZ\n+ ABC', diff['title']
#        assert diff['notes'] == '  Here\n- are some\n+ are no\n  notes', diff['notes']
#        assert diff['license_id'] == '- 1\n+ 2', diff['license_id']
#
#        diff1 = p1.diff(pr2.changeset, pr1.changeset)
#        assert diff1 == diff, (diff1, diff)
#
#        diff2 = p1.diff()
#        assert diff2 == diff, (diff2, diff)
#
#    def test_16_diff_2(self):
#        '''Test diffing at a revision where just created.'''
#        p1 = Session.query(Package).filter_by(name=self.name1).one()
#        pr2, pr1 = all_revisions(p1)
#
#        diff1 = p1.diff(to_revision=pr1.changeset)
#        assert diff1['title'] == u'- None\n+ XYZ', diff1


class Test_03_StatefulVersioned:
    @classmethod
    def setup_class(self):
        repo.rebuild_db()
        logger.info('====== TestVersioning2: start')

        # create a package with some tags
        rev1 = repo.new_revision()
        self.name1 = 'anna'
        p1 = Package(name=self.name1)
        t1 = Tag(name='geo')
        t2 = Tag(name='geo2')
        p1.tags.append(t1)
        p1.tags.append(t2)
        Session.add_all([p1,t1,t2])
        Session.commit()
        self.rev1_id = rev1.id
        Session.remove()
        
        # now remove those tags
        logger.debug('====== start Changeset 2')
        rev2 = repo.new_revision()
        newp1 = Session.query(Package).filter_by(name=self.name1).one()
        del newp1.tags[0]
        Session.commit()
        self.rev2_id = rev2.id
        Session.remove()

        # now add one of them back
        logger.debug('====== start Changeset 3')
        rev3 = repo.new_revision()
        newp1 = Session.query(Package).filter_by(name=self.name1).one()
        self.tagname1 = 'geo'
        t1 = Session.query(Tag).filter_by(name=self.tagname1).one()
        assert t1
        newp1.tags = [t1]
        repo.commit_and_remove()

    @classmethod
    def teardown_class(self):
        Session.remove()

    def test_0_m2m_ok_at_end(self):
        p1 = Session.query(Package).filter_by(name=self.name1).one()
        assert len(p1.tags) == 1
        assert p1.tags[0].id == 1
        Session.remove()

    def test_2_m2m_history(self):
        p1 = Session.query(Package).filter_by(name=self.name1).one()
        rev2 = Session.query(Changeset).get(self.rev2_id)
        changeobjects = all_revisions(p1)
        for x in changeobjects:
            print x.data
        co = changeobjects[-1]
        assert len(co.data['tags']) == 2
        co = changeobjects[1]
        assert len(co.data['tags']) == 1, co.data['tags']
        assert co.data['tags'][0][1] == 2, co.data['tags']
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.