1. Richard Shea
  2. smtpErrorAnalysis


smtpErrorAnalysis / findBadAddresses.py

Allows a directory of email messages to be parsed for 'bounce messages'
and for those 'bounce messages' to be parsed for details which will 
allow the problems to be analysed.

Particular focus on emails bounced due to sender having used an invalid

import os
import email
import csv
import re
import pprint
ERR1 = "Found zero email addresses so don't know what to do" 
ERR2 = "Found more than one email address so don't know what to do [%s]"
class FindBadAddExcptn(Exception):
    '''Base class for errors in this script.'''
    def __init__(self, value):
        super(FindBadAddExcptn, self).__init__(value)
        self.value = value
    def __str__(self):
        return repr(self.value)

def strip_line_feeds(string):
    Return the input string with CRLF
    characters removed
    string = string.replace("\r","")
    string = string.replace("\n","")
    string = string.strip()
    return string

def find_email(instr):
    Given a string searches for all email addresses contained
    within the string. We assume:

    * At least email address will be found
    * All addresses found will be identical

    If this is so the email address found will be returned.
    If this is not so errors are raised

    #Following regex found at : http://stackp.online.fr/?p=19
    email_pattern = re.compile('([\w\-\.]+@(\w[\w\-]+\.)+[\w\-]+)')
    results = []
    #Put all email addresses found into a list. The resulting list
    #should contain one or more identical email addresses
    for match in email_pattern.findall(instr):

    if len(results) == 0: 
        raise FindBadAddExcptn(ERR1)
    elif len(results) > 1:
        for email_address in results[1:]:
            if email_address != results[0]:  
                raise FindBadAddExcptn( ERR2 % pprint.pformat(results))

    return results[0]

def remove_rfc_notation(email_to_be_cleaned):
    Given a string which contains an email address in oe of the two 
    following formats

    * a@foo.bar
    * rfc:a@foo.bar

    this function will return a@foo.bar
    l_em_to_be_clnd = email_to_be_cleaned.split(';')
    if len(l_em_to_be_clnd) == 0:
        return l_em_to_be_clnd[0]
        return l_em_to_be_clnd[1]

def parse_email_for_del_stat_part(file_name, path_em_file, csv_dict_wrtr):
    Given the text of a SMTP 'bounce message' writes a CSV row 
    to match the headers in the global variable HDR_OUTPUT_COLS.

    It does this by finding the 'message/delivery-status' part of 
    the entire email and parsing the headers.

    An 'message/delivery-status' part of a 'bounce email' looks a 
    little like this :

    Content-Description: Delivery report
    Content-Type: message/delivery-status

    Reporting-MTA: dns; a.b.web              
    X-Postfix-Queue-ID: 808F17F8080
    X-Postfix-Sender: rfc822; someone@c.d.web
    Arrival-Date: Tue,  8 May 2012 16:30:12 -0700 (PDT)

    Final-Recipient: rfc822; john.smith@e.web
    Original-Recipient: rfc822;john.smith@e.web
    Action: failed
    Status: 5.0.0
    Remote-MTA: dns; smtp.e.web
    Diagnostic-Code: smtp; 550 <john.smith@e.web>, Recipient unknown
    ==END  =====================================================

    NB: All sorts of assumptions are made about the structure of the 
    bounce message which seem to hold true for a large sample I have 
    used in testing but it seems likely that somewhere there are 'bounce
    messages' which follow different conventions. In particular I suspect
    that were the original email message to be something other than a two
    part multipart email message there might be problems

    print "About to process : %s" % file_name
    em_file = file(path_em_file)
    em_msg = email.message_from_string(em_file.read())
        #Get the second email.message.Message 
        #from the list of email.message.Message
        #in the email message contained in em_file
        em_msg_dlv_status = em_msg.get_payload()[1]
    except IndexError:
        print "File %s is not a recognised format [a]" % file_name
        #Convert the generator of email.message.Message objects returned
        #by .walk() to a list of email.message.Message
        lst_em_msg_dlv_status = []
            for elem in em_msg_dlv_status.walk():
        except AttributeError:
            print "File %s is not a recognised format [b]" % file_name
            #Get the message's field headers and values
            lst_hdrs_kv_pairs = lst_em_msg_dlv_status[2].items()
            #Populate the dic_diag dictionary using the header
            #names as keys and the header values of element values
            dic_diag = {}
            for hdr_kv in lst_hdrs_kv_pairs:
                hdr_name = hdr_kv[0].upper()
                hdr_val = hdr_kv[1]
                hdr_val = hdr_val.replace("\r","")
                hdr_val = hdr_val.replace("\n","")
                if dic_diag.has_key(hdr_name):
                    raise FindBadAddExcptn(
                            ERR1 % pprint.pformat(lst_hdrs_kv_pairs))
                    dic_diag[hdr_name] = hdr_val 
            #Add a couple of non-header derived values
            dic_diag['SOURCE-FILENAME'] = file_name
            dic_diag['HUM-READ-EMAIL-ADDR'] = \
            #Write the dictionary as a CSV row

def build_ignore_list():
    Returns a hard-coded list of file names which will be ignored
    in subsequent processing

    This is not currently used but is left in place as it supports        
    the 'ignore me' structure which is in place    
    lst = []
    return lst

def main():
    The main() function

    Needs work in order that the location of email files to be parsed
    and the location of output files may be specificed via command
    line params
    lst_files_to_ignore = build_ignore_list() 
    path = 'C:/usr/rshea/mytemp/20110609/NZLPProblemEmails-20120510/'
    listing = os.listdir(path)

    #Create a csv.DictWriter to write output to
    csv_dict_wrtr = csv.DictWriter( \
            open('NZLP-bademailaddresses-headers-20120510.csv', 'wb'), \
            HDR_OUTPUT_COLS, \
            restval='N/A', \

    #Write the initial headers
    csv_dict_wrtr.writerow(dict(zip(HDR_OUTPUT_COLS, HDR_OUTPUT_COLS)))

    #Process each file in turn
    for in_file_name in listing:
        if in_file_name in lst_files_to_ignore:
            in_file_path = "%s/%s" % (path, in_file_name)
            parse_email_for_del_stat_part(  in_file_name, 

if __name__ == "__main__":