Commits

Anonymous committed 7380d60

garden

Comments (0)

Files changed (3)

quicktasks/models.py

+import sqlahelper
+import sqlalchemy as sa
+import sqlalchemy.orm as orm
+from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
+
+Base = sqlahelper.get_base()
+DBSession = sqlahelper.get_session()
+
+class Project(Base):
+    query = DBSession.query_property()
+    __tablename__ = 'projects'
+    id = sa.Column(sa.Integer, primary_key=True)
+    name = sa.Column(sa.Unicode(255))
+    user = sa.Column(sa.Unicode(255))
+
+    def add_task(self, name):
+        task = Task(name=name, project=self)
+        self.tasks.append(task)
+        return task
+
+    @hybrid_method
+    def contains(self, task):
+        return task.project == self
+
+    def query_tasks(self):
+        return Task.query.filter(Task.project==self)
+
+    def query_new_tasks(self):
+        return self.query_tasks().filter(Task.new)
+
+    def query_finished_tasks(self):
+        return self.query_tasks().filter(Task.finished)
+
+    def query_active_tasks(self):
+        return self.query_tasks().filter(Task.active)
+
+    def query_pending_tasks(self):
+        return self.query_tasks().filter(Task.pending)
+
+class Task(Base):
+    query = DBSession.query_property()
+    __tablename__ = 'tasks'
+    id = sa.Column(sa.Integer, primary_key=True)
+    name = sa.Column(sa.Unicode(255))
+    project_id = sa.Column(sa.Integer, sa.ForeignKey('projects.id'))
+    state = sa.Column(sa.Enum('new', 'active', 'pending', 'finished'), default="new")
+    project = orm.relationship('Project', backref="tasks")
+
+
+    @hybrid_property
+    def pending(self):
+        return self.state == 'pending'
+
+    @hybrid_property
+    def active(self):
+        return self.state == 'active'
+
+    @hybrid_property
+    def finished(self):
+        return self.state == 'finished'
+
+    @hybrid_property
+    def new(self):
+        return self.state == 'new'
+
+    def finish(self):
+        self.state = 'finished'
+
+    def activate(self):
+        self.state = "active"
+
+    def make_pending(self):
+        self.state = "pending"

quicktasks/tests.py

+import unittest
+from pyramid import testing
+
+def _setup_db():
+    from . import models as m
+    from sqlalchemy import create_engine
+    engine = create_engine('sqlite:///')
+    engine.echo = True
+    m.DBSession.remove()
+    m.DBSession.configure(bind=engine)
+    m.Base.metadata.create_all(bind=engine)
+    return m.DBSession
+
+def _teardown_db():
+    from . import models as m
+    m.Base.metadata.drop_all(bind=m.DBSession.bind)
+
+def setUpModule():
+    _setup_db()
+
+def tearDownModule():
+    _teardown_db()
+
+class TaskTests(unittest.TestCase):
+    def tearDown(self):
+        import transaction
+        transaction.abort()
+
+    def _getTarget(self):
+        from .models import Task
+        return Task
+
+    def _makeOne(self, *args, **kwargs):
+        return self._getTarget()(*args, **kwargs)
+
+    def test_it(self):
+        target = self._makeOne()
+
+    def test_finish(self):
+        target = self._makeOne()
+        target.finish()
+        self.assertEqual(target.state, "finished")
+
+    def test_finishd_obj(self):
+        target = self._makeOne()
+        target.state = 'finished'
+        self.assertTrue(target.finished)
+
+    def test_finished_cls(self):
+        import sqlahelper
+        session = sqlahelper.get_session()
+        target = self._getTarget()
+        session.add(target(state="new", name=u"new task"))
+        session.add(target(state="finished", name=u"finished task"))
+        session.add(target(state="active", name=u"active task"))
+        session.add(target(state="pending", name=u"pending task"))
+
+        results = target.query.filter(target.finished).all()
+
+        self.assertEqual(results[0].name, "finished task")
+
+    def test_new_obj(self):
+
+        target = self._makeOne()
+        target.state = 'new'
+        self.assertTrue(target.new)
+
+    def test_new_cls(self):
+        import sqlahelper
+        session = sqlahelper.get_session()
+        target = self._getTarget()
+        session.add(target(state="new", name=u"new task"))
+        session.add(target(state="finished", name=u"finished task"))
+        session.add(target(state="active", name=u"active task"))
+        session.add(target(state="pending", name=u"pending task"))
+        session.flush()
+
+        results = target.query.filter(target.new).all()
+
+        self.assertEqual(results[0].name, "new task")
+
+    def test_activate(self):
+        target = self._makeOne()
+        target.activate()
+        self.assertEqual(target.state, "active")
+
+    def test_active_obj(self):
+        target = self._makeOne()
+        target.state = 'active'
+
+        self.assertTrue(target.active)
+
+    def test_active_cls(self):
+        import sqlahelper
+        session = sqlahelper.get_session()
+        target = self._getTarget()
+        session.add(target(state="new", name=u"new task"))
+        session.add(target(state="finished", name=u"finished task"))
+        session.add(target(state="active", name=u"active task"))
+        session.add(target(state="pending", name=u"pending task"))
+        session.flush()
+
+        results = target.query.filter(target.active).all()
+
+        self.assertEqual(results[0].name, "active task")
+
+    def test_pending_obj(self):
+        target = self._makeOne()
+        target.state = 'pending'
+
+        self.assertTrue(target.pending)
+
+    def test_pending_cls(self):
+        import sqlahelper
+        session = sqlahelper.get_session()
+        target = self._getTarget()
+        session.add(target(state="new", name=u"new task"))
+        session.add(target(state="finished", name=u"finished task"))
+        session.add(target(state="active", name=u"active task"))
+        session.add(target(state="pending", name=u"pending task"))
+        session.flush()
+
+        results = target.query.filter(target.pending).all()
+
+        self.assertEqual(results[0].name, "pending task")
+
+
+class GetNewTasksTests(unittest.TestCase):
+    def tearDown(self):
+        import transaction
+        transaction.abort()
+
+    def _getTarget(self):
+        from .views import get_new_tasks
+        return get_new_tasks
+
+    def _callFUT(self, *args, **kwargs):
+        return self._getTarget()(*args, **kwargs)
+
+    def test_it(self):
+        request = testing.DummyRequest()
+        import sqlahelper
+        session = sqlahelper.get_session()
+        from .models import Project, Task
+        project = Project()
+        task1 = Task(name=u'new task', project=project)
+        task2 = Task(name=u'finished task', project=project, state="finished")
+        task3 = Task(name=u'active task', project=project, state="active")
+        session.add(project)
+        session.flush()
+
+        result = self._callFUT(request, project.id)
+        self.assertEqual(result,
+                dict(tasks=[{'id': task1.id, 'name': 'new task'}]))
+
+class GetFinishedTasksTests(unittest.TestCase):
+    def tearDown(self):
+        import transaction
+        transaction.abort()
+
+    def _getTarget(self):
+        from .views import get_finished_tasks
+        return get_finished_tasks
+
+    def _callFUT(self, *args, **kwargs):
+        return self._getTarget()(*args, **kwargs)
+
+    def test_it(self):
+        request = testing.DummyRequest()
+        import sqlahelper
+        session = sqlahelper.get_session()
+        from .models import Project, Task
+        project = Project()
+        task1 = Task(name=u'new task', project=project)
+        task2 = Task(name=u'finished task', project=project, state="finished")
+        task3 = Task(name=u'active task', project=project, state="active")
+        session.add(project)
+        session.flush()
+
+        result = self._callFUT(request, project.id)
+        self.assertEqual(result,
+                dict(tasks=[{'id': task2.id, 'name': 'finished task'}]))
+
+class GetActiveTasksTests(unittest.TestCase):
+    def tearDown(self):
+        import transaction
+        transaction.abort()
+
+    def _getTarget(self):
+        from .views import get_active_tasks
+        return get_active_tasks
+
+    def _callFUT(self, *args, **kwargs):
+        return self._getTarget()(*args, **kwargs)
+
+    def test_it(self):
+        request = testing.DummyRequest()
+        import sqlahelper
+        session = sqlahelper.get_session()
+        from .models import Project, Task
+        project = Project()
+        task1 = Task(name=u'new task', project=project)
+        task2 = Task(name=u'finished task', project=project, state="finished")
+        task3 = Task(name=u'active task', project=project, state="active")
+        session.add(project)
+        session.flush()
+        result = self._callFUT(request, project.id)
+        self.assertEqual(result,
+                dict(tasks=[{'id': task3.id, 'name': 'active task'}]))

quicktasks/views.py

+from . import models as m
+from pyramid.security import authenticated_userid
+
+def _task2dict(task):
+    return dict(
+            id=task.id,
+            name=task.name,
+            )
+
+
+def get_new_tasks(request, project):
+    userid = authenticated_userid(request)
+    project = m.Project.query.filter_by(id=project).filter_by(user=userid).one()
+    tasks = project.query_new_tasks().all()
+    return dict(tasks=[
+        _task2dict(t)
+        for t in tasks])
+
+def get_finished_tasks(request, project):
+    userid = authenticated_userid(request)
+    project = m.Project.query.filter_by(id=project).one()
+    tasks = project.query_finished_tasks().all()
+    return dict(tasks=[
+        _task2dict(t)
+        for t in tasks])
+
+def get_active_tasks(request, project):
+    userid = authenticated_userid(request)
+    project = m.Project.query.filter_by(id=project).one()
+    tasks = project.query_active_tasks().all()
+    return dict(tasks=[
+        _task2dict(t)
+        for t in tasks])
+
+def get_pending_tasks(request, project):
+    userid = authenticated_userid(request)
+    project = m.Project.query.filter_by(id=project).one()
+    tasks = project.query_pending_tasks().all()
+    return dict(tasks=[
+        _task2dict(t)
+        for t in tasks])
+
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.