Commits

Dan Ross committed 5609acc

put the python-bitbucket files back

Comments (0)

Files changed (2)

bitbucket/__init__.py

+
+VERSION = "0.1"
+
+from api import *
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""Bitbucket API wrapper.  Written to be somewhat like py-github:
+
+https://github.com/dustin/py-github
+
+"""
+
+from urllib2 import Request, urlopen, URLError
+from urllib import urlencode
+from functools import wraps
+import datetime
+import time
+
+try:
+    import json
+except ImportError:
+    import simplejson as json
+
+__all__ = ['AuthenticationRequired', 'to_datetime', 'BitBucket']
+
+api_base = 'https://api.bitbucket.org/1.0/'
+api_toplevel = 'https://api.bitbucket.org/'
+
+class AuthenticationRequired(Exception):
+    pass
+
+def requires_authentication(method):
+    @wraps(method)
+    def wrapper(self, *args, **kwargs):
+        username = self.bb.username if hasattr(self, 'bb') else self.username
+        password = self.bb.password if hasattr(self, 'bb') else self.password
+        if not all((username, password)):
+            raise AuthenticationRequired("%s requires authentication" % method.__name__)
+        return method(self, *args, **kwargs)
+    return wrapper
+
+def smart_encode(**kwargs):
+    """Urlencode's provided keyword arguments.  If any kwargs are None, it does
+    not include those."""
+    args = dict(kwargs)
+    for k,v in args.items():
+        if v is None:
+            del args[k]
+    if not args:
+        return ''
+    return urlencode(args)
+
+def to_datetime(timestring):
+    """Convert one of the bitbucket API's timestamps to a datetime object."""
+    format = '%Y-%m-%d %H:%M:%S'
+    return datetime.datetime(*time.strptime(timestring, format)[:7])
+
+class BitBucket(object):
+    """Main bitbucket class.  Use an instantiated version of this class
+    to make calls against the REST API."""
+    def __init__(self, username='', password=''):
+        self.username = username
+        self.password = password
+        # extended API support
+
+    def build_request(self, url, data=None):
+        if not all((self.username, self.password)):
+            return Request(url,data)
+        auth = '%s:%s' % (self.username, self.password)
+        auth = {'Authorization': 'Basic %s' % (auth.encode('base64').strip())}
+        if data:
+            data = urlencode(data)
+        return Request(url, data, auth)
+
+    def load_url(self, url, quiet=False, method=None, data=None):
+        request = self.build_request(url,data)
+        if method:
+            request.get_method = lambda : method
+        try:
+            result = urlopen(request).read()
+        except:
+            if not quiet:
+                import traceback
+                traceback.print_exc()
+                print "url was: %s" % url
+            result = "[]"
+        return result
+
+    def user(self, username):
+        return User(self, username)
+
+    def repository(self, username, slug):
+        return Repository(self, username, slug)
+    
+    @requires_authentication
+    def new_repository(self,name,**data):
+        """Create a new repository with the given name
+           for the authenticated user.
+           Return a Repository object
+        """
+        url = api_base + 'repositories/'
+        data['name'] = name
+        response = json.loads(self.load_url(url,data=data))
+        if 'slug' in response:
+            return self.repository(self.username,response['slug'])
+        
+    @requires_authentication
+    def remove_repository(self,slug):
+        """Given a slug, remove a repository from the 
+           authenticated user"""
+        url = api_base + 'repositories/%s/%s' % (self.username,slug)
+        method = 'DELETE'
+        self.load_url(url,method=method)
+        return True
+        
+        
+    @requires_authentication
+    def emails(self):
+        """Returns a list of configured email addresses for the authenticated user."""
+        url = api_base + 'emails/'
+        return json.loads(self.load_url(url))
+
+    def __repr__(self):
+        extra = ''
+        if all((self.username, self.password)):
+            extra = ' (auth: %s)' % self.username
+        return '<BitBucket API%s>' % extra
+
+class User(object):
+    """API encapsulation for user related bitbucket queries."""
+    def __init__(self, bb, username):
+        self.bb = bb
+        self.username = username
+
+    def followers(self):
+        url = api_base + 'users/%s/followers/' % self.username
+        return json.loads(self.bb.load_url(url))
+
+    def repository(self, slug):
+        return Repository(self.bb, self.username, slug)
+
+    def repositories(self):
+        user_data = self.get()
+        return user_data['repositories']
+
+    def events(self, start=None, limit=None):
+        query = smart_encode(start=start, limit=limit)
+        url = api_base + 'users/%s/events/' % self.username
+        if query:
+            url += '?%s' % query
+        return json.loads(self.bb.load_url(url))
+
+    def get(self):
+        url = api_base + 'users/%s/' % self.username
+        return json.loads(self.bb.load_url(url))
+
+    def __repr__(self):
+        return '<User: %s>' % self.username
+
+class Repository(object):
+    def __init__(self, bb, username, slug):
+        self.bb = bb
+        self.username = username
+        self.slug = slug
+        self.base_url = api_base + 'repositories/%s/%s/' % (self.username, self.slug)
+
+    def get(self):
+        return json.loads(self.bb.load_url(self.base_url))
+
+    def changeset(self, revision):
+        """Get one changeset from a repos."""
+        url = self.base_url + 'changesets/%s/' % (revision)
+        return json.loads(self.bb.load_url(url))
+
+    def changesets(self, limit=None):
+        """Get information about changesets on a repository."""
+        url = self.base_url + 'changesets/'
+        query = smart_encode(limit=limit)
+        if query: url += '?%s' % query
+        return json.loads(self.bb.load_url(url, quiet=True))
+
+    def tags(self):
+        """Get a list of tags for a repository."""
+        url = self.base_url + 'tags/'
+        return json.loads(self.bb.load_url(url))
+
+    def branches(self):
+        """Get a list of branches for a repository."""
+        url = self.base_url + 'branches/'
+        return json.loads(self.bb.load_url(url))
+
+    def issue(self, number):
+        return Issue(self.bb, self.username, self.slug, number)
+
+    def issues(self, start=None, limit=None):
+        url = self.base_url + 'issues/'
+        query = smart_encode(start=start, limit=limit)
+        if query: url += '?%s' % query
+        return json.loads(self.bb.load_url(url))
+
+    def events(self, start=None, limit=None):
+        url = self.base_url + 'events/'
+        query = smart_encode(start=start, limit=limit)
+        if query: url += '?%s' % query
+        return json.loads(self.bb.load_url(url))
+
+    def followers(self):
+        url = self.base_url + 'followers/'
+        return json.loads(self.bb.load_url(url))
+
+    def forks(self):
+        """ This is a dirty hack to get the number of Forks/Queues by scraping the src page """
+        url = 'https://bitbucket.org/%s/%s/src/' % (self.username, self.slug)
+        response = urlopen(Request(url))
+        content = response.read()
+        # Dirty hack to find X in the first occurance of the string: "Forks/Queues (X)"
+        start_search = content.find('Forks/Queues')
+        open_paren_location = content.find('(', start_search)
+        close_paren_location = content.find(')', start_search)
+        num_forks = content[open_paren_location + 1:close_paren_location] # first part of the slice includes the paren
+        return int(num_forks)
+
+    def __repr__(self):
+        return '<Repository: %s\'s %s>' % (self.username, self.slug)
+
+class Issue(object):
+    def __init__(self, bb, username, slug, number):
+        self.bb = bb
+        self.username = username
+        self.slug = slug
+        self.number = number
+        self.base_url = api_base + 'repositories/%s/%s/issues/%s/' % (username, slug, number)
+
+    def get(self):
+        return json.loads(self.bb.load_url(self.base_url))
+
+    def followers(self):
+        url = self.base_url + 'followers/'
+        return json.loads(self.bb.load_url(url))
+
+    def __repr__(self):
+        return '<Issue #%s on %s\'s %s>' % (self.number, self.username, self.slug)
+
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.