VPlayer / source_gst.py

# -*- coding: utf-8 -*-
import gobject
gobject.threads_init()

from time import sleep

from PyQt4 import QtCore
import gst

from logger import log
from bufferer import Buffer

class Source(gst.BaseSrc):
    __gsttemplates__ = (
        gst.PadTemplate("src",
                        gst.PAD_SRC,
                        gst.PAD_ALWAYS,
                        gst.caps_new_any()),
                       )
    __gsignals__ = {
      "database_changed": (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, ()),
    }
    unlock = False
    buf = None
    def __init__(self, name):
        gst.BaseSrc.__init__(self)
        self.__gobject_init__()
        self.set_name(name)
        self.set_format(gst.FORMAT_BYTES)

    def set_track(self, track):
        if self.buf:
            self.buf.terminate()
        self.buf = Buffer(track)
        QtCore.QObject.connect(self.buf, QtCore.SIGNAL('database_changed'),
                                        lambda: self.emit('database_changed') )
        self.buf.start()

    def do_create(self, offset, size):
        wait = False
        while not self.buf.started and not self.buf.failed:
            if not wait:
                wait = True
                log.debug('Waiting for buffer to start')
            if self.unlock:
                self.unlock = False
                return
            sleep(0.01)
        if wait:
            log.debug('finished waiting')
        if self.buf.length and offset >= self.buf.length:
            log.error('Requested offset (%s) is over file length (%s)' % (offset, self.buf.length))
            return gst.FLOW_UNEXPECTED, None
        wait = False
        while self.buf.pos < offset+size and not self.buf.finished and not self.buf.failed:
            if self.buf.failed:
                log.error('Buffering failed, stopping player')
                return gst.FLOW_UNEXPECTED, None
            if self.unlock:
                self.unlock = False
                return
            if not wait:
                wait = True
                log.debug('Waiting for buffer to settle...')
            sleep(0.01)
        if wait:
            log.debug('finished waiting')
        data = self.buf.data[offset:offset+size]
        if data:
            return gst.FLOW_OK, gst.Buffer(data)
        else:
            return gst.FLOW_UNEXPECTED, None

    def do_query (self, query):
        if query.type == gst.QUERY_DURATION:
            if query.parse_duration()[0] == gst.FORMAT_TIME:
                wait = False
                while not self.buf.track.length and not self.buf.failed:
                    if self.unlock:
                        self.unlock = False
                        return
                    if not wait:
                        wait = True
                        log.debug('Waiting for buffer to settle...')
                    sleep(0.01)
                lent = self.buf.track.length*gst.SECOND
                query.set_duration(gst.FORMAT_TIME, lent)
                return True
            elif query.parse_duration()[0] == gst.FORMAT_BYTES:
                log.debug('Query (type duration) format BYTES')
                wait = False
                while not self.buf.length and not self.buf.failed:
                    if self.unlock:
                        self.unlock = False
                        return
                    if not wait:
                        wait = True
                        log.debug('Waiting for buffer to settle...')
                    sleep(0.01)
                log.debug('Relying query (type duration) format BYTES with %d' % self.buf.length)
                query.set_duration(format=gst.FORMAT_BYTES, duration=int(self.buf.length))
                return True
            else:
                log.debug('Unhandled query (type: duration) %s' % str(query.parse_duration()))
                return False
            return False
        elif query.type == gst.QUERY_LATENCY:
            log.debug('Query latency request')
            query.set_latency(min_latency=0, live=True, max_latency=-1)
            return True
        else:
            log.debug('Unhandled query, type: %s' % str(query.type))
            return False

    def do_unlock(self):
        log.critical('!!!UNLOCK REQUEST!!!')
        self.unlock = True

    def is_live(self):
        return True

    def do_is_seekable(self):
        "Answers is the source seekable (this source is)"
        return True

    def do_check_get_range(self, *args):
        "Do not check range when start playing"
        return False

    def notused_do_do_seek(*args):
        "Is called on seek request (don't know what it should do)"
        return True

    def notused_seek_simple(self, *args):
        "Not called at all, don't know how to make it work"
        return True

    def destroy(self):
        "Use this to manually kill the source"
        if self.buf:
            self.buf.stop()

    def do_change_state(self, state):
        if state == gst.STATE_CHANGE_READY_TO_NULL:
            log.debug('Killing buffer')
            self.destroy()
        return gst.BaseSrc.do_change_state(self, state)

gobject.type_register(Source)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.