Anonymous avatar Anonymous committed e74f7d9

Added exchange token system. GET requests that modify state now have a
token in the URL that is unique for a user. This allows us to guard
against CSRF attacks in most cases without having to invalidate a huge
number of CSRF tokens for each request. Also added user unbanning.

Comments (0)

Files changed (15)

solace/application.py

 from babel import UnknownLocaleError, Locale
 from werkzeug import Request as RequestBase, Response, cached_property, \
      import_string, redirect, SharedDataMiddleware, url_quote
-from werkzeug.exceptions import HTTPException, NotFound, Forbidden
+from werkzeug.exceptions import HTTPException, NotFound, BadRequest, Forbidden
 from werkzeug.routing import BuildError, RequestRedirect
 from werkzeug.contrib.securecookie import SecureCookie
 
 from solace.utils.ctxlocal import local, LocalProperty
 
 
+# already resolved and imported views
+_resolved_views = {}
+
+
 class Request(RequestBase):
     """The request class."""
 
             if self.match_exception is not None:
                 raise self.match_exception
             rv = self.view(self, **self.view_arguments)
+        except BadRequest, e:
+            rv = get_view('core.bad_request')(self)
+        except Forbidden, e:
+            rv = get_view('core.forbidden')(self)
         except NotFound, e:
             rv = get_view('core.not_found')(self)
         if isinstance(rv, basestring):
 
 
 def get_view(endpoint):
-    """Returns the view for the endpoint."""
+    """Returns the view for the endpoint.  It will cache both positive and
+    negative hits, so never pass untrusted values to it.  If a view does
+    not exist, `None` is returned.
+    """
+    view = _resolved_views.get(endpoint)
+    if view is not None:
+        return view
     try:
-        return import_string('solace.views.' + endpoint)
+        view = import_string('solace.views.' + endpoint)
     except (ImportError, AttributeError):
-        try:
-            return import_string(endpoint)
-        except (ImportError, AttributeError):
-            raise RuntimeError('could not locate view for %r' % endpoint)
+        view = import_string(endpoint, silent=True)
+    _resolved_views[endpoint] = view
+    return view
 
 
 def json_response(message=None, html=None, error=False, login_could_fix=False,
                                             force_external=external)
         except BuildError:
             continue
+        view = get_view(endpoint)
+        if is_exchange_token_protected(view):
+            xt = get_exchange_token(request)
+            url = '%s%s_xt=%s' % (url, '?' in url and '&' or '?', xt)
         if anchor is not None:
             url += '#' + url_quote(anchor)
         return url
      before_request_dispatch, after_request_dispatch, \
      after_request_shutdown, before_response_sent
 from solace.utils.remoting import remote_export_primitive
+from solace.utils.csrf import get_exchange_token, is_exchange_token_protected
 
 # remember to save the session
 before_response_sent.connect(save_session)

solace/static/layout.css

     padding: 0;
     margin: 0;
 }
+
+div.admin_panel ul.userlist span.action {
+    float: left;
+    display: block;
+    margin: 4px 0 0 20px;
+}

solace/templates/admin/bans.html

   <ul class="userlist">
   {%- for user in banned_users %}
     <li>{{ render_user(user, avatar_size=26) }}
+      <span class="action">[<a href="{{ url_for('admin.unban', user=user.username)
+        }}">{{ _('lift the ban') }}</a>]</span>
   {%- endfor %}
   </ul>
   {% endif %}

solace/templates/core/bad_request.html

+{% extends 'layout.html' %}
+{% set page_title = _('Bad Request') %}
+{% block body %}
+  <h1>{{ _('Bad Request') }}</h1>
+  <p>{{ _('Your client made a request the server could not understand.') }}
+{% endblock %}

solace/templates/core/forbidden.html

+{% extends 'layout.html' %}
+{% set page_title = _('Forbidden') %}
+{% block body %}
+  <h1>{{ _('Forbidden') }}</h1>
+  <p>{{ _('You don\'t have permission to access the requested resource.') }}
+{% endblock %}

solace/templates/kb/_boxes.html

   {%- macro button(direction, val, text) -%}
     <p class="{{ direction }}vote">
       {%- if user and user['has_' ~ direction ~ 'voted'](post) -%}
-        <a href="{{ url_for('kb.vote', post=post.id, val=0) }}" class="active">
+        <a href="{{ url_for('kb.vote', post=post.id, val=0)|e }}" class="active">
       {%- else -%}
-        <a href="{{ url_for('kb.vote', post=post.id, val=val) }}">
+        <a href="{{ url_for('kb.vote', post=post.id, val=val)|e }}">
       {%- endif -%}
       <span>{{ text }}</span></a>
   {%- endmacro %}

solace/templates/mails/user_unbanned.txt

+{% extends 'mails/layout.txt' -%}
+{% block body %}{% trans user=user.display_name,
+                         site=settings.WEBSITE_TITLE -%}
+Hi {{ user }}!
+
+Your ban on {{ site }} was lifted.
+
+In order to login again you have to follow the following
+link and pick a new password:
+
+{{ reset_url }}
+
+See you soon on {{ site }}
+{%- endtrans %}{% endblock %}

solace/tests/__init__.py

 import tempfile
 import unittest
 import warnings
+from simplejson import loads
 from email import message_from_string
 from lxml import etree
 from html5lib import HTMLParser
 from html5lib.treebuilders import getTreeBuilder
 
-from werkzeug import Client, Response, cached_property
+from werkzeug import Client, Response, cached_property, unquote_header_value
+from werkzeug.contrib.securecookie import SecureCookie
 
 
 BASE_URL = 'http://localhost/'
         database.refresh_engine()
         database.init()
         self.client = Client(application, TestResponse)
+        self.is_logged_in = False
+
+    def get_session(self):
+        from solace import settings
+        for cookie in self.client.cookie_jar:
+            if cookie.name == settings.COOKIE_NAME:
+                value = unquote_header_value(cookie.value)
+                return SecureCookie.unserialize(value, settings.SECRET_KEY)
+
+    def get_exchange_token(self):
+        return loads(self.client.get('/_request_exchange_token').data)['token']
 
     def get_mails(self):
         from solace import settings
                                 data=data, follow_redirects=follow_redirects)
 
     def login(self, username, password):
-        return self.submit_form('/login', {
-            'username':     username,
-            'password':     password
-        })
+        try:
+            return self.submit_form('/login', {
+                'username':     username,
+                'password':     password
+            })
+        finally:
+            self.is_logged_in = True
 
     def logout(self):
-        return self.client.get('/logout')
+        self.is_logged_in = False
+        return self.client.get('/logout?_xt=%s' % self.get_exchange_token())
 
     def tearDown(self):
         from solace import database, settings
             pass
         settings.__dict__.clear()
         settings.__dict__.update(self.__old_settings)
+        del self.is_logged_in
 
 
 def suite():

solace/tests/kb_views.py

             el = response.html.xpath('//div[@class="votebox"]/h4')
             return int(el[0].text)
 
+        vote_url = '/_vote/%s?val=%%d&_xt=%s' % (tquid, self.get_exchange_token())
+
         # the author should not be able to upvote
         self.login('user_0', 'default')
-        response = self.client.get('/_vote/%d?val=1' % tquid,
-                                   follow_redirects=True)
+        response = self.client.get(vote_url % 1, follow_redirects=True)
         self.assert_('cannot upvote your own post' in response.data)
 
         # by default the user should not be able to downvote, because
         # he does not have enough reputation
-        response = self.client.get('/_vote/%d?val=-1' % tquid,
-                                   follow_redirects=True)
+        response = self.client.get(vote_url % -1, follow_redirects=True)
         self.assert_('to downvote you need at least 100 reputation'
                      in response.data)
 
         session.commit()
 
         # and let him downvote
-        response = self.client.get('/_vote/%d?val=-1' % tquid,
-                                   follow_redirects=True)
+        response = self.client.get(vote_url % -1, follow_redirects=True)
         self.assertEqual(get_vote_count(response), -1)
 
         # and now let *all* users vote up, including the author, but his
         for num in xrange(5):
             self.logout()
             self.login('user_%d' % num, 'default')
-            response = self.client.get('/_vote/%d?val=1' % tquid,
-                                       follow_redirects=True)
+            response = self.client.get(vote_url % 1, follow_redirects=True)
 
         # we should be at 4, author -1 the other four +1
         self.assertEqual(get_vote_count(response), 3)
     Rule('/admin/') > 'admin.overview',
     Rule('/admin/status') > 'admin.status',
     Rule('/admin/bans') > 'admin.bans',
+    Rule('/admin/unban/<user>') > 'admin.unban',
 
     # AJAX
     Rule('/_set_language/<locale>') > 'core.set_language',
     Rule('/_get_tags/<lang_code>') > 'kb.get_tags',
     Rule('/_no_javascript') > 'core.no_javascript',
     Rule('/_update_csrf_token') > 'core.update_csrf_token',
+    Rule('/_request_exchange_token') > 'core.request_exchange_token',
     Rule('/_i18n/<lang>.js') > 'core.get_translations',
 
     # the API (version 1.0)

solace/utils/admin.py

     :license: BSD, see LICENSE for more details.
 """
 from solace.i18n import _
-from solace.utils.email import send_email
+from solace.application import url_for
+from solace.templating import render_template
+from solace.utils.mail import send_email
 from solace.models import User, session
 
 
                render_template('mails/user_banned.txt', user=user),
                user.email)
     session.commit()
+
+
+def unban_user(user):
+    """Unbans the user.  What this actually does is sending the user
+    an email with a link to reactivate his account.  For reactivation
+    he has to give himself a new password.
+    """
+    if not user.is_banned:
+        return
+
+    # special password value that will never validate but does not
+    # trigger a "user is deativated".
+    user.pw_hash = '!'
+    reset_url = url_for('core.reset_password', email=user.email,
+                        key=user.password_reset_key, _external=True)
+    send_email(_(u'Your ban was lifted'),
+               render_template('mails/user_unbanned.txt', user=user,
+                               reset_url=reset_url), user.email)
+    session.commit()

solace/utils/csrf.py

     :license: BSD, see LICENSE for more details.
 """
 import os
+import hmac
+from functools import update_wrapper
 from zlib import adler32
 try:
     from hashlib import sha1
 except ImportError:
     from sha import new as sha1
+from werkzeug.exceptions import BadRequest
+from solace import settings
 
 
 #: the maximum number of csrf tokens kept in the session.  After that, the
     return int(adler32(url) & 0xffffffff)
 
 
+def random_token():
+    """Creates a random token.  10 byte in size."""
+    return os.urandom(10)
+
+
+def exchange_token_protected(f):
+    """Applies an exchange token check for each request to this view.  Using
+    this also has the advantage that the URL generation system will
+    automatically put the exchange token into the URL.
+    """
+    def new_view(request, *args, **kwargs):
+        if request.values.get('_xt') != get_exchange_token(request):
+            raise BadRequest()
+        return f(request, *args, **kwargs)
+    f.is_exchange_token_protected = True
+    return update_wrapper(new_view, f)
+
+
+def is_exchange_token_protected(f):
+    """Is the given view function exchange token protected?"""
+    return getattr(f, 'is_exchange_token_protected', False)
+
+
+def get_exchange_token(request):
+    """Returns a unique hash for the request.  This hash will always be the
+    same as long as the user has not closed the session and can be used to
+    protect "dangerous" pages that are triggered by `GET` requests.
+
+    Exchange tokens have to be submitted as a URL or form parameter named
+    `_xt`.
+
+    This token is valid for one session only (it's based on the username
+    and login time).
+    """
+    xt = request.session.get('xt', None)
+    if xt is None:
+        xt = request.session['xt'] = random_token().encode('hex')
+    return xt
+
+
 def get_csrf_token(request, url, force_update=False):
     """Return a CSRF token."""
     url_hash = csrf_url_hash(url)
         if len(tokens) >= MAX_CSRF_TOKENS:
             tokens.pop(0)
 
-        token = sha1(os.urandom(12)).digest()[:10]
+        token = random_token()
         tokens.append((url_hash, token))
         request.session.modified = True
 
-    return token.encode('base64').strip('= \n').decode('ascii')
+    return token.encode('hex')
+
 
 def invalidate_csrf_token(request, url):
     """Clears the CSRF token for the given URL."""

solace/views/admin.py

     :license: BSD, see LICENSE for more details.
 """
 from werkzeug import redirect, Response
-from werkzeug.exceptions import Forbidden
+from werkzeug.exceptions import Forbidden, NotFound
 
 from solace.i18n import _
 from solace.application import require_admin, url_for
 from solace.settings import describe_settings
 from solace.templating import render_template
 from solace.utils.pagination import Pagination
-from solace.utils.admin import ban_user
+from solace.utils.admin import ban_user, unban_user
+from solace.utils.csrf import exchange_token_protected
 
 
 @require_admin
     return render_template('admin/bans.html', pagination=pagination,
                            banned_users=pagination.get_objects(),
                            form=form.as_widget())
+
+
+@exchange_token_protected
+@require_admin
+def unban(request, user):
+    """Unbans a given user."""
+    user = User.query.filter_by(username=user).first()
+    if user is None:
+        raise NotFound()
+    next = request.next_url or url_for('admin.bans')
+    if not user.is_banned:
+        request.flash(_(u'The user is not banned.'))
+        return redirect(next)
+    unban_user(user)
+    request.flash(_(u'The user was unbanned and notified.'))
+    return redirect(next)

solace/views/core.py

 from solace.models import User
 from solace.database import session
 from solace.utils.mail import send_email
-from solace.utils.csrf import get_csrf_token
+from solace.utils.csrf import get_csrf_token, exchange_token_protected, \
+     get_exchange_token
 
 
 def language_redirect(request):
                            can_reset_password=auth.can_reset_password)
 
 
+@exchange_token_protected
 def logout(request):
     """Logs the user out."""
     if request.is_logged_in:
     return json_response(token=token)
 
 
+def request_exchange_token(request):
+    """Return the exchange token."""
+    token = get_exchange_token(request)
+    return json_response(token=token)
+
+
 def get_translations(request, lang):
     """Returns the translations for the given language."""
     rv = get_js_translations(lang)
     """Shows a not found page."""
     return Response(render_template('core/not_found.html'), status=404,
                     mimetype='text/html')
+
+
+def bad_request(request):
+    """Shows a "bad request" page."""
+    return Response(render_template('core/bad_request.html'),
+                    status=400, mimetype='text/html')
+
+
+def forbidden(request):
+    """Shows a forbidden page."""
+    return Response(render_template('core/forbidden.html'),
+                    status=401, mimetype='text/html')

solace/views/kb.py

 from solace.forms import QuestionForm, ReplyForm, CommentForm
 from solace.utils.forms import Form as EmptyForm
 from solace.utils.formatting import format_creole_diff, format_creole
+from solace.utils.csrf import exchange_token_protected
 
 
 _topic_order = {
 
 
 @require_login
+@exchange_token_protected
 def vote(request, post):
     """Votes on a post."""
     post = Post.query.get(post)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.