Source

alchemyadmin / src / alchemyadmin / __init__.py

Full commit
#
from werkzeug import Request, Response
from werkzeug.routing import Map, Rule
from werkzeug.exceptions import HTTPException
from werkzeug import Local, LocalManager
from jinja2 import Environment, PackageLoader
from sqlalchemy.orm import sessionmaker, scoped_session, class_mapper
from formalchemy import Grid, FieldSet, Field

default_template_env = Environment(loader=PackageLoader('alchemyadmin', 'templates'))

local = Local()
local_manager = LocalManager([local])

def url_for(endpoint, values, method="GET"):
    return local.adapter.build(endpoint, method=method,
                               values=values)


def make_edit_url(instance):
    values = {}
    for pk in class_mapper(instance.__class__).primary_key:
        values[pk.name] = getattr(instance, pk.name)

    return url_for('model_edit',
                   method='GET',
                   values=values)

class Application(object):
    def __init__(self, settings, *models, **modelsdict):
        self.template_env = default_template_env
        self.models = modelsdict.copy()
        self.DBSession = scoped_session(sessionmaker())
        for m in models:
            self.models[m.__name__.lower()] = m

        urls = Map([
                Rule('/', endpoint='index'),
                ])
        self.grids = {}
        self.fieldsets = {}
        for m in self.models:
            urls.add(Rule('/' + m, endpoint='model_index', 
                          defaults={'model':m}))
            edit_url = '/' + m + '/' + self.pk_url_map(self.models[m]) + ';edit'
            urls.add(Rule(edit_url, 
                          endpoint='model_edit', 
                          methods=['GET'],
                          defaults={'model':m}))
            urls.add(Rule('/' + m + '/' + self.pk_url_map(self.models[m]),
                          endpoint='model_update',
                          methods=['POST'], 
                          defaults={'model':m}))

            self.grids[m] = grid = Grid(self.models[m])
            grid.configure(readonly=True, pk=True)
            grid.append(Field('edit', 
                              value=lambda item: '<a href="%s">Edit</a>' % make_edit_url(item)))
            self.fieldsets[m] = fieldset = FieldSet(self.models[m])
        self.urls = urls

    def pk_url_map(self, model):
        mapper = class_mapper(model)
        return '/'.join(['<' + p.name + '>' for p in mapper.primary_key])


    def __call__(self, environ, start_response):
        try:
            local.adapter = adapter = self.urls.bind_to_environ(environ)
            endpoint, args = adapter.match()
            local.request = request = Request(environ)
            res = getattr(self, endpoint)(request, **args)
            
            return res(environ, start_response)
        except HTTPException, e:
            return e(environ, start_response)
        finally:
            local_manager.cleanup()

    def index(self, request):
        res = Response(self.render('index.html', models=self.models),
                       content_type='text/html;charset=utf-8')
        return res

    def model_index(self, request, model):
        rows = self.DBSession.query(self.models[model]).all()
        grid = self.grids[model].bind(instances=rows)
        res = Response(self.render('model/index.html', 
                                   model=model,
                                   grid=grid,
                                   models=self.models),
                       content_type='text/html;charset=utf-8')
        return res

    def model_update(self, request, model, **keys):
        q = self.DBSession.query(self.models[model])
        mapper = class_mapper(self.models[model])

        for p in mapper.primary_key:
            q = q.filter(p==keys[p.name])
        row = q.one()
        fs = self.fieldsets[model].bind(row, data=request.form)
        fs.sync()
        self.DBSession.commit()

        fs.configure(readonly=True)
        res = Response(self.render('model/show.html', 
                                   model=model,
                                   row=row,
                                   form=fs,
                                   models=self.models),
                       content_type='text/html;charset=utf-8')
        return res

    def model_edit(self, request, model, **keys):
        q = self.DBSession.query(self.models[model])
        mapper = class_mapper(self.models[model])

        for p in mapper.primary_key:
            q = q.filter(p==keys[p.name])
        row = q.one()
        form = self.fieldsets[model].bind(row)
        post_url = self.url_for('model_update',
                                values=self.instance_pk_values(row),
                                method='POST')
        res = Response(self.render('model/edit.html', 
                                   model=model,
                                   row=row,
                                   form=form,
                                   post_url=post_url,
                                   models=self.models),
                       content_type='text/html;charset=utf-8')
        return res

    def instance_pk_values(self, instance):
        mapper = class_mapper(instance.__class__)
        values = {}
        for pk in mapper.primary_key:
            values[pk.name] = getattr(instance, pk.name)
        return values


    def url_for(self, endpoint, values={}, method="GET"):
        return url_for(endpoint, values, method)


    def render(self, template_name, **kw):
        values = kw.copy()
        values.update(request=local.request,
                      url_for=self.url_for)
        template = self.template_env.get_template(template_name)
        return template.render(**values)