smtpErrorAnalysis / smtpErrorAnalysis /

Full commit
Allows a directory of email messages to be parsed for 'bounce messages'
and for those 'bounce messages' to be parsed for details which will 
allow the problems to be analysed.

Particular focus on emails bounced due to sender having used an invalid

    Usage: [options] is used to parse a set of files  which represent the
    'inbox' of an email account  and consider those email messages which are
    'bounceback' emails sent by SMTP servers who have found it impossible to
    deliver emails sent by the owner of the 'inbox'.   Command line options
    specify the location of the 'inbox'and where output should be written to.

      -h, --help            show this help message and exit
      -i INBOX, --inbox=INBOX
                            Location of INBOX
      -o PATH, --outpath=PATH
                            PATH to output csv file
      -v, --verbose         Show each file processed

import os
import email
import csv
import re
import pprint
from optparse import OptionParser
ERR1 = "Found zero email addresses so don't know what to do" 
ERR2 = "Found more than one email address so don't know what to do [%s]"
class FindBadAddExcptn(Exception):
    '''Base class for errors in this script.'''
    def __init__(self, value):
        super(FindBadAddExcptn, self).__init__(value)
        self.value = value
    def __str__(self):
        return repr(self.value)

def strip_line_feeds(string):
    Return the input string with CRLF
    characters removed
    string = string.replace("\r","")
    string = string.replace("\n","")
    string = string.strip()
    return string

def find_email(instr):
    Given a string searches for all email addresses contained
    within the string. We assume:

    * At least email address will be found
    * All addresses found will be identical

    If this is so the email address found will be returned.
    If this is not so errors are raised

    #Following regex found at :
    email_pattern = re.compile('([\w\-\.]+@(\w[\w\-]+\.)+[\w\-]+)')
    results = []
    #Put all email addresses found into a list. The resulting list
    #should contain one or more identical email addresses
    for match in email_pattern.findall(instr):

    if len(results) == 0: 
        raise FindBadAddExcptn(ERR1)
    elif len(results) > 1:
        for email_address in results[1:]:
            if email_address != results[0]:  
                raise FindBadAddExcptn( ERR2 % pprint.pformat(results))

    return results[0]

def remove_rfc_notation(email_to_be_cleaned):
    Given a string which contains an email address in oe of the two 
    following formats

        * ````
        * ````

    this function will return ````
    l_em_to_be_clnd = email_to_be_cleaned.split(';')
    if len(l_em_to_be_clnd) == 0:
        return l_em_to_be_clnd[0]
        return l_em_to_be_clnd[1]

def parse_email_for_del_stat_part(file_name, path_em_file, 
                                    csv_dict_wrtr, options):
    Given the text of a SMTP 'bounce message' writes a CSV row 
    to match the headers in the global variable HDR_OUTPUT_COLS.

    It does this by finding the 'message/delivery-status' part of 
    the entire email and parsing the headers.

    An 'message/delivery-status' part of a 'bounce email' looks a 
    little like this ::

        Content-Description: Delivery report
        Content-Type: message/delivery-status

        Reporting-MTA: dns; a.b.web              
        X-Postfix-Queue-ID: 808F17F8080
        X-Postfix-Sender: rfc822; someone@c.d.web
        Arrival-Date: Tue,  8 May 2012 16:30:12 -0700 (PDT)

        Final-Recipient: rfc822; john.smith@e.web
        Original-Recipient: rfc822;john.smith@e.web
        Action: failed
        Status: 5.0.0
        Remote-MTA: dns; smtp.e.web
        Diagnostic-Code: smtp; 550 <john.smith@e.web>, Recipient unknown

    NB: All sorts of assumptions are made about the structure of the 
    bounce message which seem to hold true for a large sample I have 
    used in testing but it seems likely that somewhere there are 'bounce
    messages' which follow different conventions. In particular I suspect
    that were the original email message to be something other than a two
    part multipart email message there might be problems

    if options.verbose:
        print "About to process : %s" % file_name
    em_file = file(path_em_file)
    em_msg = email.message_from_string(
        #Get the second email.message.Message 
        #from the list of email.message.Message
        #in the email message contained in em_file
        em_msg_dlv_status = em_msg.get_payload()[1]
    except IndexError:
        print "File %s is not a recognised format [a]" % file_name
        #Convert the generator of email.message.Message objects returned
        #by .walk() to a list of email.message.Message
        lst_em_msg_dlv_status = []
            for elem in em_msg_dlv_status.walk():
        except AttributeError:
            print "File %s is not a recognised format [b]" % file_name
            #Get the message's field headers and values
            lst_hdrs_kv_pairs = lst_em_msg_dlv_status[2].items()
            #Populate the dic_diag dictionary using the header
            #names as keys and the header values of element values
            dic_diag = {}
            for hdr_kv in lst_hdrs_kv_pairs:
                hdr_name = hdr_kv[0].upper()
                hdr_val = hdr_kv[1]
                hdr_val = hdr_val.replace("\r","")
                hdr_val = hdr_val.replace("\n","")
                if dic_diag.has_key(hdr_name):
                    raise FindBadAddExcptn(
                            ERR1 % pprint.pformat(lst_hdrs_kv_pairs))
                    dic_diag[hdr_name] = hdr_val 
            #Add a couple of non-header derived values
            dic_diag['SOURCE-FILENAME'] = file_name
            dic_diag['HUM-READ-EMAIL-ADDR'] = \
            #Write the dictionary as a CSV row

def build_ignore_list():
    Returns a hard-coded list of file names which will be ignored
    in subsequent processing

    This is not currently used but is left in place as it supports        
    the 'ignore me' structure which is in place    
    lst = []
    return lst

def parse_args():
    Parses command line arguments using OptionParser.
    Applies validation rules to arguments and then, if OK
    returns them in a 'dictionary like' object ``options``

    desc = "%prog is used to parse a set of files \n" + \
    "which represent the 'inbox' of an email account \n" + \
    "and consider those email messages which are 'bounceback'\n" + \
    "emails sent by SMTP servers who have found it impossible\n" + \
    "to deliver emails sent by the owner of the 'inbox'.\n" + \
    "\n\n" + \
    "Command line options specify the location of the 'inbox'" + \
    "and where output should be written to." 

    usage_inner = "Usage: %s [options]"
    usage = usage_inner % "%prog"

    parser = OptionParser(description=desc, usage=usage)
    parser.add_option(  "-i", "--inbox", action="store",  dest="inbox", 
                        metavar="INBOX", help="Location of INBOX")
    parser.add_option(  "-o", "--outpath", action="store", dest="outpath",
                        metavar="PATH", help="PATH to output csv file")
    parser.add_option(  "-v", "--verbose", action="store_true", 
                        dest="verbose", help="Show each file processed")

    (options, args) = parser.parse_args()

    if (options.inbox is None) and (options.outpath is None):   
    elif not os.path.exists(options.inbox):
        parser.error('inbox location does not exist')
    elif not os.path.exists(os.path.dirname(options.outpath)):
        parser.error('path to ouput location does not exist')

    return options

def main():
    The main() function


    options = parse_args()

    #Create a csv.DictWriter to write output to
    csv_dict_wrtr = csv.DictWriter( \
            open(options.outpath, 'wb'), \
            HDR_OUTPUT_COLS, \
            restval='N/A', \

    #Write the initial headers
    csv_dict_wrtr.writerow(dict(zip(HDR_OUTPUT_COLS, HDR_OUTPUT_COLS)))

    lst_files_to_ignore = build_ignore_list() 

    listing = os.listdir(options.inbox)

    #Process each file in turn
    for in_file_name in listing:
        if in_file_name in lst_files_to_ignore:
            in_file_path = "%s/%s" % (options.inbox, in_file_name)
            parse_email_for_del_stat_part(  in_file_name, 

if __name__ == "__main__":