python-bitbucket / bitbucket /

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Bitbucket API wrapper.  Written to be somewhat like py-github:


from urllib2 import Request, urlopen, URLError
from urllib import urlencode
from functools import wraps
import datetime
import time

    import json
except ImportError:
    import simplejson as json

__all__ = ['AuthenticationRequired', 'to_datetime', 'BitBucket']

api_base = ''
api_toplevel = ''

class AuthenticationRequired(Exception):

def requires_authentication(method):
    def wrapper(self, *args, **kwargs):
        username = if hasattr(self, 'bb') else self.username
        password = if hasattr(self, 'bb') else self.password
        if not all((username, password)):
            raise AuthenticationRequired("%s requires authentication" % method.__name__)
        return method(self, *args, **kwargs)
    return wrapper

def smart_encode(**kwargs):
    """Urlencode's provided keyword arguments.  If any kwargs are None, it does
    not include those."""
    args = dict(kwargs)
    for k,v in args.items():
        if v is None:
            del args[k]
    if not args:
        return ''
    return urlencode(args)

def to_datetime(timestring):
    """Convert one of the bitbucket API's timestamps to a datetime object."""
    format = '%Y-%m-%d %H:%M:%S'
    return datetime.datetime(*time.strptime(timestring, format)[:7])

class BitBucket(object):
    """Main bitbucket class.  Use an instantiated version of this class
    to make calls against the REST API."""
    def __init__(self, username='', password=''):
        self.username = username
        self.password = password
        # extended API support

    def build_request(self, url, data=None):
        if not all((self.username, self.password)):
            return Request(url,data)
        auth = '%s:%s' % (self.username, self.password)
        auth = {'Authorization': 'Basic %s' % (auth.encode('base64').strip())}
        if data:
            data = urlencode(data)
        return Request(url, data, auth)

    def load_url(self, url, quiet=False, method=None, data=None):
        request = self.build_request(url,data)
        if method:
            request.get_method = lambda : method
            result = urlopen(request).read()
            if not quiet:
                import traceback
                print "url was: %s" % url
            result = "[]"
        return result

    def user(self, username):
        return User(self, username)

    def repository(self, username, slug):
        return Repository(self, username, slug)
    def new_repository(self,name,**data):
        """Create a new repository with the given name
           for the authenticated user.
           Return a Repository object
        url = api_base + 'repositories/'
        data['name'] = name
        response = json.loads(self.load_url(url,data=data))
        if 'slug' in response:
            return self.repository(self.username,response['slug'])
    def remove_repository(self,slug):
        """Given a slug, remove a repository from the 
           authenticated user"""
        url = api_base + 'repositories/%s/%s' % (self.username,slug)
        method = 'DELETE'
        return True
    def emails(self):
        """Returns a list of configured email addresses for the authenticated user."""
        url = api_base + 'emails/'
        return json.loads(self.load_url(url))

    def __repr__(self):
        extra = ''
        if all((self.username, self.password)):
            extra = ' (auth: %s)' % self.username
        return '<BitBucket API%s>' % extra

class User(object):
    """API encapsulation for user related bitbucket queries."""
    def __init__(self, bb, username): = bb
        self.username = username

    def followers(self):
        url = api_base + 'users/%s/followers/' % self.username
        return json.loads(

    def repository(self, slug):
        return Repository(, self.username, slug)

    def repositories(self):
        user_data = self.get()
        return user_data['repositories']

    def events(self, start=None, limit=None):
        query = smart_encode(start=start, limit=limit)
        url = api_base + 'users/%s/events/' % self.username
        if query:
            url += '?%s' % query
        return json.loads(

    def get(self):
        url = api_base + 'users/%s/' % self.username
        return json.loads(

    def __repr__(self):
        return '<User: %s>' % self.username

class Repository(object):
    def __init__(self, bb, username, slug): = bb
        self.username = username
        self.slug = slug
        self.base_url = api_base + 'repositories/%s/%s/' % (self.username, self.slug)

    def get(self):
        return json.loads(

    def changeset(self, revision):
        """Get one changeset from a repos."""
        url = self.base_url + 'changesets/%s/' % (revision)
        return json.loads(

    def changesets(self, limit=None):
        """Get information about changesets on a repository."""
        url = self.base_url + 'changesets/'
        query = smart_encode(limit=limit)
        if query: url += '?%s' % query
        return json.loads(, quiet=True))

    def tags(self):
        """Get a list of tags for a repository."""
        url = self.base_url + 'tags/'
        return json.loads(

    def branches(self):
        """Get a list of branches for a repository."""
        url = self.base_url + 'branches/'
        return json.loads(

    def issue(self, number):
        return Issue(, self.username, self.slug, number)

    def issues(self, start=None, limit=None):
        url = self.base_url + 'issues/'
        query = smart_encode(start=start, limit=limit)
        if query: url += '?%s' % query
        return json.loads(

    def events(self):
        url = self.base_url + 'events/'
        return json.loads(

    def followers(self):
        url = self.base_url + 'followers/'
        return json.loads(

    def forks(self):
        """ This is a dirty hack to get the number of Forks/Queues by scraping by src page """
        url = '' % (username, reponame)
        response = urlopen(Request(url))
        content =
        # Dirty hack to find X in the first occurance of the string: "Forks/Queues (X)"
        start_search = content.find('Forks/Queues')
        open_paren_location = content.find('(', start_search)
        close_paren_location = content.find(')', start_search)
        num_forks = content[open_paren_location + 1:close_paren_location] # first part of the slice includes the paren
        return int(num_forks)

    def __repr__(self):
        return '<Repository: %s\'s %s>' % (self.username, self.slug)

class Issue(object):
    def __init__(self, bb, username, slug, number): = bb
        self.username = username
        self.slug = slug
        self.number = number
        self.base_url = api_base + 'repositories/%s/%s/issues/%s/' % (username, slug, number)

    def get(self):
        return json.loads(

    def followers(self):
        url = self.base_url + 'followers/'
        return json.loads(

    def __repr__(self):
        return '<Issue #%s on %s\'s %s>' % (self.number, self.username, self.slug)