1. Fabio Michelini
  2. django-swingcms

Source

django-swingcms / swingcms / mailserver / code.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import settings, time, threading

from swingmail.swingmailclient import Client

from django.core.mail.message import EmailMessage, sanitize_address
from django.core.mail.backends.base import BaseEmailBackend
from django.utils.translation import ugettext as _

from cms.code import search_content
from cms.models import ds



__all__ = ('EmailBackend',

           'request_email_subscribe_confirm',
           'request_email_unsubscribe_confirm',
           'request_email_user_confirm',

           'get_link_to_privacy_policy')


class EmailBackend(BaseEmailBackend):
    """
    A wrapper around swingmail client.
    """
    def __init__(self, host=None, port=None, username=None, password=None, use_tls=None,
                 send_block_size=None, send_pause=None, fail_silently=False, **kwargs):
        super(EmailBackend, self).__init__(fail_silently=fail_silently)
        self.connection = Client(settings.DOMAIN)
        self._lock = threading.RLock()

    def open(self):
        """
        Connection to swingmail server is opened only when sending operations and
        is closed immediately after.
        """
        return False

    def close(self):
        """
        Idem.
        """
        return None

    def send_messages(self, email_messages):
        """
        Sends one or more EmailMessage objects and returns the number of email
        messages sent.
        """
        if not email_messages:
            return
        self._lock.acquire()
        try:
            num_sent = 0
            for message in email_messages:
                sent = self._send(message)
                if sent:
                    num_sent += 1
        finally:
            self._lock.release()
        return num_sent

    def _send(self, email_message):
        """
        A helper method that does the actual sending.
        """
        if not email_message.recipients():
            return False
        from_email = sanitize_address(email_message.from_email, email_message.encoding)
        recipients = [sanitize_address(addr, email_message.encoding)
                      for addr in email_message.recipients()]
        try:
            self.connection.sendmail(from_email,
                                     recipients,
                                     email_message.message().as_string(),
                                     host=settings.EMAIL_HOST,
                                     port=settings.EMAIL_PORT,
                                     username=settings.EMAIL_HOST_USER,
                                     password=settings.EMAIL_HOST_PASSWORD,
                                     use_tls=settings.EMAIL_USE_TLS,
                                     send_block_size=settings.SEND_BLOCK_SIZE,
                                     send_pause=settings.SEND_PAUSE)
        except:
            if not self.fail_silently:
                raise
            return False
        return True


def _request_email_confirm(raw_email, subject, message):

    # build message
    email_message = EmailMessage(subject=subject,
                                 body=message,
                                 from_email=ds.DEFAULT_FROM_EMAIL,
                                 to=(raw_email,),
                                 bcc=None,
                                 cc=None,
                                 connection=None,
                                 attachments=None,
                                 headers={'sender':ds.DEFAULT_SENDER_EMAIL})
    # send email message
    email_message.send()


def request_email_subscribe_confirm(raw_email, token, categories=None):

    subject = u"confirm subscribe to %s" % settings.DOMAIN

    # test user identity before changing assigned categories
    if categories:
        message = _(settings.SUBSCRIBE_CONFIRM_WITH_CATEGORIES) % {'domain':settings.DOMAIN,
                                                                   'slug_categs':'-'.join((c.name for c in categories)),
                                                                   'raw_email':raw_email,
                                                                   'token':token}
    # test email existence and user identity
    else:
        message =  _(settings.SUBSCRIBE_CONFIRM) % {'domain':settings.DOMAIN,
                                                    'raw_email':raw_email,
                                                    'token':token}
    _request_email_confirm(raw_email, subject, message)


def request_email_unsubscribe_confirm(raw_email, token, categories=None):

    subject = u'confirm unsubscribe from %s' % (settings.DOMAIN)

    # test user identity before changing assigned categories
    if categories:
        message = _(settings.UNSUBSCRIBE_CONFIRM_WITH_CATEGORIES) % {'domain':settings.DOMAIN,
                                                                     'slug_categs':'-'.join((c.name for c in categories)),
                                                                     'raw_email':raw_email,
                                                                     'token':token}

    # test user identity before delete email
    else:
        message = _(settings.UNSUBSCRIBE_CONFIRM) % {'domain':settings.DOMAIN,
                                                     'raw_email':raw_email,
                                                     'token':token}
    _request_email_confirm(raw_email, subject, message)


def request_email_user_confirm(user, raw_email, token):

    subject = 'confirm %s email for %s' % (settings.DOMAIN, user.username)
    message = _(settings.USER_CONFIRM) % {'domain':settings.DOMAIN,
                                          'username':user.username,
                                          'token':token}
    _request_email_confirm(raw_email, subject, message)


def get_link_to_privacy_policy(language):

    privacy_policy = search_content(model_names=("article",),
                                    name="privacy policy",
                                    language=language,
                                    check_alternative=True)
    return privacy_policy and "/view/article/%s" % privacy_policy.id or None