Commits

jason kirtland committed 27526b5

Retired support for Genshi < 0.6

  • Participants
  • Parent commits 5ae56fc

Comments (0)

Files changed (20)

File flatland/out/genshi.py

+from __future__ import absolute_import
+from collections import deque
+
+from genshi.core import Namespace, QName, END, START, TEXT
+from genshi.template.base import (
+    DirectiveFactory,
+    EXPR,
+    SUB,
+    TemplateSyntaxError,
+    _eval_expr,
+    )
+from genshi.template.eval import Expression
+from genshi.template.directives import Directive
+from genshi.template.interpolation import interpolate
+
+
+from flatland.out.generic import _unpack, transform, Context
+
+
+__all__ = ('setup',)
+
+NS = Namespace(u'http://ns.discorporate.us/flatland/genshi')
+
+_static_attribute_order = [u'type', u'name', u'value']
+
+_to_context = {}
+for key in (u'auto-name', u'auto-value', u'auto-domid', u'auto-for',
+            u'auto-tabindex', u'auto-filter', u'domid-format'):
+    _to_context[key] = key.replace(u'-', u'_')
+
+_bind_qname = NS.bind
+
+
+def setup(template):
+    """Register the flatland directives with a template.
+
+    :param template: a `Template` instance
+    """
+
+    if not hasattr(template, 'add_directives'):
+        raise RuntimeError("%s.setup requires Genshi 0.6 or higher." % (
+            __name__,))
+    template.add_directives(NS, FlatlandElements())
+
+
+class EvaluatedLast(Directive):
+    __slots__ = ()
+
+    def __call__(self, stream, directives, ctxt, **vars):
+        local, foreign = [], []
+        for d in directives:
+            if isinstance(d, EvaluatedLast):
+                local.append(d)
+            else:
+                foreign.append(d)
+        if foreign:
+            foreign.append(self)
+            foreign.extend(local)
+            return foreign[0](stream, foreign[1:], ctxt, **vars)
+        return self.process(stream, local, ctxt, vars)
+
+    def process(self, stream, directives, ctxt, vars):
+        raise NotImplementedError  # pragma: nocover
+
+
+class TagOnly(EvaluatedLast):
+    _name = None
+    __slots__ = ('attributes',)
+
+    @classmethod
+    def attach(cls, template, stream, value, namespaces, pos):
+        if type(value) is not dict:
+            raise TemplateSyntaxError(
+                "The %s directive must be an element" % cls._name,
+                template.filepath, *pos[1:])
+        return super(TagOnly, cls).attach(
+            template, stream, value, namespaces, pos)
+
+    def __init__(self, value, template=None, namespaces=None,
+                 lineno=-1, offset=-1):
+        Directive.__init__(self, None, template, namespaces, lineno, offset)
+        self.attributes = value
+
+
+class AttributeOnly(EvaluatedLast):
+    _name = None
+    __slots__ = ()
+
+    @classmethod
+    def attach(cls, template, stream, value, namespaces, pos):
+        if type(value) is dict:
+            raise TemplateSyntaxError(
+                ("The %s directive may only be used as a "
+                 "tag attribute" % cls._name),
+                template.filepath, *pos[1:])
+        return super(AttributeOnly, cls).attach(
+            template, stream, value, namespaces, pos)
+
+
+class ControlAttribute(AttributeOnly):
+    __slots__ = ('raw_value')
+
+    def __init__(self, value, template=None, namespaces=None,
+                 lineno=-1, offset=-1):
+        Directive.__init__(self, None, template, namespaces, lineno, offset)
+
+        # allow interpolation inside control attributes
+        raw_value = list(interpolate(value, lineno=lineno, offset=offset))
+        if all(kind is TEXT for (kind, _, _) in raw_value):
+            self.raw_value = u''.join(event[1] for event in raw_value)
+        else:
+            self.raw_value = raw_value
+
+    def process(self, stream, directives, ctxt, vars):
+        # unbound transformation.
+        if not directives:
+            directives = [self]
+        else:
+            directives = [self] + directives
+        return _rewrite_stream(stream, directives, ctxt, vars, None)
+
+    def inject(self, mapping, ctxt, vars):
+        """Inject the translated key and interpolated value into *mapping*."""
+        raw = self.raw_value
+        if raw.__class__ is unicode:
+            final_value = raw
+        else:
+            parts = []
+            for kind, value, pos in raw:
+                if kind is TEXT:
+                    parts.append(value)
+                else:
+                    value = _eval_expr(value, ctxt, vars)
+                    parts.append(unicode(value))
+            final_value = u''.join(parts)
+        mapping[_to_context.get(self._name, self._name)] = final_value
+
+
+class AutoName(ControlAttribute):
+    _name = 'auto-name'
+    __slots__ = ()
+
+
+class AutoValue(ControlAttribute):
+    _name = 'auto-value'
+    __slots__ = ()
+
+
+class AutoDomID(ControlAttribute):
+    _name = 'auto-domid'
+    __slots__ = ()
+
+
+class AutoFor(ControlAttribute):
+    _name = 'auto-for'
+    __slots__ = ()
+
+
+class AutoTabindex(ControlAttribute):
+    _name = 'auto-tabindex'
+    __slots__ = ()
+
+
+class AutoFilter(ControlAttribute):
+    _name = 'auto-filter'
+    __slots__ = ()
+
+
+class Binding(AttributeOnly):
+    _name = 'bind'
+    __slots__ = ('bind',)
+
+    def __init__(self, attributes, template=None, namespaces=None,
+                 lineno=-1, offset=-1, bind=None):
+        AttributeOnly.__init__(self, attributes, template, namespaces,
+                               lineno, offset)
+        self.bind = bind
+
+    def process(self, stream, directives, ctxt, vars):
+        if self.bind is not None:
+            bind = self.bind
+        elif self.expr is None:
+            bind = None
+        else:
+            bind = _eval_expr(self.expr, ctxt, vars)
+        return _rewrite_stream(stream, directives, ctxt, vars, bind)
+
+
+class RenderContextManipulator(TagOnly):
+    __slots__ = ()
+
+    def __init__(self, attributes, template=None, namespaces=None,
+                 lineno=-1, offset=-1):
+        transformed = {}
+        for key, value in attributes.items():
+            key = _to_context.get(key, key)
+            if key == u'tabindex':
+                value = int(value)
+            transformed[key] = value
+        TagOnly.__init__(self, transformed, template, namespaces,
+                         lineno, offset)
+
+
+class With(RenderContextManipulator):
+    _name = 'with'
+    __slots__ = ()
+
+    def process(self, stream, directives, ctxt, vars):
+        try:
+            render_context = ctxt['flatland_render_context']
+        except KeyError:
+            ctxt['flatland_render_context'] = render_context = Context()
+
+        if 'filters' not in self.attributes:
+            attrs = self.attributes
+        else:
+            attrs = self.attributes.copy()
+            attrs['filters'] = _eval_expr(Expression(attrs['filters']),
+                                          ctxt, vars)
+
+        render_context.push()
+        render_context.update(attrs)
+        assert not directives
+        for event in stream:
+            yield event
+        render_context.pop()
+
+
+class Set(RenderContextManipulator):
+    _name = 'set'
+    __slots__ = ()
+
+    def process(self, stream, directives, ctxt, vars):
+        try:
+            render_context = ctxt['flatland_render_context']
+        except KeyError:
+            ctxt['flatland_render_context'] = render_context = Context()
+        render_context.update(self.attributes)
+        assert not directives
+        return stream
+
+
+class FlatlandElements(DirectiveFactory):
+
+    NAMESPACE = NS
+
+    directives = [
+        ('with', With),
+        ('set', Set),
+        ('bind', Binding),
+        ('auto-name', AutoName),
+        ('auto-value', AutoValue),
+        ('auto-domid', AutoDomID),
+        ('auto-for', AutoFor),
+        ('auto-tabindex', AutoTabindex),
+        ('auto-filter', AutoFilter),
+        ]
+
+
+def _rewrite_stream(stream, directives, ctxt, vars, bind):
+    stream = list(stream)
+    mutable_attrs = {}
+
+    for control_attribute in directives:
+        control_attribute.inject(mutable_attrs, ctxt, vars)
+
+    kind, (tagname, attrs), pos = stream[0]
+    if len(stream) == 2:
+        contents = None
+    else:
+        contents = _simplify_stream(stream[1:-1], ctxt, vars)
+
+    existing_attributes = {}
+    for qname, value in attrs:
+        if qname.namespace is None:
+            if not isinstance(value, unicode):
+                value = _simplify_stream(value, ctxt, vars)
+                attrs |= ((qname, value),)
+            existing_attributes[qname.localname] = qname
+            mutable_attrs[qname.localname] = value
+
+    try:
+        render_context = ctxt['flatland_render_context']
+    except KeyError:
+        ctxt['flatland_render_context'] = render_context = Context()
+
+    new_contents = transform(tagname.localname, mutable_attrs, contents,
+                             render_context, bind)
+
+    if new_contents is None:
+        new_contents = ()
+    elif isinstance(new_contents, unicode):
+        new_contents = [(TEXT, new_contents, (None, -1, -1))]
+
+    pairs = sorted(mutable_attrs.iteritems(), key=_attribute_sort_key)
+    for attribute_name, value in pairs:
+        if attribute_name in existing_attributes:
+            qname = existing_attributes.pop(attribute_name)
+        else:
+            qname = QName(attribute_name)
+        attrs |= ((qname, value),)
+    for qname in existing_attributes.values():
+        attrs -= qname
+
+    stream[0] = (kind, (tagname, attrs), pos)
+    if new_contents and tagname.localname == u'select' and bind is not None:
+        if tagname.namespace:
+            sub_tag = Namespace(tagname.namespace).option
+        else:  # pragma: nocover
+            sub_tag = QName('option')
+        new_contents = _bind_unbound_tags(new_contents, sub_tag, bind)
+    if new_contents:
+        stream[1:-1] = new_contents
+    return iter(stream)
+
+
+def _attribute_sort_key(item):
+    try:
+        return (0, _static_attribute_order.index(item[0]))
+    except ValueError:
+        return (1, item[0])
+
+
+def _bind_unbound_tags(stream, qname, bind):
+    stream = deque(stream)
+    while stream:
+        kind, data, pos = stream.popleft()
+        if kind is SUB:
+            directives, substream = data
+            for d in directives:  # pragma: nocover   (coverage bug :()
+                if isinstance(d, Binding):
+                    break
+            else:
+                substream = list(_bind_unbound_tags(substream, qname, bind))
+            yield kind, (directives, substream), pos
+        elif kind is START:
+            if data[0] != qname or qname in data[1]:
+                yield kind, data, pos
+                continue
+            head = kind, data, pos
+            substream = []
+            stack = 1
+            while stack:
+                event = stream.popleft()
+                substream.append(event)
+                if event[0] is START and event[1][0] == qname:
+                    stack += 1
+                elif event[0] is END and event[1] == qname:
+                    stack -= 1
+            substream = [head] + list(
+                _bind_unbound_tags(substream, qname, bind))
+            # attaching the directive is sufficient; don't need to fabricate
+            # a form:bind="" attribute
+            yield SUB, ([Binding(u'', bind=bind)], substream), pos
+        else:
+            yield kind, data, pos
+
+
+def _simplify_stream(stream, ctxt, vars):
+    # consumes stream, send a list
+    parts = []
+    for idx, (kind, data, pos) in enumerate(stream):
+        if kind is TEXT:
+            parts.append(data)
+        elif kind is EXPR:
+            value = _eval_expr(data, ctxt, vars)
+            if hasattr(value, '__html__'):
+                value = _unpack(value)
+            if hasattr(value, '__next__') or hasattr(value, 'next'):
+                while hasattr(value, '__next__') or hasattr(value, 'next'):
+                    value = list(value)
+                    value = _simplify_stream(value, ctxt, vars)
+                if not isinstance(value, unicode):
+                    stream[idx:idx + 1] = value
+                else:
+                    stream[idx] = (TEXT, value, pos)
+            elif not isinstance(value, unicode):
+                value = unicode(value)
+            parts.append(value)
+        else:
+            return stream
+    return u''.join(parts)

File flatland/out/genshi/__init__.py

-from flatland.out.genshi.filter import flatland_filter
-
-___all__ = ['flatland_filter', 'setup']
-
-
-def setup(template, use_version=None):
-    """Register the flatland directives with a template.
-
-    :param template: a `Template` instance
-    """
-    if use_version is None:
-        use_version = 6 if hasattr(template, 'add_directives') else 5
-
-    if use_version == 6:
-        from flatland.out.genshi_06 import setup
-        setup(template)
-    else:
-        install_element_mixin()
-        template.filters.append(flatland_filter)
-
-
-def install_element_mixin():
-    from flatland.out.genshi.elements import GenshiAccessMixin
-    from flatland.schema.base import Element
-    if GenshiAccessMixin in Element.__bases__:
-        return
-    assert Element.__bases__ != (object,)
-    Element.__bases__ += (GenshiAccessMixin,)
-
-
-def uninstall_element_mixin():
-    from flatland.out.genshi.elements import GenshiAccessMixin
-    from flatland.schema.base import Element
-    if GenshiAccessMixin not in Element.__bases__:
-        return
-    bases = list(Element.__bases__)
-    bases.remove(GenshiAccessMixin)
-    Element.__bases__ = tuple(bases)

File flatland/out/genshi/elements.py

-from flatland.schema import base
-
-
-# FIXME:
-# unicode(wrapped) should probably eval to either the fq_name or the
-# result of a configured formatting function.  clean up the set_prefix
-# bandaid and make it do this.
-#
-# FIXME: continue to simplify the wrapper. this is all hella bogus :(
-
-class GenshiAccessMixin(object):
-    def set_prefix(self, prefix):
-        self._genshi_prefix = prefix
-
-    @property
-    def bind(self):
-        return WrappedElement(self)
-
-    @property
-    def binds(self):
-        return dict((child.name, WrappedElement(child))
-                    for child in self.children)
-
-def get_prefix(element):
-    if hasattr(element, '_genshi_prefix'):
-        return element._genshi_prefix
-    else:
-        for element in element.parents:
-            if hasattr(element, '_genshi_prefix'):
-                return element._genshi_prefix
-        return None
-
-
-class WrappedElement(unicode):
-    """Wrap an Element into something Genshi expressions can use.
-
-    Genshi AST transformations don't play well with rich objects like
-    Elements.  Anything iterable will iterate on insert into the
-    template, even if it defines a __unicode__.  Normal objects
-    defining __getitem__ or __getattr__ or even __getattribute__
-    confuse Genshi's dotted expression adapter.
-
-    ``unicode``-derived objects are immune from these problems.  These
-    wrappers will str() as Genshi-evalable string.
-
-    """
-    def __new__(cls, element):
-        return unicode.__new__(cls, cls._format(element))
-
-    def __init__(self, element):
-        self.element = element
-
-    @staticmethod
-    def _format(element):
-        prefix = get_prefix(element)
-        path = element.fq_name()
-        if not prefix:
-            root = element.root
-            if root.name is None:
-                prefix = u'form'
-            else:
-                prefix = u'forms.%s' % (root.name,)
-        return u"%s.el(%s)" % (prefix, repr(path).decode('raw_unicode_escape'))
-
-    def __getitem__(self, key):
-        if isinstance(key, str):
-            try:
-                key = key.decode('ascii')
-            except UnicodeError:
-                raise KeyError(key)
-        item = self.element._index(key)
-        if isinstance(item, base.Element):
-            return WrappedElement(item)
-        else:
-            return item
-
-    def __getattr__(self, attribute):
-        return getattr(self.element, attribute)
-
-    def __unicode__(self):
-        return self._format(self.element)
-
-    def __iter__(self):
-        return (WrappedElement(child) if isinstance(child, base.Element)
-                                      else child
-                for child in self.element.children)

File flatland/out/genshi/filter.py

-from __future__ import absolute_import
-
-import itertools
-import logging
-from genshi import Markup, Namespace, Stream, QName
-from genshi.core import START, TEXT
-from genshi.template.eval import Expression
-
-import flatland
-from flatland.util import Maybe
-from . taglistener import TagListener, default_start
-
-
-__all__ = 'flatland_filter',
-
-log = logging.getLogger('flatland.out.genshi')
-
-NAMESPACE  = Namespace(u'http://ns.discorporate.us/flatland/genshi')
-
-# filter attributes
-F_BIND     = NAMESPACE[u'bind']
-
-# HTML attributes
-H_CHECKED = QName(u'checked')
-H_FOR = QName(u'for')
-H_ID = QName(u'id')
-H_NAME = QName(u'name')
-H_SELECTED = QName(u'selected')
-H_TABINDEX = QName(u'tabindex')
-H_VALUE = QName(u'value')
-
-MAYBE = (u'auto',)
-YES   = (u'1', u'true', u't', u'on', u'yes')
-NO    = (u'0', u'false', u'nil', u'off', u'no')
-
-# TODO:jek remember why mixing - and _ in these was a good idea and document
-CTX_CUR_TABINDEX      = u'auto-tabindex_value'
-CTX_FMT_DOMID         = u'auto-domid_format'
-CTX_FILTERS           = u'auto-filter_filters'
-
-
-
-def flatland_filter(stream, context):
-    """flatland_filter(stream, context) -> stream
-
-    Filter a stream through FlatlandFilter
-
-    """
-    return Stream(FlatlandFilter()(stream, context))
-
-
-
-
-class ToggledAttribute(object):
-    toggle_attribute = None
-    toggle_default = False
-    toggle_context_key = None
-
-    attribute = None
-    auto_tags = ()
-
-    def pop_toggle(self, attrs, context):
-        attrs, proceed = self.pop_attribute(
-            attrs, self.toggle_attribute, u'auto', parse_trool)
-        forced = proceed is True
-        if proceed is Maybe:
-            proceed = parse_trool(context.get(self.toggle_context_key, u'auto'))
-            if proceed is Maybe:
-                proceed = self.toggle_default
-        return attrs, proceed, forced
-
-    def pop_attribute(self, attrs, name, default=None, transform=None):
-        value = attrs.get(name, default)
-        if transform:
-            value = transform(value)
-        return (attrs - name), value
-
-
-class NameAttribute(ToggledAttribute):
-    toggle_default = True
-    toggle_attribute = NAMESPACE[u'auto-name']
-    toggle_context_key = u'auto-name'
-
-    attribute = H_NAME
-    auto_tags = set((u'input', u'button', u'select', u'textarea', u'form'))
-
-    def apply_to(self, tag, attrs, context, node):
-        attrs, proceed, forced = self.pop_toggle(attrs, context)
-
-        if not proceed:
-            return attrs
-
-        # Abort on unbound or anonymous nodes
-        if node is None or node.name is None:
-            return attrs
-
-        current = attrs.get(self.attribute, None)
-        if forced or current is None and tag.localname in self.auto_tags:
-            attrs |= ((self.attribute, node.flattened_name()),)
-        return attrs
-
-
-class ValueAttribute(ToggledAttribute):
-    toggle_default = True
-    toggle_attribute = NAMESPACE[u'auto-value']
-    toggle_context_key = u'auto-value'
-
-    attribute = H_VALUE
-    auto_tags = set((u'input', u'select', u'textarea', u'button'))
-
-    apply_child = (u'textarea',)
-    apply_mixed = ()
-
-    def apply_to(self, tag, attrs, stream, context, node):
-        attrs, proceed, forced = self.pop_toggle(attrs, context)
-
-        if not proceed:
-            return stream, attrs
-
-        # Abort on unbound nodes.
-        if node is None:
-            return stream, attrs
-
-        if not forced and tag.localname not in self.auto_tags:
-            return stream, attrs
-
-        # VALUE_CHILD (e.g. <textarea>) always replaces the stream with
-        # the node's string value.
-        if tag.localname in self.apply_child:
-            stream = self.set_stream_value(stream, node.u)
-
-        elif tag.localname == u'select':
-            stream = self.set_select(forced, attrs, stream, node)
-
-        elif tag.localname in self.apply_mixed:
-            stream, attrs = self.set_mixed_value(forced, attrs, stream, node)
-
-        elif tag.localname == u'input':
-            attrs = self.set_input(forced, attrs, node)
-
-        else:
-            attrs = self.set_simple_value(forced, attrs, node)
-
-        return stream, attrs
-
-    def set_stream_value(self, stream, text):
-        stream = stream_is_empty(stream)
-        if stream is None:
-            return Stream([(TEXT, text, (None, -1, -1))])
-        else:
-            return stream
-
-    def set_simple_value(self, override, attrs, node):
-        current = attrs.get(H_VALUE)
-        if current is None or override is True:
-            attrs |= ((H_VALUE, node.u),)
-        return attrs
-
-    def set_mixed_value(self, override, attrs, stream, node):
-        """
-        For output nodes that may either take a 'value=""' or encode
-        their value in nested content.  Node value will be passed
-        along as unescaped markup if child nodes are generated!
-
-        """
-        if attrs.get(H_VALUE, None) is None:
-            stream = self.set_stream_value(stream, Markup(node))
-        else:
-            attrs = self.set_simple_value(override, attrs, node)
-        return stream, attrs
-
-    def set_input(self, override, attrs, node):
-        type = attrs.get(u'type', u'text').lower()
-
-        if type in (u'text', u'hidden', u'button', u'submit', u'reset'):
-            attrs = self.set_simple_value(override, attrs, node)
-        elif type in (u'password', u'file', u'image'):
-            if override is True:
-                attrs = self.set_simple_value(override, attrs, node)
-        elif type == u'checkbox':
-            value = attrs.get(H_VALUE, None)
-            if value is None and isinstance(node, flatland.Boolean):
-                value = node.true
-                attrs |= ((H_VALUE, value),)
-            attrs = self.set_checked(attrs, node)
-        elif type == u'radio':
-            attrs = self.set_checked(attrs, node)
-        else:
-            if override is True:
-                attrs = self.set_simple_value(override, attrs, node)
-        return attrs
-
-    def set_select(self, override, attrs, stream, node):
-        return OptionToggler(node.u)(stream)
-
-    def set_checked(self, attrs, node):
-        value = attrs.get(H_VALUE, None)
-        if value is None:
-            return attrs
-        if value == node.u:
-            attrs |= ((H_CHECKED, u'checked'),)
-        elif isinstance(node, flatland.Compound):
-            attrs -= H_CHECKED
-        else:
-            for child in node.children:
-                if value == child.u:
-                    attrs |= ((H_CHECKED, u'checked'),)
-                    break
-            else:
-                attrs -= H_CHECKED
-        return attrs
-
-
-class DomIDAttribute(ToggledAttribute):
-    toggle_attribute = NAMESPACE[u'auto-domid']
-    toggle_context_key = u'auto-domid'
-
-    attribute = H_ID
-    auto_tags = set((u'input', u'button', u'select', u'textarea'))
-
-    def apply_to(self, tag, attrs, context, el):
-        attrs, proceed, forced = self.pop_toggle(attrs, context)
-        if not proceed:
-            return attrs
-
-        current = attrs.get(self.attribute, None)
-        if forced or current is None and tag.localname in self.auto_tags:
-            if el is not None:
-                domid = self.id_for(el, context)
-            else:
-                domid = self.id_for_unbound(tag, attrs, context)
-            attrs |= ((self.attribute, domid),)
-        return attrs
-
-    @classmethod
-    def id_for(cls, el, context):
-        fmt = context.get(CTX_FMT_DOMID, u'f_%s')
-        return fmt % el.flattened_name()
-
-    @classmethod
-    def id_for_unbound(cls, tag, attrs, context):
-        if tag in cls.auto_tags:
-            name = attrs.get(H_NAME, None)
-            if name is not None:
-                fmt = context.get(CTX_FMT_DOMID, u'f_%s')
-                return fmt % name
-        return None
-
-
-class ForAttribute(ToggledAttribute):
-    toggle_attribute = NAMESPACE[u'auto-for']
-    # tied to ID generation
-    toggle_context_key = u'auto-domid'
-
-    attribute = H_FOR
-    auto_tags = set((u'label',))
-
-    def apply_to(self, tag, attrs, context, node):
-        attrs, proceed, forced = self.pop_toggle(attrs, context)
-        if not proceed or node is None:
-            return attrs
-
-        current = attrs.get(self.attribute, None)
-        if forced or current is None and tag.localname in self.auto_tags:
-            attrs |= ((self.attribute, DomIDAttribute.id_for(node, context)),)
-        return attrs
-
-
-class TabIndexAttribute(ToggledAttribute):
-    toggle_attribute = NAMESPACE[u'auto-tabindex']
-    toggle_context_key = u'auto-tabindex'
-
-    attribute = H_TABINDEX
-    auto_tags = set((u'input', u'button', u'select', u'textarea'))
-
-    def apply_to(self, tag, attrs, context):
-        attrs, proceed, forced = self.pop_toggle(attrs, context)
-        if not proceed:
-            return attrs
-
-        tabindex = context.get(CTX_CUR_TABINDEX, 0)
-        if tabindex == 0:
-            return attrs
-
-        current = attrs.get(self.attribute, None)
-        if forced or current is None and tag.localname in self.auto_tags:
-            attrs |= ((self.attribute, unicode(tabindex)),)
-            context[CTX_CUR_TABINDEX] = tabindex + 1
-        return attrs
-
-
-class Filter(ToggledAttribute):
-    toggle_default = True
-    toggle_attribute = NAMESPACE[u'auto-filter']
-    toggle_context_key = u'auto-filter'
-
-    def apply_to(self, tag, attrs, stream, context, node):
-        attrs, proceed, forced = self.pop_toggle(attrs, context)
-        filters = context.get(CTX_FILTERS, ())
-
-        if not (proceed and filters):
-            return tag, attrs, stream
-
-        for filter_expr in filters:
-            if callable(filter_expr):
-                fn = filter_expr
-            else:
-                try:
-                    fn = Expression(filter_expr).evaluate(context)
-                except:
-                    log.error("Failed to parse filter expression %r" %
-                              filter_expr,)
-                    raise
-            want = getattr(fn, 'tags', None)
-            if want is None or tag.localname not in want:
-                continue
-
-            tag, attrs, stream = fn(tag, attrs, stream, context, node)
-            if isinstance(stream, basestring):
-                stream = Stream([(TEXT, stream, (None, -1, -1))])
-
-        return tag, attrs, stream
-
-
-class OptionToggler(TagListener):
-    __slots__ = (u'value',)
-
-    activated = ((H_SELECTED, u'selected'),)
-
-    def __init__(self, value):
-        self.value = value
-
-    def inspect(self, event, context):
-        kind, data, pos = event
-
-        if kind is START and data[0].localname == u'option':
-            return (default_start, self.end)
-        else:
-            return False
-
-    def end(self, start, end, stream, context, history):
-        kind, (tag, attrs), pos = start
-        attrs -= H_SELECTED
-
-        value = attrs.get(H_VALUE, None)
-        if value is not None:
-            if value == self.value:
-                attrs |= ((H_SELECTED, u'selected'),)
-        else:
-            children = list(stream)
-            value = u''
-            for ck, cd, cp in children:
-                if ck is TEXT: value += cd
-            stream = Stream(children)
-
-            if value.strip() == self.value.strip():
-                attrs |= ((H_SELECTED, u'selected'),)
-
-        start = kind, (tag, attrs), pos
-
-        return start, end, stream
-
-
-class DecoratedElementDirective(object):
-    set_name = NameAttribute().apply_to
-    set_value = ValueAttribute().apply_to
-    set_domid = DomIDAttribute().apply_to
-    set_tabindex = TabIndexAttribute().apply_to
-    set_for = ForAttribute().apply_to
-    apply_filters = Filter().apply_to
-
-    def start(self, event, context):
-        kind, (tag, attrs), pos = event
-
-        node = self.find_binding(tag, attrs, context)
-
-        # Node-free transformations
-        attrs = self.set_tabindex(tag, attrs, context)
-
-        # Node-sensitive transformations
-        attrs = self.set_domid(tag, attrs, context, node)
-
-        return (kind, (tag, attrs), pos), dict(binding=node)
-
-    def end(self, start, end, stream, context, history):
-        kind, (tag, attrs), pos = start
-
-        node = history.get('binding', None)
-        attrs -= F_BIND
-
-        # Set <... name=""> for bound nodes.
-        attrs = self.set_name(tag, attrs, context, node)
-
-        # Map <label for="..."> to bound tags.
-        attrs = self.set_for(tag, attrs, context, node)
-
-        # Set <... value=""> or tag-specific equivalent.
-        stream, attrs = self.set_value(tag, attrs, stream, context, node)
-
-        # Run user filters after all transformations are applied.
-        filtered_tag, attrs, stream = \
-          self.apply_filters(tag, attrs, stream, context, node)
-
-        if filtered_tag != tag:
-            tag = filtered_tag
-            # Re-assemble end event with the new tag name.
-            end = end[0], tag, end[2]
-
-        # Re-assemble the start event.
-        start = (kind, (tag, attrs), pos)
-
-        return start, end, stream
-
-    def find_binding(self, tag, attributes, context):
-        expr = attributes.get(F_BIND, None)
-        if expr is None:
-            return expr
-        try:
-            return Expression(expr).evaluate(context)
-        except:
-            log.error("Failed to parse binding %r" % expr,)
-            raise
-
-
-class ImmediateVarDirective(object):
-    toggles = (u'auto-tabindex', u'auto-domid', u'auto-for',
-               u'auto-name', u'auto-value', u'auto-filter')
-    name = u'set'
-
-    def start(self, event, context):
-        kind, (tag, attrs), pos = event
-
-        for toggle in self.toggles:
-            val = attrs.get(toggle, None)
-            if val is not None:
-                context[toggle] = parse_trool(val)
-
-        val = parse_int(attrs.get(u'tabindex', None))
-        if val is not None:
-            context[CTX_CUR_TABINDEX] = val
-
-        val = attrs.get(u'domid-format', None)
-        if val is not None:
-            context[CTX_FMT_DOMID] = val
-
-        val = parse_simple_csv(attrs.get(u'filters', None))
-        if val:
-            context[CTX_FILTERS] = val
-
-        return None, None
-
-    def end(self, start, end, stream, context, history):
-        return None, None, stream
-
-
-class ScopedVarDirective(ImmediateVarDirective):
-    name = u'with'
-
-    def start(self, event, context):
-        context.push({})
-        return ImmediateVarDirective.start(self, event, context)
-
-    def end(self, start, end, stream, context, history):
-        context.pop()
-        return ImmediateVarDirective.end(self, start, end, stream,
-                                         context, history)
-
-
-class FlatlandFilter(TagListener):
-    """TODO: Document
-
-    Binds template form elements to flatland data elements and
-    automatically sets name, and value.  Manages DOM id generation,
-    links <label> to matching elements and manages tabindex.
-
-    """
-
-    dir_with = ScopedVarDirective()
-    dir_set  = ImmediateVarDirective()
-    dir_el = DecoratedElementDirective()
-
-    inspect_with = (dir_with.start, dir_with.end)
-    inspect_set = (dir_set.start, dir_set.end)
-    inspect_el = (dir_el.start, dir_el.end)
-
-    def inspect(self, event, context):
-        if event[0] is not START:
-            return False
-
-        kind, (tag, attributes), pos = event
-
-        if tag in NAMESPACE:
-            if tag.localname == self.dir_with.name:
-                return self.inspect_with
-            elif tag.localname == self.dir_set.name:
-                return self.inspect_set
-        else:
-            for attr, value in attributes:
-                if attr in NAMESPACE:
-                    return self.inspect_el
-
-
-
-
-def stream_is_empty(stream):
-    stream, dupe = itertools.tee(stream)
-    try:
-        dupe.next()
-    except StopIteration:
-        return None
-    else:
-        return stream
-
-
-
-def parse_bool(value, yes=YES, no=NO):
-    if value is True or value is False or value is None:
-        return value
-    if isinstance(value, unicode):
-        value = value.lower()
-    else:
-        value = unicode(value).lower()
-    if value in yes:
-        return True
-    if value in no:
-        return False
-    return None
-
-def parse_trool(value):
-    if value is True or value is False or value is Maybe:
-        return value
-    if isinstance(value, unicode):
-        value = value.lower()
-    else:
-        value = unicode(value).lower()
-    if value in YES:
-        return True
-    if value in NO:
-        return False
-    if value in MAYBE:
-        return Maybe
-    return Maybe
-
-def parse_int(text):
-    if type(text) is int:
-        return text
-    try:
-        return int(text)
-    except:
-        return None
-
-def parse_simple_csv(text):
-    parts = []
-    if text is None:
-        return parts
-    for part in (p.strip() for p in text.split(u',')):
-        if part:
-            parts.append(part)
-    return parts

File flatland/out/genshi/taglistener.py

-from genshi.core import Namespace, Stream, END, START
-
-
-__all__ = 'TagListener', 'default_start', 'default_end'
-
-class TagListener(object):
-    """A Stream filter that operates on tag open/close events.
-
-    Events are buffered and close event handlers may modify or replace
-    the tag and all of its children.
-    """
-
-    def __prep__(self, stream, context):
-        """Filtering and setup hook called before processing begins."""
-        return stream, context
-
-    def __call__(self, stream, context=None):
-        stream, context = self.__prep__(stream, context)
-
-        eb = EventBuffer()
-
-        for event in stream:
-            if event[0] is START:
-                listen = self.inspect(event, context)
-            else:
-                listen = False
-            eb.push(event, listen, context)
-
-        for event in eb.events:
-            yield event
-
-    def inspect(self, event, context):
-        """inspect(event, context) -> False | (callable | None, callable | None)
-
-        Called for each START event in the stream.  To register a
-        callback for a tag open/close pair, return a 2-tuple of
-        callables to consume the start and end events.
-
-        The start callable takes the form:
-            start(event, context) -> event, user-data
-
-        You can modify the open event if you wish.  The event you
-        return will be used in place of the original event.
-
-        User-data can be anything you like, and will be passed along
-        to the end-tag handler.  You can use this to maintain state
-        between open and close events.
-
-        The end callable takes the form:
-            end(start, end, stream, context, user-data) -> start, end, stream
-
-        You will be called with the opening and closing events (kind =
-        START and kind = END) as well as the stream of events that
-        occured between them.  You'll also recieve the filter context
-        and any user-data you returned from the matching start
-        callable.
-
-        The start, end, and stream you return will be used in place of
-        the original input.  Any of the three can be omitted (None),
-        and those events will not propagate to the output stream.
-
-        For example, here's a processor that discards children of the
-        matched tag::
-
-            def child_remover(start, event, stream, context, history):
-                return start, end, None
-
-        And one that removes itself but leaves children intact::
-
-            def self_remover(start, event, stream, context, history):
-                return None, None, stream
-
-        """
-        return False
-
-def default_start(event, context):
-    """A default start-listener implementation."""
-    return event, None
-
-def default_end(start, end, stream, context, history):
-    """A default end-listener implementation."""
-    return start, end, stream
-
-
-class ChildRemover(TagListener):
-    """Sample TagListener implementation that discards all children from
-    matching nodes."""
-    NAMESPACE = Namespace(u'http://code.discorporate.us/child-remover')
-
-    def inspect(self, event, context):
-        if event[0] is not START:
-            return False
-
-        kind, (tag, attributes), pos = event
-
-        if tag in self.NAMESPACE:
-            return (self.start, self.end)
-        else:
-            for attr, value in attributes:
-                if attr in self.NAMESPACE:
-                    return (default_start, self.end)
-
-        return False
-
-    def end(self, start, event, stream, context, history):
-        """I remove all children."""
-        # fixme: guessed missing symbol, cover w/ tests
-        return start, self.end, None
-
-class EventBuffer(object):
-    __slots__ = 'streams', 'tags'
-
-    def __init__(self):
-        self.streams = [[]]
-        self.tags    = []
-
-    def push(self, event, listen=False, context=None):
-        kind, data, pos = event
-
-        if kind is START:
-            if listen:
-                on_start, on_end = listen
-
-                if callable(on_start):
-                    event, history = on_start(event, context)
-                else:
-                    history = None
-
-                self.tags.append((event, on_end, history))
-                self.streams.append([])
-            else:
-                self.tags.append((event, False, None))
-                self.streams[-1].append(event)
-
-        elif kind is END:
-            start, on_end, history = self.tags.pop()
-
-            if not on_end:
-                self.streams[-1].append(event)
-            else:
-                children = Stream(self.streams.pop())
-
-                if callable(on_end):
-                    start, end, children = on_end(start, event, children,
-                                                  context, history)
-                else:
-                    end = event
-
-                if start is not None:
-                    self.streams[-1].append(start)
-                if children:
-                    self.streams[-1].extend(iter(children))
-                if end is not None:
-                    self.streams[-1].append(end)
-
-        else:
-            self.streams[-1].append(event)
-
-    events = property(lambda s: s.streams[-1], lambda s,v: None)

File flatland/out/genshi_06.py

-from __future__ import absolute_import
-from collections import deque
-
-from genshi.core import Namespace, QName, END, START, TEXT
-from genshi.template.base import (
-    DirectiveFactory,
-    EXPR,
-    SUB,
-    TemplateSyntaxError,
-    _eval_expr,
-    )
-from genshi.template.eval import Expression
-from genshi.template.directives import Directive
-from genshi.template.interpolation import interpolate
-
-
-from flatland.out.generic import _unpack, transform, Context
-
-
-__all__ = ('setup',)
-
-NS = Namespace(u'http://ns.discorporate.us/flatland/genshi')
-
-_static_attribute_order = [u'type', u'name', u'value']
-
-_to_context = {}
-for key in (u'auto-name', u'auto-value', u'auto-domid', u'auto-for',
-            u'auto-tabindex', u'auto-filter', u'domid-format'):
-    _to_context[key] = key.replace(u'-', u'_')
-
-_bind_qname = NS.bind
-
-
-def setup(template):
-    """Register the flatland directives with a template.
-
-    :param template: a `Template` instance
-    """
-
-    if not hasattr(template, 'add_directives'):
-        raise RuntimeError("%s.setup requires Genshi 0.6 or higher." % (
-            __name__,))
-    template.add_directives(NS, FlatlandElements())
-
-
-class EvaluatedLast(Directive):
-    __slots__ = ()
-
-    def __call__(self, stream, directives, ctxt, **vars):
-        local, foreign = [], []
-        for d in directives:
-            if isinstance(d, EvaluatedLast):
-                local.append(d)
-            else:
-                foreign.append(d)
-        if foreign:
-            foreign.append(self)
-            foreign.extend(local)
-            return foreign[0](stream, foreign[1:], ctxt, **vars)
-        return self.process(stream, local, ctxt, vars)
-
-    def process(self, stream, directives, ctxt, vars):
-        raise NotImplementedError  # pragma: nocover
-
-
-class TagOnly(EvaluatedLast):
-    _name = None
-    __slots__ = ('attributes',)
-
-    @classmethod
-    def attach(cls, template, stream, value, namespaces, pos):
-        if type(value) is not dict:
-            raise TemplateSyntaxError(
-                "The %s directive must be an element" % cls._name,
-                template.filepath, *pos[1:])
-        return super(TagOnly, cls).attach(
-            template, stream, value, namespaces, pos)
-
-    def __init__(self, value, template=None, namespaces=None,
-                 lineno=-1, offset=-1):
-        Directive.__init__(self, None, template, namespaces, lineno, offset)
-        self.attributes = value
-
-
-class AttributeOnly(EvaluatedLast):
-    _name = None
-    __slots__ = ()
-
-    @classmethod
-    def attach(cls, template, stream, value, namespaces, pos):
-        if type(value) is dict:
-            raise TemplateSyntaxError(
-                ("The %s directive may only be used as a "
-                 "tag attribute" % cls._name),
-                template.filepath, *pos[1:])
-        return super(AttributeOnly, cls).attach(
-            template, stream, value, namespaces, pos)
-
-
-class ControlAttribute(AttributeOnly):
-    __slots__ = ('raw_value')
-
-    def __init__(self, value, template=None, namespaces=None,
-                 lineno=-1, offset=-1):
-        Directive.__init__(self, None, template, namespaces, lineno, offset)
-
-        # allow interpolation inside control attributes
-        raw_value = list(interpolate(value, lineno=lineno, offset=offset))
-        if all(kind is TEXT for (kind, _, _) in raw_value):
-            self.raw_value = u''.join(event[1] for event in raw_value)
-        else:
-            self.raw_value = raw_value
-
-    def process(self, stream, directives, ctxt, vars):
-        # unbound transformation.
-        if not directives:
-            directives = [self]
-        else:
-            directives = [self] + directives
-        return _rewrite_stream(stream, directives, ctxt, vars, None)
-
-    def inject(self, mapping, ctxt, vars):
-        """Inject the translated key and interpolated value into *mapping*."""
-        raw = self.raw_value
-        if raw.__class__ is unicode:
-            final_value = raw
-        else:
-            parts = []
-            for kind, value, pos in raw:
-                if kind is TEXT:
-                    parts.append(value)
-                else:
-                    value = _eval_expr(value, ctxt, vars)
-                    parts.append(unicode(value))
-            final_value = u''.join(parts)
-        mapping[_to_context.get(self._name, self._name)] = final_value
-
-
-class AutoName(ControlAttribute):
-    _name = 'auto-name'
-    __slots__ = ()
-
-
-class AutoValue(ControlAttribute):
-    _name = 'auto-value'
-    __slots__ = ()
-
-
-class AutoDomID(ControlAttribute):
-    _name = 'auto-domid'
-    __slots__ = ()
-
-
-class AutoFor(ControlAttribute):
-    _name = 'auto-for'
-    __slots__ = ()
-
-
-class AutoTabindex(ControlAttribute):
-    _name = 'auto-tabindex'
-    __slots__ = ()
-
-
-class AutoFilter(ControlAttribute):
-    _name = 'auto-filter'
-    __slots__ = ()
-
-
-class Binding(AttributeOnly):
-    _name = 'bind'
-    __slots__ = ('bind',)
-
-    def __init__(self, attributes, template=None, namespaces=None,
-                 lineno=-1, offset=-1, bind=None):
-        AttributeOnly.__init__(self, attributes, template, namespaces,
-                               lineno, offset)
-        self.bind = bind
-
-    def process(self, stream, directives, ctxt, vars):
-        if self.bind is not None:
-            bind = self.bind
-        elif self.expr is None:
-            bind = None
-        else:
-            bind = _eval_expr(self.expr, ctxt, vars)
-        return _rewrite_stream(stream, directives, ctxt, vars, bind)
-
-
-class RenderContextManipulator(TagOnly):
-    __slots__ = ()
-
-    def __init__(self, attributes, template=None, namespaces=None,
-                 lineno=-1, offset=-1):
-        transformed = {}
-        for key, value in attributes.items():
-            key = _to_context.get(key, key)
-            if key == u'tabindex':
-                value = int(value)
-            transformed[key] = value
-        TagOnly.__init__(self, transformed, template, namespaces,
-                         lineno, offset)
-
-
-class With(RenderContextManipulator):
-    _name = 'with'
-    __slots__ = ()
-
-    def process(self, stream, directives, ctxt, vars):
-        try:
-            render_context = ctxt['flatland_render_context']
-        except KeyError:
-            ctxt['flatland_render_context'] = render_context = Context()
-
-        if 'filters' not in self.attributes:
-            attrs = self.attributes
-        else:
-            attrs = self.attributes.copy()
-            attrs['filters'] = _eval_expr(Expression(attrs['filters']),
-                                          ctxt, vars)
-
-        render_context.push()
-        render_context.update(attrs)
-        assert not directives
-        for event in stream:
-            yield event
-        render_context.pop()
-
-
-class Set(RenderContextManipulator):
-    _name = 'set'
-    __slots__ = ()
-
-    def process(self, stream, directives, ctxt, vars):
-        try:
-            render_context = ctxt['flatland_render_context']
-        except KeyError:
-            ctxt['flatland_render_context'] = render_context = Context()
-        render_context.update(self.attributes)
-        assert not directives
-        return stream
-
-
-class FlatlandElements(DirectiveFactory):
-
-    NAMESPACE = NS
-
-    directives = [
-        ('with', With),
-        ('set', Set),
-        ('bind', Binding),
-        ('auto-name', AutoName),
-        ('auto-value', AutoValue),
-        ('auto-domid', AutoDomID),
-        ('auto-for', AutoFor),
-        ('auto-tabindex', AutoTabindex),
-        ('auto-filter', AutoFilter),
-        ]
-
-
-def _rewrite_stream(stream, directives, ctxt, vars, bind):
-    stream = list(stream)
-    mutable_attrs = {}
-
-    for control_attribute in directives:
-        control_attribute.inject(mutable_attrs, ctxt, vars)
-
-    kind, (tagname, attrs), pos = stream[0]
-    if len(stream) == 2:
-        contents = None
-    else:
-        contents = _simplify_stream(stream[1:-1], ctxt, vars)
-
-    existing_attributes = {}
-    for qname, value in attrs:
-        if qname.namespace is None:
-            if not isinstance(value, unicode):
-                value = _simplify_stream(value, ctxt, vars)
-                attrs |= ((qname, value),)
-            existing_attributes[qname.localname] = qname
-            mutable_attrs[qname.localname] = value
-
-    try:
-        render_context = ctxt['flatland_render_context']
-    except KeyError:
-        ctxt['flatland_render_context'] = render_context = Context()
-
-    new_contents = transform(tagname.localname, mutable_attrs, contents,
-                             render_context, bind)
-
-    if new_contents is None:
-        new_contents = ()
-    elif isinstance(new_contents, unicode):
-        new_contents = [(TEXT, new_contents, (None, -1, -1))]
-
-    pairs = sorted(mutable_attrs.iteritems(), key=_attribute_sort_key)
-    for attribute_name, value in pairs:
-        if attribute_name in existing_attributes:
-            qname = existing_attributes.pop(attribute_name)
-        else:
-            qname = QName(attribute_name)
-        attrs |= ((qname, value),)
-    for qname in existing_attributes.values():
-        attrs -= qname
-
-    stream[0] = (kind, (tagname, attrs), pos)
-    if new_contents and tagname.localname == u'select' and bind is not None:
-        if tagname.namespace:
-            sub_tag = Namespace(tagname.namespace).option
-        else:  # pragma: nocover
-            sub_tag = QName('option')
-        new_contents = _bind_unbound_tags(new_contents, sub_tag, bind)
-    if new_contents:
-        stream[1:-1] = new_contents
-    return iter(stream)
-
-
-def _attribute_sort_key(item):
-    try:
-        return (0, _static_attribute_order.index(item[0]))
-    except ValueError:
-        return (1, item[0])
-
-
-def _bind_unbound_tags(stream, qname, bind):
-    stream = deque(stream)
-    while stream:
-        kind, data, pos = stream.popleft()
-        if kind is SUB:
-            directives, substream = data
-            for d in directives:  # pragma: nocover   (coverage bug :()
-                if isinstance(d, Binding):
-                    break
-            else:
-                substream = list(_bind_unbound_tags(substream, qname, bind))
-            yield kind, (directives, substream), pos
-        elif kind is START:
-            if data[0] != qname or qname in data[1]:
-                yield kind, data, pos
-                continue
-            head = kind, data, pos
-            substream = []
-            stack = 1
-            while stack:
-                event = stream.popleft()
-                substream.append(event)
-                if event[0] is START and event[1][0] == qname:
-                    stack += 1
-                elif event[0] is END and event[1] == qname:
-                    stack -= 1
-            substream = [head] + list(
-                _bind_unbound_tags(substream, qname, bind))
-            # attaching the directive is sufficient; don't need to fabricate
-            # a form:bind="" attribute
-            yield SUB, ([Binding(u'', bind=bind)], substream), pos
-        else:
-            yield kind, data, pos
-
-
-def _simplify_stream(stream, ctxt, vars):
-    # consumes stream, send a list
-    parts = []
-    for idx, (kind, data, pos) in enumerate(stream):
-        if kind is TEXT:
-            parts.append(data)
-        elif kind is EXPR:
-            value = _eval_expr(data, ctxt, vars)
-            if hasattr(value, '__html__'):
-                value = _unpack(value)
-            if hasattr(value, '__next__') or hasattr(value, 'next'):
-                while hasattr(value, '__next__') or hasattr(value, 'next'):
-                    value = list(value)
-                    value = _simplify_stream(value, ctxt, vars)
-                if not isinstance(value, unicode):
-                    stream[idx:idx + 1] = value
-                else:
-                    stream[idx] = (TEXT, value, pos)
-            elif not isinstance(value, unicode):
-                value = unicode(value)
-            parts.append(value)
-        else:
-            return stream
-    return u''.join(parts)

File tests/__init__.py

 
 
-def setup_package():
-    # undo any augmentation that nose may have triggered during
-    # test discovery
-    import flatland.out.genshi
-    flatland.out.genshi.uninstall_element_mixin()
-
-    # TODO: switch this on with an environ variable or something.
-    # and document.
-    #import tests._util
-    #tests._util.enable_coercion_blocker()
+# TODO: switch this on with an environ variable or something.
+# and document.
+#def setup_package():
+#    import tests._util
+#    tests._util.enable_coercion_blocker()

File tests/genshi/__init__.py

-from tests._util import eq_
-
-
-def setup_package():
-    from flatland.schema import base
-    eq_(base.Element.__bases__, (base._BaseElement,))
-
-    import flatland.out.genshi
-    flatland.out.genshi.install_element_mixin()
-    assert len(base.Element.__bases__) == 2
-
-def teardown_package():
-    from flatland.schema import base
-
-    assert len(base.Element.__bases__) == 2
-
-    import flatland.out.genshi
-    flatland.out.genshi.uninstall_element_mixin()
-    eq_(base.Element.__bases__, (base._BaseElement,))

File tests/genshi/_util.py

-import inspect
-import os
-import re
-import sys
-from tests._util import eq_
-
-
-__all__ = 'rendered_markup_eq_', 'RenderTest'
-
-def rendered_markup_eq_(template_text, **context):
-    import genshi, genshi.template
-
-    chunked_raw = chunk_assertion_blocks(template_text)
-    if not chunked_raw:
-        raise AssertionError("No test chunks found in template text.")
-
-    template = genshi.template.MarkupTemplate(template_text)
-    stream = template.generate(**context)
-    output = stream.render('xhtml')
-    chunked_output = chunk_assertion_blocks(output)
-
-    for idx, label, lhs, rhs in chunked_output:
-        assert lhs == rhs, "test %s: %r != %r" % (label, lhs, rhs)
-
-    eq_(len(chunked_raw), len(chunked_output))
-
-class TextFileMixin(object):
-    def wrap_with_xmlns(self, template):
-        return ('<div xmlns="http://www.w3.org/1999/xhtml" '
-                'xmlns:form="http://ns.discorporate.us/flatland/genshi" '
-                'xmlns:py="http://genshi.edgewall.org/">\n'
-                + template +
-                '\n</div>')
-
-    def _make_text_runner(self, filename, template, default_label=''):
-        label = template.strip().splitlines()[0][3:].strip()
-        if not label or label == 'test':
-            label = "test %s" % default_label
-        tester = lambda context, text: self.compare_(context, text)
-        tester.description = "%s: %s" % (os.path.basename(filename), label)
-        return tester
-
-    def from_file(self, filename, context_factory=dict):
-        modpath = sys.modules[self.__module__].__file__
-        filepath = os.path.join(os.path.dirname(modpath), filename)
-        chunks = chunk_file(filepath)
-
-        for num, chunk in enumerate(chunks):
-            wrapped = self.wrap_with_xmlns(chunk)
-            yield (self._make_text_runner(filename, chunk, num + 1),
-                   context_factory(),
-                   wrapped)
-
-    def from_string(self, collection, context_factory=dict, name=None):
-        chunks = chunk_text(collection)
-        if name is None:
-            name = inspect.stack()[1][3]
-
-        for num, chunk in enumerate(chunks):
-            wrapped = self.wrap_with_xmlns(chunk)
-            yield (self._make_text_runner(name, chunk, num + 1),
-                   context_factory(),
-                   wrapped)
-
-
-def from_text_files(context_factory=dict):
-    def wrap(fn):
-        def test(self, *args, **kw):
-            files = fn(self, *args, **kw)
-            if isinstance(files, basestring):
-                files = (files,)
-            base = os.path.dirname(os.path.abspath(
-                    inspect.getsourcefile(fn)))
-
-            all_tests = []
-            for name in files:
-                filename = os.path.join(base, name)
-                all_tests.extend(self.from_file(filename, context_factory))
-            for test, context, template in all_tests:
-                yield test, context, template
-
-        test.func_name = fn.func_name
-        return test
-    return wrap
-
-def from_docstring(context_factory=dict):
-    def wrap(fn):
-        def test(self, *args, **kw):
-            tests = self.from_string(fn.__doc__, context_factory,
-                                     name=fn.func_name)
-            for test, context, template in tests:
-                yield test, context, template
-        test.func_name = fn.func_name
-        return test
-    return wrap
-
-
-class RenderTest(TextFileMixin):
-    format = 'xhtml'
-    debug = False
-
-    def compile(self, template_text):
-        import genshi.template
-        return genshi.template.MarkupTemplate(template_text)
-
-    def to_genshi_context(self, context):
-        import genshi.template
-        if not isinstance(context, genshi.template.Context):
-            return genshi.template.Context(**context)
-        return context
-
-    def generate(self, template, context):
-        return template.generate(context)
-
-    def render(self, stream, format=None):
-        if format is None:
-            format = self.format
-        return stream.render(format)
-
-    def compare_(self, context, template_text):
-        chunked_raw = chunk_assertion_blocks(template_text)
-        if not chunked_raw:
-            raise AssertionError("No test chunks found in template text.")
-
-        template = self.compile(template_text)
-        genshi_context = self.to_genshi_context(context)
-        stream = self.generate(template, genshi_context)
-        output = self.render(stream)
-        if self.debug:
-            print template_text
-            print output
-
-        chunked_output = chunk_assertion_blocks(output)
-
-        try:
-            for idx, label, lhs, rhs in chunked_output:
-                assert lhs == rhs, "test %s: %r != %r" % (label, lhs, rhs)
-
-            eq_(len(chunked_raw), len(chunked_output))
-        except:
-            print output
-            raise
-
-
-class FilteredRenderTest(RenderTest):
-    def generate(self, template, context):
-        from flatland.out.genshi import flatland_filter
-        return flatland_filter(template.generate(context), context)
-
-
-class ChunkError(Exception):
-    """Internal to chunk_assertion_blocks."""
-
-
-def chunk_file(filepath):
-    fh = open(filepath, 'rb')
-    slurping, lines = False, []
-    for line in fh.readlines():
-        if not slurping and line.startswith('::'):
-            slurping = True
-        if slurping:
-            lines.append(line)
-            continue
-    try:
-        return chunk_text("\n".join(lines))
-    finally:
-        fh.close()
-
-def chunk_text(string):
-    return [(chunk + '\n:: endtest')
-            for chunk in re.split(r'(?:^|\n):: *endtest', string)
-            if re.search(r'(?:^|\n):: *test\b', chunk)]
-
-def chunk_assertion_blocks(text):
-    chunks = []
-    buffer = None
-    for lineno, line in enumerate(text.splitlines()):
-        if not line.startswith('::'):
-            if buffer:
-                buffer[-1].append(line)
-            continue
-        tokens = line[2:].split()
-        try:
-            pragma, args = tokens[0], tokens[1:]
-            if pragma == 'test':
-                if buffer:
-                    raise ChunkError("test out of order")
-                chunknum = len(chunks) + 1
-                if args:
-                    title = ' '.join(args)
-                else:
-                    title = str(chunknum)
-                buffer = [chunknum, title, []]
-                continue
-            elif pragma == 'endtest':
-                if not buffer or len(buffer) == 3:
-                    raise ChunkError("endtest out of order")
-                buffer[2] = ' '.join(buffer[2]).strip()
-                buffer[3] = ' '.join(buffer[3]).strip()
-                chunks.append(buffer)
-                buffer = None
-                continue
-            elif pragma == 'eq':
-                if not buffer or len(buffer) > 3:
-                    raise ChunkError("eq out of order")
-                buffer.append([])
-            else:
-                raise ChunkError("unknown pragma" + pragma)
-        except (ChunkError, IndexError), exc:
-            lineno += 1
-            arg = exc.args[0] if exc.args else ''
-            raise AssertionError(
-                "Invalid testing chunk specification: %s\n"
-                "line %s:\n%r" % (
-                    arg, lineno, line))
-    return chunks
-
-def test_chunk_assertion_blocks_1():
-   input = """
-foo
-bar
-:: test
-thing
-thing
-:: eq
-thing thing
-:: endtest
-"""
-   eq_(chunk_assertion_blocks(input),
-       [[1, '1', 'thing thing', 'thing thing']])
-
-def test_chunk_assertion_blocks_2():
-   input = """
-<div>
-:: test
-  <b>thing
-       </b>
-
-:: eq
-  <b>thing
-       </b>
-
-:: endtest
-</div>
-
-<p>
-:: test label  label
-foo
-::eq
-bar
-:: endtest"""  # no final newline
-   eq_(chunk_assertion_blocks(input),
-       [[1, '1', '<b>thing        </b>', '<b>thing        </b>'],
-        [2, 'label label', 'foo', 'bar']])

File tests/genshi/test_context.py

-from tests.genshi._util import (
-    FilteredRenderTest,
-    RenderTest,
-    from_docstring,
-    )
-
-
-
-class TestPrefilterTags(RenderTest):
-    @from_docstring()
-    def test_with(self):
-        """
-:: test
-<form:with />
-:: eq
-<form:with />
-:: endtest
-
-:: test
-<form:with>stuff</form:with>
-:: eq
-<form:with>stuff</form:with>
-:: endtest
-
-:: test
-<form:with auto-domid="on">
-${value_of('auto-domid')}
-<div id="${value_of('auto-domid')}" />
-</form:with>
-:: eq
-<form:with auto-domid="on">
-
-<div />
-</form:with>
-:: endtest
-
-:: test
-<form:with auto-domid="true">
-  <py:if test="value_of('auto-domid')">
-  defined
-  </py:if>
-  <py:if test="not value_of('auto-domid')">
-  not defined
-  </py:if>
-</form:with>
-:: eq
-<form:with auto-domid="true">
-  not defined
-</form:with>
-:: endtest
-
-:: test
-<form:set />
-:: eq
-<form:set />
-:: endtest
-
-:: test
-<form:set>stuff</form:set>
-:: eq
-<form:set>stuff</form:set>
-:: endtest
-        """
-
-
-class TestContext(FilteredRenderTest):
-    @from_docstring()
-    def test_with_scope(self):
-        """
-:: test with leaves no trace
-<form:with />
-:: eq
-
-:: endtest
-
-
-:: test with passes through inner
-<form:with>stuff</form:with>
-:: eq
-stuff
-:: endtest
-
-
-:: test nothing in context
-${value_of('auto-name')}
-:: eq
-:: endtest
-
-
-:: test with adds Maybe context
-<form:with auto-name="auto">
-${value_of('auto-name')}
-</form:with>
-:: eq
-Maybe
-:: endtest
-
-
-:: test with adds True to context
-<form:with auto-name="on">
-${value_of('auto-name')}
-</form:with>
-:: eq
-True
-:: endtest
-
-
-:: test with adds False to context
-<form:with auto-name="off">
-${value_of('auto-name')}
-</form:with>
-:: eq
-False
-:: endtest
-
-
-:: test with scopes nest
-<form:with auto-name="off">
-${value_of('auto-name')}
-<form:with auto-name="on">${value_of('auto-name')}</form:with>
-<form:with auto-name="auto">${value_of('auto-name')}</form:with>
-${value_of('auto-name')}
-</form:with>
-:: eq
-False
-True
-Maybe
-False
-::endtest
-
-:: test
-<form:with auto-domid="on">
-${value_of('auto-domid')}
-<div id="${value_of('auto-domid')}" />
-</form:with>
-:: eq
-True
-<div id="True" />
-:: endtest
-
-:: test
-<form:with auto-domid="true">
-  <py:if test="value_of('auto-domid')">
-  defined
-  </py:if>
-  <py:if test="not value_of('auto-domid')">
-  not defined
-  </py:if>
-</form:with>
-:: eq
-  defined
-:: endtest
-    """
-
-    @from_docstring()
-    def test_set_scope(self):
-        """
-:: test set leaves no trace
-<form:set />
-:: eq
-:: endtest
-
-
-:: test set pases through inner
-<form:set>stuff</form:set>
-:: eq
-stuff
-:: endtest
-
-
-:: test set does not nest
-${value_of('auto-name')}
-<form:set auto-name="auto">
-${value_of('auto-name')}
-</form:set>
-${value_of('auto-name')}
-:: eq
-Maybe
-Maybe
-:: endtest
-