(obsolete)
#!/usr/bin/env python # # # Postfix SMTP Access Policy Delegation sctip based on: # # https://github.com/syrusakbary/validate_email # https://github.com/leoboiko/quotapolicy # # (c) 2016 Matthias Henze matthias@mhcsoftware.de # This code is made available to you under the GNU GPL v3. # # To hand over SMTP VERIFY to an other MTA by SMTP # # Do: # # mkdir -p /var/spool/postfix/var/run # # Run the script as root, it will daemonize: # # smtpverify -i IP_OF_TARGET_SERVER # # Add this to "smtpd_recipient_restrictions" in Postfix "main.cf" as first entry (!): # # check_policy_service unix:/var/run/smptverify.socket # # It should look like: # # smtpd_recipient_restrictions = # check_policy_service unix:/var/run/smptverify.socket # permit_mynetworks reject_unauth_destination # ${djigzo_rbl_clients} # ${djigzo_reject_unverified_recipient? reject_unverified_recipient} # # Do: # # postfix reload # # Now Ciphermail will ony accept Mail, accepted by the Mailserver. # # Reference: http://www.postfix.org/SMTPD_POLICY_README.html # # import daemon import SocketServer import sys import pwd import subprocess import os import signal import lockfile import argparse import atexit import re import smtplib import logging import socket # All we are really doing is comparing the input string to one # gigantic regular expression. But building that regexp, and # ensuring its correctness, is made much easier by assembling it # from the "tokens" defined by the RFC. Each of these tokens is # tested in the accompanying unit test file. # # The section of RFC 2822 from which each pattern component is # derived is given in an accompanying comment. # # (To make things simple, every string below is given as 'raw', # even when it's not strictly necessary. This way we don't forget # when it is necessary.) # WSP = r'[\s]' # see 2.2.2. Structured Header Field Bodies CRLF = r'(?:\r\n)' # see 2.2.3. Long Header Fields NO_WS_CTL = r'\x01-\x08\x0b\x0c\x0f-\x1f\x7f' # see 3.2.1. Primitive Tokens QUOTED_PAIR = r'(?:\\.)' # see 3.2.2. Quoted characters FWS = r'(?:(?:' + WSP + r'*' + CRLF + r')?' + \ WSP + r'+)' # see 3.2.3. Folding white space and comments CTEXT = r'[' + NO_WS_CTL + \ r'\x21-\x27\x2a-\x5b\x5d-\x7e]' # see 3.2.3 CCONTENT = r'(?:' + CTEXT + r'|' + \ QUOTED_PAIR + r')' # see 3.2.3 (NB: The RFC includes COMMENT here # as well, but that would be circular.) COMMENT = r'\((?:' + FWS + r'?' + CCONTENT + \ r')*' + FWS + r'?\)' # see 3.2.3 CFWS = r'(?:' + FWS + r'?' + COMMENT + ')*(?:' + \ FWS + '?' + COMMENT + '|' + FWS + ')' # see 3.2.3 ATEXT = r'[\w!#$%&\'\*\+\-/=\?\^`\{\|\}~]' # see 3.2.4. Atom ATOM = CFWS + r'?' + ATEXT + r'+' + CFWS + r'?' # see 3.2.4 DOT_ATOM_TEXT = ATEXT + r'+(?:\.' + ATEXT + r'+)*' # see 3.2.4 DOT_ATOM = CFWS + r'?' + DOT_ATOM_TEXT + CFWS + r'?' # see 3.2.4 QTEXT = r'[' + NO_WS_CTL + \ r'\x21\x23-\x5b\x5d-\x7e]' # see 3.2.5. Quoted strings QCONTENT = r'(?:' + QTEXT + r'|' + \ QUOTED_PAIR + r')' # see 3.2.5 QUOTED_STRING = CFWS + r'?' + r'"(?:' + FWS + \ r'?' + QCONTENT + r')*' + FWS + \ r'?' + r'"' + CFWS + r'?' LOCAL_PART = r'(?:' + DOT_ATOM + r'|' + \ QUOTED_STRING + r')' # see 3.4.1. Addr-spec specification DTEXT = r'[' + NO_WS_CTL + r'\x21-\x5a\x5e-\x7e]' # see 3.4.1 DCONTENT = r'(?:' + DTEXT + r'|' + \ QUOTED_PAIR + r')' # see 3.4.1 DOMAIN_LITERAL = CFWS + r'?' + r'\[' + \ r'(?:' + FWS + r'?' + DCONTENT + \ r')*' + FWS + r'?\]' + CFWS + r'?' # see 3.4.1 DOMAIN = r'(?:' + DOT_ATOM + r'|' + \ DOMAIN_LITERAL + r')' # see 3.4.1 ADDR_SPEC = LOCAL_PART + r'@' + DOMAIN # see 3.4.1 # A valid address will match exactly the 3.4.1 addr-spec. VALID_ADDRESS_REGEXP = '^' + ADDR_SPEC + '$' class DummyConfig: pass # filled by argparse below c = DummyConfig() devnull = open(os.devnull, 'w') def validate_email(email, ip, smtp_timeout=10): """Indicate whether the given string is a valid email address according to the 'addr-spec' portion of RFC 2822 (see section 3.4.1). Parts of the spec that are marked obsolete are *not* included in this test, and certain arcane constructions that depend on circular definitions in the spec may not pass, but in general this should correctly identify any email address likely to be in use as of 2011.""" try: assert re.match(VALID_ADDRESS_REGEXP, email) is not None try: smtp = smtplib.SMTP(timeout=smtp_timeout) smtp.connect(ip) status, _ = smtp.helo() if status != 250: smtp.quit() if c.debug: logger.debug(u'%s answer: %s - %s', ip, status, _) return None smtp.mail('') status, _ = smtp.rcpt(email) if status == 250: smtp.quit() return True if c.debug: logger.debug(u'%s answer: %s - %s', ip, status, _) smtp.quit() except smtplib.SMTPServerDisconnected: # Server not permits verify user if c.debug: logger.debug(u'%s disconected.', ip) except smtplib.SMTPConnectError: if c.debug: logger.debug(u'Unable to connect to %s.', ip) return None except AssertionError: return False except (socket.error) as e: if c.debug: logger.debug('ServerError or socket.error exception raised (%s).', e) return None return True class SmtpSocketHandler(SocketServer.StreamRequestHandler): def handle(self): recipient = None action = 'OK' while True: line = self.rfile.readline().strip() if line == '': break a = line.split('=') key = a[0] val = '='.join(a[1:]) if key == 'recipient': recipient = val break if recipient: ok = validate_email(recipient, c.ip) if not ok: action = 'REJECT ' \ + recipient \ + ' user unknowen' \ + ' (' + str(ok) + ')' if c.debug: logger.debug( u'%s: sending %s', recipient, action) self.wfile.write('action=' + action + "\n") self.wfile.write("\n") class ForkingUnixStreamServer( SocketServer.UnixStreamServer, SocketServer.ForkingMixIn ): pass def cleanup(signum=None, stack=None): if os.path.exists(c.sockpath): os.unlink(c.sockpath) if os.path.exists(c.pidpath): os.unlink(c.pidpath) # parse arguments p = argparse.ArgumentParser() p.add_argument('-i', '--ip', required=True, dest='ip', help='(Required) SMTP server IP', ) p.add_argument('-p', '--pidfile', default='/var/spool/postfix/var/run/smptverify.pid', dest='pidpath', help='Pidfile, relative to HOMEDIR (default: %(default)s )') p.add_argument('-s', '--socketfile', default='/var/spool/postfix/var/run/smptverify.socket', dest='sockpath', help='Socket file, relative to HOMEDIR (default: %(default)s )') p.add_argument('-d', '--recipient-delimiter', default='+', help='Set this to Postfix recipient_delimiter parameter') p.add_argument('--debug', action='store_true', help="Don't fork; print messages to stdout") c.__dict__.update(p.parse_args().__dict__) # check things atexit.register(cleanup) if os.path.exists(c.pidpath): if c.pidpath[0] == '/': abs_pidpath = c.pidpath sys.stderr.write("daemon seems to be running: %s exists\n" % abs_pidpath) sys.exit(1) # setup socket if os.path.exists(c.sockpath): os.unlink(c.sockpath) server = ForkingUnixStreamServer(c.sockpath, SmtpSocketHandler) os.chmod(c.sockpath, 0666) # setup daemonization context = daemon.DaemonContext() context.files_preserve=[server.fileno(), devnull] context.pidfile = lockfile.FileLock(c.pidpath) if c.debug: context.detach_process = False context.stdin = sys.stdin context.stdout = sys.stdout context.stderr = sys.stderr logger = logging.getLogger('smtpverify') ch = logging.StreamHandler() ch.setLevel(logging.ERROR) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) logger.setLevel(logging.DEBUG) else: logger = None context.detach_process = True with context: try: with open(c.pidpath, 'w') as p: # for start-stop-daemon etc. p.write(str(os.getpid())) server.serve_forever() finally: cleanup()