Source

toffee / toffee.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
from itertools import count
from functools import reduce

__version__ = '0.1.2dev'


class Factory(object):

    _creation_order = count()

    def __init__(self, what, *args, **kwargs):

        for name in ['configure_object', 'create_object', 'destroy_object']:
            meth = kwargs.pop(name, None)
            if meth is not None:
                setattr(self, '_' + name, meth)

        self.what = what
        self.args = args
        self.kwargs = AttrDict(kwargs)
        self._order = next(self._creation_order)

        # process subfactory__attribute='foo'
        for k, v in list(self.kwargs.items()):
            if '__' in k:
                head, tail = k.split('__', 1)
                self.kwargs[head] = self.kwargs[head](**{tail: v})
                del self.kwargs[k]

    def __getattr__(self, attr):
        return Lazy(self, attr)

    def __setitem__(self, attr, value):
        if '.' in attr:
            head, tail = attr.split('.', 1)
            self.kwargs[head] = self.kwargs[head](**{tail: value})
        else:
            self.kwargs[attr] = value

    def __getitem__(self, attr):
        return Lazy(self, attr)

    def __call__(self, *args, **kwargs):
        return self.__class__(self.what,
                              *(args or self.args),
                              **dict(self.kwargs, **kwargs))

    def __get__(self, instance, context):
        """
        Factories have magic behaviour in the context of fixtures, so that
        accessing 'my_fixture.foo' auto-delegates to 'my_fixture.o.foo'
        """
        if isinstance(instance, Fixture):
            try:
                return instance.o[instance.factory_names[self]]
            except KeyError:
                raise AttributeError()
        return self

    def create_object(self, context):
        args = tuple(context.resolve(a) for a in self.args)
        kwargs = dict((k, context.resolve(v)) for k, v in self.kwargs.items())

        ob = self._create_object(context, args, kwargs)
        ob = self._configure_object(ob, context)
        return ob

    def destroy_object(self, context, ob):
        return self._destroy_object(context, ob)

    def _create_object(self, context, args, kwargs):
        return self.what(*args, **kwargs)

    def _destroy_object(self, context, ob):
        pass

    def _configure_object(self, ob, context):
        return ob

    @classmethod
    def configure(cls):
        """\
        Return a dynamically created Factory subclass.

        In the Factory base class this serves no functional purpose, however
        subclasses may extend this to provide configuration points.
        For example :class:`~toffee.StormFactory` uses this to allow the user
        to supply the getstore method.
        """
        return type(cls.__name__, (cls,), {})

    @classmethod
    def setup_complete(self, context, created):
        """\
        Override this in subclasses to implement custom behaviour after all
        objects have been created (eg flushing to database)
        """

    @classmethod
    def teardown_complete(self, context, created):
        """\
        Override this in subclasses to implement custom behaviour after all
        objects have been torn down (eg flushing to database)
        """


class CallFactory(Factory):
    """\
    A factory that returns the value of a callable function or object.

    Use this to lazily instantiate values that are accessible at setup
    time but not necessarily earlier::

        class fixture(Fixture):

            user = CallFactory(lambda: User.objects.get(username='admin'))
            ...

    """

    def _create_object(self, context, args, kwargs):
        return self.what(*args, **kwargs)

    def _destroy_object(self, context, ob):
        pass


class DjangoFactory(Factory):

    def _create_object(self, context, args, kwargs):
        ob = self.what.objects.create(*args, **kwargs)
        ob.save()
        return ob

    def _destroy_object(self, context, ob):
        ob.delete()


class StormFactory(Factory):
    """\
    Typically you would configure this in at the start of your test code, so::

        getstore = lamdba: getUtility(IZStorm).get('main'))
        Factory = StormFactory.configure(getstore)

    You can then use Factory as normal::

        class fixture(Fixture):
            user = Factory(models.User, ...)

    By default, StormFactory calls ``store.flush`` but not ``store.commit``.
    Change this behaviour by passing factory options to setup::

        fixture.setup(flush=False)
        fixture.setup(commit=True)

    Alternatively you can supply factory options in the fixture class:

        class fixture(Fixture):
            factoryoptions = {'commit': True}

    """

    #: Callable that can retrieve Storm's store object.
    getstore = lambda: None

    storekey = '_StormFactory_store'

    default_flush = True
    default_commit = False

    @classmethod
    def configure(cls, getstore, *args, **kwargs):
        base = super(StormFactory, cls).configure(*args, **kwargs)
        return type(cls.__name__, (base,),
                    {'getstore': staticmethod(getstore)})

    @classmethod
    def _getstore_cached(cls, context):
        try:
            return context.factoryoptions[cls.storekey]
        except KeyError:
            return context.factoryoptions.setdefault(cls.storekey,
                                                     cls.getstore())

    def _create_object(self, context, args, kwargs):
        store = self._getstore_cached(context)
        ob = self.what.__new__(self.what)
        for item, value in kwargs.items():
            setattr(ob, item, value)
        if store is not None:
            store.add(ob)
        return ob

    def _destroy_object(self, context, ob):
        store = self._getstore_cached(context)
        if store is not None:
            store.remove(ob)

    @classmethod
    def setup_complete(cls, context, created):
        store = cls._getstore_cached(context)
        if store:
            if context.factoryoptions.get('flush', cls.default_flush):
                store.flush()
            if context.factoryoptions.get('commit', cls.default_commit):
                store.commit()

    @classmethod
    def teardown_complete(cls, context, created):
        store = cls._getstore_cached(context)
        if store:
            if context.factoryoptions.get('flush', cls.default_flush):
                store.flush()
            if context.factoryoptions.get('commit', cls.default_commit):
                store.commit()


class ArgumentGenerator(object):
    """
    A callable that dynamically generates factory arguments
    """

    def __init__(self, fn):
        self.fn = fn

    def __call__(self):
        return self.fn()


class ArgumentGeneratorFactory(object):
    """
    A factory for producing ``ArgumentGenerator`` objects.

    For an example, see :class:`~toffee.Seq`
    """

    def make_argument_generator(self):
        raise NotImplementedError


class Seq(ArgumentGeneratorFactory):

    def __init__(self, str_or_fn='%d', start=0):
        self.str_or_fn = str_or_fn
        self.start = start

    def make_argument_generator(self):
        counter = count(self.start)

        def seq():
            n = next(counter)
            if callable(self.str_or_fn):
                return self.str_or_fn(n)
            else:
                return self.str_or_fn % (n,)
        return ArgumentGenerator(seq)


class Lazy(object):
    def __init__(self, factory, attrs):
        self.factory = factory
        if not isinstance(attrs, list):
            attrs = [attrs]
        self.attrs = attrs

    def __getattr__(self, attr):
        return Lazy(self.factory, self.attrs + [attr])

    def __str__(self):
        return "<%s for %r %r>" % (self.__class__.__name__, self.factory,
                                   '.'.join(self.attrs))
    __repr__ = __str__


class AttrDict(dict):

    def __getattr__(self, item):
        return self[item]

    def __setattr__(self, item, value):
        self[item] = value


class Fixture(object):

    #: Default factory options. See :class:`~toffee.StormFactory` for an
    #: example of how these are used to control flush and commit options.
    factoryoptions = {}

    def __init__(self, **kwargs):

        #: Factory definitions
        self.d = AttrDict()
        self.factory_names = {}

        #: Factory-created objects
        self.o = AttrDict()
        self.created = None

        self.factoryoptions_defaults = self.factoryoptions.copy()
        self.factoryoptions_defaults.update(kwargs.pop('factoryoptions', {}))

        class_factories = ((k, getattr(self.__class__, k))
                           for k in dir(self.__class__))
        class_factories = ((k, v)
                           for k, v in class_factories
                           if isinstance(v, Factory))

        self.update_factories(dict(class_factories, **kwargs))

    def __getattr__(self, name):
        try:
            return self.o[name]
        except KeyError:
            raise AttributeError(name)

    def update_factories(self, factories):
        for k, v in factories.items():
            if v in self.factory_names:
                raise ValueError("Factory %r used more than once (%r and %r)" %
                                 (v, k, self.factory_names[v]))
            self.factory_names[v] = k
        self.d.update(factories)

    def setup(self, force=False, **factoryoptions):
        """
        :param force: allow calls to setup to be nested
        :param **factoryoptions: any factory specific options, for example
                                 passing flags such as ``commit=True``. See
                                 individual factory subclasses for information
        """
        if self.created and not force:
            raise Exception("setup() has already been called on this fixture. "
                            "Call teardown first, or use setup(force=True).")

        self.factoryoptions = dict(self.factoryoptions_defaults,
                                   **factoryoptions)
        self.o = AttrDict()
        self.created = []
        self.argument_generators = {}

        factories = sorted((v._order, k) for k, v in self.d.items())
        for _, name in factories:
            self._get_or_create_named_object(name)
        self.setup_complete()
        return self

    def setup_complete(self):
        """\
        Call any setup_complete methods on factory classes to let them know
        that all objects have been created
        """
        factory_created = {}

        for name, ob, factory in self.created:
            factory_created.setdefault(factory.__class__, []).append(ob)

        for f, obs in factory_created.items():
            f.setup_complete(self, obs)

        self.configure()

    def teardown_complete(self, torndown):
        """\
        Call any teardown_complete methods on factory classes to let them know
        that all objects have been torn down
        """
        factory_created = {}

        for name, ob, factory in torndown:
            factory_created.setdefault(factory.__class__, []).append(ob)

        for f, obs in factory_created.items():
            f.teardown_complete(self, obs)

    def configure(self):
        """\
        Subclasses should override this to provide custom post-creation
        configuration
        """

    def _get_or_create_named_object(self, name):
        """\
        Return the object from the named factory, creating it if necessary
        """
        try:
            return self.o[name]
        except KeyError:
            pass
        return self._create_object_from_factory(self.d[name])

    def _create_object_from_factory(self, factory):
        """\
        Invoke ``factory`` to create and configure an object.
        Register the created object so that it may later be referenced and
        torn down.
        """
        ob = factory.create_object(self)
        name = self.factory_names.get(factory, None)
        if name is not None:
            self.o[name] = ob
        self.created.append((name, ob, factory))
        return ob

    def resolve(self, what):

        if isinstance(what, ArgumentGeneratorFactory):
            if what not in self.argument_generators:
                self.argument_generators[what] = \
                        what.make_argument_generator()
            return self.argument_generators[what]()

        if isinstance(what, ArgumentGenerator):
            return what()

        if isinstance(what, Factory):
            return self.resolve(Lazy(what, []))

        if isinstance(what, Lazy):
            if what.factory in self.factory_names:
                name = self.factory_names[what.factory]
                ob = self._get_or_create_named_object(name)
            else:
                ob = self._create_object_from_factory(what.factory)

            return reduce(getattr, what.attrs, ob)

        return what

    def teardown(self):
        torndown = []
        while self.created:
            name, ob, factory = self.created.pop()
            if name is not None:
                del self.o[name]
            factory.destroy_object(self, ob)
            torndown.append((name, ob, factory))
        self.teardown_complete(torndown)
        self.factoryoptions = {}

    def __enter__(self):
        return self.setup()

    def __exit__(self, type, value, tb):
        if type:
            try:
                self.teardown()
            except Exception:
                pass
        else:
            self.teardown()
        return False

    #: Alias for compatility with unittest names
    setUp = setup

    #: Alias for compatility with unittest names
    tearDown = teardown
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.