Source

spotimeta / spotimeta / resolver.py

Full commit
# coding=UTF-8

import string

from __init__ import Metadata

TR_FROM = u'ÁÀÂÄÅÃáàâäåãÇçÉÈËÊéèëêÍÌÏÎíìïîÑñÓÒÖÔÕØóòöôõøÚÙÜÛúùüûÿ'.encode('latin-1')
TR_TO   = u'AAAAAAaaaaaaCcEEEEeeeeIIIIiiiiNnOOOOOOooooooUUUUuuuuy'
TRANS   = string.maketrans(TR_FROM, TR_TO)
CASE_PRICE = .2
ASCII_PRICE = .5

def edit_distance(a,b):
    """
    Calculates a modified Levenshtein distance between a and b.
    Heavily modified from the basic algorithm at http://hetland.org/coding/
    """
    n, m = len(a), len(b)
    if n > m:
        # Make sure n <= m, to use O(min(n,m)) space
        a,b = b,a
        n,m = m,n
        
    current = range(n+1)
    add_price = 1
    del_price = 1
    for i in range(1,m+1):
        previous, current = current, [i]+[0]*n
        for j in range(1,n+1):
            add    = previous[j] + add_price
            delete = current[j-1] + del_price
            change = previous[j-1]
            if a[j-1] != b[i-1]:
                if a[j-1].lower() == b[i-1].lower():
                    change += CASE_PRICE
                else:
                    try:
                        if a[j-1].encode('latin-1').translate(TRANS) == b[i-1].encode('latin-1').translate(TRANS):
                            change += ASCII_PRICE
                        else:
                            change += 1
                    except UnicodeEncodeError:
                        change += 1
            current[j] = min(add, delete, change)
            
    return current[n]

def f(n):                
    return max(1-.1*n, 1/(1.+n))

class Resolver(object):
    def __init__(self, candidates=4, cache={}, *args, **kwargs):
        self._cache = cache
        self._md = Metadata(cache=self._cache, *args, **kwargs)
        self.candidates = candidates
    
    def score(self, target, candidate_list, accessor=None):
        """
        Modifies the incoming list (generally a 'result' from the API)
        with a score (0-1) derived from the edit distance.
        """
        for candidate in candidate_list:
            if accessor:
                candidate['score'] = candidate.get('score', 1.0) * f(edit_distance(target, candidate.get(accessor,{}).get('name','')))
            else:
                candidate['score'] = candidate.get('score', 1.0) * f(edit_distance(target, candidate['name']))
        candidate_list.sort(key=lambda x: x['score'], reverse=True)
        return candidate_list
        
    def search_artist(self, artist, *args, **kwargs):
        res = self._md.search_artist(artist, *args, **kwargs)
        self.score(artist, res['result'])
        return res
    
    def search_album(self, album, *args, **kwargs):
        res = self._md.search_album(album, *args, **kwargs)
        self.score(album, res['result'])
        return res
    
    def search_track(self, track, *args, **kwargs):
        res = self._md.search_track(track, *args, **kwargs)
        self.score(track, res['result'])
        return res
    
    def lookup(self, uri, detail=0):
        return self._md.lookup(uri, detail=detail)
    
    def multiquery(self, artist=None, album=None, track=None):
        """
        Given a combination of artist, album, and/or track, tries
        to find the nearest match for all of them
        """
        candlist = list()
        if artist:
            candlist = self.search_artist(artist)['result']
        if album:
            if candlist:
                clist = list()
                for art in candlist[:self.candidates]:
                    r = self.lookup(art['href'], detail=1)['result']['albums']
                    for a in r:
                        a['score'] = art['score']
                    clist.extend(r)
                candlist = self.score(album, clist)
            else:
                candlist = self.search_album(album)['result']
        if track:
            if candlist and album:
                clist = list()
                for alb in candlist[:self.candidates]:
                    r = self.lookup(alb['href'], detail=1)['result']['tracks']
                    for t in r:
                        t['score'] = alb['score']
                    clist.extend(r)
                candlist = self.score(track, clist)
            else:
                candlist = self.search_track(track)['result']
                if artist:
                    candlist = self.score(artist, candlist, accessor='artist')   
        return candlist