Snippets

Oleg Korsak Autobahn + asyncio + aiopg + SQLAlchemy

Created by Oleg Korsak last modified
import asyncio
from os import environ
from multiprocessing import Pool, cpu_count

from aiopg.sa import create_engine
from autobahn.wamp.types import RegisterOptions
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

from my_package import auth, db
from my_package.conf import settings
from my_package.models import User


class MyComponent(ApplicationSession):
    async def onJoin(self, details):
        engine = self.config.extra['db_engine']
        db.setup(engine)

        async with engine.acquire() as conn:
            try:
                print(User.__table__)
                async for user in conn.execute(User.__table__.select()):
                    print(user.email)
            except Exception as e:
                print(e)

                await auth.register(self, 'com.my.system.auth.', options=RegisterOptions(invoke='roundrobin'))


async def join_to_router(component_class):
    asyncio.set_event_loop(asyncio.new_event_loop())

    async with create_engine(dsn=settings.DB_DSN) as engine:
        extra = {
            'db_engine': engine
        }

        runner = ApplicationRunner(
            environ.get('AUTOBAHN_MARKET_ROUTER', 'ws://my.com:8090/system'),
            'systemweb',
            debug_app=True,
            debug=True,
            extra=extra
        )

        runner.run(component_class)


if __name__ == '__main__':
    is_webscale = environ.get('WEBSCALE', '0')

    if is_webscale == '1':
        process_count = cpu_count() + 1
        pool = Pool(processes=process_count)

        pool.imap_unordered(join_to_router, (MyComponent for _ in range(process_count)))
        pool.close()
        pool.join()
    else:
        join_to_router(MyComponent)

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.