Source

saturnalia / src / python / saturnalia / tile / annotation.py

Full commit

from saturnalia.util           import getTimeInMs
from saturnalia.messagetypes   import AnnotationTile
from saturnalia.tile.interface import TileAggregatorInterface

from saturnalia.slot import COUNT_UNITS_PER_EVENT

# Serialization constants:
ANNOTATION_TILE_MESSAGE_VERSION  = 1
ANNOTATION_TILE_MESSAGE_INSTANCE = AnnotationTile( )
ANNOTATION_TILE_TAG = AnnotationTile().getMessageTag()


class AnnotationAggregator ( TileAggregatorInterface ):

    def getEntries ( self, startCutoff, endCutoff ):
        """
        This method returns the "query abstraction" for Annotation tiles.  This is not part of the TileAggregatorInterface.

        Returns [ ( timestamp, resolution, annotationBytes ) ]

        The timestamps must be between startCutoff and endCutoff.

        BUG: Refactor the TileAggregatorInterface into two abstractions for Aggregation versus Query.
        """
        results = []

        for entry in sorted( self._entries ):
            time = entry[0]
            if time > startCutoff and time < endCutoff:
                results.append( entry )

        if not ( startCutoff <= endCutoff ):
            self._logger.warn( 'Invalid range: startCutoff {0!r} is after endCutoff {1!r}.'.format( startCutoff, endCutoff ) )
            if len( results ) != 0:
                self._logger.error( 'startCutoff {0!r} > endCutoff {1!r} yet some entries have a time > startCutoff and time < endCutoff.  This should be impossible; results: {2!r}'.format( startCutoff, endCutoff, results ) )

        return results


    # TileAggregatorInterface methods below:
    @staticmethod
    def getTypeName ():
        return 'annotation'


    @staticmethod
    def filterStorageResolutions ( logger, eventResolution, storageResolutions ):

        noneMatched = True
        finestResolution = None

        for storageResolution in storageResolutions:
            if eventResolution > storageResolution:
                yield storageResolution
                noneMatched = False

            if finestResolution == None:
                finestResolution = storageResolution
            else:
                finestResolution = min( finestResolution, storageResolution )

        if noneMatched:
            # If none matched, the finest resolution always is a catch-all:
            yield finestResolution


    @classmethod
    def buildFromString ( cls, config, logger, messageBuilder, bytes ):

        dataInstance = messageBuilder.makeDataInstanceFromString( bytes, ANNOTATION_TILE_TAG )

        version = dataInstance.readField( 'version'    )
        assert version == ANNOTATION_TILE_MESSAGE_VERSION, 'Unsupported tile version: {0!r}'.format(version)

        startTime     = dataInstance.readField( 'startPoint'    )
        endTime       = dataInstance.readField( 'endPoint'      )
        resolution    = dataInstance.readField( 'resolution'    )
        tileByteLimit = dataInstance.readField( 'tileByteLimit' )
        droppedBytes  = dataInstance.readField( 'droppedBytes'  )
        entries       = dataInstance.readField( 'entries'       )

        tile = cls( logger, startTime, endTime, resolution, tileByteLimit, droppedBytes )

        # Now add the given entries, but drop if we hit the size limit:
        previouslyDropped = droppedBytes
        allSucceeded      = True

        for entryMessage in entries:

            time, resolution, annotationBytes = entryMessage.readFields( ( 'time', 'resolution', 'text' ) )

            succeeded = tile.update( time, resolution, annotationBytes, COUNT_UNITS_PER_EVENT )
            allSucceeded = allSucceeded and succeeded

        if allSucceeded != True:
            newlyDropped = tile._droppedBytes - previouslyDropped
            assert newlyDropped > 0, 'AnnotationAggregator string deserialization dropped some entries but did not correctly record dropped bytes.'
            logger.warn( 'AnnotationAggregator string deserialization dropped {0!r} bytes during construction; total dropped bytes for this tile: {1!r}'.format( newlyDropped, tile._droppedBytes ) )

        return tile


    @classmethod
    def buildNew ( cls, config, logger, startTime, resolution, slotCount, scale ):

        endTime       = startTime + ( resolution * slotCount )
        tileByteLimit = config.getItem( 'tileByteLimit' )
        droppedBytes  = 0

        return cls( logger, startTime, endTime, resolution, tileByteLimit, droppedBytes )


    # __init__ is private.
    def __init__ ( self, logger, startTime, endTime, resolution, tileByteLimit, droppedBytes ):

        self._logger        = logger
        self._startTime     = startTime
        self._endTime       = endTime
        self._resolution    = resolution
        self._tileByteLimit = tileByteLimit
        self._droppedBytes  = droppedBytes
        self._byteCount     = 0
        self._entries       = []
        self._lastUpdated   = 0


    def getResolution ( self ):
        return self._resolution


    def getStartPoint ( self ):
        return self._startTime


    def getEndPoint ( self ):
        return self._endTime


    def serializeToString ( self, messageBuilder, name, currentStartTime, slots, resolution ):

        entries = []

        for ( time, resolution, bytes ) in sorted( self._entries ):
            entry = {
                'time'       : time       ,
                'resolution' : resolution ,
                'text'       : bytes      ,
                }
            entries.append(entry)

        data = {
            'version'       : ANNOTATION_TILE_MESSAGE_VERSION ,
            'startPoint'    : self._startTime                 ,
            'endPoint'      : self._endTime                   ,
            'resolution'    : self._resolution                ,
            'tileByteLimit' : self._tileByteLimit             ,
            'droppedBytes'  : self._droppedBytes              ,
            'entries'       : entries                         ,
            }
            
        bytes = messageBuilder.serializeToString( ANNOTATION_TILE_MESSAGE_INSTANCE, data )

        return bytes


    def update ( self, time, eventResolution, annotationBytes, countUnits ):
        assert type( annotationBytes ) is unicode, 'annotationBytes text must be unicode strings, not: {0!r}'.format( annotationBytes )

        entryBytes = len( annotationBytes )

        succeeded = None

        self._lastUpdated = getTimeInMs()

        if self._byteCount + entryBytes <= self._tileByteLimit:
            entry = ( time, eventResolution, annotationBytes )
            self._entries.append( entry )
            self._byteCount += entryBytes
            succeeded = True
        else:
            self._droppedBytes += entryBytes
            self._logger.warn( 'An annotation of {0!r} bytes was dropped because the estimated size of this tile is {1!r} bytes and the byte limit is {2!r}.'.format( entryBytes, self._byteCount, self._tileByteLimit ) )
            succeeded = False

        assert isinstance(succeeded, bool), 'Invariant failed; return value must be a bool, not: {0!r}'.format( type( succeeded ) )

        return succeeded



    def getLastUpdated ( self ):
        return self._lastUpdated


    ## Private ##

    __slots__ = [
        '_logger',
        '_startTime',
        '_endTime',
        '_resolution',
        '_tileByteLimit',
        '_droppedBytes',
        '_byteCount',
        '_entries',
        ]