Commits

Andriy Kornatskyy  committed 5fc12db

Introduced mail module.

  • Participants
  • Parent commits acee439

Comments (0)

Files changed (9)

 
 qa:
 	if [ "$$(echo $(VERSION) | sed 's/\.//')" -eq 27 ]; then \
-		flake8 --max-complexity 10 doc src setup.py && \
+		flake8 --max-complexity 9 demos doc src setup.py && \
 		pep8 doc src setup.py ; \
 	fi
 

File demos/mail/alternative.py

+
+"""
+"""
+
+import os.path
+
+from wheezy.core.mail import Alternative
+from wheezy.core.mail import MailMessage
+from wheezy.core.mail import Related
+from wheezy.core.mail import SMTPClient
+
+
+mail = MailMessage(
+    subject='Welcome to Python',
+    content='Hello World!',
+    from_addr='someone@dev.local',
+    to_addrs=['master@dev.local'])
+
+alt = Alternative("""\
+<html><body>
+    <h1>Hello World!</h1>
+    <p><img src="cid:python-logo.gif" /></p>
+</body></html>""", content_type='text/html')
+
+curdir = os.path.dirname(__file__)
+path = os.path.join(curdir, 'python-logo.gif')
+alt.related.append(Related.from_file(path))
+
+mail.alternatives.append(alt)
+
+client = SMTPClient()
+client.send(mail)

File demos/mail/attachment.py

+
+"""
+"""
+
+from wheezy.core.mail import Attachment
+from wheezy.core.mail import MailMessage
+from wheezy.core.mail import SMTPClient
+
+
+mail = MailMessage(
+    subject='Welcome to Python',
+    content='Hello World!',
+    from_addr='someone@dev.local',
+    to_addrs=['master@dev.local'])
+mail.attachments.append(Attachment(
+    name='welcome.txt',
+    content='Hello World!'))
+
+client = SMTPClient()
+client.send(mail)

File demos/mail/plain.py

+
+"""
+"""
+
+from wheezy.core.mail import MailMessage
+from wheezy.core.mail import SMTPClient
+
+
+mail = MailMessage(
+    subject='Welcome to Python',
+    content='Hello World!',
+    from_addr='someone@dev.local',
+    to_addrs=['master@dev.local'])
+
+client = SMTPClient()
+client.send(mail)

File demos/mail/python-logo.gif

Added
New image

File doc/modules.rst

 .. automodule:: wheezy.core
    :members:
 
-wheezy.core.db
---------------
-
-.. automodule:: wheezy.core.db
-   :members:
-
 wheezy.core.benchmark
 -----------------------
 
 .. automodule:: wheezy.core.datetime
    :members:
 
+wheezy.core.db
+--------------
+
+.. automodule:: wheezy.core.db
+   :members:
+
 wheezy.core.descriptors
 -----------------------
 
 .. automodule:: wheezy.core.luhn
    :members:
 
+wheezy.core.mail
+----------------
+
+.. automodule:: wheezy.core.mail
+   :members:
+
 wheezy.core.pooling
 -------------------
 
         'Topic :: Software Development :: Localization',
         'Topic :: System :: Benchmark'
     ],
-    keywords='core collections config datetime descriptor i18n '
-             'introspection url uuid',
+    keywords='core benchmark collections config datetime db descriptor '
+             'feistel i18n introspection json luhn mail pooling url uuid',
     packages=['wheezy', 'wheezy.core'],
     package_dir={'': 'src'},
     namespace_packages=['wheezy'],

File src/wheezy/core/mail.py

+
+""" ``mail`` module.
+"""
+
+from mimetypes import guess_type
+from os.path import split as path_split
+from smtplib import SMTP
+from time import time
+
+from wheezy.core.comp import ntob
+
+try:
+    from email.charset import CHARSETS
+    from email.encoders import encode_base64
+    from email.message import Message
+    from email.utils import formatdate
+    from email.utils import make_msgid
+except ImportError:  # pragma: nocover, python2.4
+    from email.Charset import CHARSETS  # noqa
+    from email.Encoders import encode_base64  # noqa
+    from email.Message import Message  # noqa
+    from email.Utils import formatdate  # noqa
+    from email.Utils import make_msgid  # noqa
+
+
+# Do not apply Base64 encoding to utf-8 messages
+CHARSETS['utf-8'] = (3, None, 'utf-8')
+
+
+class MailMessage(object):
+    """ Mail message.
+    """
+
+    def __init__(self, subject='', content='',
+                 from_addr=None, to_addrs=None,
+                 cc_addrs=None, bcc_addrs=None, reply_to_addrs=None,
+                 content_type='text/plain', charset='us-ascii'):
+        self.subject = subject
+        self.content = content
+        self.from_addr = from_addr
+        self.to_addrs = to_addrs or []
+        self.cc_addrs = cc_addrs or []
+        self.bcc_addrs = bcc_addrs or []
+        self.reply_to_addrs = reply_to_addrs or []
+        self.content_type = content_type
+        self.charset = charset
+        self.date = time()
+        self.attachments = []
+        self.alternatives = []
+
+    def recipients(self):
+        return set(self.to_addrs + self.cc_addrs + self.bcc_addrs)
+
+
+class Attachment(object):
+    """ An attachment to mail message.
+    """
+
+    def __init__(self, name, content, content_type=None, disposition=None,
+                 name_charset=None, content_charset=None):
+        self.name = name
+        self.content = content
+        self.content_type = content_type
+        self.disposition = disposition
+        self.name_charset = name_charset
+        self.content_charset = content_charset
+
+    @classmethod
+    def from_file(cls, path):
+        """ Creates an attachment from file.
+        """
+        ignore, name = path_split(path)
+        print(open.__module__)
+        f = open(path, 'rb')
+        try:
+            return cls(name, f.read())
+        finally:
+            f.close()
+
+
+class Alternative(object):
+    """ Represents alternative mail message.
+    """
+
+    def __init__(self, content, content_type='text/html', charset=None):
+        self.content = content
+        self.content_type = content_type
+        self.charset = charset
+        self.related = []
+
+
+class Related(object):
+    """ A resource related to alternative mail message.
+    """
+
+    def __init__(self, content_id, content, content_type):
+        self.content_id = content_id
+        self.content = content
+        self.content_type = content_type
+
+    @classmethod
+    def from_file(cls, path):
+        """ Creates a related mail resource from file.
+        """
+        ignore, name = path_split(path)
+        content_type, ignore = guess_type(name)
+        if content_type is None:
+            content_type = 'application/octet-stream'
+        f = open(path, 'rb')
+        try:
+            return cls(name, f.read(), content_type)
+        finally:
+            f.close()
+
+
+class SMTPClient(object):
+    """ SMTP client that can be used to send mail.
+    """
+
+    def __init__(self, host='127.0.0.1', port=25, use_tls=False,
+                 username=None, password=None):
+        self.host = host
+        self.port = port
+        self.use_tls = use_tls
+        self.username = username
+        self.password = password
+
+    def send(self, message):
+        """ Sends a single mail message.
+        """
+        recepients = message.recipients()
+        content = ntob(mime(message).as_string(), message.charset)
+        # keep connection scope minimal
+        client = self.connect()
+        client.sendmail(message.from_addr, recepients, content)
+        client.quit()
+
+    def send_multi(self, messages):
+        """ Sends multiple mail messages.
+        """
+        args = [(message.from_addr, message.recipients(),
+                 ntob(mime(message).as_string(), message.charset))
+                for message in messages]
+        # keep connection scope minimal
+        client = self.connect()
+        for arg in args:
+            client.sendmail(*arg)
+        client.quit()
+
+    def connect(self):
+        smtp = SMTP()
+        #smtp.set_debuglevel(1)
+        smtp.connect(self.host, self.port)
+        if self.use_tls:
+            smtp.starttls()
+        if self.username:
+            smtp.login(self.username, self.password)
+        return smtp
+
+
+# region: internal details
+
+def mime(message):
+    m = mime_part(message.content, message.content_type, message.charset)
+    subparts = message.content and [m] or []
+    if message.alternatives:
+        subparts += [mime_alternative(a) for a in message.alternatives]
+        if len(subparts) > 1:
+            m = mime_multipart('multipart/alternative', subparts)
+            subparts = [m]
+        else:
+            m = subparts[0]
+    if message.attachments:
+        subparts += [mime_attachment(a) for a in message.attachments]
+        m = mime_multipart('multipart/mixed', subparts)
+    m['Message-ID'] = make_msgid()
+    m['Subject'] = message.subject
+    m['Date'] = formatdate(message.date, localtime=True)
+    m['From'] = message.from_addr
+    m['To'] = ', '.join(message.to_addrs)
+    if message.cc_addrs:
+        m['Cc'] = ', '.join(message.cc_addrs)
+    if message.bcc_addrs:
+        m['Bcc'] = ', '.join(message.bcc_addrs)
+    if message.reply_to_addrs:
+        m['Reply-To'] = ', '.join(message.reply_to_addrs)
+    return m
+
+
+def mime_part(content, content_type, content_charset=None):
+    m = Message()
+    m.add_header('Content-Type', content_type)
+    m.set_payload(content, content_charset)
+    if not content_type.startswith('text'):
+        encode_base64(m)
+    return m
+
+
+def mime_multipart(content_type, subparts):
+    m = Message()
+    m.add_header('Content-Type', content_type)
+    m.set_payload(subparts)
+    return m
+
+
+def mime_alternative(a):
+    m = mime_part(a.content, a.content_type, a.charset)
+    if a.related:
+        subparts = [m]
+        for r in a.related:
+            m = mime_part(r.content, r.content_type)
+            m.add_header('Content-ID', r.content_id)
+            subparts.append(m)
+        m = mime_multipart('multipart/related', subparts)
+    return m
+
+
+def mime_attachment(attachment):
+    content_type = attachment.content_type
+    if not content_type:
+        content_type, ignore = guess_type(attachment.name)
+        if content_type is None:
+            content_type = 'application/octet-stream'
+    m = mime_part(attachment.content, content_type,
+                  attachment.content_charset)
+    name = attachment.name
+    if attachment.name_charset:
+        name = (attachment.name_charset, '', name)
+    # see http://www.ietf.org/rfc/rfc2183.txt
+    m.add_header('Content-Disposition',
+                 attachment.disposition or 'attachment',
+                 filename=name)
+    return m

File src/wheezy/core/tests/test_mail.py

+
+""" Unit tests for ``wheezy.core.mail``.
+"""
+
+import unittest
+
+from mock import ANY
+from mock import patch
+
+from wheezy.core.comp import PY3
+
+
+if PY3:
+    b = lambda s: s.encode('ascii')
+else:
+    b = lambda s: s
+
+
+class MiscTestCase(unittest.TestCase):
+
+    def setUp(self):
+        from mock import mock_open
+        self.mock_open = mock_open()
+        self.patcher = patch('wheezy.core.mail.open', self.mock_open,
+                             create=True)
+        self.patcher.start()
+
+    def tearDown(self):
+        self.patcher.stop()
+
+    def test_recipients(self):
+        """ Ensure list is unique.
+        """
+        from wheezy.core.mail import MailMessage
+        m = MailMessage(from_addr='f',
+                        to_addrs=['a', 'b'],
+                        cc_addrs=['b', 'c'],
+                        bcc_addrs=['c', 'd'],
+                        reply_to_addrs=['e', 'f'])
+        assert ['a', 'b', 'c', 'd'] == sorted(m.recipients())
+
+    def test_attachment_from_file(self):
+        """ Ensure attachment can be created from file.
+        """
+        from wheezy.core.mail import Attachment
+        self.mock_open.return_value.read.return_value = 'hello'
+        a = Attachment.from_file('data/welcome.txt')
+        self.mock_open.assert_called_once_with('data/welcome.txt', 'rb')
+        self.mock_open.return_value.read.assert_called_once_with()
+        self.mock_open.return_value.close.assert_called_once_with()
+        assert 'welcome.txt' == a.name
+        assert 'hello' == a.content
+
+    def test_alternative(self):
+        """ Alternative default content type.
+        """
+        from wheezy.core.mail import Alternative
+        a = Alternative('content')
+        assert 'text/html' == a.content_type
+
+    @patch('wheezy.core.mail.guess_type')
+    def test_related_from_file(self, mock_guess_type):
+        """ Ensure related can be created from file.
+        """
+        from wheezy.core.mail import Related
+        mock_guess_type.return_value = ('text/css', None)
+        self.mock_open.return_value.read.return_value = 'a {}'
+        r = Related.from_file('css/style.css')
+        self.mock_open.assert_called_once_with('css/style.css', 'rb')
+        self.mock_open.return_value.read.assert_called_once_with()
+        self.mock_open.return_value.close.assert_called_once_with()
+        assert 'style.css' == r.content_id
+        assert 'text/css' == r.content_type
+        assert 'a {}' == r.content
+
+    @patch('wheezy.core.mail.guess_type')
+    def test_related_from_file_unknown_type(self, mock_guess_type):
+        """ Ensure default content type if its unknown.
+        """
+        from wheezy.core.mail import Related
+        mock_guess_type.return_value = (None, None)
+        r = Related.from_file('css/style.css')
+        self.mock_open.assert_called_once_with('css/style.css', 'rb')
+        assert 'application/octet-stream' == r.content_type
+
+
+class SMTPClientTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.patcher = patch('wheezy.core.mail.SMTP')
+        self.mock_smtp = self.patcher.start().return_value
+
+    def tearDown(self):
+        self.patcher.stop()
+
+    def test_connect(self):
+        """ Ensure connected to right host and port
+        """
+        from wheezy.core.mail import MailMessage
+        from wheezy.core.mail import SMTPClient
+        client = SMTPClient('mail.dev.local', 125)
+        client.send(MailMessage())
+        self.mock_smtp.connect.assert_called_once_with('mail.dev.local', 125)
+        assert not self.mock_smtp.starttls.called
+        assert not self.mock_smtp.login.called
+        self.mock_smtp.quit.assert_called_once_with()
+
+    def test_use_tls(self):
+        """ Ensure start tls command is issued.
+        """
+        from wheezy.core.mail import MailMessage
+        from wheezy.core.mail import SMTPClient
+        client = SMTPClient(use_tls=True)
+        client.send(MailMessage())
+        self.mock_smtp.starttls.assert_called_once_with()
+        assert not self.mock_smtp.login.called
+
+    def test_login(self):
+        """ Ensure credentials are used.
+        """
+        from wheezy.core.mail import MailMessage
+        from wheezy.core.mail import SMTPClient
+        client = SMTPClient(username='user', password='pass')
+        client.send(MailMessage())
+        self.mock_smtp.login.assert_called_once_with('user', 'pass')
+
+    def test_send(self):
+        """ Ensure from and to lists are valid in sending a single message.
+        """
+        from wheezy.core.mail import MailMessage
+        from wheezy.core.mail import SMTPClient
+        client = SMTPClient(username='user', password='pass')
+        message = MailMessage(
+            from_addr='one@dev.local',
+            to_addrs=['two@dev.local'])
+        client.send(message)
+        self.mock_smtp.sendmail.assert_called_once_with(
+            message.from_addr,
+            message.recipients(),
+            ANY)
+
+    def test_send_multi(self):
+        """ Ensure from and to lists are valid in sending multiple messages.
+        """
+        from mock import call
+        from wheezy.core.mail import MailMessage
+        from wheezy.core.mail import SMTPClient
+        client = SMTPClient(username='user', password='pass')
+        message1 = MailMessage(
+            from_addr='one@dev.local',
+            to_addrs=['two@dev.local'])
+        message2 = MailMessage(
+            from_addr='three@dev.local',
+            to_addrs=['four@dev.local'])
+        client.send_multi([message1, message2])
+        assert 2 == self.mock_smtp.sendmail.call_count
+        assert self.mock_smtp.sendmail.mock_calls == [
+            call(message1.from_addr, message1.recipients(), ANY),
+            call(message2.from_addr, message2.recipients(), ANY)]
+
+
+class MIMETestCase(unittest.TestCase):
+
+    @patch('wheezy.core.mail.formatdate')
+    def test_required_headers(self, mock_formatdate):
+        """ Ensure mail message information is included.
+        """
+        from wheezy.core.mail import MailMessage
+        from wheezy.core.mail import mime
+        mock_formatdate.return_value = 'x'
+        message = MailMessage(subject='s',
+                              content='c',
+                              charset='utf-8',
+                              from_addr='z',
+                              to_addrs=['a', 'b'],
+                              cc_addrs=['c', 'd'],
+                              bcc_addrs=['e', 'f'],
+                              reply_to_addrs=['x', 'y'])
+        message.date = 1354555373
+        m = mime(message)
+        assert m['Message-ID']
+        assert 'text/plain; charset="utf-8"' == m['Content-Type']
+        assert message.subject == m['Subject']
+        assert 'c' == m.get_payload()
+        mock_formatdate.assert_called_once_with(message.date, localtime=True)
+        assert 'x' == m['Date']
+        assert message.from_addr == m['From']
+        assert 'a, b' == m['To']
+        assert 'c, d' == m['Cc']
+        assert 'e, f' == m['Bcc']
+        assert 'x, y' == m['Reply-To']
+
+    def test_alternative(self):
+        """ Ensure alternative includes both plain and html.
+        """
+        from wheezy.core.mail import Alternative
+        from wheezy.core.mail import MailMessage
+        from wheezy.core.mail import mime
+        message = MailMessage(content='c')
+        a = Alternative('a')
+        message.alternatives.append(a)
+        m = mime(message)
+        assert 'multipart/alternative' == m['Content-Type']
+        subparts = m.get_payload()
+        assert 2 == len(subparts)
+        assert 'c' == subparts[0].get_payload()
+        assert 'a' == subparts[1].get_payload()
+
+    def test_alternative_no_plain(self):
+        """ Ensure if alternative available by plain is empty
+            only one included.
+        """
+        from wheezy.core.mail import Alternative
+        from wheezy.core.mail import MailMessage
+        from wheezy.core.mail import mime
+        message = MailMessage()
+        a = Alternative('a')
+        message.alternatives.append(a)
+        m = mime(message)
+        assert 'text/html' == m['Content-Type']
+        assert 'a' == m.get_payload()
+
+    def test_attachment(self):
+        """ Ensure attachments added.
+        """
+        from wheezy.core.mail import Attachment
+        from wheezy.core.mail import MailMessage
+        from wheezy.core.mail import mime
+        message = MailMessage(content='c')
+        a = Attachment('1.txt', 'a')
+        message.attachments.append(a)
+        m = mime(message)
+        assert 'multipart/mixed' == m['Content-Type']
+        subparts = m.get_payload()
+        assert 2 == len(subparts)
+        assert 'c' == subparts[0].get_payload()
+        assert 'a' == subparts[1].get_payload()
+
+    def test_everything(self):
+        """ Add plain, alternate and attachment.
+        """
+        from wheezy.core.mail import Alternative
+        from wheezy.core.mail import Attachment
+        from wheezy.core.mail import MailMessage
+        from wheezy.core.mail import mime
+        message = MailMessage(content='c')
+        a = Alternative('al')
+        message.alternatives.append(a)
+        a = Attachment('1.txt', 'at')
+        message.attachments.append(a)
+        m = mime(message)
+        assert 'multipart/mixed' == m['Content-Type']
+        subparts = m.get_payload()
+        assert 2 == len(subparts)
+        assert 'at' == subparts[1].get_payload()
+        subparts = subparts[0].get_payload()
+        assert 'c' == subparts[0].get_payload()
+        assert 'al' == subparts[1].get_payload()
+
+
+class MIMEPartsTestCase(unittest.TestCase):
+
+    def test_part(self):
+        """ Ensure base64 encoding for non text content.
+        """
+        from wheezy.core.mail import mime_part
+        m = mime_part('content', 'text/plain')
+        assert 'text/plain' == m['Content-Type']
+        assert 'content' == m.get_payload()
+        m = mime_part(b('content'), 'image/gif')
+        assert 'Y29udGVudA==' == m.get_payload().rstrip('\n')
+
+    def test_multipart(self):
+        """ Ensure subparts.
+        """
+        from wheezy.core.mail import mime_multipart
+        subparts = ['a']
+        m = mime_multipart('multipart/mixed', subparts)
+        assert 'multipart/mixed' == m['Content-Type']
+        assert subparts == m.get_payload()
+
+    @patch('wheezy.core.mail.mime_part')
+    def test_alternative(self, mock_mime_part):
+        """ Ensure alternative is built.
+        """
+        from wheezy.core.mail import Alternative
+        from wheezy.core.mail import mime_alternative
+        mock_mime_part.return_value = 'x'
+        a = Alternative('content', 'ct', 'cs')
+        assert 'x' == mime_alternative(a)
+        mock_mime_part.assert_called_once_with('content', 'ct', 'cs')
+
+    def test_related(self):
+        """ Ensure related is built.
+        """
+        from wheezy.core.mail import Alternative
+        from wheezy.core.mail import Related
+        from wheezy.core.mail import mime_alternative
+        a = Alternative('content', 'text/html', 'utf-8')
+        a.related.append(Related('cid', b('rc'), 'image/gif'))
+        m = mime_alternative(a)
+        assert 'multipart/related' == m['Content-Type']
+        subparts = m.get_payload()
+        assert 2 == len(subparts)
+        m = subparts[0]
+        assert 'text/html; charset="utf-8"' == m['Content-Type']
+        assert 'content' == m.get_payload()
+        m = subparts[1]
+        assert 'cid' == m['Content-ID']
+        assert 'image/gif' == m['Content-Type']
+        assert 'cmM=' == m.get_payload().rstrip('\n')
+
+    def test_attachment(self):
+        """ Ensure attachment is built.
+        """
+        from wheezy.core.mail import Attachment
+        from wheezy.core.mail import mime_attachment
+        a = Attachment('1.txt', 'c', 'text/plain')
+        m = mime_attachment(a)
+        assert 'text/plain' == m['Content-Type']
+        assert 'attachment; filename="1.txt"' == m['Content-Disposition']
+        a = Attachment('1.txt', 'c', 'text/plain', disposition='inline')
+        m = mime_attachment(a)
+        assert 'inline; filename="1.txt"' == m['Content-Disposition']
+        a = Attachment('1.txt', 'c', 'text/plain', content_charset='utf-8')
+        m = mime_attachment(a)
+        assert 'text/plain; charset="utf-8"' == m['Content-Type']
+        a = Attachment('1.txt', 'c', 'text/plain', name_charset='utf-8')
+        m = mime_attachment(a)
+        if PY3:
+            assert 'attachment; filename*=utf-8\'\'1.txt' == m[
+                'Content-Disposition']
+        else:
+            assert 'attachment; filename*="utf-8\'\'1.txt"' == m[
+                'Content-Disposition']
+        a = Attachment('1', b('c'))
+        m = mime_attachment(a)
+        assert 'application/octet-stream' == m['Content-Type']