Source

gevent_cometd / gevent_cometd / app_views.py

from gevent import monkey; monkey.patch_all()
import gevent
import geventwebsocket
import logging
import datetime
import gevent_cometd
import gevent_cometd.util as util
import gevent_cometd.channel as channel
import gevent_cometd.user as user
from webob import Request, Response, exc
from mako.template import Template
from gevent_cometd import config, users_lock, channels_lock, statics, stats

try:
    import json
except ImportError:
    import simplejson as json

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

def garbage_control():
    """garbage control loop"""
    while True:
        gevent.sleep(1)
        # lock both users and channels
        with users_lock:
            with channels_lock:
            # garbage control connections
                try:
                    gc_conns = user.Connection.gc_pass()
                    for user_name, conns in gc_conns.items():
                        # now remove conn ids from channels
                        for channel_inst in gevent_cometd.channels.values():
                            for conn_id in conns:
                                if (user_name in channel_inst.connections and
                                    conn_id in channel_inst.connections[user_name]):
                                    channel_inst.connections[user_name].remove(conn_id)
                            # remove empty user key from channel conns
                            # first remove the channel name from user object
                            user_inst = user.User.by_name(user_name)
                            if user_inst and channel_inst.name in user_inst.channels:
                                user_inst.channels.remove(channel_inst.name)
                            # remove user key from channel connections
                            if user_name in channel_inst.connections and not channel_inst.connections[user_name]:
                                channel_inst.connections.pop(user_name, None)
                    # now make pass over users and remove user objects that weren't active for at least 1 day
                    for key, user_inst in gevent_cometd.users.items():
                        curr_time = datetime.datetime.utcnow()
                        delta = curr_time - user_inst.last_active
                        if delta.days >= 1:
                            gevent_cometd.users.pop(key)
                except Exception, e:
                    raise
                    print "EXCEPTION, but not critical hopefully ;-)", e
                except KeyboardInterrupt, e:
                    raise

# start our gc loop
gevent.spawn(garbage_control)


def connect(request, *args):
    """return the id of connected users - will be secured with password string
    for webapp to internally call the server - we combine conn string with user id,
    and we tell which channels the user is allowed to subscribe to"""
    request_data, res = util.process_request(request)
    if res:
        return res

    user_name = request_data.get('user')
    user_status = int(request_data.get('status',
                      gevent_cometd.status_codes['online']))
    conn_id = request_data.get('conn_id')
    subscribe_to_channels = request_data.get('channels')
    if user_name is None:
        res = Response(json.dumps({'error': "No username specified"}),
                       request=request)
        res.status = 400
    elif not subscribe_to_channels:
        res = Response(json.dumps({'error': "No channels specified"}),
                       request=request)
        res.status = 400
    else:
        with users_lock:
            with channels_lock:
                # everything is ok so lets add new connection to channel and connection list
                user_inst, connection = user.User.add_connection(user_name, conn_id, user_status)
                for channel_name in subscribe_to_channels:
                    # user gets assigned to a channel
                    channel.Channel.get_channel(channel_name).add_connection(user_inst, connection)
        res = Response(json.dumps({'conn_id':connection.id, 'status':user_inst.status}), request=request)
        log.info('connecting %s with uuid %s' % (user_name, connection.id))
    return res

def disconnect(request, *args):
    """disconnect connection with specific id """
    request_data, res = util.process_request(request, require_authed_request=False)
    if res:
        return res
    conn_id = request_data['conn_id'] if request_data else request.GET.get('conn_id')
    log.info('recycling %s' % conn_id)
    connection = user.Connection.by_id(conn_id)
    if connection:
        connection.mark_for_gc()
    res = Response('', request=request)
    return res

def subscribe(request, *args):
    """ call this to subscribe specific connection to new channels """
    request_data, res = util.process_request(request)
    if res:
        return res
    conn_id = request_data['conn_id'] if request_data else request.GET.get('conn_id')
    connection = user.Connection.by_id(conn_id)
    subscribe_to_channels = request_data.get('channels')
    if not connection:
        res = Response(json.dumps({'error':"Unknown connection"}),
                       request=request)
        res.status = 403
    if not subscribe_to_channels:
        res = Response(json.dumps({'error':"No channels specified"}),
                       request=request)
        res.status = 400
    if not res:
        # everything is ok so lets add new connection to channel and connection list
        # lets lock it just in case
        with users_lock:
            with channels_lock:
                # find the right user
                user_inst = user.User.by_name(connection.user)
                if user_inst:
                    for channel_name in subscribe_to_channels:
                        channel.Channel.get_channel(channel_name).add_connection(user_inst, connection)
        res = Response('{}', request=request)
    return res

def user_status(request, *args):
    """ set the status of specific user """
    request_data, res = util.process_request(request)
    if res:
        return res

    user_name = request_data.get('user')
    user_status = int(request_data.get('status',
                      gevent_cometd.status_codes['online']))
    if user_name is None:
        res = Response(json.dumps({'error': "No username specified"}),
                       request=request)
        res.status = 400
    else:
        user_inst = user.User.by_name(user_name)
        if user_inst:
            user_inst.status = user_status
            # mark active
            user_inst.last_active = datetime.datetime.utcnow()
    res = Response('{}', request=request)
    return res

def pass_message(msg):
    if msg.get('timestamp'):
        # if present lets use timestamp provided in the message
        if '.' in msg['timestamp']:
            timestmp = datetime.datetime.strptime(msg['timestamp'],
                                      '%Y-%m-%dT%H:%M:%S.%f')
        else:
            timestmp = datetime.datetime.strptime(msg['timestamp'],
                                      '%Y-%m-%dT%H:%M:%S')
    else:
        timestmp = datetime.datetime.utcnow()
    message = {'user': msg.get('user'),
               'message': msg['message'],
               'type':'message',
               'timestamp': timestmp
               }
    pm_users = msg.get('pm_users', [])
    if msg.get('channel'):
        channel_inst = channel.Channel.get_channel(msg['channel'])
        channel_inst.add_message(message, pm_users=pm_users)
    elif pm_users:
        # if pm then iterate over all users and notify about new message hiyoo!!
        for user_id in pm_users:
            user_inst = user.User.by_name(user_id)
            if user_inst:
                user_inst.add_message(message)

def message(request, *args):
    """sends out message"""
    request_data, res = util.process_request(request)
    if res:
        return res
    for msg in request_data:
        if not msg.get('channel') and not msg.get('pm_users', []):
            res = Response(json.dumps({'error':"No channels or usernames specified"}),
                           request=request)
            res.status = 400
            return res
        gevent.spawn(pass_message, msg)
    res = Response(u"{}", request=request)
    return res


def channel_config(request, *args):
    """ call this to subscribe specific connection to new channels """
    request_data, res = util.process_request(request)
    if res:
        return res
    channel_data = request_data
    if not channel_data:
        res = Response(json.dumps({'error':"No channels specified"}),
                       request=request)
        res.status = 400
        return res

    json_data = []
    with channels_lock:
        for channel_name, config in channel_data:
            channel_inst = channel.Channel.get_channel(channel_name)
            for k, v in config.iteritems():
                setattr(channel_inst, k, v)
            json_data.append({'name':channel_inst.name,
                                     'long_name':channel_inst.long_name,
                                     'presence':channel_inst.presence,
                                     'salvagable':channel_inst.salvagable,
                                     'store_history':channel_inst.store_history,
                                     'history_size':channel_inst.history_size
                                     })
    res = Response(json.dumps(json_data, cls=util.DateTimeEncoder), request=request)
    return res

def index(request, *args):
    res = Response(u"Nothing to see here, carry on...", request=request)
    return res

def siege(request, *args):
    raise Exception('disabled')
    # siege test
    import uuid
    import random
    with users_lock:
        with channels_lock:
            user_inst, connection = user.User.add_connection(random.randint(1, 999), str(uuid.uuid4()))
            channel.Channel.get_channel('pub_chan').add_connection(user_inst, connection)
    res = Response(u"ok", request=request)
    return res

def admin(request, *args):
    tmpl = Template(statics['templates']['admin.mak'])
    uptime = datetime.datetime.utcnow() - stats['started_on']
    html = tmpl.render(stats=gevent_cometd.stats,
                       channels=gevent_cometd.channels,
                       users=gevent_cometd.users, uptime=uptime)
    res = Response(html, request=request)
    return res

def info(request, *args):
    request_data, res = util.process_request(request)
    json_data = {"channels":{}, "unique_users": len(gevent_cometd.users)}
    if res:
        return res
    # select everything for empty list
    if not request_data or not request_data.get('channels'):
        req_channels = gevent_cometd.channels.keys()
    else:
        req_channels = request_data['channels']
    # return requested channel info
    for channel_inst in [chan for chan in gevent_cometd.channels.values() if chan.name in req_channels]:
        json_data["channels"][channel_inst.name] = {}
        json_data["channels"][channel_inst.name]['total_users'] = len(channel_inst.connections)
        json_data["channels"][channel_inst.name]['total_connections'] = sum([len(conns) for conns in channel_inst.connections.values()])
        json_data["channels"][channel_inst.name]['users'] = []
        for user_name in channel_inst.connections.keys():
            user_inst = user.User.by_name(user_name)
            udata = {'user':user_inst.user, 'status':user_inst.status, "connections":channel_inst.connections[user_name]}
            json_data["channels"][channel_inst.name]['users'].append(udata)
        json_data["channels"][channel_inst.name]['last_active'] = channel_inst.last_active
    res = Response(json.dumps(json_data, cls=util.DateTimeEncoder), request=request)
    return res

def listen(request, *args):
    """this is where connections listen for our responses"""
    request_data, res = util.process_request(request, require_authed_request=False)
    conn_id = request_data['conn_id'] if request_data else request.GET.get('conn_id')
    connection = user.Connection.by_id(conn_id)
    if not connection:
        res = Response(json.dumps({'error': "Unknown connection"}),
                       request=request)
        res.status = 403
        return res

    # mark as last active
    user_inst = user.User.by_name(connection.user)
    user_inst.last_active = datetime.datetime.utcnow()
    connection.last_active = datetime.datetime.utcnow()

    websocket = request.environ.get("wsgi.websocket")
    if websocket is None:
        if request.method.upper() == 'OPTIONS':
            # preflight
            res = Response(' ' * 2048, request=request,
                           content_type='application/json')
            util.add_cors_headers(res)
            return res
        def yield_response():
            # for chrome issues
            # yield ' ' * 1024
            # wait for this to wake up
            connection.event.wait(config['wake_connections_after'])
            gevent.sleep(0.2)
            if request.params.get('callback'):
                yield request.params.get('callback') + '(' + json.dumps(connection.messages, cls=util.DateTimeEncoder) + ')'
            else:
                yield json.dumps(connection.messages, cls=util.DateTimeEncoder)
            connection.messages = []
        res = Response(app_iter=yield_response(), request=request,
                       content_type='application/json')
        util.add_cors_headers(res)
        return res
    else:
        connection.websocket = websocket
        try:
            while True:
                connection.event.wait(10)
                # mark as last active
                # ensures they dont get GC'd even if we are not sending anything
                to_send = connection.messages
                connection.messages = []
                websocket.send(json.dumps(to_send, cls=util.DateTimeEncoder))
                user_inst = user.User.by_name(connection.user)
                user_inst.last_active = datetime.datetime.utcnow()
                connection.last_active = datetime.datetime.utcnow()
        except geventwebsocket.WebSocketError as e:
            print 'PROBLEM'
            connection.mark_for_gc()
        return Response('[]', request=request)

def static(request, *args):
    """serve static stuff"""
    if len(args) and args[0] in statics['statics']:
        res = Response(statics['statics'][args[0]], request=request)
        return res
    raise exc.HTTPNotFound()