Commits

Anonymous committed 78c0306

CSRF system now keeps a list of the most recently created tokens for
different URLs. Adler32 is used for URL hashing.

Comments (0)

Files changed (8)

solace/application.py

 from fnmatch import fnmatch
 from functools import update_wrapper
 from simplejson import dumps
-try:
-    from hashlib import sha1
-except ImportError:
-    from sha import new as sha1
 
 from babel import UnknownLocaleError, Locale
 from werkzeug import Request as RequestBase, Response, cached_property, \
             self._pulled_flash_messages = msgs
         return msgs
 
-    def get_csrf_token(self, force_generation=False):
-        """Return the CSRF token."""
-        token = self.session.get('csrf_token')
-        if token is None or force_generation:
-            token = sha1(os.urandom(12)).hexdigest()[:24]
-            self.session['csrf_token'] = token
-        return token
-
-    def clear_csrf_token(self):
-        """Clears the CSRF token."""
-        self.session.pop('csrf_token', None)
-
 
 def get_view(endpoint):
     """Returns the view for the endpoint."""

solace/static/solace.js

            the form another time, we send another HTTP request to
            get the updated CSRF token. */
         var token_field = $('input[name="_csrf_token"]');
-        if (token_field.length)
-          Solace.request('_get_csrf_token', null, 'GET', function(response) {
-            token_field.val(response.token);
+        if (token_field.length) {
+          var url = $(token_field).parent().parent().attr('action');
+          Solace.request('_update_csrf_token', {url: url}, 'POST', function(data) {
+            token_field.val(data.token);
           });
+        }
         return Solace._standardRemoteCallback(callback)(data);
       }
     });

solace/tests/__init__.py

 from werkzeug import Client, Response, cached_property
 
 
+BASE_URL = 'http://localhost/'
+
+
 # ignore lxml and html5lib warnings
 warnings.filterwarnings('ignore', message='lxml does not preserve')
 warnings.filterwarnings('ignore', message=r'object\.__init__.*?takes no parameters')
         settings.MAIL_LOG_FILE.seek(pos)
         return [message_from_string(x) for x in mails if x]
 
+    def normalize_local_path(self, path):
+        if path in ('', '.'):
+            path = path
+        elif path.startswith(BASE_URL):
+            path = path[len(BASE_URL) - 1:]
+        return path
+
     def submit_form(self, path, data, follow_redirects=False):
         response = self.client.get(path)
         try:
             raise RuntimeError('no form on page')
         csrf_token = form.xpath('//input[@name="_csrf_token"]')[0]
         data['_csrf_token'] = csrf_token.attrib['value']
-        action = form.attrib['action']
-        if action in ('', '.'):
-            action = path
+        action = self.normalize_local_path(form.attrib['action'])
         return self.client.post(action, method=form.attrib['method'].upper(),
                                 data=data, follow_redirects=follow_redirects)
 
     Rule('/_submit_comment/<post>') > 'kb.submit_comment',
     Rule('/_get_tags/<lang_code>') > 'kb.get_tags',
     Rule('/_no_javascript') > 'core.no_javascript',
-    Rule('/_get_csrf_token') > 'core.get_csrf_token',
+    Rule('/_update_csrf_token') > 'core.update_csrf_token',
     Rule('/_i18n/<lang>.js') > 'core.get_translations',
 
     # the API (version 1.0)

solace/utils/csrf.py

+# -*- coding: utf-8 -*-
+"""
+    solace.utils.csrf
+    ~~~~~~~~~~~~~~~~~
+
+    Implements helpers for the CSRF protection the form use.
+
+    :copyright: (c) 2009 by Plurk Inc., see AUTHORS for more details.
+    :license: BSD, see LICENSE for more details.
+"""
+import os
+from zlib import adler32
+try:
+    from hashlib import sha1
+except ImportError:
+    from sha import new as sha1
+
+
+#: the maximum number of csrf tokens kept in the session.  After that, the
+#: oldest item is deleted
+MAX_CSRF_TOKENS = 4
+
+
+def csrf_url_hash(url):
+    """A hash for a URL for the CSRF system."""
+    if isinstance(url, unicode):
+        url = url.encode('utf-8')
+    return int(adler32(url) & 0xffffffff)
+
+
+def get_csrf_token(request, url, force_update=False):
+    """Return a CSRF token."""
+    url_hash = csrf_url_hash(url)
+    tokens = request.session.setdefault('csrf_tokens', [])
+    token = None
+
+    if not force_update:
+        for stored_hash, stored_token in tokens:
+            if stored_hash == url_hash:
+                token = stored_token
+                break
+    if token is None:
+        if len(tokens) >= MAX_CSRF_TOKENS:
+            tokens.pop(0)
+
+        token = sha1(os.urandom(12)).digest()[:10]
+        tokens.append((url_hash, token))
+        request.session.modified = True
+
+    return token.encode('base64').strip('= \n').decode('ascii')
+
+def invalidate_csrf_token(request, url):
+    """Clears the CSRF token for the given URL."""
+    url_hash = csrf_url_hash(url)
+    tokens = request.session.get('csrf_tokens', None)
+    if not tokens:
+        return
+    request.session['csrf_tokens'] = [(h, t) for h, t in tokens
+                                      if h != url_hash]

solace/utils/formatting.py

 # -*- coding: utf-8 -*-
 """
-    solace.formatting
-    ~~~~~~~~~~~~~~~~~
+    solace.utils.formatting
+    ~~~~~~~~~~~~~~~~~~~~~~~
 
     Implements the formatting.  Uses creoleparser internally.
 

solace/utils/forms.py

 from itertools import chain
 from functools import update_wrapper
 from threading import Lock
+from urlparse import urljoin
 
 from werkzeug import html, escape, MultiDict, redirect, cached_property
 
 from solace.i18n import _, ngettext, lazy_gettext
 from solace.utils.support import OrderedDict
 from solace.utils.recaptcha import get_recaptcha_html, validate_recaptcha
+from solace.utils.csrf import get_csrf_token, invalidate_csrf_token
 
 
 _last_position_hint = -1
 
         if with_errors:
             body = self.errors() + body
-        return html.form(body, action=self._field.form.action or u'',
+        return html.form(body, action=self._field.form.action,
                          method=method, **attrs)
 
     def __call__(self, *args, **attrs):
         self.initial = initial
         self.action = action
         self.invalid_redirect_targets = set()
+
         if self.request is not None:
             self.csrf_protected = True
+            if self.action in (None, '', '.'):
+                self.action = request.url
+            else:
+                self.action = urljoin(request.url, self.action)
 
         self._root_field = _bind(self.__class__._root_field, self, {})
         self.reset()
         if not self.csrf_protected:
             raise AttributeError('no csrf token because form not '
                                  'csrf protected')
-        return self.request.get_csrf_token()
+        return get_csrf_token(self.request, self.action)
 
     @property
     def is_valid(self):
         # every time we validate, we invalidate the csrf token if there
         # was one.
         if self.csrf_protected:
-            self.request.clear_csrf_token()
+            invalidate_csrf_token(self.request, self.action)
             self.request.session.pop('csrf_token', None)
 
         if errors:

solace/views/core.py

     :license: BSD, see LICENSE for more details.
 """
 from werkzeug import redirect, Response
-from werkzeug.exceptions import NotFound
+from werkzeug.exceptions import NotFound, MethodNotAllowed
 from babel import Locale, UnknownLocaleError
 
 from solace.application import url_for, json_response
 from solace.models import User
 from solace.database import session
 from solace.utils.mail import send_email
+from solace.utils.csrf import get_csrf_token
 
 
 def language_redirect(request):
     return render_template('core/no_javascript.html')
 
 
-def get_csrf_token(request):
+def update_csrf_token(request):
     """Updates the CSRF token.  Required for forms that are submitted multiple
     times using JavaScript.  This updates the token.
     """
     if not request.is_xhr:
         raise BadRequest()
-    return json_response(token=request.get_csrf_token(force_generation=True))
+    elif not request.method == 'POST':
+        raise MethodNotAllowed(valid=['POST'])
+    token = get_csrf_token(request, request.form['url'], force_update=True)
+    return json_response(token=token)
 
 
 def get_translations(request, lang):