Friedrich Weber avatar Friedrich Weber committed 7c62dcb

softlinks are uncool

Comments (0)

Files changed (1)

jambalah/jamendo.py

-/home/fred/dev/jambalah/jamendo/jamendo.py
+# -*- coding: utf-8 -*-
+"""
+    jamendo
+    ~~~~~~~
+
+    A simple jamendo api.
+
+    This program is free software. It comes without any warranty, to
+    the extent permitted by applicable law. You can redistribute it
+    and/or modify it under the terms of the Do What The Fuck You Want
+    To Public License, Version 2, as published by Sam Hocevar. See
+    http://sam.zoy.org/wtfpl/COPYING for more details.
+
+    cached_property stolen from
+    http://ronny.uberhost.de/simple-cached-for-properties-done-right
+"""
+
+try:
+    import json
+except ImportError:
+    import simplejson as json
+
+from urllib import urlencode, quote
+from urllib2 import urlopen
+from time import time, sleep
+
+GET2_API = 'http://api.jamendo.com/get2'
+PATH_SEPARATOR = '/'
+LIST_SEPARATOR = '+'
+RETURN_FORMAT = 'json'
+DELAY = 1.0
+DEFAULT_N = 100
+
+_last_query = 0
+
+class cached_property(object):
+    def __init__(self, func, name=None):
+        self.func = func
+        self.name = name if name is not None else func.__name__
+
+    def __get__(self, obj, type=None):
+        if obj is None:
+            return self
+
+        result = self.func(obj)
+        setattr(obj, self.name, result)
+        return result
+
+def _join_url(items):
+    """
+        join all *items* together with `PATH_SEPARATOR`.
+    """
+    return PATH_SEPARATOR.join(items)
+
+def _join_list(items):
+    """
+        join all *items* together with `LIST_SEPARATOR`.
+    """
+    return LIST_SEPARATOR.join(items)
+
+def _fetch(url):
+    """
+        return the pythonized "return value" of *url*
+        (converts json to python).
+
+        It will also send no more than 1 request per second.
+    """
+    global _last_query
+    # only one request per second ...
+    stamp = time()
+    while _last_query + DELAY > stamp:
+        sleep(DELAY / 3)
+        stamp = time()
+    _last_query = stamp
+    # ok. do it.
+    try:
+        conn = urlopen(url)
+        return conn.read()
+    finally:
+        conn.close()
+
+def _select_query(fields, unit, format=RETURN_FORMAT, **params):
+    """
+        perform a query and return the pythonized return value.
+
+        :Parameters:
+            `fields`: list of strings
+                field names to fetch (unquoted)
+            `unit`: str
+                the unit we are operating in (e.g. 'album')
+            `format`: str
+                the format we should return, defaults to RETURN_FORMAT
+            `params`:
+                additional parameters
+
+    """
+    # URL: http://api.jamendo.com/get2/[fields]/[unit]/[returnformat]/?[params]
+    url = _join_url([
+        GET2_API, _join_list(map(quote, fields)), unit, format,
+        '?' + urlencode(params.items())
+        ])
+    print url
+    value = _fetch(url)
+    if format == 'json':
+        return json.loads(value)
+    else:
+        return value
+
+def _unique_cached_property(field):
+    def property_getter(self):
+        value = _select_query(self.fields, self.unit, id=self.id)
+        if value:
+            return value[0]
+        else:
+            return None
+
+    property_getter.__name__ = field
+    return cached_property(property_getter, field)
+
+def _unique_field_classmethod(field):
+    def query_classmethod(cls, value, **kwargs):
+        kwargs[field] = value
+        return cls(_select_query(cls.fields, cls.unit, **kwargs)[0])
+    query_classmethod.__name__ = 'by_%s' % field
+    return classmethod(query_classmethod)
+
+def _field_classmethod(field):
+    def query_classmethod(cls, value, **kwargs):
+        kwargs[field] = value
+        return map(cls, _select_query(cls.fields, cls.unit, **kwargs))
+    query_classmethod.__name__ = 'by_%s' % field
+    return classmethod(query_classmethod)
+
+def _unique_bunch(field):
+    return (_unique_cached_property(field), _unique_field_classmethod(field))
+
+def iterate_objects(func, *args, **kwargs):
+    kwargs['order'] = 'id'
+    kwargs['n'] = DEFAULT_N
+    kwargs['pn'] = 1
+    while True:
+        objects = func(*args, **kwargs)
+        kwargs['pn'] += 1
+        if not objects:
+            break
+        for object in objects:
+            yield object
+
+class JamendoUnit(object):
+    unit = None
+    fields = ('id',)
+    aliases = {}
+
+    def __init__(self, fields):
+        for name in self.fields:
+            setattr(self,
+                    self.aliases.get(name, name),
+                    fields.get(name)
+                    )
+
+    def __eq__(self, other):
+        return self.id == other.id
+
+    def __repr__(self):
+        return '<%s id %d at 0x%x>' % (
+                self.__class__.__name__,
+                self.id,
+                id(self)
+                )
+
+    ahref, by_ahref = _unique_bunch('ahref')
+    ahrefidstr, by_ahrefidstr = _unique_bunch('ahrefidstr')
+    ahrefname, by_ahrefname = _unique_bunch('ahrefname')
+
+class Artist(JamendoUnit):
+    unit = 'artist'
+    fields = ('id', 'idstr', 'name', 'image', 'url', 'mbgid', 'mbid',
+            'genre', 'dates')
+
+    idstr, by_idstr = _unique_bunch('idstr')
+
+    name = _unique_cached_property('name')
+    by_name = _field_classmethod('name')
+
+    image = _unique_cached_property('image')
+    by_image = _field_classmethod('image')
+
+    url, by_url = _unique_bunch('url')
+    mbgid, by_mbgid = _unique_bunch('mbgid')
+    mbid, by_mbid = _unique_bunch('mbid')
+
+    genre = _unique_cached_property('genre')
+    by_genre = _field_classmethod('genre')
+
+    dates = _unique_cached_property('dates')
+    by_dates = _field_classmethod('dates')
+
+    @cached_property
+    def albums(self):
+        return map(Album, _select_query(Album.fields, 'album',
+            artist_id=self.id))
+
+    @classmethod
+    def by_firstletter(cls, firstletter, **kwargs):
+        kwargs['artist_firstletter'] = firstletter
+        return map(cls, _select_query(cls.fields, 'artist',
+            **kwargs))
+
+class Album(JamendoUnit):
+    unit = 'album'
+    fields = ('id', 'name', 'image', 'url', 'duration', 'genre', 'dates')
+
+    name = _unique_cached_property('name')
+    by_name = _field_classmethod('name')
+
+    image = _unique_cached_property('image')
+    by_image = _field_classmethod('image')
+
+    url, by_url = _unique_bunch('url')
+
+    duration = _unique_cached_property('duration')
+    by_duration = _field_classmethod('duration')
+
+    genre = _unique_cached_property('genre')
+    by_genre = _field_classmethod('genre')
+
+    dates = _unique_cached_property('dates')
+    by_dates = _field_classmethod('dates')
+
+    @cached_property
+    def tracks(self):
+        return map(Track, _select_query(Track.fields, 'track',
+            album_id=self.id))
+
+    @cached_property
+    def m3u(self):
+        return _select_query(('stream',), 'album', 'plain', id=self.id)
+
+    @cached_property
+    def streams(self):
+        return _fetch(self.m3u).splitlines()
+
+class Playlist(JamendoUnit):
+    unit = 'playlist'
+    fields = ('id', 'name', 'duration')
+
+    name = _unique_cached_property('name')
+    by_name = _field_classmethod('name')
+
+    duration = _unique_cached_property('duration')
+    by_duration = _field_classmethod('duration')
+
+class Track(JamendoUnit):
+    unit = 'track'
+    fields = ('id', 'name', 'duration', 'url')
+
+    name = _unique_cached_property('name')
+    by_name = _field_classmethod('name')
+
+    duration = _unique_cached_property('duration')
+    by_duration = _field_classmethod('duration')
+
+    url, by_url = _unique_bunch('url')
+
+    def get_stream(self, encoding=''):
+        """
+            returns the url of the stream.
+
+            :Parameters:
+                `encoding`: str
+                    You can specify a stream encoding here. Some valid
+                    values are 'mp31' or 'ogg2'. Couldn't find a reference
+                    for this. Can also be empty.
+
+            :todo: more docs (valid encodings?)
+        """
+        return _select_query(('stream',), 'track', 'plain', id=self.id)
+
+    @cached_property
+    def stream(self):
+        return self.get_stream()
+
+class Tag(JamendoUnit):
+    unit = 'tag'
+    fields = ('id', 'idstr',)
+
+    idstr, by_idstr = _unique_bunch('idstr')
+    # TODO: weight??
+
+class License(JamendoUnit):
+    unit = 'license'
+    fields = ('id', 'class', 'name')
+    aliases =  {'class': 'class_'}
+
+    class_ = _unique_cached_property('class')
+    by_class = _field_classmethod('class')
+
+    name = _unique_cached_property('name')
+    by_name = _field_classmethod('name')
+
+    url, by_url = _unique_bunch('url')
+
+class User(JamendoUnit):
+    unit = 'user'
+    fields = ('id', 'idstr', 'name', 'lang', 'image', 'dates')
+
+    idstr, by_idstr = _unique_bunch('idstr')
+
+    name = _unique_cached_property('name')
+    by_name = _field_classmethod('name')
+
+    lang = _unique_cached_property('lang')
+    by_lang = _field_classmethod('lang')
+
+    image = _unique_cached_property('image')
+    by_image = _field_classmethod('image')
+
+    dates = _unique_cached_property('dates')
+    by_dates = _field_classmethod('dates')
+
+class Review(JamendoUnit):
+    unit = 'review'
+    fields = ('id', 'name', 'text', 'rating', 'lang', 'dates')
+
+    name = _unique_cached_property('name')
+    by_name = _field_classmethod('name')
+
+    text = _unique_cached_property('text')
+    by_text = _field_classmethod('text')
+
+    rating = _unique_cached_property('rating')
+    by_rating = _field_classmethod('rating')
+
+    lang = _unique_cached_property('lang')
+    by_lang = _field_classmethod('lang')
+
+    dates = _unique_cached_property('dates')
+    by_dates = _field_classmethod('dates')
+
+class Radio(JamendoUnit):
+    unit = 'radio'
+    fields = ('id', 'idstr', 'name', 'image')
+
+    idstr, by_idstr = _unique_bunch('idstr')
+
+    name = _unique_cached_property('name')
+    by_name = _field_classmethod('name')
+
+    image = _unique_cached_property('image')
+    by_image = _field_classmethod('image')
+
+    # TODO: `weight` & `duration`.
+
+def test():
+    print list(iterate_objects(Artist.by_firstletter, 'T', hasalbums=1))
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.