Snippets

ASD Technologies Winter intensive promo (Poland 2021)

Created by Alexander Tryastsyn last modified
version = "003.2021_01_18_2045"
from decimal import Decimal, ROUND_HALF_UP
from collections import namedtuple

import pendulum
from sqlalchemy import func, orm

from novakid.backend.authentication.interfaces import UserGroup
from novakid.backend.authentication.models import UserModel
from novakid.backend.billing.models import PaymentModel, TransactionModel
from novakid.backend.classes.interfaces import ClassStatus, ClassResult
from novakid.backend.classes.models import ClassModel
from novakid.backend.db.utils import exists
from novakid.backend.exceptions import UnexpectedException
from novakid.backend.promos.bonuses.models import BonusTransactionModel
from novakid.backend.promos.bonuses.utils import get_promo_bonus_transactions
from novakid.backend.promos.promo_events.interfaces import PromoEventType
from novakid.backend.promos.promo_events.models import PromoEventLogModel
from novakid.backend.students.models import StudentModel
from novakid.backend.utils.other import log2console

BONUS_COEFFICIENT = Decimal('1.0')
BONUS_EXPIRES = pendulum.interval(days=90)
BONUS_THRESHOLD = 4
EVENT_KEY = 'winter_intensive'
PROMO_REGIONS = {'PL'}
PROMO_WEEK_START_TIMES = [
    pendulum.create(2020, 12, 28),
    pendulum.create(2021, 1, 4),
    pendulum.create(2021, 1, 11),
]

PromoApplyResult = namedtuple('PromoApplyResult', 'successful message')


def winter_intensive_promo(request, dry_run=True, logprint=False):

    def _log(log_str):
        if logprint:
            log2console(log_str)

    db_session = request.db_session
    user_ids = []
    totals = []
    users_all = 0
    users_processed = 0
    exception_count = 0
    bonus_all = 0
    # Apply bonus to users from certain regions
    with request.tm:
        start_time_gte = PROMO_WEEK_START_TIMES[0]
        start_time_lt = PROMO_WEEK_START_TIMES[2].add(weeks=1)
        q = db_session.query(
            UserModel.id
        ).join(
            StudentModel, StudentModel.parent_id == UserModel.id
        ).join(
            ClassModel, ClassModel.student_id == StudentModel.id
        ).filter(
            UserModel.region_code.in_(PROMO_REGIONS),
            UserModel.group == UserGroup.users,
            ClassModel.status == ClassStatus.done,
            ClassModel.result == ClassResult.completed,
            ClassModel.start_time >= start_time_gte,
            ClassModel.start_time < start_time_lt
        ).group_by(
            UserModel.id
        ).having(
            func.count(ClassModel.id) >= BONUS_THRESHOLD
        ).order_by(
            UserModel.id
        )
        for x in q:
            user_ids.append(x.id)

    log2console(f'{len(user_ids)} users found.')

    user_query = db_session.query(
        UserModel
    )

    for user_id in user_ids:
        users_all += 1
        _log(80*'-')
        if users_all % 1000 == 0:
            log2console('\n')
            log2console(f'Users: all - {users_all} | processed - {users_processed} | errors - {exception_count}')
            log2console('\n')
        with request.tm:
            user_model = user_query.get(user_id)
            _log(f'User:  id - {user_id} | currency - {user_model.region.currency_code}')
            classes_to_bonuses = 0
            user_bonuses_value = Decimal('0.0')
            for start_time_gte in PROMO_WEEK_START_TIMES:
                classes_to_bonuses_per_week = 0
                user_bonuses_value_per_week = Decimal('0.0')
                start_time_lt = start_time_gte.add(weeks=1)
                class_query = ClassModel.query_classes_of_parent(db_session, user_id)
                class_query = ClassModel.filter_classes_in_time_interval(class_query,
                                                                         start_time_gte=start_time_gte,
                                                                         start_time_lt=start_time_lt)
                class_query = class_query.filter(
                    ClassModel.status == ClassStatus.done,
                    ClassModel.result == ClassResult.completed
                )
                class_query = class_query.options(
                    orm.joinedload(ClassModel.teacher).load_only("teacher_is_near_native"),
                )
                class_query = class_query.order_by(
                    ClassModel.start_time
                )
                class_values = Decimal('0.0')
                for i, class_model in enumerate(class_query):
                    class_value = Decimal('0.6') if class_model.teacher_is_near_native else Decimal('1.0')
                    class_values += class_value
                    # One bonus per every BONUS_THRESHOLD-th successful class
                    if (i+1) % BONUS_THRESHOLD == 0:
                        classes_to_bonuses_per_week += 1
                        class_value_to_bonus = class_values / BONUS_THRESHOLD
                        user_bonuses_value_per_week += class_value_to_bonus
                        class_values = Decimal('0.0')

                classes_to_bonuses += classes_to_bonuses_per_week
                user_bonuses_value += user_bonuses_value_per_week
                classes_count = class_query.count()
                _log(f'Week: {start_time_gte} - {start_time_lt} | '
                     f'classes done/completed - {classes_count} | '
                     f'classes to bonuses - {classes_to_bonuses_per_week} | '
                     f'credits to bonuses - {user_bonuses_value_per_week}')

            if user_bonuses_value > 0:
                try:
                    # Get bonus cost as one native class cost in user's currency multiplied by BONUS_COEFFICIENT
                    last_payment = PaymentModel.last_paid_for_user(db_session, user_model)
                    if last_payment and last_payment.cost and last_payment.value:
                        native_class_cost = last_payment.cost / last_payment.value
                        user_bonus_cost = native_class_cost * user_bonuses_value * BONUS_COEFFICIENT
                        user_bonus_cost = user_bonus_cost.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
                        user_bonus_currency_code = last_payment.currency_code
                        if user_bonus_cost > 0:
                            _log(f'Last payment: value - {last_payment.value} | '
                                 f'cost - {last_payment.cost} | '
                                 f'currency - {last_payment.currency_code}')
                            _log(f'User bonus: value - {user_bonuses_value} | '
                                 f'cost - {user_bonus_cost} | '
                                 f'currency - {user_bonus_currency_code}')
                            result = apply_bonus(request, user_model, user_bonus_cost, user_bonus_currency_code, dry_run)
                            if result.successful:
                                users_processed += 1
                                bonus_all += user_bonuses_value
                                totals.append((user_id, user_bonus_cost, user_bonus_currency_code,))
                            _log(result.message)
                except UnexpectedException as err:
                    exception_count += 1
                    msg = 'Unexpected exception on Winter Intensive promo'
                    _log(f'{msg}: {err}')

    log2console(80*'-')
    for row in totals:
        message = ', '.join([str(item) for item in row])
        print(message)
    log2console(80*'-')
    log2console(f'Users: all - {users_all} | processed - {users_processed} | errors - {exception_count}')


def apply_bonus(request, user_model, cost, currency_code, dry_run=True) -> PromoApplyResult:
    db_session = request.db_session
    user_id = user_model.id

    # Check user region
    if user_model.region.currency_code != currency_code:
        return PromoApplyResult(False, f'Region currency {user_model.region.currency_code} not equal user {user_id} currency {currency_code}')

    # Check existing bonus transaction
    query_bonus = db_session.query(BonusTransactionModel).filter(
        BonusTransactionModel.user_id == user_id,
    ).join(
        PromoEventLogModel, PromoEventLogModel.bonus_transaction_id == BonusTransactionModel.id,
    ).filter(
        PromoEventLogModel.key == EVENT_KEY,
        PromoEventLogModel.event_type == PromoEventType.other,
    )
    if exists(query_bonus):
        return PromoApplyResult(False, f'User {user_id} has already received a bonus')

    # create bonus transaction
    transactions = get_promo_bonus_transactions(request.root)
    transaction_params = {
        'user_id': user_id,
        'value': cost,
        'currency_code': currency_code,
        'ttl': BONUS_EXPIRES,
        'event_log': {
            'event_type': PromoEventType.other,
            'key': EVENT_KEY,
        },
    }
    if not dry_run:
        transaction, _ = transactions.http_post(request, params=transaction_params)
    return PromoApplyResult(True,  f'Bonus {cost} {currency_code} applied successfully')

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.