palmagent / contact_fix.py

'''contact_fix.py -- batch edit google contacts
'''

import sys
import getpass
import getopt

# per
# http://code.google.com/apis/contacts/docs/1.0/developers_guide_python.html
#
# This is an old version of the API, but it's what comes with Ubuntu.
# Package: python-gdata
# Version: 1.2.4-0ubuntu2
# SHA1: e084331b46d3022cd827ae529e4ee13adb17aedb
#
# New one is:
# http://code.google.com/apis/contacts/docs/3.0/developers_guide_python.html
import atom
import gdata.base
import gdata.contacts
import gdata.contacts.service


def main(argv):
    optlist, args = getopt.getopt(argv[1:], 'ng:', ['dry-run', 'group='])
    acct = args[0]
    svc = ContactService(acct)
    svc.login(getpass.getpass())

    dry_run = False
    group = None
    for flag, val in optlist:
        if flag in ('-n', '--dry-run'):
            dry_run = True
        elif flag in ('g', '--group'):
            group = val

    if group:
        batch = svc.group_members(group)
    else:
        batch = svc.all_contacts()

    print >> sys.stderr, "batch candidates:", len(batch.entry)
    fix = chain_fixes([fix_contact_note, fix_phones])
    edits, results = svc.batch_edit(batch, fix, dry_run)
    for request, response in zip(edits.entry, results.entry):
        if response.batch_status.code != '200':
            print >> sys.stderr, "Failure in:", request.title.text
            print >> sys.stderr, response.batch_status.code, \
                  response.batch_status.reason


class ContactService(gdata.contacts.service.ContactsService):
    def __init__(self, email):
        gdata.contacts.service.ContactsService.__init__(self)
        self.email = email
        self.source = 'connolly-palmagent-2'

    def login(self, pw):
        self.password = pw
        self.ProgrammaticLogin()

    def _groups(self):
        return dict([(group.title.text, group)
                     for group in self.GetGroupsFeed().entry])

    def group_names(self):
        return sorted(self._groups().keys())

    def pick_one(self):
        query = gdata.contacts.service.ContactsQuery()
        query.max_results = 1
        feed = self.GetContactsFeed(query.ToUri())
        return iter(feed.entry).next()

    def all_contacts(self):
        allq = gdata.contacts.service.ContactsQuery()
        allq.max_results = 10000  # KLUDGE
        return self.GetContactsFeed(allq.ToUri())

    def group_members(self, name):
        g = self._groups()[name]
        in_group = gdata.contacts.service.ContactsQuery()
        in_group.group = g.id.text
        in_group.max_results = 10000  # KLUDGE
        return self.GetContactsFeed(in_group.ToUri())

    def batch_edit(self, before, edit_func, dry_run=False):
        # based on
        #http://code.google.com/p/gdata-python-client/wiki/UsingBatchOperations
        batch = gdata.base.GBaseItemFeed(atom_id=atom.Id(text='edit_contacts'))
        any_edits = False
        for entry in before.entry:
            updated = edit_func(entry)
            if updated:
                print >> sys.stderr, "Updating:", updated.title.text
                batch.AddUpdate(updated)
                any_edits = True

        print >> sys.stderr, "Updated entries:", len(batch.entry)
        if not dry_run:
            results = self.ExecuteBatch(batch,
                                  self.GetFeedUri(projection='full/batch'))
            return batch, results


def chain_fixes(fixes):
    def fix(x):
        any_fixes = False
        for f in fixes:
            after = f(x)
            if after:
                #print >> sys.stderr, "edit:", f.__name__, x.title.text
                x = after
                any_fixes = True
        if any_fixes:
            return x
        return None
    return fix


def fix_contact_note(c):
    if c.content and c.content.text:
        before = c.content.text
        after = _fix_note(c.content.text)
        if after:
            c.content.text = after
            return c
    return None


def _fix_note(txt):
    r'''
    >>> _fix_note(_testnote)
    'line 1\nline 2'
    '''
    lines = txt.split("\n")
    if lines:
        runon = lines[0]
        normal = lines[2:]
        if ''.join(normal) == runon:
            return '\n'.join(normal)
    return None

_testnote = '''line 1line 2

line 1
line 2'''


def fix_phones(c):
    '''Delete duplicate phone numbers +1-nnn where nnn is also present.
    '''
    any_edits = False
    before = c.phone_number[:]
    for i in xrange(len(c.phone_number) - 1, -1, -1):
        candidate = c.phone_number[i]
        for canonical in c.phone_number:
            if '+1-' + canonical.text == candidate.text:
                if canonical.rel != candidate.rel:
                    print >> sys.stderr, "rel mismatch:", (canonical.text,
                                                           canonical.rel,
                                                           candidate.rel)
                    break
                #print >> sys.stderr, "fix phones:", c.title.text
                #print >> sys.stderr, "numbers", [n.text for n in before]
                #print >> sys.stderr, "deleting at:", i
                del c.phone_number[i]
                any_edits = True
                break
    if any_edits:
        return c
    return None


if __name__ == '__main__':
    main(sys.argv)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.