Commits

Anonymous committed 23b7846

0.12.3dev: Generate the notification message using `'\n'` instead of `CRLF`, as the conversion is done in the respective `IEmailSender` implementations.

Also, factored out EOL-fixing into a separate function, with unit tests.

Closes #10377.

Comments (0)

Files changed (4)

 from trac import __version__
 from trac.config import BoolOption, ExtensionOption, IntOption, Option
 from trac.core import *
-from trac.util.text import CRLF
+from trac.util.text import CRLF, fix_eol
 from trac.util.translation import _, deactivate, reactivate
 
 MAXHEADERLEN = 76
         '(?:[a-zA-Z0-9_-]+\.)+' # labels (but also allow '_')
         '[a-zA-Z](?:[-a-zA-Z\d]*[a-zA-Z\d])?' # TLD
         )
-eol_re = re.compile('\r?\n')
 
 
 class IEmailSender(Interface):
     
     def send(self, from_addr, recipients, message):
         # Ensure the message complies with RFC2822: use CRLF line endings
-        message = CRLF.join(eol_re.split(message))
+        message = fix_eol(message, CRLF)
         
         self.log.info("Sending notification through SMTP at %s:%d to %s"
                       % (self.smtp_server, self.smtp_port, recipients))
 
     def send(self, from_addr, recipients, message):
         # Use native line endings in message
-        message = os.linesep.join(eol_re.split(message))
+        message = fix_eol(message, os.linesep)
 
         self.log.info("Sending notification through sendmail at %s to %s"
                       % (self.sendmail_path, recipients))

trac/ticket/notification.py

 from trac.ticket.api import TicketSystem
 from trac.util import md5
 from trac.util.datefmt import to_utimestamp
-from trac.util.text import CRLF, wrap, obfuscate_email_address, text_width
+from trac.util.text import obfuscate_email_address, text_width, wrap
 from trac.util.translation import deactivate, reactivate
 
 class TicketNotificationSystem(Component):
                 change_data.update({
                     'author': obfuscate_email_address(change['author']),
                     'comment': wrap(change['comment'], self.COLS, ' ', ' ',
-                                    CRLF, self.ambiwidth)
+                                    '\n', self.ambiwidth)
                     })
                 link += '#comment:%s' % str(change.get('cnum', ''))
                 for field, values in change['fields'].iteritems():
                     new = values['new']
                     newv = ''
                     if field == 'description':
-                        new_descr = wrap(new, self.COLS, ' ', ' ', CRLF,
+                        new_descr = wrap(new, self.COLS, ' ', ' ', '\n',
                                          self.ambiwidth)
-                        old_descr = wrap(old, self.COLS, '> ', '> ', CRLF,
+                        old_descr = wrap(old, self.COLS, '> ', '> ', '\n',
                                          self.ambiwidth)
-                        old_descr = old_descr.replace(2 * CRLF, CRLF + '>' + \
-                                                      CRLF)
-                        cdescr = CRLF
-                        cdescr += 'Old description:' + 2 * CRLF + old_descr + \
-                                  2 * CRLF
-                        cdescr += 'New description:' + 2 * CRLF + new_descr + \
-                                  CRLF
+                        old_descr = old_descr.replace(2 * '\n', '\n' + '>' + \
+                                                      '\n')
+                        cdescr = '\n'
+                        cdescr += 'Old description:' + 2 * '\n' + old_descr + \
+                                  2 * '\n'
+                        cdescr += 'New description:' + 2 * '\n' + new_descr + \
+                                  '\n'
                         changes_descr = cdescr
                     elif field == 'summary':
                         summary = "%s (was: %s)" % (new, old)
                         if delcc:
                             chgcc += wrap(" * cc: %s (removed)" %
                                           ', '.join(delcc), 
-                                          self.COLS, ' ', ' ', CRLF,
-                                          self.ambiwidth) + CRLF
+                                          self.COLS, ' ', ' ', '\n',
+                                          self.ambiwidth) + '\n'
                         if addcc:
                             chgcc += wrap(" * cc: %s (added)" %
                                           ', '.join(addcc), 
-                                          self.COLS, ' ', ' ', CRLF,
-                                          self.ambiwidth) + CRLF
+                                          self.COLS, ' ', ' ', '\n',
+                                          self.ambiwidth) + '\n'
                         if chgcc:
                             changes_body += chgcc
                         self.prev_cc += old and self.parse_cc(old) or []
                         if len(old + new) + length > self.COLS:
                             length = 5
                             if len(old) + length > self.COLS:
-                                spacer_old = CRLF
+                                spacer_old = '\n'
                             if len(new) + length > self.COLS:
-                                spacer_new = CRLF
+                                spacer_new = '\n'
                         chg = '* %s: %s%s%s=>%s%s' % (field, spacer_old, old,
                                                       spacer_old, spacer_new,
                                                       new)
-                        chg = chg.replace(CRLF, CRLF + length * ' ')
-                        chg = wrap(chg, self.COLS, '', length * ' ', CRLF,
+                        chg = chg.replace('\n', '\n' + length * ' ')
+                        chg = wrap(chg, self.COLS, '', length * ' ', '\n',
                                    self.ambiwidth)
-                        changes_body += ' %s%s' % (chg, CRLF)
+                        changes_body += ' %s%s' % (chg, '\n')
                     if newv:
                         change_data[field] = {'oldvalue': old, 'newvalue': new}
         
         ticket_values['id'] = ticket.id
         ticket_values['description'] = wrap(
             ticket_values.get('description', ''), self.COLS,
-            initial_indent=' ', subsequent_indent=' ', linesep=CRLF,
+            initial_indent=' ', subsequent_indent=' ', linesep='\n',
             ambiwidth=self.ambiwidth)
         ticket_values['new'] = self.newticket
         ticket_values['link'] = link
                 width_r = min((self.COLS - 1) * 2 / 3, width_r)         
                 width_l = self.COLS - width_r - 1
         sep = width_l * '-' + '+' + width_r * '-'
-        txt = sep + CRLF
+        txt = sep + '\n'
         cell_tmp = [u'', u'']
         big = []
         i = 0
             if fname in ['owner', 'reporter']:
                 fval = obfuscate_email_address(fval)
             if f['type'] == 'textarea' or '\n' in unicode(fval):
-                big.append((f['label'], CRLF.join(fval.splitlines())))
+                big.append((f['label'], '\n'.join(fval.splitlines())))
             else:
                 # Note: f['label'] is a Babel's LazyObject, make sure its
                 # __str__ method won't be called.
                                       (width[2 * idx]
                                        - self.get_text_width(f['label'])
                                        + 2 * idx) * ' ',
-                                      2 * ' ', CRLF, self.ambiwidth)
-                cell_tmp[idx] += CRLF
+                                      2 * ' ', '\n', self.ambiwidth)
+                cell_tmp[idx] += '\n'
                 i += 1
         cell_l = cell_tmp[0].splitlines()
         cell_r = cell_tmp[1].splitlines()
                 cell_r.append('')
             fmt_width = width_l - self.get_text_width(cell_l[i]) \
                         + len(cell_l[i])
-            txt += u'%-*s|%s%s' % (fmt_width, cell_l[i], cell_r[i], CRLF)
+            txt += u'%-*s|%s%s' % (fmt_width, cell_l[i], cell_r[i], '\n')
         if big:
             txt += sep
             for name, value in big:
-                txt += CRLF.join(['', name + ':', value, '', ''])
+                txt += '\n'.join(['', name + ':', value, '', ''])
         txt += sep
         return txt
 
 
     def format_hdr(self):
         return '#%s: %s' % (self.ticket.id, wrap(self.ticket['summary'],
-                                                 self.COLS, linesep=CRLF,
+                                                 self.COLS, linesep='\n',
                                                  ambiwidth=self.ambiwidth))
 
     def format_subj(self, summary):

trac/util/tests/text.py

 import unittest
 from StringIO import StringIO
 
-from trac.util.text import empty, expandtabs, javascript_quote, \
+from trac.util.text import empty, expandtabs, fix_eol, javascript_quote, \
                            normalize_whitespace, to_unicode, \
                            text_width, print_table, unicode_quote, \
                            unicode_quote_plus, unicode_unquote, \
                                        ambiwidth=2))
 
 
+class FixEolTestCase(unittest.TestCase):
+    def test_mixed_eol(self):
+        text = u'\nLine 2\rLine 3\r\nLine 4\n\r'
+        self.assertEqual(u'\nLine 2\nLine 3\nLine 4\n\n',
+                         fix_eol(text, '\n'))
+        self.assertEqual(u'\rLine 2\rLine 3\rLine 4\r\r',
+                         fix_eol(text, '\r'))
+        self.assertEqual(u'\r\nLine 2\r\nLine 3\r\nLine 4\r\n\r\n',
+                         fix_eol(text, '\r\n'))
+
+
 def suite():
     suite = unittest.TestSuite()
     suite.addTest(unittest.makeSuite(ToUnicodeTestCase, 'test'))
     suite.addTest(unittest.makeSuite(TextWidthTestCase, 'test'))
     suite.addTest(unittest.makeSuite(PrintTableTestCase, 'test'))
     suite.addTest(unittest.makeSuite(WrapTestCase, 'test'))
+    suite.addTest(unittest.makeSuite(FixEolTestCase, 'test'))
     return suite
 
 if __name__ == '__main__':
             return ' '.join([to_unicode(arg) for arg in text.args])
     return unicode(text)
 
+
 def exception_to_unicode(e, traceback=False):
     message = '%s: %s' % (e.__class__.__name__, to_unicode(e))
     if traceback:
         message = '\n%s\n%s' % (to_unicode('\n'.join(traceback_only)), message)
     return message
 
+
 _js_quote = {'\\': '\\\\', '"': '\\"', '\b': '\\b', '\f': '\\f',
              '\n': '\\n', '\r': '\\r', '\t': '\\t', "'": "\\'"}
 for i in range(0x20) + [ord(c) for c in '&<>']:
     _js_quote.setdefault(chr(i), '\\u%04x' % i)
 _js_quote_re = re.compile(r'[\x00-\x1f\\"\b\f\n\r\t\'&<>]')
 
+
 def javascript_quote(text):
     """Quote strings for inclusion in javascript"""
     if not text:
         return _js_quote[match.group(0)]
     return _js_quote_re.sub(replace, text)
 
+
 def unicode_quote(value, safe='/'):
     """A unicode aware version of `urllib.quote`
 
     return quote(isinstance(value, unicode) and value.encode('utf-8') or
                  str(value), safe)
 
+
 def unicode_quote_plus(value, safe=''):
     """A unicode aware version of `urllib.quote_plus`.
 
     return quote_plus(isinstance(value, unicode) and value.encode('utf-8') or
                       str(value), safe)
 
+
 def unicode_unquote(value):
     """A unicode aware version of `urllib.unquote`.
     
     """
     return unquote(value).decode('utf-8')
 
+
 def unicode_urlencode(params, safe=''):
     """A unicode aware version of `urllib.urlencode`.
     
                      unicode_quote_plus(v, safe))
     return '&'.join(l)
 
+
 def to_utf8(text, charset='iso-8859-15'):
     """Convert a string to UTF-8, assuming the encoding is either UTF-8, ISO
     Latin-1, or as specified by the optional `charset` parameter.
     def __repr__(self):
         return '*******'
 
+
 def console_print(out, *args, **kwargs):
     cons_charset = getattr(out, 'encoding', None)
     # Windows returns 'cp0' to indicate no encoding
     if kwargs.get('newline', True):
         out.write('\n')
 
+
 def printout(*args, **kwargs):
     console_print(sys.stdout, *args, **kwargs)
 
+
 def printerr(*args, **kwargs):
     console_print(sys.stderr, *args, **kwargs)
 
+
 def raw_input(prompt):
     printout(prompt, newline=False)
     return to_unicode(__builtin__.raw_input(), sys.stdin.encoding)
     if re.match(r'zh|ja|kr', os.environ.get('LANG') or '', re.IGNORECASE):
         _default_ambiwidth = 2
 
+
 def print_table(data, headers=None, sep='  ', out=None, ambiwidth=None):
     """Print `data` as a table in the terminal.
 
 
     out.write('\n')
 
+
 def shorten_line(text, maxlen=75):
     if len(text or '') < maxlen:
         return text
         cut = maxlen
     return text[:cut] + ' ...'
 
+
 class UnicodeTextWrapper(textwrap.TextWrapper):
     breakable_char_ranges = [
         (0x1100, 0x11FF),   # Hangul Jamo
 
         return lines
 
+
 def wrap(t, cols=75, initial_indent='', subsequent_indent='',
          linesep=os.linesep, ambiwidth=1):
     """Wraps the single paragraph in `t`, which contains unicode characters.
         wrappedLines += wrapper.wrap(line.rstrip()) or ['']
     return linesep.join(wrappedLines)
 
+
 def obfuscate_email_address(address):
     if address:
         at = address.find('@')
                    (address[-1] == '>' and '>' or '')
     return address
 
+
 def breakable_path(path):
     """Make a path breakable after path separators, and conversely, avoid
     breaking at spaces.
     return prefix + path.replace('/', u'/\u200b').replace('\\', u'\\\u200b') \
                         .replace(' ', u'\u00a0')
 
+
 def normalize_whitespace(text, to_space=u'\u00a0', remove=u'\u200b'):
     """Normalize whitespace in a string, by replacing special spaces by normal
     spaces and removing zero-width spaces."""
 
     return (format + ' %s') % (size, units[i - 1])
 
+
 def expandtabs(s, tabstop=8, ignoring=None):
     if '\t' not in s:
         return s
         outlines.append(''.join(s))
     return '\n'.join(outlines)
 
+
+def fix_eol(text, eol):
+    """Fix end-of-lines in a text."""
+    lines = text.splitlines()
+    lines.append('')
+    return eol.join(lines)