Commits

Tobias Mueller committed 889f67d

Initial implementation of the rollover logic, there is loads left to be done

  • Participants
  • Parent commits 9887552

Comments (0)

Files changed (2)

+# You might want to install these dependencies like this:
+# pip install -E /tmp/gnupg --requirement=requirements.txt
+
+python-gnupg
+#!/usr/bin/env python
+
+import argparse
+from collections import namedtuple
+import logging
+import os
+import subprocess
+
+import gnupg
+from gnupg import _make_binary_stream
+
+from gpg_colon import GPGRecordLine
+from sendemail import send_email, MTText
+
+TEMPLATE = '''Hello {name},
+
+you signed my key {old_keyid} which I am now deprecating.
+I am rolling over to my new OpenPGP Key {new_keyid}.
+The new fingerprint is {new_fingerprint}.
+
+I have a attached a copy of my new key and signed it with my old key.
+
+You might want to sign the new key as well.
+
+Thanks!
+  {myname}
+'''
+
+log = logging.getLogger(__name__)
+
+class DummyResult(object):
+    def __init__(self, *args, **kwargs):
+        self.log = logging.getLogger('DummyResult')
+
+    def handle_status(self, *args, **kwargs):
+        self.log.debug("Handling status %s, %s", args, kwargs)
+        pass
+
+class AddUIDResult(object):
+    '''This is just some cruft lying around in case one needs that in the future
+    however, I don't think it's likely going to happen.
+    '''
+    def __init__(self, gpg, name, email, comment):
+        self.gpg = gpg
+        self.name = name
+        self.email = email
+        self.comment = comment
+
+    def handle_status(self, key, val):
+    
+        log.debug("Handling status %s, %s", key, val)
+        {'GET_LINE': '',
+        'GOT_IT': '',
+        'GOOD_PASSPHRASE': '', 
+        'GET_BOOL': '',
+        }
+        
+        linehandler = {
+            'keygen.name': self.name,
+            'keygen.email': self.email,
+            'keygen.comment': self.comment,
+            'keyedit.prompt': 'save', 
+            'keyedit.save.okay': 'y',
+        }
+        pass
+
+
+def _extract_name_email_from_uid(uid):
+        marker = uid.index('<')
+        name = uid[:marker-1] # Assuming that a uid looks like "foo bar <foo@bar.com>", this extracts the name
+        email = uid[marker+1:-1]
+        comment = ""
+        
+        return (name, email, comment)
+
+
+class OpenPGPKey(object):
+    GPG = 'gpg' # The binary to call
+    
+    def __init__(self, keyid, enforce_secret=True, extra_args=None):
+        self.keyid = keyid #FIXME: This needs to be normalised, because other parts depend on this keyid, however, it could be the whole fingerprint as well!
+        self.extra_args = extra_args or []
+        self.GnuPG = gnupg.GPG(gnupghome='/tmp/gnupg/test') # FIXME: Reset homedir
+
+        if enforce_secret:
+            print self.is_secret_key()
+    
+ 
+    def _build_gpg_args(self, args):
+        cmd = [self.GPG] + self.extra_args + args
+
+        return cmd
+        
+
+    def _call_gpg(self, args):
+        cmd = self._build_gpg_args(args)
+        log.debug('Calling GPG: %s', ' '.join(cmd))
+        output = subprocess.check_output(cmd)
+        
+        return output
+
+    @property
+    def fingerprint(self):
+        '''Returns the fingerprint of the current key'''
+        all_keys = self.GnuPG.list_keys(True)
+        try:
+            fpr = [k['fingerprint'] for k in all_keys
+                if k['keyid'].endswith(self.keyid)][0] # This is a fuzzy match, because python-gnupg would use 8 byte (represented as 16 hex characters) representations whereas the "normal" GnuPG is 4 byte.
+        except IndexError:
+            log.critical("Couldn't find fingerprint for key with id %s. "
+                "Maybe the id is wrong or bogosly formatted",
+                self.keyid)
+            log.info("The following keys are available:\n%s",
+                [(k['keyid'], k['fingerprint']) for k in all_keys])
+            raise
+
+        return fpr
+            
+ 
+    def is_secret_key(self, keyid=None):
+        '''Raises an exception is the key is not found by gpg as a secret key
+        
+        It basically calls gpg --list-secret-keys to determine the 
+        existence of a key.
+        The exception raised is subprocess.CalledProcessError.
+        '''
+        keyid = keyid or self.keyid
+        gpg_args = ['--list-secret-keys', keyid]
+        cmd = self._build_gpg_args(gpg_args)
+        ret = subprocess.check_call(cmd)
+        
+        return ret
+
+
+    def get_signatures(self):
+        '''Yields identities that signed this key
+        
+        It calls gpg and parses the output, which might look like
+        sig:::17:DC03BAA3D3492A2A:2005-03-30::::Tobias Mueller <4tmuelle@informatik.uni-hamburg.de>:18x:
+        
+        FIXME: Use gpg_colon many_from_colon_lines API
+        '''
+        gpg_args = ['--with-colons', '--list-sigs', self.keyid]
+        gpg_sigs = self._call_gpg(gpg_args)
+        
+        for line in gpg_sigs.splitlines():
+            fields = line.split(':')
+            if not fields[0] == 'sig':
+                log.debug('Parsing %s which is not a sig', fields)
+            else:
+                sig_line = GPGRecordLine(*fields)
+                log.debug('Got the following Signature Line: %s', sig_line)
+                
+                yield sig_line.userid
+
+  
+    def get_userids(self):
+        '''Yields identities that are on this key
+        
+        It calls gpg and parses the output, which might look like:
+        sec::1024:17:DC03BAA3D3492A2A:2005-03-30:2012-04-30:::Tobias Mueller <4tmuelle@informatik.uni-hamburg.de>::sca:
+        uid:::::2011-06-24::AA9937BF330C75548D95EF819B5BB8CD5BD9BF56::Tobias Mueller <muellet2@computing.dcu.ie>:
+        uid:::::2011-06-24::9C843455585DCECE3960F420652940B30AEABD2A::Tobias Mueller <tobias.mueller2@mail.dcu.ie>:
+        ssb::4096:16:8DF164965334DA32:2005-03-30:2009-05-18:::::e:
+        ssb::4096:16:C00A953897F50880:2009-05-27:2014-10-07:::::e:
+        '''
+        gpg_args = ['--with-colons', '--list-secret-keys', self.keyid]
+        gpg_sigs = self._call_gpg(gpg_args)
+        
+        for line in GPGRecordLine.many_from_colon_lines(gpg_sigs, filter=('sec', 'uid')):
+            yield line.userid
+
+
+    @classmethod
+    def generate_new_key(cls, uids):
+        '''Generates a new OpenPGP Key by calling gpg
+        
+        You need to provide a list of uids, the first UID in that list will be
+        the primary uid
+        
+        Lots of parameters are not influencable right now, such as the key type.
+        '''
+        primary_uid = uids.pop(0)
+        
+        keylen = 64 # FIXME: Pimp up to 4096
+        
+        inputlines = [
+            'Key-Type: RSA',
+            #'Name-Email: too',
+            'Key-Length: %d' % keylen,
+            'Name-Real: %s' % primary_uid, 
+            'Expire-Date: 5y',
+            'Subkey-Type: RSA',
+            'Subkey-Length: %d' % keylen,
+            '%echo Not setting any passphrase at all! Please take care about that',
+            '%no-protection',
+            #'%ask-passphrase',
+            '%commit',
+        ]
+        gpg_input = '\n'.join(inputlines)
+        GnuPG = gnupg.GPG(gnupghome='/tmp/gnupg/test') # This can not be self. because we are in a classmethod
+        '''newgpgkey = GnuPG.gen_key(gpg_input)
+        newkeyid = [k['keyid'] for k in newgpgkey.gpg.list_keys(True)
+            if k['fingerprint'] == newgpgkey.fingerprint][0]
+        '''
+        newkeyid = '515D4491' # FIXME: Reset
+        newkey = OpenPGPKey(newkeyid, extra_args=['--homedir','/tmp/gnupg/test'])
+        # add UIDs to key
+        for uid in uids:
+            newkey.add_uid(uid)
+        
+        return newkey
+        
+
+    def add_uid(self, uid):
+        '''Adds a new UID onto a key
+        
+        The UID needs to be in the very special form:
+        Some Name <email@address.com>
+        
+        where the important bits is the opening and closing bracket.
+        '''
+        result = DummyResult(self.GnuPG)
+        args = [
+            '--command-fd=0',
+            #'--yes',
+            #'--batch',
+            '--edit-key' , self.keyid, 'adduid']
+        # Eh, this is mean. The damn thing expects Name, EMail and Comment to be seperated
+        name, email, comment = _extract_name_email_from_uid(uid)
+        data = ''
+        data += "%s\n" % name
+        data += "%s\n" % email
+        data += "%s\n" % '' # comment
+        #data += "O\n"
+        data += "save\n"
+        data += "y\n"
+        fdata = _make_binary_stream(data, 'ascii')
+        
+        self.GnuPG._handle_io(args, fdata, result, passphrase=None)
+        
+        return result
+
+
+    def sign_key(self, otherkey):
+        '''Signs other key identified by "otherkey"
+        
+        please see GnuPG manual for the format of the otherkey specification.
+        
+        '''
+        result = DummyResult(self.GnuPG)
+        args = [
+            '--command-fd=0',
+            #'--yes',
+            #'--batch',
+            '--local-user' , self.keyid,
+            '--default-cert-level', '3',
+            '--sign-key', otherkey,
+        ]
+        data = ''
+        data += "y\n"
+        fdata = _make_binary_stream(data, 'ascii')
+        
+        self.GnuPG._handle_io(args, fdata, result, passphrase=None)
+        
+        return result
+        
+
+def rollover(keyid, template=TEMPLATE, newkeyid=None):
+    print keyid
+    try:
+        key = OpenPGPKey(keyid)
+    except subprocess.CalledProcessError:
+        log.error("The key id provided (%s) doesn't seem to exist. "
+            "Check the output of gpg --list-secret-keys for "
+            "available keys", keyid)
+        raise
+    
+    sigs = key.get_signatures() # We could make that a set to avoid duplicates
+    
+    log.debug('The new ID to use is %s', newkeyid)
+    if not newkeyid:
+        user_ids = key.get_userids()
+        user_ids = list(user_ids) #FIXME: do not list the generator
+        log.debug('Found the following UIDs on the key: %s', user_ids)
+        
+        # generate new key with found UIDs.
+        newkey = OpenPGPKey.generate_new_key(user_ids)
+        newkeyid = newkey.keyid
+
+    else:
+        try:
+            newkey = OpenPGPKey(newkeyid)
+        except subprocess.CalledProcessError:
+            log.error("The key id provided (%s) for the new key "
+                "doesn't seem to exist. "
+                "Check the output of gpg --list-secret-keys for "
+                "available keys", newkeyid)
+            raise
+    
+    # Sign the new key with the old
+    key.sign_key(newkeyid)
+    
+    # Have a general public message to upload somewhere
+    #name, email, comment = ('World', '', '')
+    
+    for sig in sigs:
+        name, email, comment = _extract_name_email_from_uid(sig)
+        log.debug("Got a signature from %s with email %s", name, email)        
+    
+        # Create the message from Template and sign with both keys
+        formatdict = dict(
+            old_keyid=key.keyid,
+            new_keyid = newkey.keyid,
+            new_fingerprint = newkey.fingerprint,
+            email = email,
+            name = name,
+            myname = 'Not yet implemented', # FIXME: we don't obtain the name of the key yet
+        )        
+        message = template.format(formatdict)
+        
+        log.debug("Constructing the following message: %s", message)
+
+        # Encrypt the message to everyone that signed the old key, maybe within the last two year or so
+        # We need to get all the keys *sigh*
+        # Or maybe not encrypt if it doesn't work
+        signed_message = newkey.GnuPG.sign(message).data
+        
+        if send_emails:
+            subject = "Moving to a new OpenPGP Key {}".format(formatdict)
+            from_address = my_email_address
+            send_email(email, subject, signed_message, fromaddress)
+
+    
+    pass
+
+
+def main(options):
+    keyid = options.keyid
+    
+    if options.write_template:
+        fname = keyid
+        if os.access(fname, os.F_OK):
+            raise RuntimeError('File "%s" exists, please delete and try'
+            ' again' % fname)
+        else:
+            with open(fname, 'w') as f:
+                f.write(TEMPLATE)
+    else:
+        if options.use_template:
+            log.debug('Reading template from %s', options.use_template)
+            template = open(options.use_template).read()
+        else:
+            template = TEMPLATE
+
+        return rollover(keyid, template=template, newkeyid=options.new_key_id)
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description='Migrate from one '
+        'OpenPGP Key to a new one.')
+    parser.add_argument('keyid',
+        help='Set the id of the key to migrate away from')
+    parser.add_argument('-t', '--use-template',
+        help='Use the template from file mentioned. See '
+        '--write-template to write out the built-in template')
+    parser.add_argument('-n', '--new-key-id',
+        help='Instead of generating a new key automatically, use this '
+        'key instead.')
+    parser.add_argument('-w', '--write-template',
+        action='store_true',
+        help='Write the template the file mentioned as keyid so that '
+        'you can customize the message')
+    parser.add_argument('--verbose', '-v', action='count')
+    args = parser.parse_args()
+
+
+    default_loglevel = logging.ERROR
+    debug_level = default_loglevel - (args.verbose or 0) * 10
+    logging.basicConfig(level=debug_level)
+    
+    main(args)