Commits

Alex Grönholm committed ad5849d

added Python 3.x compatibility, simplified setup.py

Comments (0)

Files changed (20)

+include COPYING.txt
+recursive-include docs *
 # -*- coding: utf-8 -*-
 
-# -- Import project information ------------------------------------------------
-
-import os
-
-def get_release_info():
-    release_info = {}
-    execfile(os.path.join('..', 'pymta', 'release.py'), release_info)
-    return release_info
-release_info = get_release_info()
-
-# -- General configuration -----------------------------------------------------
-
 # Add any Sphinx extension module names here, as strings. They can be extensions
 # coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
 extensions = ['sphinx.ext.autodoc', 'sphinx.ext.todo']
 master_doc = 'index'
 
 # General information about the project.
-project = release_info['name']
-copyright = release_info['_copyright']
+project = 'pymta'
+copyright = '2008-2011 Felix Schwarz'
 
 # The version info for the project you're documenting, acts as replacement for
 # |version| and |release|, also used in various other places throughout the
 # built documents.
 #
 # The short X.Y version.
-version = release_info['version']
+version = '0.5'
 # The full version, including alpha/beta/rc tags.
-release = version + ''
+release = '0.5dev'
 
 # List of directories, relative to source directory, that shouldn't be searched
 # for source files.
   ('index', 'pymta.tex', u'pymta Documentation',
    u'Felix Schwarz', 'manual'),
 ]
-
-
-# -- Add source directory to PYTHONPATH ----------------------------------------
-
-import os
-import sys
-
-# don't have to set PYTHONPATH before building the documentation
-source_dir = os.path.abspath('..')
-sys.path.insert(0, source_dir)
-
-# TODO: This does not work - zope.component is not found...
-# libdir = os.path.abspath(os.path.join('..', 'lib'))
-# import site
-# site.addsitedir(libdir)
-
-
-

pymta/command_parser.py

 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 # THE SOFTWARE.
 
-from Queue import Empty
 import re
 import socket
 
 from pymta.exceptions import SMTPViolationError
 from pymta.session import SMTPSession
 from pymta.statemachine import StateMachine
+from pymta.compat import Empty, basestring
 
 __all__ = ['SMTPCommandParser']
 
         """Send a multi-message to the peer (using the correct SMTP line 
         terminators (usually only called from the SMTPSession)."""
         for line in lines[:-1]:
-            answer = '%s-%s' % (str(code), str(line))
+            answer = '%s-%s' % (code, line)
             self.push(answer)
         self.push(code, lines[-1])
     
         """Send a message to the peer (using the correct SMTP line terminators
         (usually only called from the SMTPSession)."""
         if msg is None:
-            msg = code
+            msg = str(code)
         else:
-            msg = '%s %s' % (str(code), msg)
+            msg = '%s %s' % (code, msg)
         
         if not msg.endswith(self.LINE_TERMINATOR):
             msg += self.LINE_TERMINATOR
                     data = self._connection.recv(4096)
                 except socket.error:
                     raise ClientDisconnectedError()
-                if data == '':
+                if not data:
                     raise ClientDisconnectedError()
-                self._chatter.process_new_data(data)
+                self._chatter.process_new_data(data.decode('ascii'))
         except ClientDisconnectedError:
             if self.is_connected():
                 self.close()
             return
         assert self.is_connected()
         try:
-            self._connection.send(data)
-        except socket.error, e:
+            self._connection.send(data.encode('ascii'))
+        except socket.error:
             self.close()
             self._ignore_write_operations = True
 
 """This module provides a unified view on certain symbols that are not present 
 for all versions of Python."""
 
+import sys
+import base64
 
 __all__ = ['set']
 
 
-try:
+def print_(template, *args, **kwargs):
+    template = str(template)
+    if args:
+        template = template % args
+    elif kwargs:
+        template = template % kwargs
+    sys.stdout.writelines(template)
+
+
+if sys.version_info < (3, 0):  
+    basestring = basestring
+    binary = bytes = str
+    unicode = unicode
+    range = xrange
+    b = lambda x, encoding='iso-8859-1': x.encode(encoding) if isinstance(x, unicode) else str(x)
+    b64encode = lambda x: base64.b64encode(x)
+    b64decode = lambda x: base64.b64decode(x)
+    func_code = lambda func: func.im_func.func_code
+    dict_items = lambda dct: dct.items()
+    dict_keys = lambda dct: dct.keys()
+    dict_values = lambda dct: dct.values()
+    dict_iteritems = lambda dct: dct.iteritems()
+    dict_iterkeys = lambda dct: dct.iterkeys()
+    dict_itervalues = lambda dct: dct.itervalues()
+    from Queue import Queue, Full, Empty
+else:
+    basestring = str
+    binary = bytes = b = bytes
+    unicode = str
+    range = range
+    b = lambda x, encoding='iso-8859-1': x.encode(encoding) if isinstance(x, unicode) else bytes(x)
+    b64encode = lambda x: base64.b64encode(b(x)).decode('ascii')
+    b64decode = lambda x: base64.b64decode(b(x)).decode('ascii')
+    func_code = lambda func: func.__func__.__code__
+    dict_items = lambda dct: list(dct.items())
+    dict_keys = lambda dct: list(dct.keys())
+    dict_values = lambda dct: list(dct.values())
+    dict_iteritems = lambda dct: dct.items()
+    dict_iterkeys = lambda dct: dct.keys()
+    dict_itervalues = lambda dct: dct.values()
+    from queue import Queue, Full, Empty
+
+if sys.version_info < (2, 4):
+    from sets import Set as set
+else:
     set = set
-except NameError:
-    from sets import Set as set
-

pymta/lib/distribution_helpers.py

-# -*- coding: UTF-8 -*-
-# 
-# The MIT License
-# 
-# Copyright (c) 2010 Felix Schwarz <felix.schwarz@oss.schwarz.eu>
-# 
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-# 
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-# 
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-from distutils.command.build import build
-import os
-
-from setuptools.command.install_lib import install_lib
-
-
-__all__ = ['commands_for_babel_support', 'i18n_aware_commands']
-
-def is_babel_available():
-    try:
-        import babel
-    except ImportError:
-        return False
-    return True
-
-def commands_for_babel_support():
-    if not is_babel_available():
-        return {}
-    from babel.messages import frontend as babel
-    
-    extra_commands = {
-        'extract_messages': babel.extract_messages,
-        'init_catalog':     babel.init_catalog,
-        'update_catalog':   babel.update_catalog,
-        'compile_catalog':  babel.compile_catalog,
-    }
-    return extra_commands
-
-def module_for_filename(filename):
-    if filename.endswith('.py'):
-        filename = filename[:-len('.py')]
-    module_name = filename.replace(os.sep, '.')
-    package_name = module_name.split('.')[-1]
-    top_level_module = __import__(module_name)
-    module = getattr(top_level_module, package_name)
-    return module
-
-def information_from_module(module):
-    data = dict()
-    for symbol_name in dir(module):
-        value = getattr(module, symbol_name)
-        if not isinstance(value, basestring):
-            continue
-        data[symbol_name] = value
-    return data
-
-def information_from_file(filename):
-    data = dict()
-    if os.path.exists(filename):
-        execfile(filename, data)
-    else:
-        data = information_from_module(module_for_filename(filename))
-    is_exportable_symbol = lambda key: not key.startswith('_')
-    
-    externally_defined_parameters = dict()
-    for key, value in data.items():
-        if is_exportable_symbol(key):
-            externally_defined_parameters[key] = value
-    return externally_defined_parameters
-
-def i18n_aware_commands():
-    if not is_babel_available():
-        # setup(..., cmdclass={}) will use just the built-in commands
-        return dict()
-    
-    class i18n_build(build):
-        sub_commands = [('compile_catalog', None)] + build.sub_commands
-    
-    # before doing an 'install' (which can also be a 'bdist_egg'), compile the catalog
-    class i18n_install_lib(install_lib):
-        def run(self):
-            self.run_command('compile_catalog')
-            install_lib.run(self)
-    command_dict = dict(build=i18n_build, install_lib=i18n_install_lib)
-    command_dict.update(commands_for_babel_support())
-    return command_dict
-
-

pymta/lib/testcase.py

 #  - Martin Häcker <martin.haecker@agile42.com>
 
 from unittest import TestCase
+import sys
 
 from pymta.lib.simple_super import SuperProxy
+from pymta.compat import dict_iteritems
 
 
 __all__ = ['PythonicTestCase']
     def assert_raises(self, exception_type, callable, *args, **kwargs):
         try:
             callable(*args, **kwargs)
-        except exception_type, e:
-            return e
+        except exception_type:
+            return sys.exc_info()[1]
         # We want the same error message as assertRaises but we must not 
         # assume that callable is idempotent
         self.assertRaises(exception_type, lambda: None)
         raise self.failureException(message)
     
     def assert_dict_contains(self, expected_sub_dict, actual_super_dict):
-        for key, value in expected_sub_dict.items():
+        for key, value in dict_iteritems(expected_sub_dict):
             message = "%s:%s not in %s" % (repr(key), repr(value), repr(actual_super_dict))
             self.assert_contains(key, actual_super_dict, msg=message)
             self.assert_equals(value, actual_super_dict[key], msg=message)
 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 # THE SOFTWARE.
 
+from threading import Event
 import socket
-from threading import Event
 import time
 
 from pymta.command_parser import WorkerProcess
             except ImportError:
                 use_multiprocessing = False
         if not use_multiprocessing:
-            from Queue import Queue
+            from pymta.compat import Queue
         
         self._shutdown_server.clear()
         self._queue = Queue()

pymta/release.py

-# -*- coding: UTF-8 -*-
-"Release information about pymta."
-
-name = 'pymta'
-version = '0.6dev'
-description = 'library to build a custom SMTP server'
-long_description = '''
-pymta is a library to build a custom SMTP server in Python. This is useful if 
-you want to...
-
-* test mail-sending code against a real SMTP server even in your unit tests.
-* build a custom SMTP server with non-standard behavior without reimplementing 
-  the whole SMTP protocol.
-* have a low-volume SMTP server which can be easily extended using Python
-
-Changelog
-******************************
-
-dev
-==================
-- require pycerberus >= 0.5 to simplify some validation code
-
-0.5.2 (12.06.2010)
-==================
-- Fix bug - detect of command/message also when the terminator is sent in 
-  multiple packages (fixes manual message submission with telnet)
-
-0.5.1 (08.06.2010)
-==================
-- Fix egg file generation: Include all necessary packages in eggs
-
-0.5.0 (07.04.2010)
-==================
-- Dropped dependency to repoze.workflow because the module added a lot of 
-  dependencies recently (six others in total). The new, custom state machine
-  also supports flags and conditions which suits SMTP very much.
-- Added dependency to pycerberus (>= 0.3.1) to validate all sent parameters
-  thoroughly with sensible error messages.
-- All inputs from peers is now validated
-- relaxed restrictions for the HELO/EHLO parameter (as real world clients don't
-  send real host names)
-- Fixed bug - ESMTP session switched back to plain SMTP after the first mail 
-  was sent
-- Fixed bug - Hang after sending data to a broken connection
-
-0.4.0 (08.06.2009)
-==================
-- Compatibility fixes for Python 2.3-2.6
-- Policies can drop connection to the client before or after the response
-- CommandParser is more robust against various socket errors
-- Better infrastructure and documentation to use pymta in third-party tests
-
-0.3.1 (27.02.2009)
-==================
- - Fixed bug which caused hang after unexpected connection drop by client
-
-0.3 (15.02.2009)
-==================
- - Switch to process-based architecture, got rid of asyncore
- - Support for size-limitations of messages, huge messages will not be stored in
-   memory if they will be rejected anyway (denial of service prevention)
- - API documentation is now auto-generated
- - Renamed DefaultMTAPolicy to IMTAPolicy and moved all interfaces to pymta.api
- - Added the debugging_server as an extremely simple example of a pymta-based 
-   server
-'''
-author = 'Felix Schwarz'
-author_email = 'felix.schwarz@oss.schwarz.eu'
-url = 'http://www.schwarz.eu/opensource/projects/pymta'
-download_url = 'http://www.schwarz.eu/opensource/projects/%(name)s/download/%(version)s/%(name)s-%(version)s.tar.gz' % dict(name=name, version=version)
-# prefix it with '_' so the symbol is not passed to setuptools.setup()
-_copyright = u'2008-2010 Felix Schwarz'
-license='MIT'
-
 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 # THE SOFTWARE.
 
+import sys
+
 from pycerberus import InvalidDataError
 
-from pymta.compat import set
+from pymta.compat import set, dict_iteritems, dict_itervalues, basestring
 from pymta.exceptions import InvalidParametersError, SMTPViolationError
 from pymta.model import Message, Peer
 from pymta.statemachine import StateMachine, StateMachineError
     
     def _get_all_commands(self, including_quit=False):
         commands = set()
-        for actions in self.state._transitions.values():
-            for command_name, transition in actions.items():
+        for actions in dict_itervalues(self.state._transitions):
+            for command_name, transition in dict_iteritems(actions):
                 target_state = transition[0]
                 if target_state in ['new']:
                     continue
                     if len(allowed_transitions) > 0:
                         msg += ', expected on of %s' % allowed_transitions
                         self.reply(503, msg)
-            except InvalidDataError, e:
+            except InvalidDataError:
+                e = sys.exc_info()[1]
                 self.reply(501, e.msg())
-            except InvalidParametersError, e:
+            except InvalidParametersError:
                 # TODO: Get rid of InvalidParametersError, shouldn't be 
                 # necessary anymore
+                e = sys.exc_info()[1]
                 if not e.response_sent:
                     msg = 'Syntactically invalid %s argument(s)' % smtp_command
                     self.reply(501, msg)
-            except PolicyDenial, e:
+            except PolicyDenial:
+                e = sys.exc_info()[1]
                 if not e.response_sent:
                     self.reply(e.code, e.reply_text)
         finally:

pymta/statemachine.py

 
 import re
 
-from pymta.compat import set
+from pymta.compat import set, dict_iterkeys, dict_iteritems, dict_itervalues
 
 __all__ = ['StateMachine', 'StateMachineDefinitionError', 'StateMachineError']
 
     
     def known_actions(self):
         actions = set()
-        for action_name in self._transitions.values():
+        for action_name in dict_itervalues(self._transitions):
             actions = actions.union(action_name)
         return actions
     
     def allowed_actions(self):
         current_transitions = self._transitions.get(self.state(), {})
         _allowed_actions = set()
-        for action_name, (to_state, handler, operations, condition) in current_transitions.items():
+        for action_name, (to_state, handler, operations, condition) in dict_iteritems(current_transitions):
             if not self._is_condition_satisfied(condition):
                 continue
             _allowed_actions.add(action_name)
         return _allowed_actions
     
     def known_non_final_states(self):
-        return set(self._transitions.keys())
+        return set(dict_iterkeys(self._transitions))
     
     def known_states(self):
         states = self.known_non_final_states()
-        for from_state, action_names in self._transitions.items():
+        for from_state, action_names in dict_iteritems(self._transitions):
             for action_name in action_names:
                 (to_state, handler, operations, condition) = self._transitions[from_state][action_name]
                 states.update((to_state,))

pymta/test_util.py

 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 # THE SOFTWARE.
 
-from Queue import Queue
 import random
 import socket
 import threading
 from pymta.api import IMessageDeliverer, IMTAPolicy
 from pymta.lib import PythonicTestCase
 from pymta.mta import PythonMTA
+from pymta.compat import Queue, print_
 
 
 __all__ = ['BlackholeDeliverer', 'DebuggingMTA', 'MTAThread', 'SMTPTestCase']
         self.server.shutdown_server()
         threading.Thread.join(self, timeout=timeout_seconds)
         if self.isAlive():
-            print "WARNING: Thread still alive. Timeout while waiting for " + \
-                      "termination!"
+            print_("WARNING: Thread still alive. Timeout while waiting for termination!")
 
 
 class SMTPTestCase(PythonicTestCase):
                 tries += 1
                 time.sleep(0.1)
             else:
+                sock.close()
                 return
         assert False, 'MTA not reachable'
     
         """Return a list of received messages which are stored in the 
         BlackholeDeliverer."""
         return self.deliverer.received_messages
-
-

pymta/validation.py

 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 # THE SOFTWARE.
 
-import base64
 import re
 
 from pycerberus.i18n import _
 from pycerberus.schemas import PositionalArgumentsParsingSchema
-from pycerberus.validators import EmailAddressValidator, IntegerValidator, \
-    StringValidator
+from pycerberus.validators import EmailAddressValidator, IntegerValidator, StringValidator
 
-__all__ = ['HeloSchema', 'MailFromSchema', 'RcptToSchema', 
-           'SMTPCommandArgumentsSchema']
+from pymta.compat import dict_keys, dict_values, b, b64decode
+
+__all__ = ['HeloSchema', 'MailFromSchema', 'RcptToSchema', 'SMTPCommandArgumentsSchema']
 
 
 # ------------------------------------------------------------------------------
 class SMTPEmailValidator(EmailAddressValidator):
     
     def messages(self):
-        return {'unbalanced_quotes': _(u'Invalid email address format - use balanced angle brackets.')}
+        return {'unbalanced_quotes': _('Invalid email address format - use balanced angle brackets.')}
     
     def convert(self, value, context):
         string_value = self.super()
     def aggregate_values(self, parameter_names, arguments, context):
         if len(arguments) <= 1:
             return self.super()
-        key_value_pairs = map(lambda option: re.split('=', option, 1), arguments[1:])
+        key_value_pairs = [re.split('=', option, 1) for option in arguments[1:]]
 
         self._validate_extension_arguments(key_value_pairs, ' '.join(arguments[1:]), context)
-        lower_case_key_value_pairs = map(lambda item: (item[0].lower(), item[1]), key_value_pairs)
+        lower_case_key_value_pairs = [(item[0].lower(), item[1]) for item in key_value_pairs]
         options = dict(lower_case_key_value_pairs)
         
-        parameter_names = parameter_names + options.keys()
-        arguments = [arguments[0]] + options.values()
+        parameter_names = parameter_names + dict_keys(options)
+        arguments = [arguments[0]] + dict_values(options)
         return parameter_names, arguments
     
     def _process_fields(self, fields, context):
     
     def _decode_base64(self, value, context):
         try:
-            return base64.decodestring(value)
+            return b64decode(value)
         except:
             self.raise_error('invalid_base64', value, context)
     
     def split_parameters(self, value, context):
-        match = re.search('=\s(.+)$', value.strip())
+        match = re.search(('=\s(.+)$'), value.strip())
         if match is not None:
             self.raise_error('additional_item', value, context, additional_item=repr(match.group(1)))
         decoded_parameters = self._decode_base64(value, context)
+[build_sphinx]
+source-dir = docs
+build-dir  = docs/_build
 #!/usr/bin/env python
-# -*- coding: UTF-8 -*-
+from setuptools import setup, find_packages
 
-import os
 
-import setuptools
+setup(
+        name='pymta',
+        version='0.6dev',
+        description='library to build a custom SMTP server',
+        long_description="""
+pymta is a library to build a custom SMTP server in Python. This is useful if 
+you want to...
 
-from pymta.lib.distribution_helpers import information_from_file
-
-if __name__ == '__main__':
-    release_filename = os.path.join('pymta', 'release.py')
-    externally_defined_parameters= information_from_file(release_filename)
-    
-    setuptools.setup(
-          install_requires=['pycerberus >= 0.5dev'],
-          
-          # simple_super is not zip_safe
-          zip_safe=False,
-          packages=setuptools.find_packages(),
-          classifiers = [
-              'Development Status :: 4 - Beta',
-              'Intended Audience :: Developers',
-              'License :: OSI Approved :: MIT License',
-              'Operating System :: OS Independent',
-              'Programming Language :: Python',
-              'Topic :: Communications :: Email',
-              'Topic :: Software Development :: Libraries :: Python Modules',
-            ],
-          test_suite = 'nose.collector',
-          **externally_defined_parameters
-    )
-
-
+* test mail-sending code against a real SMTP server even in your unit tests.
+* build a custom SMTP server with non-standard behavior without reimplementing 
+  the whole SMTP protocol.
+* have a low-volume SMTP server which can be easily extended using Python""",
+        zip_safe=False,
+        packages=find_packages(),
+        license='MIT',
+        author='Felix Schwarz',
+        author_email='felix.schwarz@oss.schwarz.eu',
+        url='http://www.schwarz.eu/opensource/projects/pymta',
+        install_requires=['pycerberus >= 0.5dev'],
+        tests_require=['nose'],
+        test_suite='nose.collector',
+        classifiers=[
+                'Development Status :: 4 - Beta',
+                'Intended Audience :: Developers',
+                'License :: OSI Approved :: MIT License',
+                'Operating System :: OS Independent',
+                'Programming Language :: Python',
+                'Topic :: Communications :: Email',
+                'Topic :: Software Development :: Libraries :: Python Modules',
+                ],
+)

tests/basic_message_sending_test.py

 # THE SOFTWARE.
 
 from pymta.api import IMTAPolicy
-from pymta.compat import set
+from pymta.compat import set, b, b64encode
 
 from tests.util import CommandParserTestCase, DummyAuthenticator
 
     
     def test_auth_plain_without_authenticator_is_rejected(self):
         self.send('EHLO', 'foo.example.com')
-        base64_credentials = u'\x00foo\x00foo'.encode('base64')
+        base64_credentials = b64encode('\x00foo\x00foo')
         self.send('AUTH PLAIN', base64_credentials, expected_first_digit=5)
         self.assert_equals(3, len(self.command_parser.replies))
         self._check_last_code(535)
         self.session._authenticator = DummyAuthenticator()
         
         self.send('EHLO', 'foo.example.com')
-        self.send('AUTH PLAIN', u'\x00foo\x00foo'.encode('base64'))
+        self.send('AUTH PLAIN', b64encode('\x00foo\x00foo'))
         self.assert_equals(3, len(self.command_parser.replies))
         self._check_last_code(235)
         code, reply_text = self.command_parser.replies[-1]
         # [authzid] \x00 authcid \x00 passwd
         # smtplib in Python 2.3 will send an additional authzid (which is equal 
         # to authcid)
-        self.send('AUTH PLAIN', u'ignored\x00foo\x00foo'.encode('base64'))
+        self.send('AUTH PLAIN', b64encode('ignored\x00foo\x00foo'))
         self.assert_equals(3, len(self.command_parser.replies))
         self._check_last_code(235)
         code, reply_text = self.command_parser.replies[-1]
         self.session._authenticator = DummyAuthenticator()
         
         self.send('EHLO', 'foo.example.com')
-        base64_credentials = u'\x00foo\x00bar'.encode('base64')
+        base64_credentials = b64encode('\x00foo\x00bar')
         self.send('AUTH PLAIN', base64_credentials, expected_first_digit=5)
         self._check_last_code(535)
     
         self.session._authenticator = DummyAuthenticator()
         
         self.send('EHLO', 'foo.example.com')
-        base64_credentials = u'\x00foo'.encode('base64')
+        base64_credentials = b64encode('\x00foo')
         self.send('AUTH PLAIN', base64_credentials, expected_first_digit=5)
         self.assert_equals(3, len(self.command_parser.replies))
         self._check_last_code(501)

tests/basic_smtp_test.py

 
 from pymta.api import IMTAPolicy
 from pymta.test_util import DebuggingMTA, SMTPTestCase
+from pymta.compat import b
 
 from tests.util import DummyAuthenticator
 
         e = self.assert_raises(expected_exceptions, self.connection.sendmail, 
                                'from@example.com', 'foo@example.com', msg)
         self.assert_equals(552, e.smtp_code)
-        self.assert_equals('message exceeds fixed maximum message size', e.smtp_error)
+        self.assert_equals(b('message exceeds fixed maximum message size'), e.smtp_error)
     
     def service_is_available(self):
         # On a normal system we should be able to reconnect after a dropped
                 socket.setdefaulttimeout(old_default)
         except socket.timeout:
             return False
-        
-    
+
     def test_workerprocess_detects_closed_connections_when_reading(self):
         """Check that the WorkerProcess gracefully handles connections which are
         closed without QUIT. This can happen due to network problems or 
         
         time.sleep(0.5)
         self.assert_true(self.service_is_available())
-        
-
-

tests/command_parsing_test.py

 from pymta.command_parser import SMTPCommandParser
 from pymta.lib import PythonicTestCase
 from pymta.test_util import BlackholeDeliverer
+from pymta.compat import basestring
+
 from tests.util import MockChannel
 
 
         self.send('\r\n.\r\n')
         self.assert_no_messages_received()
         self.assert_true(self.last_reply().startswith('552 '))
-

tests/policy_test.py

 # THE SOFTWARE.
 
 from pymta.api import IMTAPolicy, PolicyDecision
+from pymta.compat import b64encode
 
 from tests.util import CommandParserTestCase, DummyAuthenticator
 
                 return False
         self.init(policy=FalsePolicy(), authenticator=DummyAuthenticator())
         self.send('EHLO', 'foo.example.com')
-        base64_credentials = u'\x00foo\x00foo'.encode('base64')
+        base64_credentials = b64encode('\x00foo\x00foo')
         self.send('AUTH PLAIN', base64_credentials, expected_first_digit=5)
     
     

tests/smtp_parameter_validation_test.py

 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 # THE SOFTWARE.
 
+from pymta.compat import b64encode
+
 from tests.util import CommandParserTestCase, DummyAuthenticator
 
 
         self.session._authenticator = DummyAuthenticator()
     
     def base64(self, value):
-        return unicode(value).encode('base64').strip()
+        return b64encode(value).strip()
     
     def test_auth_plain_accepts_correct_authentication(self):
         self.inject_authenticator()
         self.ehlo()
         
-        self.send_valid('AUTH PLAIN', u'\x00foo\x00foo'.encode('base64'))
+        self.send_valid('AUTH PLAIN', b64encode('\x00foo\x00foo'))
     
     def test_auth_plain_requires_exactly_one_parameter(self):
         self.inject_authenticator()
         self.ehlo()
         
         self.send_invalid('AUTH PLAIN')
-        base64_credentials = self.base64(u'\x00foo\x00foo')
+        base64_credentials = self.base64('\x00foo\x00foo')
         self.send_invalid('AUTH PLAIN', base64_credentials + ' ' + base64_credentials)
     
     def test_auth_plain_detects_bad_base64_credentials(self):
         self.inject_authenticator()
         self.ehlo()
         
-        self.send_invalid('AUTH PLAIN', self.base64(u'\x00foo\x00bar'))
+        self.send_invalid('AUTH PLAIN', self.base64('\x00foo\x00bar'))
 
 

tests/validation_schema_test.py

 
 from pymta.lib import PythonicTestCase
 from pymta.validation import AuthPlainSchema, MailFromSchema, SMTPCommandArgumentsSchema
+from pymta.compat import b64encode
 
 
 class CommandWithoutParametersTest(PythonicTestCase):
     
     def test_can_extract_base64_decoded_string(self):
         expected_parameters = dict(username='foo', password='foo ', authzid=None)
-        parameters = self.schema().process(self.base64(u'\x00foo\x00foo '))
+        parameters = self.schema().process(self.base64('\x00foo\x00foo '))
         self.assert_equals(expected_parameters, parameters)
     
     def base64(self, value):
-        return unicode(value).encode('base64').strip()
+        return b64encode(value).strip()
     
     def assert_bad_input(self, input):
         e = self.assert_raises(InvalidDataError, lambda: self.schema().process(input))
         return e
     
     def test_reject_more_than_one_parameter(self):
-        input = self.base64(u'\x00foo\x00foo') + ' ' + self.base64(u'\x00foo\x00foo')
+        input = self.base64('\x00foo\x00foo') + ' ' + self.base64('\x00foo\x00foo')
         self.assert_bad_input(input)
     
     def test_rejects_bad_base64(self):
         self.assert_equals('Garbled data sent', e.msg())
     
     def test_rejects_invalid_format(self):
-        e = self.assert_bad_input(u'foobar'.encode('base64'))
+        e = self.assert_bad_input(b64encode('foobar'))
         self.assert_equals('Garbled data sent', e.msg())
 
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.