Snippets

ASD Technologies Winter intensive promo (Poland 2021)

You are viewing an old version of this snippet. View the current version.
Revised by Alexander Tryastsyn 520c3b7
from decimal import Decimal, ROUND_HALF_UP

import pendulum
from sqlalchemy import func

from novakid.backend.authentication.interfaces import UserGroup
from novakid.backend.authentication.models import UserModel
from novakid.backend.billing.models import PaymentModel
from novakid.backend.classes.interfaces import ClassStatus, ClassResult
from novakid.backend.classes.models import ClassModel
from novakid.backend.db.utils import exists
from novakid.backend.exceptions import UnexpectedException
from novakid.backend.promos.bonuses.models import BonusTransactionModel
from novakid.backend.promos.bonuses.utils import get_promo_bonus_transactions
from novakid.backend.promos.promo_events.interfaces import PromoEventType
from novakid.backend.promos.promo_events.models import PromoEventLogModel
from novakid.backend.students.models import StudentModel
from novakid.backend.utils.other import log2console

BONUS_COEFFICIENT = Decimal('1.0')
BONUS_EXPIRES = pendulum.interval(days=90)
BONUS_THRESHOLD = 4
EVENT_KEY = 'winter_intensive'
PROMO_REGIONS = {'PL'}
PROMO_WEEK_START_TIMES = [
    pendulum.create(2020, 12, 28),
    pendulum.create(2021, 1, 4),
    pendulum.create(2021, 1, 11),
]


def winter_intensive_promo(request, dry_run=True, logprint=False):

    def _log(log_str):
        if logprint:
            log2console(log_str)

    db_session = request.db_session
    user_ids = []
    users_all = 0
    users_processed = 0
    exception_count = 0
    bonus_all = 0
    cost_total = Decimal('0.0')
    # Apply bonus to users from certain regions
    with request.tm:
        start_time_gte = PROMO_WEEK_START_TIMES[0]
        start_time_lt = PROMO_WEEK_START_TIMES[2].add(weeks=1)
        q = db_session.query(
            UserModel.id
        ).join(
            StudentModel, StudentModel.parent_id == UserModel.id
        ).join(
            ClassModel, ClassModel.student_id == StudentModel.id
        ).filter(
            UserModel.region_code.in_(PROMO_REGIONS),
            UserModel.group == UserGroup.users,
            ClassModel.status == ClassStatus.done,
            ClassModel.result == ClassResult.completed,
            ClassModel.start_time >= start_time_gte,
            ClassModel.start_time < start_time_lt
        ).group_by(
            UserModel.id
        ).having(
            func.count(ClassModel.id) >= BONUS_THRESHOLD
        )
        for x in q:
            user_ids.append(x.id)

    log2console(f'{len(user_ids)} users found.')

    user_query = db_session.query(
        UserModel
    )

    for user_id in user_ids:
        users_all += 1
        _log(80*'-')
        if users_all % 1000 == 0:
            log2console('\n')
            log2console(f'Users: all - {users_all} | processed - {users_processed} | errors - {exception_count}')
            log2console('\n')
        with request.tm:
            user_model = user_query.get(user_id)
            _log(f'User:  id - {user_id} | currency - {user_model.region.currency_code}')
            user_bonuses_count = 0
            user_bonuses_per_week = 0
            for start_time_gte in PROMO_WEEK_START_TIMES:
                start_time_lt = start_time_gte.add(weeks=1)
                class_query = ClassModel.query_classes_of_parent(db_session, user_id)
                class_query = ClassModel.filter_classes_in_time_interval(class_query,
                                                                         start_time_gte=start_time_gte,
                                                                         start_time_lt=start_time_lt)
                class_query = class_query.filter(
                    ClassModel.status == ClassStatus.done,
                    ClassModel.result == ClassResult.completed
                )
                classes_count = class_query.count()
                # One bonus per every BONUS_THRESHOLD-th successful class
                user_bonuses_per_week = classes_count // BONUS_THRESHOLD
                user_bonuses_count += user_bonuses_per_week
                _log(f'Week: {start_time_gte} - {start_time_lt} | classes done/completed - {classes_count} | bonuses - {user_bonuses_per_week}')

            if user_bonuses_count > 0:
                try:
                    # Get bonus cost as one native class cost in user's currency multiplied by BONUS_COEFFICIENT
                    last_payment = PaymentModel.last_paid_for_user(db_session, user_model)
                    if last_payment and last_payment.cost and last_payment.value:
                        native_class_cost = last_payment.cost / last_payment.value
                        user_bonus_cost = native_class_cost * user_bonuses_count * BONUS_COEFFICIENT
                        user_bonus_cost = user_bonus_cost.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
                        user_bonus_currency_code = last_payment.currency_code
                        if user_bonus_cost > 0:
                            _log(f'Last payment: cost - {last_payment.cost} | '
                                 f'value: {last_payment.value} | '
                                 f'currency - {last_payment.currency_code}')
                            _log(f'User bonus: count - {user_bonuses_count} | '
                                 f'cost - {user_bonus_cost} | '
                                 f'currency - {user_bonus_currency_code}')
                            result = apply_bonus(request, user_model, user_bonus_cost, user_bonus_currency_code, dry_run)
                            _log(result)
                            users_processed += 1
                            bonus_all += user_bonuses_count
                            cost_total += user_bonus_cost
                except UnexpectedException as err:
                    exception_count += 1
                    msg = 'Unexpected exception on Winter Intensive promo'
                    _log(f'{msg}: {err}')

    _log(80*'-')
    log2console(f'Users: all - {users_all} | processed - {users_processed} | errors - {exception_count}')
    log2console(f'Bonuses: count - {bonus_all} | cost - {cost_total}')


def apply_bonus(request, user_model, cost, currency_code, dry_run=True) -> str:
    db_session = request.db_session
    user_id = user_model.id

    # Check user region
    if user_model.region.currency_code != currency_code:
        return f'Region currency {user_model.region.currency_code} not equal user {user_id} currency {currency_code}'

    # Check existing bonus transaction
    query_bonus = db_session.query(BonusTransactionModel).filter(
        BonusTransactionModel.user_id == user_id,
    ).join(
        PromoEventLogModel, PromoEventLogModel.bonus_transaction_id == BonusTransactionModel.id,
    ).filter(
        PromoEventLogModel.key == EVENT_KEY,
        PromoEventLogModel.event_type == PromoEventType.other,
    )
    if exists(query_bonus):
        return f'User {user_id} has already received a bonus'

    # create bonus transaction
    transactions = get_promo_bonus_transactions(request.root)
    transaction_params = {
        'user_id': user_id,
        'value': cost,
        'currency_code': currency_code,
        'ttl': BONUS_EXPIRES,
        'event_log': {
            'event_type': PromoEventType.other,
            'key': EVENT_KEY,
        },
    }
    if not dry_run:
        transaction, _ = transactions.http_post(request, params=transaction_params)
    return f'Bonus {cost} {currency_code} applied successfully'
HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.