#!/usr/bin/env python # ### WARNING: this script requires the dnspython project to be installed. ### visit http://www.dnspython.org for downloads. # # Very simple script to help remove bad email addresses from a given list. # It will stuff all culled addresses into a separate file for manual inspection # with a quick note why it was culled. # # Input to this script will come in the form of a CSV file separated by tabs. # Field 1 will be the email address, field 2 will be the first name and field 3 # will be the last name. # # Output format of the culledList will also be CSV w/ tab separation. Field 1 # will be the rejected address (last name, first name), and field 2 will be # the reason for rejection. (just a simple text string.) # # We use a couple of basic techniques. The first is to translate all characters to # lower case in the domain part of the address. Second, offline regex parsing to # cull the stupidly bad email addresses. Third, I will check to see if I can even # find an MX record for the domain. Fourth, it will try to connect to the MX server # and fifth attempt to send a quick reminder message if you specify a message file. # # A possible future expansion would be to try addresses that failed at part 5 with # the username portion in all lower case. (RFC 2821 states all recipient parts of # an address must be treated as case sensitive, but not all servers do.) # Another future expansion would be to embed a simple POP/IMAP client that can # check a specific address to look for bounce/spam/other block type messages and # further cull the list. Maybe that could be a separate program?? The # possibilities are almost unbounded. :) # # A further expansion would involve certain black-listed domains like pookmail, # mailinator, etc. to help further resolve the issue. No point in sending spam # to people who won't get it. # # an immediately desirable expansion is caching the results of domain tests. eg: # if we test a domain, say, aol.com, we cache that result so we don't waste # precious thread time checking it again. __author__ = "Nick Guy & Brian Guy" __license__ = "GPL" import sys,os,string,re,csv, smtplib, socket from optparse import OptionParser try: import dns.resolver except: print "You need the DNS Python library from http://www.dnspython.org" # Simple open() wrapper that checks for problems. Feel free to extend # to check permissions, etc. def openFile( fileName, mode, retCode ): try: infile = open( fileName, mode ) except IOError: fileMode = "" if mode == 'w': fileMode = "writing" elif mode == 'r': fileMode = "reading" else: fileMode = "special I/O" print "Can't open " + fileName + " for " + fileMode + "." sys.exit(retCode) # give back the file handle. return infile # Here we look to see if we can query a nameserver to get the MX # For a domain. If we see any error, we can safely assume the domain # listed is problematic without further explanation. def checkDomain( domain ): try: # Go with the first answer from the name server. # Future enhancement: determine which is primary MX # and go with that one. answers = dns.resolver.query(domain, 'MX') return str(answers[0]).split()[1] # python rocks right here. except: return False # Check here if the MX resolves to an IP. Any exceptions should # signify it's not worth further checking out. def checkMXResolve( mx ): try: answers = dns.resolver.query(mx, 'A') return str(answers[0]) except: return False # Simply checks to see if we get a valid (250) response code from # the MX host. If not, safely assume something is wrong and return # false. Note that this may be obviated by the code to fully send # a message. def checkMXHelo( mxip ): try: server = smtplib.SMTP(mxip) [code, response] = server.helo(socket.gethostname()) if( code == 250 ): return True else: return False except: return False # Function driver for all network based tests. Separating it out # should make it an ideal pthread entry point. Note that since # these tests are linear and dependent on prior results, finer thread # granulation isn't recommended. Too much work for too little benefit. def netTest( address ): (username, domain) = address.split('@', 2) # Have we seen this domain before? if( domainCache.has_key(str(domain)) ): return domainCache[str(domain)] mx = checkDomain( domain ) if( False == mx ): domainCache[str(domain)] = errorMessages[6] if( doVerbose == True ): print domain + ": " + errorMessages[6] return(errorMessages[6]) mxip = checkMXResolve( mx ) if( False == mxip ): domainCache[str(domain)] = errorMessages[7] if( doVerbose == True ): print domain + ": " + errorMessages[7] return(errorMessages[7]) heloCode = checkMXHelo( mxip ) if( False == heloCode ): domainCache[str(domain)] = errorMessages[8] if( doVerbose == True ): print domain + ": " + errorMessages[8] return(errorMessages[8]) ### Cascade additional network-based tests here. domainCache[str(domain)] = True return True # file handles and other file-scope stuff listFile = False culledFile = False outFile = False testFile = False # Option holders doNetTests = False doTestMessage = False doVerbose = False # address containers culledAddys = [] goodAddys = [] domainCache = {} addyCache = {} # Regex's for offline address examination regat = re.compile('@') regmultat = re.compile('@.*@') regnoaddr = re.compile('^\s*@') regdomain = re.compile('@[^.]+\.[^.]+') regscrub = re.compile('[^@]+$') # Error message listing errorMessages = [ "duplicate address..", "Multiple @ symbols", "Missing @ symbol", "FCC regulated domain (or otherwise blacklisted domain)", "Missing username", "Missing domain", "Domain either doesn't exist or no MX is listed.", "Domain MX doesn't resolve", "Non-responsive MX", "Validated via network test", "Validated via offline tests only." ] # Try to open the FCC scrub list (these are domains we CANNOT spam to legally.) scrubFile = openFile('scrublist.txt', 'r', 5) scrubItems = map(string.strip, scrubFile.readlines()) scrubFile.close() # build the command line option parser. Note that anything you expect to have # an option culled = Trueed to, do NOT set 'default=False|true'. This seems to corrupt # OptionParser() in a way that makes two options (or more) share the same # argument. Dunno why. Also, -h|--help is autogenerated ftw. parser = OptionParser() parser.add_option("-l", "--listfile", dest="listFileName", help="Name of file containing email addresses. If not specified, stdin is used.", metavar="FILE") parser.add_option("-c", "--culledfile", dest="culledFileName", help="Name of file to write bad addresses to. If not specified, stderr is used.", metavar="FILE") parser.add_option("-o", "--outfile", dest="outFileName", help="Name of file to write good addresses to. If not specified, stdout is used.", metavar="FILE") parser.add_option("-t", "--testmessage", dest="testFileName", help="Path to the test message to send as part of the diagnostic. Implies --network.", metavar="FILE") parser.add_option("-n", "--network", action="store_true", help="Enable non-spammy network-based tests.", default=False) parser.add_option("-v", "--verbose", action="store_true", help="Show verbose output while running tests.", default=False) (options, args) = parser.parse_args() # Validate command line input. Check files to be readable/writable accordingly. if ( options.listFileName != None ): listFile = openFile( options.listFileName, 'rb', 1 ) else: listFile = sys.stdin if( options.culledFileName != None ): culledFile = openFile( options.culledFileName, 'wb', 2 ) else: culledFile = sys.stderr if( options.outFileName != None ): outFile = openFile( options.outFileName, 'wb', 3 ) else: outFile = sys.stdout if( options.network != False ): doNetTests = True if( options.verbose != False ): doVerbose = True if( options.testFileName != None ): testFile = openFile( options.testFileName, 'rb', 4 ) # read in email addresses. addressReader = csv.reader(listFile, delimiter="\t") # main processing loop. Consider splitting this into the offline and online # loops so you can thread the online loop. for line in addressReader: address = line[0] # this is used to test presence of a single @ symbol. parts = address.split('@',2) # Have we seen this address before? Check the cache. if ( addyCache.has_key( str(address) ) ): culledAddys.append(line + [errorMessages[0]]) if( doVerbose == True ): print str(address) + ": " + errorMessages[0] # First process all the regex type offline stuff. # Note that this must come first for @@+ to not fail after the second test. elif (regmultat.search(address)): culledAddys.append(line + [errorMessages[1]]) addyCache[str(address)] = errorMessages[1] if( doVerbose == True ): print str(line) + ": " + errorMessages[1] elif (len(parts) != 2): addyCache[str(address)] = errorMessages[2] culledAddys.append(line + [errorMessages[2]]) if( doVerbose == True ): print str(line) + ": " + errorMessages[2] elif(parts[1] in scrubItems): addyCache[str(address)] = errorMessages[3] culledAddys.append(line + [errorMessages[3]]) if( doVerbose == True ): print str(line) + ": " + errorMessages[3] elif (regnoaddr.search(address)): addyCache[str(address)] = errorMessages[4] culledAddys.append(line + [errorMessages[4]]) if( doVerbose == True ): print str(line) + ": " + errorMessages[4] elif not (regdomain.search(address)): addyCache[str(address)] = errorMessages[5] culledAddys.append(line + [errorMessages[5]]) if( doVerbose == True ): print str(line) + ": " + errorMessages[5] ### Place additional regexes here.. ### # Network based tests: existing domain and connecting to MX. elif( doNetTests == True ): result = netTest(address) if( result == True ): addyCache[str(address)] = errorMessages[9] goodAddys.append(line + [errorMessages[9]]) if( doVerbose == True ): print str(line) + ": " + errorMessages[9] else: culledAddys.append(line + [result]) addyCache[str(address)] = result if( doVerbose == True ): print str(line) + ": " + result ### This might be a good point to differentiate between unobtrusive ### Network tests and the full test-message version. else: goodAddys.append(line + [errorMessages[10]]) addyCache[str(address)] = errorMessages[10] if( doVerbose == True ): print str(line) + ": " + errorMessages[10] # Write out CSV's for valid and culled email addresses. culledWriter = csv.writer(culledFile, delimiter="\t") outWriter = csv.writer(outFile, delimiter="\t") culledWriter.writerows(culledAddys) outWriter.writerows(goodAddys) sys.exit(0)