Commits

Anonymous committed 30ccb49

simply validation schema by relying on pycerberus 0.5

  • Participants
  • Parent commits e28fabe

Comments (0)

Files changed (5)

 Changelog for pymta
 ===================
 
+dev
+- require pycerberus >= 0.5 to simplify some validation code
+
 0.5.2 (12.06.2010)
 - Fix bug - detect of command/message also when the terminator is sent in 
   multiple packages (fixes manual message submission with telnet)
 Changelog
 ******************************
 
+dev
+==================
+- require pycerberus >= 0.5 to simplify some validation code
+
 0.5.2 (12.06.2010)
 ==================
 - Fix bug - detect of command/message also when the terminator is sent in 

pymta/validation.py

 import re
 
 from pycerberus.i18n import _
-from pycerberus.schema import SchemaValidator
+from pycerberus.schemas import PositionalArgumentsParsingSchema
 from pycerberus.validators import EmailAddressValidator, IntegerValidator, \
     StringValidator
 
 # ------------------------------------------------------------------------------
 # General infrastructure
 
-
-class SMTPCommandArgumentsSchema(SchemaValidator):
-    
-    def __init__(self, *args, **kwargs):
-        self.super()
-        self.set_internal_state_freeze(False)
-        self.set_allow_additional_parameters(False)
-        self.set_parameter_order(getattr(self.__class__, 'parameter_order', ()))
-        self.set_internal_state_freeze(True)
+class SMTPCommandArgumentsSchema(PositionalArgumentsParsingSchema):
     
     def messages(self):
-        return {'additional_items': _('Syntactically invalid argument(s) %(additional_items)s')}
-    
-    def _parameter_names(self):
-        return list(self._parameter_order)
-    
-    def _assign_names(self, arguments, context):
-        parameter_names = self._parameter_names()
-        nr_missing_parameters = max(len(parameter_names) - len(arguments), 0)
-        nr_additional_parameters = max(len(arguments), len(parameter_names), 0)
-        arguments.extend([None] * nr_missing_parameters)
-        parameter_names.extend(['extra%d' % i for i in xrange(nr_additional_parameters)])
-        return dict(zip(parameter_names, arguments))
-    
-    def _parse_parameters(self, value, context):
-        arguments = []
-        if len(value) > 0:
-            arguments = re.split('\s+', value.strip())
-        return arguments
-    
-    def _map_arguments_to_named_fields(self, value, context):
-        return self._assign_names(self._parse_parameters(value, context), context)
-    
-    def set_parameter_order(self, parameter_names):
-        self._parameter_order = parameter_names
-    
-    def process(self, value, context=None):
-        fields = self._map_arguments_to_named_fields(value, context or {})
-        return self.super(fields, context=context)
+        return {'additional_item': _("Syntactically invalid argument(s) '%(additional_item)s'")}
+
+    def separator_pattern(self):
+        return '\s+'
 
 
 class SMTPEmailValidator(EmailAddressValidator):
         if string_value.startswith('<') or string_value.endswith('>'):
             match = re.search('^<(.+)>$', string_value)
             if match is None:
-                self.error('unbalanced_quotes', string_value, context)
+                self.raise_error('unbalanced_quotes', string_value, context)
             string_value = match.group(1)
         return string_value
 
             if len(option) == 2:
                 continue
             value = ''.join(option)
-            self.error('invalid_smtp_arguments', value, context, smtp_arguments=repr(input_string))
+            self.raise_error('invalid_smtp_arguments', value, context, smtp_arguments=repr(input_string))
     
     def _assert_only_known_extensions(self, key_value_pairs, input_string, context):
         for key, value in key_value_pairs:
             if key.lower() in self.fieldvalidators():
                 continue
             value = '='.join((key, value))
-            self.error('invalid_extension', value, context, smtp_extension=repr(input_string))
+            self.raise_error('invalid_extension', value, context, smtp_extension=repr(input_string))
     
     def _validate_extension_arguments(self, key_value_pairs, input_string, context):
         self._assert_all_options_have_a_value_assigned(key_value_pairs, input_string, context)
         self._assert_only_known_extensions(key_value_pairs, input_string, context)
     
-    def _assign_names(self, arguments, context):
+    def aggregate_values(self, parameter_names, arguments, context):
         if len(arguments) <= 1:
             return self.super()
-        
         key_value_pairs = map(lambda option: re.split('=', option, 1), arguments[1:])
+
         self._validate_extension_arguments(key_value_pairs, ' '.join(arguments[1:]), context)
         lower_case_key_value_pairs = map(lambda item: (item[0].lower(), item[1]), key_value_pairs)
         options = dict(lower_case_key_value_pairs)
-        parameters = self.super(arguments[:1], context)
-        options.update(parameters)
-        return options
+        
+        parameter_names = parameter_names + options.keys()
+        arguments = [arguments[0]] + options.values()
+        return parameter_names, arguments
     
     def _process_fields(self, fields, context):
         if len(fields) > 1 and not self.uses_esmtp(context):
-            self.error('no_extensions', '', context)
+            self.raise_error('no_extensions', '', context)
         return self.super()
 
 # ------------------------------------------------------------------------------
         try:
             return base64.decodestring(value)
         except:
-            self.error('invalid_base64', value, context)
+            self.raise_error('invalid_base64', value, context)
     
-    def _parse_parameters(self, value, context):
+    def split_parameters(self, value, context):
         match = re.search('=\s(.+)$', value.strip())
         if match is not None:
-            self.error('additional_items', value, context, additional_items=repr(match.group(1)))
+            self.raise_error('additional_item', value, context, additional_item=repr(match.group(1)))
         decoded_parameters = self._decode_base64(value, context)
         match = re.search('^([^\x00]*)\x00([^\x00]*)\x00([^\x00]*)$', decoded_parameters)
         if not match:
-            self.error('invalid_format', value, context)
-        items = list(match.groups())
-        if items[0] == '':
-            items[0] = None
-        return items
+            self.raise_error('invalid_format', value, context)
+        parameters = list(match.groups())
+        if parameters[0] == '':
+            parameters[0] = None
+        return parameters
 
     externally_defined_parameters= information_from_file(release_filename)
     
     setuptools.setup(
-          install_requires=['pycerberus >= 0.3.1'],
+          install_requires=['pycerberus >= 0.5dev'],
           
           # simple_super is not zip_safe
           zip_safe=False,

tests/validation_schema_test.py

     
     def test_bails_out_if_additional_parameters_are_passed(self):
         e = self.assert_raises(InvalidDataError, lambda: self.schema().process('fnord'))
-        self.assert_equals('Syntactically invalid argument(s) \'fnord\'', e.msg())
+        self.assert_equals("Syntactically invalid argument(s) 'fnord'", e.msg())
 
 
 class CommandWithSingleParameterTest(PythonicTestCase):