Commits

"Fel...@schwarz.eu>"  committed 695ef1b

state is preserved when invalid arguments were given, check for helo hostname to be syntactically valid

  • Participants
  • Parent commits a66f9ec

Comments (0)

Files changed (3)

File pymta/command_parser.py

 class SMTPCommandParser(asynchat.async_chat):
     """This class handles only the actual communication with the client. As soon
     as a complete command is received, this class will hand everything over to
-    the SMTPProcessor.
+    the SMTPSession.
     
     In the original 'SMTPChannel' class from Python.org this class handled 
     all communication with asynchat, implemented a extremly simple state machine
 
     # -------------------------------------------------------------------------
     # Methods that call policy checks
-    
-
-    # SMTP and ESMTP commands
-    def smtp_HELO(self, arg):
-        print 'helo', repr(self._greeting)
-        if not arg:
-            self.push('501 Syntax: HELO hostname')
-            return
-        if self._greeting:
-            self.push('503 Duplicate HELO/EHLO')
-        else:
-            print 'sending ', '250 %s' % self._fqdn
-            self._greeting = arg
-            self.push('250 %s' % self._fqdn)
-    
-    def smtp_QUIT(self, arg):
-        # args is ignored
-        self.push('221 Bye')
-        self.close_when_done()
 
     def smtp_MAIL(self, arg):
         print '===> MAIL', arg

File pymta/session.py

 # -*- coding: UTF-8 -*-
 
+import re
 from sets import Set
 
 from repoze.workflow.statemachine import StateMachine, StateMachineError
 __all__ = ['SMTPSession']
 
 
+class InvalidParametersException(Exception):
+    pass
+
+
+# regular expression deliberately taken from
+# http://stackoverflow.com/questions/106179/regular-expression-to-match-hostname-or-ip-address#106223
+regex_string = r'^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z]|[A-Za-z][A-Za-z0-9\-]*[A-Za-z0-9])$'
+
+
 class SMTPSession(object):
     """The SMTPSession processes all input data which were extracted from 
     sockets previously. The idea behind is that this class is decoupled from 
-    asynchat as much as possible and make it really testable."""
+    asynchat as much as possible and make it really testable.
+    
+    The protocol parser will create a new session instance for every new 
+    connection so this class does not have to be thread-safe.
+    """
     
     def __init__(self, command_parser, policy=None):
         self._command_parser = command_parser
         self._command_arguments = None
         self._message = None
         
+        
+        self.hostname_regex = re.compile(regex_string, re.IGNORECASE)
+        
         self._build_state_machine()
         
     
             print base_msg % (smtp_command, name_handler_method)
             self.reply(451, 'Temporary Local Problem: Please come back later')
         else:
+            # Don't catch InvalidParametersException here - else the state would
+            # be moved forward. Instead the handle_input will catch it and send
+            # out the appropriate reply.
             handler_method()
+            
     
     # -------------------------------------------------------------------------
     
                 if len(allowed_transitions) > 0:
                       msg += ', expected on of %s' % allowed_transitions
                 self.reply(503, msg)
+        except InvalidParametersException:
+            self.reply(501, 'Syntactically invalid %s argument(s)' % smtp_command)
         self._command_arguments = None
     
     
     def reply(self, code, text):
         """This method returns a message to the client (actually the session 
         object is responsible of actually pushing the bits)."""
+        print 'code, text', code, text
         self._command_parser.push(code, text)
     
     
         self.reply(250, 'OK')
     
     def smtp_helo(self):
-        helo_string = self._command_arguments
-        self._message.smtp_helo = helo_string
-        primary_hostname = self._command_parser.primary_hostname
-        self.reply(250, primary_hostname)
+        helo_string = (self._command_arguments or '').strip()
+        valid_hostname_syntax = (self.hostname_regex.match(helo_string) != None)
+        if not valid_hostname_syntax:
+            raise InvalidParametersException(helo_string)
+        else:
+            self._message.smtp_helo = helo_string
+            primary_hostname = self._command_parser.primary_hostname
+            self.reply(250, primary_hostname)
     
     def smtp_mail_from(self):
         # TODO: Check for good email address!

File tests/basic_message_sending_test.py

         self.session = SMTPSession(command_parser=self.command_parser)
         self.session.new_connection('127.0.0.1', 4567)
     
-    def _send(self, command, data=None):
+    def _check_reply_code(self, code, reply_text, expect_failure=False):
+        first_code_digit = int(str(code)[0])
+        smtp_reply = "%s %s" % (code, reply_text)
+        if not expect_failure:
+            self.assertEqual(2, first_code_digit, smtp_reply)
+        else:
+            self.assertNotEqual(2, first_code_digit, smtp_reply)
+    
+    def _send(self, command, data=None, expect_failure=False):
         number_replies_before = len(self.command_parser.replies)
         self.session.handle_input(command, data)
         self.assertEqual(number_replies_before + 1, len(self.command_parser.replies))
         code, reply_text = self.command_parser.replies[-1]
-        self.assertEqual('2', str(code)[0], "%s %s" % (code, reply_text))
+        self._check_reply_code(code, reply_text, expect_failure=expect_failure)
+        return (code, reply_text)
     
     def _close_connection(self):
         self._send('quit')
         self.assertEqual('localhost Hello 127.0.0.1', reply_text)
         self._close_connection()
     
+    def test_noop_does_nothing(self):
+        self._send('noop')
+        self._close_connection()
+    
     def test_send_helo(self):
         self._send('helo', 'foo.example.com')
         self.assertEqual(2, len(self.command_parser.replies))
         self.assertEqual(250, code)
         self.assertEqual('localhost', reply_text)
         self._close_connection()
-    
-    def test_noop_does_nothing(self):
-        self._send('noop')
-        self._close_connection()
 
     def test_reject_duplicated_helo(self):
         self._send('helo', 'foo.example.com')
-        self.session.handle_input('helo', 'foo.example.com')
-        self.assertEqual(3, len(self.command_parser.replies))
-        code, reply_text = self.command_parser.replies[-1]
+        code, reply_text = self._send('helo', 'foo.example.com', 
+                                      expect_failure=True)
         self.assertEqual(503, code)
         expected_message = 'Command "helo" is not allowed here'
         self.assertTrue(reply_text.startswith(expected_message), reply_text)
         self._close_connection()
+    
+    def test_helo_without_hostname_is_rejected(self):
+        self._send('helo', expect_failure=True)
+        # But we must be able to send the right command here (state machine must
+        # not change)
+        self._send('helo', 'foo')
+    
+    def test_helo_with_invalid_arguments_is_rejected(self):
+        expect_invalid = lambda data: self.assertEqual(501, self._send('helo', data, expect_failure=True)[0])
+        expect_invalid('')
+        expect_invalid('  ')
+        expect_invalid(None)
+        expect_invalid('foo bar')
+        expect_invalid('foo_bar')
 
     def test_invalid_commands_are_recognized(self):
         self.session.handle_input('invalid')