Commits

Anonymous committed 4e0f2ed

package added

  • Participants
  • Parent commits 8028c06

Comments (0)

Files changed (5)

File pyramid_lamson/__init__.py

+# -*- coding: utf-8 -*-
+
+
+from pyramid_lamson.mail.connection import Connection
+from pyramid_lamson.mail.message import Message
+from pyramid_lamson.mail_message import Attachment
+from pyramid_lamson.mail_message import BadHeaderError
+from pyramid_lamson.mail.signals import email_dispatched
+
+
+class Mailer(object):
+    """
+    Manages email messaging
+
+    Usage:
+
+    config.registry['mailer'] = Mailer(config.settings)
+
+    """
+
+    def __init__(self, settings):
+        
+        """
+        Initializes your mail settings from the application
+        settings.
+
+        You can use this if you want to set up your Mail instance
+        at configuration time.
+
+        :param app: Flask application instance
+        """
+
+        self.server = settings.get('mail_server', '127.0.0.1')
+        self.username = settings.get('mail_username')
+        self.password = settings.get('mail_password')
+        self.port = settings.get('mail_port', 25)
+        self.use_tls = settings.get('mail_use_tls', false)
+        self.use_ssl = settings.get('mail_use_ssl', false)
+        self.debug = int(settings.get('mail_debug', app.debug))
+        self.max_emails = settings.get('default_max_emails')
+        self.suppress = settings.get('mail_suppress_send', false)
+        self.fail_silently = settings.get('mail_fail_silently', true)
+
+    def send(self, message):
+        """
+        Sends a single message instance. If TESTING is True
+        the message will not actually be sent.
+
+        :param message: a Message instance.
+        """
+
+        with self.connect() as connection:
+            message.send(connection)
+
+    def send_message(self, *args, **kwargs):
+        """
+        Shortcut for send(msg). 
+
+        Takes same arguments as Message constructor.
+    
+        :versionadded: 0.3.5
+        """
+
+        self.send(Message(*args, **kwargs))
+
+    def connect(self, max_emails=None):
+        """
+        Opens a connection to the mail host.
+        
+        :param max_emails: the maximum number of emails that can 
+                           be sent in a single connection. If this 
+                           number is exceeded the Connection instance 
+                           will reconnect to the mail server. The
+                           DEFAULT_MAX_EMAILS config setting is used 
+                           if this is None.
+        """
+        return Connection(self, max_emails) 
+                          
+

File pyramid_lamson/connection.py

+import smtplib
+import socket
+
+from pyramid_lamson.mail.message import Message
+from pyramid_lamson.mail.signals import email_dispatched
+
+class Connection(object):
+
+    """Handles connection to host."""
+
+    def __init__(self, mail, max_emails=None):
+
+        self.mail = mail
+        self.app = self.mail.app
+        self.suppress = self.mail.suppress
+        self.max_emails = max_emails or self.mail.max_emails or 0
+        self.fail_silently = self.mail.fail_silently
+
+    def __enter__(self):
+
+        if self.suppress:
+            self.host = None
+        else:
+            self.host = self.configure_host()
+        
+        self.num_emails = 0
+
+        return self
+
+    def __exit__(self, exc_type, exc_value, tb):
+        if self.host:
+            self.host.quit()
+    
+    def configure_host(self):
+        
+        try:
+            if self.mail.use_ssl:
+                host = smtplib.SMTP_SSL(self.mail.server, self.mail.port)
+            else:
+                host = smtplib.SMTP(self.mail.server, self.mail.port)
+        except socket.error:
+            if self.fail_silently:
+                return
+            raise
+
+        host.set_debuglevel(int(self.app.debug))
+
+        if self.mail.use_tls:
+            host.starttls()
+        if self.mail.username and self.mail.password:
+            host.login(self.mail.username, self.mail.password)
+
+        return host
+
+    def send(self, message):
+        """
+        Sends message.
+        
+        :param message: Message instance.
+        """
+
+        if self.host:
+            self.host.sendmail(message.sender,
+                               message.send_to,
+                               str(message.get_response()))
+
+        if email_dispatched:
+            email_dispatched.send(message, app=self.app)
+
+        self.num_emails += 1
+
+        if self.num_emails == self.max_emails:
+            
+            self.num_emails = 0
+            if self.host:
+                self.host.quit()
+                self.host = self.configure_host()
+
+    def send_message(self, *args, **kwargs):
+        """
+        Shortcut for send(msg). 
+
+        Takes same arguments as Message constructor.
+    
+        :versionadded: 0.3.5
+
+        """
+
+        self.send(Message(*args, **kwargs))
+
+

File pyramid_lamson/message.py

+
+from lamson.mail import MailResponse
+
+class BadHeaderError(Exception): pass
+
+
+class Attachment(object):
+
+    """
+    Encapsulates file attachment information.
+
+    :versionadded: 0.3.5
+
+    :param filename: filename of attachment
+    :param content_type: file mimetype
+    :param data: the raw file data
+    :param disposition: content-disposition (if any)
+ 
+    """
+
+    def __init__(self, filename=None, content_type=None, data=None,
+        disposition=None): 
+
+        self.filename = filename
+        self.content_type = content_type
+        self.data = data
+        self.disposition = disposition or 'attachment'
+
+
+class Message(object):
+    
+    """
+    Encapsulates an email message.
+
+    :param subject: email subject header
+    :param recipients: list of email addresses
+    :param body: plain text message
+    :param html: HTML message
+    :param sender: email sender address, or **DEFAULT_MAIL_SENDER** by default
+    :param cc: CC list
+    :param bcc: BCC list
+    :param attachments: list of Attachment instances
+    """
+
+    def __init__(self, subject, 
+                 recipients=None, 
+                 body=None, 
+                 html=None, 
+                 sender=None,
+                 cc=None,
+                 bcc=None,
+                 attachments=None):
+
+
+        self.subject = subject
+        self.sender = sender
+        self.body = body
+        self.html = html
+
+        self.cc = cc
+        self.bcc = bcc 
+
+        if recipients is None:
+            recipients = []
+
+        self.recipients = list(recipients)
+        
+        if attachments is None:
+            attachments = []
+
+        self.attachments = attachments
+
+    @property
+    def send_to(self):
+        return set(self.recipients) | set(self.bcc or ()) | set(self.cc or ())
+
+    def get_response(self):
+        """
+        Creates a Lamson MailResponse instance
+        """
+
+        response = MailResponse(Subject=self.subject, 
+                                To=self.recipients,
+                                From=self.sender,
+                                Body=self.body,
+                                Html=self.html)
+
+        if self.bcc:
+            response.base['Bcc'] = self.bcc
+
+        if self.cc:
+            response.base['Cc'] = self.cc
+
+        for attachment in self.attachments:
+
+            response.attach(attachment.filename, 
+                            attachment.content_type, 
+                            attachment.data, 
+                            attachment.disposition)
+
+        return response
+    
+    def is_bad_headers(self):
+        """
+        Checks for bad headers i.e. newlines in subject, sender or recipients.
+        """
+       
+        for val in [self.subject, self.sender] + self.recipients:
+            for c in '\r\n':
+                if c in val:
+                    return True
+        return False
+        
+    def send(self, connection):
+        """
+        Verifies and sends the message.
+        """
+        
+        assert self.recipients, "No recipients have been added"
+        assert self.body or self.html, "No body or HTML has been set"
+        assert self.sender, "No sender address has been set"
+
+        if self.is_bad_headers():
+            raise BadHeaderError
+
+        connection.send(self)
+
+    def add_recipient(self, recipient):
+        """
+        Adds another recipient to the message.
+        
+        :param recipient: email address of recipient.
+        """
+        
+        self.recipients.append(recipient)
+
+    def attach(self, 
+               filename=None, 
+               content_type=None, 
+               data=None,
+               disposition=None):
+        
+        """
+        Adds an attachment to the message.
+        
+        :param filename: filename of attachment
+        :param content_type: file mimetype
+        :param data: the raw file data
+        :param disposition: content-disposition (if any)
+        """
+
+        self.attachments.append(
+            Attachment(filename, content_type, data, disposition))
+

File pyramid_lamson/subscribers.py

+
+try:
+    
+    import blinker
+    signals = blinker.Namespace()
+    
+    email_dispatched = signals.signal("email-dispatched", doc="""
+Signal sent when an email is dispatched. This signal will also be sent
+in testing mode, even though the email will not actually be sent.
+    """)
+
+except ImportError:
+    email_dispatched = None
+

File pyramid_lamson/tests.py

+# -*- coding: utf-8 -*-
+
+
+import unittest
+import mailbox
+
+from email import encoders
+
+from nose.tools import assert_equal
+
+class TestCase(unittest.TestCase):
+
+    TESTING = True
+    DEFAULT_MAIL_SENDER = "support@mysite.com"
+
+    def setUp(self):
+
+        self.app = Flask(__name__)
+        self.app.config.from_object(self)
+        
+        assert self.app.testing
+
+        self.mail = Mail(self.app)
+
+        self.ctx = self.app.test_request_context()
+        self.ctx.push()
+
+    def tearDown(self):
+
+        self.ctx.pop()
+
+
+class TestMessage(TestCase):
+
+    def test_initialize(self):
+
+        msg = Message(subject="subject",
+                      recipients=["to@example.com"])
+
+
+        assert msg.sender == "support@mysite.com"
+        assert msg.recipients == ["to@example.com"]
+
+    def test_recipients_properly_initialized(self):
+
+        msg = Message(subject="subject")
+
+        assert msg.recipients == []
+
+        msg2 = Message(subject="subject")
+        msg2.add_recipient("somebody@here.com")
+
+        assert len(msg.recipients) == 0
+
+        msg3 = Message(subject="subject")
+        msg3.add_recipient("somebody@here.com")
+
+        assert len(msg.recipients) == 0
+
+    def test_add_recipient(self):
+
+        msg = Message("testing")
+        msg.add_recipient("to@example.com")
+
+        assert msg.recipients == ["to@example.com"]
+
+    
+    def test_sender_as_tuple(self):
+
+        msg = Message(subject="testing",
+                      sender=("tester", "tester@example.com"))
+
+    
+    def test_send_without_sender(self):
+
+        del self.app.config['DEFAULT_MAIL_SENDER']
+
+        msg = Message(subject="testing",
+                      recipients=["to@example.com"],
+                      body="testing")
+
+        self.assertRaises(AssertionError, self.mail.send, msg)
+
+    def test_send_without_recipients(self):
+
+        msg = Message(subject="testing",
+                      recipients=[],
+                      body="testing")
+
+        self.assertRaises(AssertionError, self.mail.send, msg)
+
+    def test_send_without_body(self):
+
+        msg = Message(subject="testing",
+                      recipients=["to@example.com"])
+
+        self.assertRaises(AssertionError, self.mail.send, msg)
+
+        msg.html = "<b>test</b>"
+
+        self.mail.send(msg)
+
+    def test_normal_send(self):
+        """
+        This will not actually send a message unless the mail server
+        is set up. The error will be logged but test should still 
+        pass.
+        """
+
+        self.app.config['TESTING'] = False
+        self.mail.init_app(self.app)
+
+        with self.mail.record_messages() as outbox:
+
+            msg = Message(subject="testing",
+                          recipients=["to@example.com"],
+                          body="testing")
+
+            self.mail.send(msg)
+            
+            assert len(outbox) == 1
+        
+        self.app.config['TESTING'] = True
+
+    def test_bcc(self):
+
+        msg = Message(subject="testing",
+                      recipients=["to@example.com"],
+                      body="testing",
+                      bcc=["tosomeoneelse@example.com"])
+
+        response = msg.get_response()
+        assert "Bcc: tosomeoneelse@example.com" in str(response)
+
+    def test_cc(self):
+
+        msg = Message(subject="testing",
+                      recipients=["to@example.com"],
+                      body="testing",
+                      cc=["tosomeoneelse@example.com"])
+
+        response = msg.get_response()
+        assert "Cc: tosomeoneelse@example.com" in str(response)
+
+    def test_attach(self):
+
+        msg = Message(subject="testing",
+                      recipients=["to@example.com"],
+                      body="testing")
+        
+        msg.attach(data="this is a test", 
+                   content_type="text/plain")
+        
+
+        a = msg.attachments[0]
+        
+        assert a.filename is None
+        assert a.disposition == 'attachment'
+        assert a.content_type == "text/plain"
+        assert a.data == "this is a test"
+ 
+
+    def test_bad_header_subject(self):
+
+        msg = Message(subject="testing\n\r",
+                      sender="from@example.com",
+                      body="testing",
+                      recipients=["to@example.com"])
+
+        self.assertRaises(BadHeaderError, self.mail.send, msg)
+
+    def test_bad_header_sender(self):
+
+        msg = Message(subject="testing",
+                      sender="from@example.com\n\r",
+                      recipients=["to@example.com"],
+                      body="testing")
+
+        self.assertRaises(BadHeaderError, self.mail.send, msg)
+
+    def test_bad_header_recipient(self):
+
+        msg = Message(subject="testing",
+                      sender="from@example.com",
+                      recipients=[
+                          "to@example.com",
+                          "to\r\n@example.com"],
+                      body="testing")
+
+        self.assertRaises(BadHeaderError, self.mail.send, msg)
+
+
+class TestMail(TestCase):
+
+    def test_send(self):
+
+        with self.mail.record_messages() as outbox:
+            msg = Message(subject="testing",
+                          recipients=["tester@example.com"],
+                          body="test")
+
+            self.mail.send(msg)
+
+            assert len(outbox) == 1 
+
+    def test_send_message(self):
+
+        with self.mail.record_messages() as outbox:
+            self.mail.send_message(subject="testing",
+                                   recipients=["tester@example.com"],
+                                   body="test")
+
+            assert len(outbox) == 1
+
+            msg = outbox[0]
+
+            assert msg.subject == "testing"
+            assert msg.recipients == ["tester@example.com"]
+            assert msg.body == "test"
+
+
+class TestConnection(TestCase):
+
+    def test_send_message(self):
+
+        with self.mail.record_messages() as outbox:
+            with self.mail.connect() as conn:
+                conn.send_message(subject="testing",
+                                  recipients=["to@example.com"],
+                                  body="testing")
+
+            assert len(outbox) == 1
+
+    def test_send_single(self):
+
+        with self.mail.record_messages() as outbox:
+            with self.mail.connect() as conn:
+                msg = Message(subject="testing",
+                              recipients=["to@example.com"],
+                              body="testing")
+
+                conn.send(msg)
+
+            assert len(outbox) == 1
+
+    def test_send_many(self):
+        
+        messages = []
+
+        with self.mail.record_messages() as outbox:
+            with self.mail.connect() as conn:
+                for i in xrange(100):
+                    msg = Message(subject="testing",
+                                  recipients=["to@example.com"],
+                                  body="testing")
+        
+                    conn.send(msg)
+
+            assert len(outbox) == 100
+
+    def test_max_emails(self):
+        
+        messages = []
+
+        with self.mail.record_messages() as outbox:
+            with self.mail.connect(max_emails=10) as conn:
+                for i in xrange(100):
+                    msg = Message(subject="testing",
+                                  recipients=["to@example.com"],
+                                  body="testing")
+        
+                    conn.send(msg)
+
+                    print conn.num_emails
+                    if i % 10 == 0:
+                        assert conn.num_emails == 1
+
+            assert len(outbox) == 100
+