Commits

Mathias Köhler committed 9cb8eeb

Initial commit

Comments (0)

Files changed (14)

+*.pyc
+#!/usr/bin/env python
+# coding: utf8
+
+from flask_script import Manager
+from wetube import app, db
+
+
+manager = Manager(app)
+
+
+@manager.command
+def runserver():
+    from socketio.server import SocketIOServer
+    SocketIOServer(('0.0.0.0', 5000), app,
+        namespace="socket.io",  policy_server=False).serve_forever()
+
+
+if __name__ == '__main__':
+    manager.run()

wetube/__init__.py

+# -*- coding: utf-8 -*-
+
+from flask import Flask, render_template, request, session, redirect, \
+    url_for
+from passlib.hash import pbkdf2_sha512 as hasher
+from redis import StrictRedis
+from socketio import socketio_manage
+
+from forms import RegisterForm, LoginForm
+from .session_interface import RedisSessionInterface
+from .utils import login_required, logged_in
+
+
+app = Flask(__name__)
+app.config.from_pyfile("config.cfg")
+app.session_interface = RedisSessionInterface()
+db = StrictRedis()
+
+
+from .websockets import WetubeSocket
+
+
+@app.context_processor
+def inject_user():
+    logged_in = "user" in session and session["user"] is not None
+    user =  db.hgetall("users:%s" % session["user"]) if logged_in else None
+    return dict(user=user, logged_in=logged_in)
+
+
+@app.route('/login', methods=['GET', 'POST'])
+def login():
+    form = LoginForm(request.form)
+    if form.validate_on_submit():
+        session["user"] = form.username.data
+        return form.redirect('index')
+    return render_template('user/login.html', login_form=form,
+                           register_form=RegisterForm())
+
+
+@app.route("/")
+@login_required
+def index():
+    return render_template('index.html')
+
+
+@app.route('/logout')
+def logout():
+    del session["user"]
+    return redirect(url_for('index'))
+
+
+@app.route("/register", methods=["POST"])
+def register():
+    form = RegisterForm(request.form)
+    if form.validate_on_submit():
+        name = form.username.data
+        key = "users:%s" % name
+
+        pipe = db.pipeline()
+        pipe.hset(key, "hash", hasher.encrypt(form.password.data))
+        pipe.hset(key, "email", form.email.data)
+        pipe.hset(key, "name", name)
+        pipe.hset(key, "role", "user")
+        pipe.sadd("users", name)
+        pipe.execute()
+
+        session["user"] = name
+        return redirect(url_for("index"))
+    return render_template("user/login.html", login_form=LoginForm(),
+                           register_form=form)
+
+
+@app.route('/socket.io/<path:remaining>')
+def socketio(remaining):
+    try:
+        socketio_manage(request.environ, {'/app': WetubeSocket})
+    except:
+        app.logger.error("Exception while handling socketio connection",
+                         exc_info=True)
+    return ""

wetube/config.cfg

+SECRET_KEY="1234567"
+DEBUG=True
+# -*- coding: utf8 -*-
+
+from uuid import UUID, uuid4
+
+from flask import redirect, request, url_for, session
+from urlparse import urlparse, urljoin
+from wtforms import Form, TextField, PasswordField, HiddenField
+from wtforms.fields.html5 import EmailField
+from wtforms.validators import InputRequired, EqualTo, Optional, Email
+from wtforms.csrf.core import CSRF
+from passlib.hash import pbkdf2_sha512 as hasher
+
+from .utils import compress_uuid, decompress_uuid
+
+
+def get_redirect_target():
+    next = request.args.get('next')
+    if next and is_safe_url(next):
+        return next
+
+
+def is_safe_url(target):
+    ref_url = urlparse(request.host_url)
+    test_url = urlparse(urljoin(request.host_url, target))
+    return test_url.scheme in ('http', 'https') and \
+           ref_url.netloc == test_url.netloc
+
+
+class UUID_CSRF(CSRF):
+
+    def setup_form(self, form):
+        return super(UUID_CSRF, self).setup_form(form)
+
+    def generate_csrf_token(self, csrf_token):
+        if "csrf" not in session:
+            session["csrf"] = compress_uuid(uuid4())
+        return session["csrf"]
+
+    def validate_csrf_token(self, form, field):
+        token = session["csrf"]
+        del session["csrf"]
+        if field.data != token:
+            raise ValueError('Invalid CSRF')
+
+
+class BaseForm(Form):
+    class Meta:
+        csrf = True
+        csrf_class = UUID_CSRF
+
+    def validate_on_submit(self):
+        submitted = request and request.method in ("PUT", "POST")
+        return submitted and self.validate()
+
+
+class RedirectForm(BaseForm):
+    next = HiddenField()
+
+    def __init__(self, *args, **kwargs):
+        Form.__init__(self, *args, **kwargs)
+        if not self.next.data:
+            self.next.data = get_redirect_target()
+
+    def redirect(self, endpoint='index', **values):
+        if self.next.data and is_safe_url(self.next.data):
+            return redirect(self.next.data)
+        return redirect(url_for(endpoint, **values))
+
+
+class UserForm(BaseForm):
+    username = TextField(u'Username', validators=[InputRequired()])
+    password = PasswordField(u'Passwort', validators=[InputRequired()])
+
+
+class LoginForm(RedirectForm, UserForm):
+
+    def validate(self):
+        from . import db
+        rv = Form.validate(self)
+        if not rv:
+            return False
+
+        user = db.hgetall("users/%s" % self.username.data)
+        if not user:
+            self.username.errors.append(u'Unbekannter Username')
+            return False
+
+        passw = self.password.data
+        if not ("hash" in user or hasher.verify(passw, user["hash"])):
+            self.password.errors.append(u'Falsches Passwort')
+            return False
+
+        self.user = user
+        return True
+
+
+class RegisterForm(UserForm):
+    email = EmailField(u'Email', validators=[Optional(),Email()])
+
+    def validate(self):
+        from . import db
+        rv = Form.validate(self)
+        if not rv:
+            return False
+
+        if db.exists("users/%s" % self.username.data):
+            self.username.errors.append(u"Name bereits vergeben")
+            return False
+
+        return True

wetube/session_interface.py

+from datetime import timedelta, datetime
+from itertools import chain
+from uuid import uuid4
+from redis import StrictRedis
+from werkzeug.datastructures import CallbackDict
+from flask.sessions import SessionInterface, SessionMixin
+from utils import compress_uuid, decompress_uuid
+
+
+class RedisSession(CallbackDict, SessionMixin):
+
+    def __init__(self, initial=None, sid=None, new=False):
+        def on_update(self):
+            self.modified = True
+        CallbackDict.__init__(self, initial, on_update)
+        self.sid = sid
+        self.new = new
+        self.modified = False
+
+
+class RedisSessionInterface(SessionInterface):
+    session_class = RedisSession
+
+    def __init__(self, redis=None, prefix='session:'):
+        if redis is None:
+            redis = StrictRedis()
+        self.redis = redis
+        self.prefix = prefix
+
+    def generate_sid(self):
+        return compress_uuid(uuid4())
+
+    def get_expiration_timedelta(self, app, session):
+        if session.permanent:
+            return app.permanent_session_lifetime
+        return timedelta(days=1)
+
+    def get_expiration_time(self, app, session):
+        expire_time = self.get_expiration_timedelta(app, session)
+        return datetime.utcnow() + expire_time
+
+    def open_session(self, app, request):
+        sid = request.cookies.get(app.session_cookie_name)
+        if not sid:
+            sid = self.generate_sid()
+            return self.session_class(sid=sid, new=True)
+
+        val = self.redis.hgetall(self.prefix + sid)
+        if val is not None:
+            sid = self.generate_sid()
+            return self.session_class(val, sid=sid)
+
+        return self.session_class(sid=sid, new=True)
+
+    def save_session(self, app, session, response):
+        domain = self.get_cookie_domain(app)
+        name = app.session_cookie_name
+        if not session:
+            self.redis.delete(self.prefix + session.sid)
+            if session.modified:
+                response.delete_cookie(name, domain=domain)
+            return
+
+        time = self.get_expiration_timedelta(app, session)
+        expires = self.get_expiration_time(app, session)
+
+        self.redis.hmset(self.prefix + session.sid, session)
+        response.set_cookie(name, session.sid, expires=expires,
+                            httponly=True, domain=domain)

wetube/static/js/app.js

+var app = angular.module('wetube', [])
+  .factory('socket', function ($rootScope) {
+    var socket = io.connect("/app");
+    return {
+      on: function (eventName, callback) {
+        socket.on(eventName, function () {  
+          var args = arguments;
+          $rootScope.$apply(function () {
+            callback.apply(socket, args);
+          });
+        });
+      },
+      emit: function (eventName, data, callback) {
+        socket.emit(eventName, data, function () {
+          var args = arguments;
+          $rootScope.$apply(function () {
+            if (callback) {
+              callback.apply(socket, args);
+            }
+          });
+        });
+      }
+    }
+  })
+  .controller('VideoController', function($scope, socket) {
+
+  })
+  .controller('PlaylistController', function($scope, socket) {
+  
+  })
+  .controller('ChatController', function($scope, socket) {
+    socket.on('connected_users', function(data) {
+      $scope.users = data;
+    });
+    socket.on('user_leaved', function(username) {
+      $scope.users = $scope.users.filter(function(user) {
+        return user.name !== username;
+      });
+    });
+    socket.on('user_joined', function(user) {
+      $scope.users.push(user);
+    });
+  })
+  .run(function($rootScope, socket) {
+    var request_time, time_diff, round_trip, response_diff, now, index;
+    var time_diff_queue = [];
+
+    socket.on('sync_time', function(request_diff, response_time) {
+      now = (new Date()).getTime();
+      response_diff = now - response_time;
+
+      round_trip = Math.ceil((request_diff + response_diff) / 2);
+      time_diff = response_diff - round_trip;
+
+      time_diff_queue.push(time_diff);
+      if (time_diff_queue.length > 300) {
+        time_diff_queue.shift();
+      }
+
+      index = Math.ceil(time_diff_queue.length / 2) - 1;
+      $rootScope.time_diff = time_diff_queue.slice(0).sort()[index];
+      $rootScope.latency = round_trip;
+      
+      setTimeout(function(){
+        request_time = (new Date()).getTime();
+        socket.emit('sync_time', request_time);
+      }, 1000);
+    });
+
+    socket.emit('join');
+    request_time = (new Date()).getTime();
+    socket.emit('sync_time', request_time);
+  });
+

wetube/templates/base.html

+{%- from 'macros/helpers.html' import link -%}
+<!doctype html>
+<html ng-app="wetube">
+<meta charset=utf-8>
+<link rel=stylesheet type=text/css href="{{ url_for('static', filename='main.css') }}">
+
+<title>{% block title %}Wetube{% endblock %}</title>
+
+<header>
+  <h1>{{ self.title() }}</h1>
+
+  <nav>
+    <ul>
+      {%- if logged_in %}
+      <li>{{ link("logout", "Logout") }}</li>
+      {%- else %}
+      <li>{{ link("login", "Login/Register") }}</li>
+      {%- endif %}
+    </ul>
+  </nav>
+</header>
+
+<main>
+  {% block content %}{% endblock %}
+</main>
+
+<script src="//ajax.googleapis.com/ajax/libs/angularjs/1.2.8/angular.js"></script>
+<script src="//cdnjs.cloudflare.com/ajax/libs/socket.io/0.9.16/socket.io.min.js"></script>
+<script src="//www.youtube.com/iframe_api"></script>
+<script src="{{ url_for('static', filename='js/app.js') }}"></script>
+</html>

wetube/templates/index.html

+{% extends "base.html" %}
+{% from 'macros/forms.html' import render_form %}
+
+{% block title %}{{ super() }}{% endblock %}
+
+{% block content -%}
+  <section id=party>
+    {%- raw %}
+    <section ng-controller="VideoController">
+      {{ video }}
+    </section>
+    <section ng-controller="PlaylistController">
+      {{ latency }} || {{ time_diff }}
+      <ul id=playlist>
+        <li ng-repeat="video in playlist">
+          {{ video.name }}<span class=duration>{{ video.duration }}</span>
+        </li>
+      </ul>
+    </section>
+    <section ng-controller="ChatController">
+      <div id=messages>
+        <h1>Nachrichten</h1>
+        <ul>
+          <li ng-repeat="message in messages">
+            <span class=user>{{ message.user }}</span>{{ message.content }}
+          </li>
+        </ul>
+      </div>
+      <div id=users>
+        <h1>Users</h1>
+        <ul>
+          <li ng-repeat="user in users">{{ user.name }}</li>
+        </ul>
+      </div>
+    </section>
+    {%- endraw %}
+  </section>
+{%- endblock %}

wetube/templates/macros/forms.html

+{%- macro render_field(field, _label=True) -%}
+<p{% if field.errors %} class="error"{% endif %}>
+  {% if _label -%}{{ field.label()|safe }}{%- endif %}
+  {{ field(**kwargs)|safe }}
+  {%- if field.errors -%}
+  <small>{{ field.errors|join('. ') }}</small>
+  {%- endif %}
+</p>
+{% endmacro %}
+
+{%- macro render_form_fields(fields) %}
+  {% set hidden = ['CSRFTokenField', 'HiddenField'] -%}
+  {%- for field in fields if field.type in hidden %}
+  {{- field(**kwargs)|safe }}
+  {% endfor %}
+  {%- for field in fields if field.type not in hidden %}
+  {{- render_field(field)|indent(2)|safe }}
+  {% endfor -%}
+{%- endmacro %}
+
+
+
+{%- macro render_form(form, endpoint, save, multipart=False) %}
+<form method=post action="{{ url_for(endpoint, **kwargs) }}" 
+{%- if multipart %} enctype="multipart/form-data"{% endif %}>
+  {{- render_form_fields(form) -}}
+<button type=submit>{{ save }}</button>
+</form>
+{%- endmacro %}

wetube/templates/macros/helpers.html

+{% macro link(endpoint, caption, endpoint_active=False) -%}
+  <a {%  if request.endpoint == endpoint
+         and (endpoint_active or request.view_args == kwargs) 
+  -%}class="active"{%- endif %} href="{{ url_for(endpoint, **kwargs)}}">
+    {{- caption -}}
+  </a>
+{%- endmacro %}

wetube/templates/user/login.html

+{% extends "base.html" %}
+{% from 'macros/forms.html' import render_form %}
+
+{% block title %}{{ super() }} | Login/Register{% endblock %}
+
+{% block content -%}
+  <div id=user>
+    <div id=login>
+      {{- render_form(login_form, "login", "Login")|indent(6)|safe }}
+    </div>
+    <div id=register>
+      {{- render_form(register_form, "register", "Registrieren")|indent(6)|safe }}
+    </div>
+  </div>
+{%- endblock %}
+# -*- coding: utf-8 -*-
+
+import re
+import string
+from translitcodec import long_encode
+from uuid import UUID
+from functools import wraps
+
+from flask import abort, session, redirect, url_for
+
+
+_punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+')
+_string_inc_re = re.compile(r'(\d+)$')
+
+
+def increment_string(string):
+    """Increment a string by one:
+
+    >>> increment_string(u'test')
+    u'test2'
+    >>> increment_string(u'test2')
+    u'test3'
+    """
+    match = _string_inc_re.search(string)
+    if match is None:
+        return string + u'2'
+    return string[:match.start()] + unicode(int(match.group(1)) + 1)
+
+
+def slugify(text, delim=u'-'):
+    """Generates an ASCII-only slug."""
+    result = []
+    for word in _punct_re.split(text.lower()):
+        word = long_encode(word)[0] # codec does not work in pypy?!
+        if word:
+            result.append(word)
+    return unicode(delim.join(result))
+
+
+def compress_uuid(uuid):
+    characters = string.letters + string.digits
+    uuid_int = getattr(uuid, 'int') or uuid
+
+    output = []
+    while uuid_int:
+        uuid_int, digit = divmod(uuid_int, len(characters))
+        output.append(characters[digit])
+    return ''.join(output)
+
+
+def decompress_uuid(uuid):
+    characters = string.letters + string.digits
+    length = len(characters)
+
+    number = 0
+    for char in reversed(uuid):
+        number = number * length + characters.index(char)
+    return UUID(int=number)
+
+
+def logged_in():
+    return "user" in session and session["user"] is not None
+
+
+def login_required(func):
+    @wraps(func)
+    def wrapper(*args, **kwargs):
+        if logged_in():
+            return func(*args, **kwargs)
+        return redirect(url_for('login'))
+    return wrapper

wetube/websockets.py

+#!/usr/bin/env python
+# encoding: utf-8
+
+import math
+from gevent import monkey; monkey.patch_all()
+from gevent.pool import Pool
+from time import time, sleep
+
+from socketio import socketio_manage
+from socketio.server import SocketIOServer
+from socketio.namespace import BaseNamespace
+from socketio.mixins import RoomsMixin, BroadcastMixin
+
+from flask import request
+from redis import StrictRedis
+from . import app
+
+
+# TODO: Remove global state
+sockets = []
+worker_tasks = Pool(1)
+
+
+def playback_worker():
+    db = StrictRedis()
+    id = db.lindex('playlist', 0)
+    while not id:
+        id = db.lindex('playlist', 0)
+        sleep(2)
+    video = db.hgetall('video:%s' % id)
+    while True:
+        if time() > video['started'] + video['duration']:
+            app.logger("Automatic skip to next video")
+            # Sort the current playlist
+            db.lpop('playlist')
+            db.rpush('playlist', id)
+
+            # Get current playlist and set start
+            # time of the first (current) video
+            video_ids = db.lrange('playlist', 0, -1)
+            id = video_ids[0]
+            key = 'video:{}'.format(id)
+            db.hset(key, 'started', time() * 1000)
+            video = db.hgetall(key)
+
+            # Send full playlist with metadata and current video
+            videos = []
+            for video_id in video_ids:
+                videos.append(db.hgetall('video:%s' % video_id))
+            for socket in sockets:
+                if len(videos) > 1:
+                    socket.emit('playlist', videos[1:])
+                if videos:
+                    socket.emit('playback', videos[0])
+        sleep(0.3)
+
+
+class WetubeSocket(BaseNamespace, BroadcastMixin):
+
+    def __init__(self, *args, **kwargs):
+        self.db = StrictRedis()
+        super(WetubeSocket, self).__init__(*args, **kwargs)
+
+    def get_initial_acl(self):
+        return set(['recv_connect'])
+
+    @property
+    def sockets(self):
+        return self.socket.server.sockets.values()
+
+    def recv_connect(self):
+        try:
+            with app.app_context():
+                with app.request_context(self.environ):
+                    sid = request.cookies.get('session')
+            data = self.db.hgetall('session:' + sid)
+            key = 'users:' + data['user']
+            self.session['user'] = user = self.db.hgetall(key)
+            if 'role' in user and user['role'] == 'mod':
+                self.lift_acl_restrictions()
+            else:
+                self.allowed_methods = set(['recv_connect', 'on_join'])
+        except:
+            self.disconnect()
+
+    def on_join(self, data):
+        """Join the party and send initial data"""
+        self.allowed_methods.update(('on_message', 'on_sync_time'))
+        print self.allowed_methods
+        joined = {k: self.session['user'][k] for k in ('name', 'role')}
+        self.broadcast_event_not_me('user_joined', joined)
+        if self not in sockets:
+            sockets.append(self)
+
+        # Send client who is connected
+        connected = []
+        for socket in sockets:
+            if not socket.socket.connected:
+                continue
+            user = {k: socket.session["user"][k] for k in ('name', 'role')}
+            if user not in connected:
+                connected.append(user)
+        self.emit('connected_users', connected)
+
+        # Send playlist and current video
+        videos = []
+        ids = self.db.lrange('playlist', 0, -1)
+        for id in ids:
+            videos.append(self.db.hgetall('video:%s' % id))
+        if len(videos) > 1:
+            self.emit('playlist', videos[1:])
+        if videos:
+            self.emit('playback', videos[0])
+
+        # Start video_worker if neccessary
+        if not worker_tasks.full():
+            worker_tasks.spawn(playback_worker)
+
+    def recv_disconnect(self):
+        connected = []
+        sockets.remove(self)
+        username = self.session['user']['name']
+        # TODO: Create a new method for that
+        for socket in sockets:
+            if not socket.socket.connected:
+                continue
+            user = socket.session["user"]["name"]
+            if user not in connected:
+                connected.append(user)
+        if username not in connected:
+            self.broadcast_event_not_me('user_leaved', username)
+        self.emit('connected_users', connected)
+
+        if not len(sockets):
+            worker_tasks.kill()
+        self.disconnect(silent=True)
+
+    def on_message(self, msg):
+        username = self.session['user']['name']
+        self.broadcast_event('message', msg, username)
+
+    def on_sync_time(self, client_time):
+        server_time = int(math.ceil(time() * 1000))
+        difference = server_time - client_time
+        self.emit('sync_time', difference, server_time)
+
+    def on_ban(self, user):
+        # TODO: Implement Admin Functions
+        pass
+
+    def recv_message(self, message):
+        print 'PING!!!', message