Source

bugzilla_bitbucket / create_issues.py

Full commit
"""
read in the xml, and create the issues.

issues API:
	http://confluence.atlassian.com/display/BITBUCKET/Issues

	    title: The title of the new issue.
	    content: The content of the new issue.
	    component: The component associated with the issue.
	    milestone: The milestone associated with the issue.
	    version: The version associated with the issue.
	    responsible: The username of the person responsible for the issue.
	    status: The status of the issue (new, open, resolved, on hold, invalid, duplicate, or wontfix).
	    kind: The kind of issue (bug, enhancement, or proposal).

    $ curl -d "title=Issue%20Title&content=Issue%20Content" \
        https://api.bitbucket.org/1.0/repositories/sarahmaddox/sarahmaddox/issues/

How to delete an issue:
	curl -X DELETE https://api.bitbucket.org/1.0/repositories/pygame/pygame/issues/3

Add --user username:password afterwards to authenticate, otherwise it does not work.


You probably need to set up the components, versions and other parts used in your bugzilla.



"""
import os,sys,urllib, pprint, glob, base64, getpass
import xml.etree.cElementTree as ElementTree


ATTACHMENT_BASE_URL = "http://www.pygame.org/old_bug_attachments/"
ISSUES_API_URL = "https://api.bitbucket.org/1.0/repositories/pygame/pygame/issues/"


issue_keys = "title content component milestone version responsible status kind".split()
valid_status = "new open resolved hold invalid duplicate wontfix".split()
valid_kind = "bug enhancement proposal".split()






def validate_issue(issue):
	for k in issue_keys:
		if k not in issue:
			raise ValueError("%s not in issue" % k)
	if issue.get('status') not in valid_status:
		raise ValueError(":%s: status is not one of %s" % (issue.get('status'), valid_status))
	if issue.get('kind') not in valid_kind:
		raise ValueError(":%s: kind is not one of %s" % (issue.get('kind'), valid_kind))

def create_issue(issue):
    validate_issue(issue)
    # create http post data from issue
    if 0:
        for k,v in issue.items():
            print "k", k
            print v
            urllib.urlencode(dict(k=v))
        pprint.pprint(issue)
    issue_data = urllib.urlencode(issue)
    return issue_data


def test_create_issue():
	i = dict(title="just a test issue", content="some content to test", component="pygameissues", milestone="bitbucketmigration", version="1.9.2", responsible="illume", status="new", kind="bug")
	print create_issue(i)
#test_create_issue()



# read bugzilla xml data.
def create_bugzilla_dicts():

    # convert xml to a dict.
    #http://code.activestate.com/recipes/410469-xml-as-dictionary/
    class XmlListConfig(list):
        def __init__(self, aList):
            for element in aList:
                if element:
                    # treat like dict
                    if len(element) == 1 or element[0].tag != element[1].tag:
                        self.append(XmlDictConfig(element))
                    # treat like list
                    elif element[0].tag == element[1].tag:
                        self.append(XmlListConfig(element))
                elif element.text:
                    text = element.text.strip()
                    if text:
                        self.append(text)
    class XmlDictConfig(dict):
        '''
        Example usage:

        >>> tree = ElementTree.parse('your_file.xml')
        >>> root = tree.getroot()
        >>> xmldict = XmlDictConfig(root)

        Or, if you want to use an XML string:

        >>> root = ElementTree.XML(xml_string)
        >>> xmldict = XmlDictConfig(root)

        And then use xmldict for what it is... a dict.
        '''
        def __init__(self, parent_element):
            if parent_element.items():
                self.update(dict(parent_element.items()))
            for element in parent_element:
                if element:
                    # treat like dict - we assume that if the first two tags
                    # in a series are different, then they are all different.
                    if len(element) == 1 or element[0].tag != element[1].tag:
                        aDict = XmlDictConfig(element)
                    # treat like list - we assume that if the first two tags
                    # in a series are the same, then the rest are the same.
                    else:
                        # here, we put the list in dictionary; the key is the
                        # tag name the list elements all share in common, and
                        # the value is the list itself 
                        aDict = {element[0].tag: XmlListConfig(element)}
                    # if the tag has attributes, add those to the dict
                    if element.items():
                        aDict.update(dict(element.items()))
                    self.update({element.tag: aDict})
                # this assumes that if you've got an attribute in a tag,
                # you won't be having any text. This may or may not be a 
                # good idea -- time will tell. It works for the way we are
                # currently doing XML configuration files...
                elif element.items():
                    self.update({element.tag: dict(element.items())})
                # finally, if there are no child tags and no attributes, extract
                # the text
                else:
                    self.update({element.tag: element.text})




    #convert the bugzilla data to an issue.
    files = glob.glob("xml/*")
    bugs = {}
    for f in files:

        tree = ElementTree.parse(f)
        xmldict = XmlDictConfig(tree.getroot())

        # create a list of long_desc since the xml2dict func does not work correctly here.
        xmldict['bug']['long_desc'] = []
        xmldict['bug']['attachment'] = []
        for x in tree.getroot().getiterator():
            #print x.tag
            if x.tag == 'long_desc':
                xmldict['bug']['long_desc'].append(XmlDictConfig(x))
            if x.tag == 'attachment':
                xmldict['bug']['attachment'].append(XmlDictConfig(x))
            if x.tag == 'data':
                #print XmlDictConfig(x)
                xmldict['bug']['attachment'][-1]['data'] = XmlDictConfig(x)
                xmldict['bug']['attachment'][-1]['data']['text'] = x.text



        bugs[int(xmldict['bug']['bug_id'])] = xmldict['bug']

    return bugs




def extractInfo(bugs):
    """ This gets information from the bugzilla bugs.

        Things like all the versions used, all the components etc.

        This is so you can set them up in bitbucket before you do the import.
    """
    info = dict(bug_status = set([]), 
                version = set([]), 
                component= set([]), 
                product= set([]), 
                op_sys= set([]), 
                rep_platform= set([]), 
                priority= set([]), 
                bug_severity= set([]), 
                assigned_to = set([]), 
                target_milestone= set([])
                )
    for bug_id, b in bugs.items():
        for k in info.keys():
            if k in b:
                if k == "assigned_to":
                    if 'name' in b[k]:
                        info[k].add(b[k]['name'])
                    else:
                        info[k].add(b[k])
                else:
                    info[k].add(b[k])

    return info







class ConvertBug(object):
    """ For each issue field, there is a method with the same name.

        The method takes a bug as input, and outputs the field value.
    """
    def convert(self, bug):
        """ converts a bugzilla bug into a bitbucket issue.
        """
        i = dict(title="just a test issue", 
                 content="some content to test", 
                 component="pygameissues", 
                 milestone="bitbucketmigration", 
                 version="1.9.2", 
                 responsible="illume", 
                 status="new", 
                 kind="bug")

        for k in i.keys():
            i[k] = getattr(self, k)(bug)
        return i

    def title(self, bug):
        return bug['short_desc']

    def content(self, bug):
        # concat each long_desc together.
        content = u""
        for long_desc in bug['long_desc']:
            #pprint.pprint( long_desc )
            c = u"\n\n"
            c += u"== %s, %s\n\n{{{\n%s\n}}}\n\n----\n" % (long_desc['who']['name'], 
                                                           long_desc['bug_when'],
                                                           long_desc['thetext'])
            # we create links to attachments, since the bitbucket API can not add attachments.

            if 'attachid' in long_desc:
                c += "Attachments:\n"
                for attachment in bug['attachment']:
                    if long_desc['attachid'] == attachment['attachid']:
                        c += "[[%s| %s]]\n" % ("/".join([ATTACHMENT_BASE_URL.rstrip("/"), 
                                        attachment['attachid'], 
                                        urllib.quote(attachment['filename'])]), 
                                        urllib.quote(attachment['filename']))
            content += c
        content = content.encode('utf8')
        return content

    def component(self, bug):

        if bug['component'] == '(Miscellaneous)':
            return 'other'
        if bug['component'].startswith('pygame.'):
            return bug['component'].replace('pygame.', '')

        return ''

    def milestone(self, bug):
        'NOTE: empty, since we did not use milestones'
        return ''

    def version(self, bug):
        return bug['version']

    def responsible(self, bug):
        responsible = {'illume': 'illume',
               'Lenard Lindstrom': 'llindstrom'
               }
        if 'name' in bug['assigned_to']:
            assigned_to = bug['assigned_to']['name']
        else:
            assigned_to = bug['assigned_to']
         
        if assigned_to in responsible:
            return responsible[assigned_to]
        else:
            return 'illume'


    def status(self, bug):
        bugzilla_status = {'ASSIGNED': 'open', 
                           'CLOSED': 'resolved', 
                           'NEW' : 'new', 
                           'RESOLVED': 'resolved'}
        # "new open resolved hold invalid duplicate wontfix"
        return bugzilla_status[ bug['bug_status'] ]

    def kind(self, bug):
        return 'bug'

    def priority(self, bug):
        'NOTE: this is not supported by API yet'
        priorities = "Blocker Critical Major Minor Trivial".split()
        bugzilla_ps = ['P1', 'P2', 'P3', 'P4', 'P5']
        return priorities[ bugzilla_ps.index(bug['priority']) ]



def create_attachments(bugs):
    """ create_attachments(bugs) decodes the attachments and creates a 
          directory of files outputting the attachments in there
    """
    join = os.path.join
    def mkdir(s):
        #print 'making dir:%s:' % s
        return os.mkdir(s)

    if not os.path.exists('attachment'):
        mkdir('attachment')

    for bug_id, bug in bugs.items():
        #if "attachment" in bug and bug['attachment']:
        #    pprint.pprint(bug)
        #    raise

        for attachment in bug['attachment']:
            attach_path = join('attachment', str(attachment['attachid']))
            if not os.path.exists(attach_path):
                mkdir(attach_path)

            fname = join(attach_path, attachment['filename'])
            if not attachment['data']['encoding'] == 'base64':
                raise ValueError('unknown encoding')
            encoded_string = attachment['data']['text']
            decoded = base64.decodestring(encoded_string)

            f = open(fname, "wb")
            f.write(decoded)
            f.close()


def make_create_issue_curls(issues):
    username = "illume"
    password = getpass.getpass("Bitbucket password:\n")
    curl_commands = []
    for i, issue in enumerate(issues):
        issue_id = i -1

        issue_data = create_issue(issue)
        curl_cmd = 'curl -d "%s" %s --user %s:%s' % (issue_data, ISSUES_API_URL, username, password)
        curl_commands.append(curl_cmd)
    return curl_commands


bugs = create_bugzilla_dicts()

# sort the bugs via id.


# this stuff is useful for when you first start converting.
if 0:
    pprint.pprint( bugs[3] )
    info = extractInfo(bugs)
    pprint.pprint(info)

create_attachments(bugs)

issues = [ ConvertBug().convert(bug) for bug_id, bug in sorted(bugs.items())]
#pprint.pprint(issues[69-1])
#print(issues[69-1]['content'])

cmds = make_create_issue_curls(issues)
for c in cmds:
    print c