Source

twoq / twoq / queuing.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
# -*- coding: utf-8 -*-
'''twoq queuing mixins'''

from threading import local
from collections import deque
from itertools import tee, repeat
from contextlib import contextmanager

from stuf.utils import OrderedDict
from stuf.core import stuf, frozenstuf, orderedstuf

from twoq.support import n2u, n2b

SLOTS = [
    '_work', 'outgoing', '_util', 'incoming', '_call', '_alt', '_wrapper',
    '_args', '_kw', '_clearout', '_context', '_CONFIG', '_INQ', '_WORKQ',
    '_UTILQ', '_OUTQ', '_iterator', 'current_mode', '_snapshots',
]


class ThingsMixin(local):

    '''things management mixin'''

    # 1. incoming things
    _INCFG = 'inq'
    _INVAR = 'incoming'
    # 2. utility things
    _UTILCFG = 'utilq'
    _UTILVAR = '_util'
    # 3. work things
    _WORKCFG = 'workq'
    _WORKVAR = '_work'
    # 4. outgoing things
    _OUTCFG = 'outq'
    _OUTVAR = 'outgoing'
    # read/write mode marker
    _RW = 'read/write'
    # read-only mode marker
    _RO = 'read-only'

    def __init__(self, incoming, outgoing, **kw):
        '''
        init

        @param incoming: incoming things
        @param outgoing: outgoing things
        '''
        super(ThingsMixin, self).__init__()
        # snapshots
        self._snapshots = deque(maxlen=kw.pop('snapshots', 5))
        # incoming things
        self.incoming = incoming
        # outgoing things
        self.outgoing = outgoing
        # current callable
        self._call = lambda x: x
        # current alt callable
        self._alt = lambda x: x
        # clear wrapper
        self._wrapper = list
        # reset postitional arguments
        self._args = ()
        # reset keyword arguments
        self._kw = {}
        # mode
        self.current_mode = self._RW
        # set defaults
        self.unswap()

    @property
    def balanced(self):
        '''if queues are balanced'''
        return self.outcount() == self.__len__()

    @staticmethod
    def _repr(*args):
        return (
            '<{0}.{1}<<{2}>>([IN: {3}({4}) => WORK: {5}({6}) => UTIL: {7}({8})'
            ' => OUT: {9}: ({10})]) at {11}>'
        ).format(*args)

    def clear(self):
        '''clear every thing'''
        self.detap().unwrap().dealt()
        return self.outclear().inclear()._wclear()._uclear()

    ###########################################################################
    ## context rotation #######################################################
    ###########################################################################

    @contextmanager
    def ctx1(self, **kw):
        '''swap to one-armed context'''
        self.snapshot()
        q = kw.pop(self._WORKCFG, self._INVAR)
        self.swap(workq=q, utilq=q, context=self.ctx1, **kw)
        yield
        # return to global context
        self.reswap()

    def swap(self, hard=False, **kw):
        '''swap contexts'''
        self._context = kw.get('context', getattr(self, self._default_context))
        # clear out outgoing things before extending them?
        self._clearout = kw.get('clearout', True)
        # keep context-specific settings between context swaps
        self._CONFIG = kw if kw.get('hard', False) else {}
        # 1. incoming things
        self._INQ = kw.get(self._INCFG, self._INVAR)
        # 2. work things
        self._WORKQ = kw.get(self._WORKCFG, self._WORKVAR)
        # 3. utility things
        self._UTILQ = kw.get(self._UTILCFG, self._UTILVAR)
        # 4. outgoing things
        self._OUTQ = kw.get(self._OUTCFG, self._OUTVAR)
        return self

    def unswap(self):
        '''swap context to default context'''
        return self.swap()

    def reswap(self):
        '''swap contexts to current preferred context'''
        return self.swap(**self._CONFIG)

    ###########################################################################
    ## mode ###################################################################
    ###########################################################################

    def rw(self):
        '''switch to read/write mode'''
        self.current_mode = self._RW
        return self._uclear().unswap()

    ###########################################################################
    ## snapshots ##############################################################
    ###########################################################################

    def undo(self):
        '''revert to last snapshot'''
        self.clear()
        self.incoming = self._snapshots.pop()
        self.snapshot()
        return self

    def revert(self, snapshot=0):
        '''revert to specific snapshot'''
        self.clear()
        self.incoming = self._snapshots[snapshot]
        self.snapshot()
        return self

    ###########################################################################
    ## current callable management ############################################
    ###########################################################################

    def args(self, *args, **kw):
        '''arguments for current callable'''
        # set positional arguments
        self._args = args
        # set keyword arguemnts
        self._kw = kw
        return self

    def tap(self, call):
        '''
        set current callable

        @param call: a callabler
        '''
        # reset postitional arguments
        self._args = ()
        # reset keyword arguments
        self._kw = {}
        # set current callable
        self._call = call
        return self

    def alt(self, call):
        '''
        set alternative current callable

        @param call: an alternative callable
        '''
        self._alt = call
        return self

    def detap(self):
        '''clear current callable'''
        # reset postitional arguments
        self._args = ()
        # reset keyword arguments
        self._kw = {}
        # reset current callable (default is identity)
        self._call = lambda x: x
        return self

    def dealt(self):
        '''clear current alternative callable'''
        self._alt = lambda x: x
        return self

    def factory(self, call):
        '''
        build current callable from factory

        @param call: a callable
        '''
        def wrap(*args, **kw):
            return call(*args, **kw)
        return self.tap(wrap)

    defactory = detap

    ###########################################################################
    ## things rotation ########################################################
    ###########################################################################

    def outshift(self):
        '''shift incoming things to outgoing things'''
        with self.autoctx():
            return self._xtend(self._iterable)

    outsync = outshift

    def reup(self):
        '''put incoming things in incoming things as one incoming thing'''
        with self.ctx2():
            return self._append(list(self._iterable))

    def shift(self):
        '''shift outgoing things to incoming things'''
        with self.autoctx(inq=self._OUTVAR, outq=self._INVAR):
            return self._xtend(self._iterable)

    sync = shift

    ###########################################################################
    ## things appending #######################################################
    ###########################################################################

    def append(self, thing):
        '''
        append thing to right side of incoming things

        @param thing: some thing
        '''
        with self.ctx1():
            return self._append(thing)

    def prepend(self, thing):
        '''
        append `thing` to left side of incoming things

        @param thing: some thing
        '''
        with self.ctx1():
            return self._appendleft(thing)

    appendleft = prepend

    ###########################################################################
    ## things extension #######################################################
    ###########################################################################

    def extend(self, things):
        '''
        extend right side of incoming things with `things`

        @param thing: some things
        '''
        with self.ctx1():
            return self._xtend(things)

    def prextend(self, things):
        '''
        extend left side of incoming things with `things`

        @param thing: some things
        '''
        with self.ctx1():
            return self._xtendleft(things)

    extendleft = prextend

    def outextend(self, things):
        '''
        extend right side of outgoing things with `things`

        @param thing: some things
        '''
        with self.ctx1(workq=self._OUTVAR):
            return self._xtend(things)

    ###########################################################################
    ## iteration runners ######################################################
    ###########################################################################

    @classmethod
    def breakcount(cls, call, length, exception=StopIteration):
        '''
        rotate through iterator until it reaches its original length

        @param iterable: an iterable to exhaust
        '''
        for i in repeat(None, length):  # @UnusedVariable
            try:
                yield call()
            except exception:
                pass

    @staticmethod
    def iterexcept(call, exception):
        '''
        call a function repeatedly until an exception is raised

        Converts a call-until-exception interface to an iterator interface.
        Like `iter(call, sentinel)` but uses an exception instead of a sentinel
        to end the loop.

        Raymond Hettinger, Python Cookbook recipe # 577155
        '''
        try:
            while 1:
                yield call()
        except exception:
            pass


class ResultMixin(local):

    '''result things mixin'''

    def wrap(self, wrapper):
        '''
        wrapper for outgoing things

        @param wrapper: an iterator
        '''
        self._wrapper = wrapper
        return self

    def unwrap(self):
        '''clear current wrapper'''
        return self.list()

    def dict(self):
        '''set wrapper to `d    ict`'''
        self._wrap = dict
        return self

    def ordered_dict(self):
        '''set wrapper to `OrderedDict`'''
        self._wrap = OrderedDict

    def list(self):
        '''set wrapper to `list`'''
        self._wrap = list
        return self

    def unicode(self, encoding='utf-8'):
        '''set wrapper to `unicode` with given `encoding`'''
        self._wrap = lambda x: n2u(x, encoding)
        return self

    def bytes(self, encoding='ISO-8859-1'):
        '''set wrapper to `bytes` with given `encoding`'''
        self._wrap = lambda x: n2b(x, encoding)
        return self

    def tuple(self):
        '''set wrapper to `tuple`'''
        self._wrap = tuple
        return self

    def set(self):
        '''set wrapper to `set`'''
        self._wrap = set
        return self

    def frozenset(self):
        '''set wrapper to `frozenset`'''
        self._wrap = frozenset
        return self

    def deque(self):
        '''set wrapper to `deque`'''
        self._wrap = deque
        return self

    def stuf(self):
        '''set wrapper to `stuf`'''
        self._wrap = stuf
        return self

    def frozenstuf(self):
        '''set wrapper to `frozenstuf`'''
        self._wrap = frozenstuf
        return self

    def orderedstuf(self):
        '''set wrapper to `orderedstuf`'''
        self._wrap = orderedstuf
        return self

    def first(self):
        '''first incoming thing'''
        with self._context():
            return self._append(next(self._iterable))

    def last(self):
        '''last incoming thing'''
        with self._context():
            i1, _ = tee(self._iterable)
            return self._append(deque(i1, maxlen=1).pop())

    def peek(self):
        '''results from read-only context'''
        self.ro()
        out = self._wrapper(self._util)
        results = out[0] if len(out) == 1 else out
        self.rw()
        return results

    def results(self):
        '''yield outgoing things, clearing outgoing things as it iterates'''
        return self.__iter__()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.