Source

VPlayer / lastfm.py

# -*- coding: utf-8 -*-
import time
import urllib
import urllib2
from hashlib import md5
from xml.dom import minidom
from threading import Thread
from collections import deque

from PyQt4 import QtCore

from logger import log
from track import Track

class AuthError(Exception):
    pass

class LastfmScrobbler(QtCore.QThread):
    np_queue = deque()
    sub_queue = deque()
    session = None
    active = True
    def create_session(self, login = None):
        settings = QtCore.QSettings()
        unixtime = str(int(time.time()))
        passw = str(settings.value('lastfm/password').toString())
        passmd5 = md5(passw).hexdigest()
        token = md5(passmd5 + unixtime).hexdigest()
        baseurl = 'http://post.audioscrobbler.com/?'
        data = {}
        data['hs'] = 'true'
        data['p'] = '1.2.1'
        data['c'] = 'vpl'
        data['v'] = '0.1'
        data['u'] = settings.value('lastfm/login').toString()
        data['t'] = unixtime
        data['a'] = token
        url = baseurl + urllib.urlencode(data)
        try:
            auth = urllib2.urlopen(url).read().split()
        except:
            raise AuthError()
        if auth[0] == 'OK':
            self.session = auth[1]
            self.npurl = auth[2]
            self.suburl = auth[3]
        else:
            raise AuthError()

    def signal_handler(self, status):
        log.debug('got signal '+str(status))
        if status[0] == 'end':
            settings = QtCore.QSettings()
            use = settings.value('lastfm/use', QtCore.QVariant(False)).toBool()
            if not use:
                return
            if status[2]>60 and status[1].length > 30:
                self.submit(status[1])

    def run(self):
        while self.active:
            if len(self.np_queue) > 0:
                res = self._send_now_playing(self.np_queue[0])
                if res.split()[0] == 'OK':
                    log.info('Now playing sent ok')
                    self.np_queue.popleft()
                elif res.split()[0] == 'BADSESSION':
                    log.debug('Now playing not sent, bad session')
                    self.create_session()
                elif res.split()[0] == 'SOMEERROR':
                    log.debug('Now playing could not be sent')
            if len(self.sub_queue) > 0:
                res = self._send_submit(self.sub_queue[0])
                if res.split()[0] == 'OK':
                    log.info('Track submited ok')
                    self.sub_queue.popleft()
                elif res.split()[0] == 'BADSESSION':
                    log.debug('Track not submited, bad session')
                    self.create_session()
                elif res.split()[0] == 'SOMEERROR':
                    log.debug('Track could not be submited')
            time.sleep(5)

    def now_playing(self, track):
        self.np_queue.append(track)

    def submit(self, track):
        tracktime = int(time.time())-track.length
        self.sub_queue.append((track, tracktime))

    def _send_now_playing(self, track):
        log.info('Sending nowplaying...')
        if not self.session:
            self.create_session()
        data = {}
        data['s'] = self.session
        data['a'] = track.artist.encode('utf8')
        data['t'] = track.title.encode('utf8')
        data['b'] = ''
        data['l'] = str(int(track.length))
        data['n'] = ''
        data['m'] = ''
        post = urllib.urlencode(data)
        try:
            return urllib2.urlopen(self.npurl, post).read()
        except:
            return 'SOMEERROR'

    def _send_submit(self, track):
        if not self.session:
            self.create_session()
        data = {}
        data['s'] = self.session
        data['a[0]'] = track[0].artist.encode('utf8')
        data['t[0]'] = track[0].title.encode('utf8')
        data['i[0]'] = track[1]
        data['o[0]'] = 'P'
        data['r[0]'] = ''
        data['l[0]'] = str(int(track[0].length))
        data['b[0]'] = ''
        data['n[0]'] = ''
        data['m[0]'] = ''
        post = urllib.urlencode(data)
        try:
            return urllib2.urlopen(self.suburl, post).read()
        except:
            return 'SOMEERROR'


class LastfmData:
    api_key = 'd1147ab84fccf4014972c76b1a1cc403'
    api_sgn = 'fadfa5728cf9acb740486c617176088f'


class GetTrackInfo(QtCore.QThread):
    api_key = LastfmData.api_key

    def __init__(self, track, no_cover = False):
        QtCore.QThread.__init__(self)
        self.track = track
        self.no_cover = no_cover

    def run(self):
        info = self.get_info()
        self.emit(QtCore.SIGNAL('results'), info)

    def get_info(self, track = None):
        if track is None:
            track = self.track
        url = 'http://ws.audioscrobbler.com/2.0/'
        data = {}
        data['method'] = 'track.getinfo'
        data['artist'] = track.artist
        data['track'] = track.title
        data['api_key'] = self.api_key
        album = 'Unknown'
        cover = None
        genre = 'Unknown'
        try:
            post = urllib.urlencode(data)
            #print post
            xml = urllib2.urlopen(url +'?'+ post).read()
            #print xml
            parsed = minidom.parseString(xml)
            albums_xml = parsed.getElementsByTagName('album')
            if albums_xml:
                album_xml = albums_xml[0]
                album = album_xml.getElementsByTagName('title')[0].childNodes[0].data
                covers_urls = album_xml.getElementsByTagName('image')
                if covers_urls and not self.no_cover:
                    bigcover_url = covers_urls[-1].childNodes[0].data
                    cover = urllib.urlopen(bigcover_url).read()
            tags_list_xml = parsed.getElementsByTagName('toptags')
            if tags_list_xml:
                tags_xml = tags_list_xml[0].getElementsByTagName('tag')
                if tags_xml:
                    genre = tags_xml[0].childNodes[1].childNodes[0].data
            return (album, genre, cover)
        except:
            return ()


class RecommendedArtists(QtCore.QThread):
    api_key = LastfmData.api_key
    api_sgn = LastfmData.api_sgn
    session = None
    def __init__(self, login):
        QtCore.QThread.__init__(self)
        self.login = login

    def get_recommended(self):
        if not self.session:
            self.create_session()
        url = 'http://ws.audioscrobbler.com/2.0/'
        data = {}
        data['method'] = 'user.getrecommendedartists'
        data['api_key'] = self.api_key
        data['api_sig'] = self.api_sgn
        data['sk'] = self.session

        post = urllib.urlencode(data)
        xml = urllib2.urlopen(url, post).read()


    def get_token(self):
        data = {}
        url = 'http://ws.audioscrobbler.com/2.0/'
        data['method'] = 'auth.getToken'
        data['api_key'] = self.api_key
        sig = md5('api_key'+self.api_key+'methodauth.getToken').hexdigest()
        data['api_sig'] = sig
        post = urllib.urlencode(data)
        xml = urllib2.urlopen(url, post).read()
        parsed = minidom.parseString(xml)
        lfm = parsed.getElementsByTagName('lfm')[0]
        status = lfm.attributes['status']
        token = lfm.getElementsByTagName('token')[0].childNodes[0].data
        self.token = token
        return token

    def get_sk(self):
        data = {}
        url = 'http://ws.audioscrobbler.com/2.0/'
        data['method'] = 'auth.getSession'
        data['api_key'] = self.api_key
        data['token'] = self.token
        txt = 'api_key'+self.api_key+'methodauth.getSession'+'token'+self.token
        sig = md5(txt).hexdigest()
        data['api_sig'] = sig
        post = urllib.urlencode(data)
        xml = urllib2.urlopen(url, post).read()

    def create_session(self, login = None):
        if login is None:
            login = self.login
        unixtime = str(int(time.time()))
        passmd5 = md5(login['password']).hexdigest()
        token = md5(passmd5 + unixtime).hexdigest()
        baseurl = 'http://post.audioscrobbler.com/?'
        data = {}
        data['hs'] = 'true'
        data['p'] = '1.2.1'
        #data['c'] = 'm3a'
        data['c'] = 'vpl'
        data['v'] = '0.1'
        data['u'] = login['login']
        data['t'] = unixtime
        data['a'] = token
        url = baseurl + urllib.urlencode(data)
        auth = urllib2.urlopen(url).read().split()
        if auth[0] == 'OK':
            self.session = auth[1]
            self.npurl = auth[2]
            self.suburl = auth[3]
        else:
            raise AuthError()

class SimilarArtists(QtCore.QThread):
    api_key = LastfmData.api_key
    artists = []
    def __init__(self, artist):
        QtCore.QThread.__init__(self)
        self.artist = artist

    def run(self):
        self.artists = artists = self.similiar_artists(self.artist)

        log.debug('Lastfm.py: Emiting signal "results" with %d artists' % len(artists))
        self.emit(QtCore.SIGNAL('results'), artists)
        #print artists

    def similiar_artists(self, artist):
        url = 'http://ws.audioscrobbler.com/2.0/'
        data = {}
        data['method'] = 'artist.getsimilar'
        data['artist'] = artist.encode('utf-8')
        data['api_key'] = self.api_key
        try:
            post = urllib.urlencode(data)
            xml = urllib2.urlopen(url, post).read()
            parsed = minidom.parseString(xml)
            artists_xml = parsed.getElementsByTagName('name')
            return [a.childNodes[0].data for a in artists_xml]
        except:
            self.emit(QtCore.SIGNAL('status'), 'error')


class SimilarTracks(QtCore.QThread):
    api_key = LastfmData.api_key
    def __init__(self, track):
        QtCore.QThread.__init__(self)
        self.track = track

    def run(self):
        self.results = results = self.similiar_tracks(self.track)
        if results:
            log.debug('Emiting signal "results" with %d tracks' % len(results))
            self.emit(QtCore.SIGNAL('results'), results)

    def similiar_tracks(self, track):
        url = 'http://ws.audioscrobbler.com/2.0/'
        data = {}
        data['method'] = 'track.getsimilar'
        data['artist'] = track.artist.encode('utf-8')
        data['track'] = track.title.encode('utf-8')
        data['api_key'] = self.api_key
        try:
            post = urllib.urlencode(data)
            xml = urllib2.urlopen(url, post).read()
            parsed = minidom.parseString(xml)
            tracks_xml = parsed.getElementsByTagName('track')
            results = []
            for track in tracks_xml:
                title_xml = track.getElementsByTagName('name')[0]
                title = title_xml.childNodes[0].data
                artist_xml = track.getElementsByTagName('name')[1]
                artist = artist_xml.childNodes[0].data
                results.append((artist,title))
            log.debug('Got %d similar tracks' % len(results))
            return results
        except Exception as e:
            log.error('Error: %s' % e.message)
            self.emit(QtCore.SIGNAL('status'), 'error')