Snippets
Revised by
Alexander Tryastsyn
588eb35
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | from decimal import Decimal, ROUND_HALF_UP
import pendulum
from novakid.backend.authentication.interfaces import UserGroup
from novakid.backend.authentication.models import UserModel
from novakid.backend.billing.models import PaymentModel
from novakid.backend.classes.interfaces import ClassStatus, ClassResult
from novakid.backend.classes.models import ClassModel
from novakid.backend.db.utils import exists
from novakid.backend.exceptions import UnexpectedException
from novakid.backend.promos.bonuses.models import BonusTransactionModel
from novakid.backend.promos.bonuses.utils import get_promo_bonus_transactions
from novakid.backend.promos.promo_events.interfaces import PromoEventType
from novakid.backend.promos.promo_events.models import PromoEventLogModel
from novakid.backend.utils.other import log2console
BONUS_COEFFICIENT = Decimal('1.0')
BONUS_EXPIRES = pendulum.interval(days=90)
BONUS_THRESHOLD = 4
EVENT_KEY = 'winter_intensive'
PROMO_REGIONS = {'PL'}
PROMO_WEEK_START_TIMES = [
pendulum.create(2020, 12, 28),
pendulum.create(2021, 1, 4),
pendulum.create(2021, 1, 11),
]
def winter_intensive_promo(request, dry_run=True, logprint=False):
def _log(log_str):
if logprint:
log2console(log_str)
db_session = request.db_session
users_all = 0
users_processed = 0
exception_count = 0
bonus_all = 0
cost_total = Decimal('0.0')
# Apply bonus to users from certain regions
user_query = db_session.query(
UserModel
).filter(
UserModel.region_code.in_(PROMO_REGIONS),
UserModel.group == UserGroup.users,
)
with request.tm:
for user_model in user_query:
users_all += 1
user_id = user_model.id
_log(f'User: id - {user_id} | currency - {user_model.region.currency_code}')
user_bonuses_count = 0
user_bonuses_per_week = 0
for start_time_gte in PROMO_WEEK_START_TIMES:
start_time_lt = start_time_gte.add(weeks=1)
class_query = ClassModel.query_classes_of_parent(db_session, user_id)
class_query = ClassModel.filter_classes_in_time_interval(class_query,
start_time_gte=start_time_gte,
start_time_lt=start_time_lt)
class_query = class_query.filter(
ClassModel.status == ClassStatus.done,
ClassModel.result == ClassResult.completed
)
classes_count = class_query.count()
# One bonus per every BONUS_THRESHOLD-th successful class
user_bonuses_per_week = classes_count // BONUS_THRESHOLD
user_bonuses_count += user_bonuses_per_week
_log(f'Week: {start_time_gte} - {start_time_lt} | classes done/completed - {classes_count} | bonuses - {user_bonuses_per_week}')
if user_bonuses_count > 0:
try:
# Get bonus cost as one native class cost in user's currency multiplied by BONUS_COEFFICIENT
last_payment = PaymentModel.last_paid_for_user(db_session, user_model)
if last_payment and last_payment.cost and last_payment.value:
native_class_cost = last_payment.cost / last_payment.value
user_bonus_cost = native_class_cost * user_bonuses_count * BONUS_COEFFICIENT
user_bonus_cost = user_bonus_cost.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
user_bonus_currency_code = last_payment.currency_code
if user_bonus_cost > 0:
_log(f'Last payment: cost - {last_payment.cost} | '
f'value: {last_payment.value} | '
f'currency - {last_payment.currency_code}')
_log(f'User bonus: count - {user_bonuses_count} | '
f'cost - {user_bonus_cost} | '
f'currency - {user_bonus_currency_code}')
result = apply_bonus(request, user_model, user_bonus_cost, user_bonus_currency_code, dry_run)
_log(result)
users_processed += 1
bonus_all += user_bonuses_count
cost_total += user_bonus_cost
except UnexpectedException as err:
exception_count += 1
msg = 'Unexpected exception on Winter Intensive promo'
_log(f'{msg}: {err}')
_log(80*'-')
if users_all % 100 == 0:
log2console('\n')
log2console(f'Users: all - {users_all} | processed - {users_processed} | errors - {exception_count}')
log2console('\n')
log2console(f'Users: all - {users_all} | processed - {users_processed} | errors - {exception_count}')
log2console(f'Bonuses: count - {bonus_all} | cost - {cost_total}')
def apply_bonus(request, user_model, cost, currency_code, dry_run=True) -> str:
db_session = request.db_session
user_id = user_model.id
# Check user region
if user_model.region.currency_code != currency_code:
return f'Region currency {user_model.region.currency_code} not equal user {user_id} currency {currency_code}'
# Check existing bonus transaction
query_bonus = db_session.query(BonusTransactionModel).filter(
BonusTransactionModel.user_id == user_id,
).join(
PromoEventLogModel, PromoEventLogModel.bonus_transaction_id == BonusTransactionModel.id,
).filter(
PromoEventLogModel.key == EVENT_KEY,
PromoEventLogModel.event_type == PromoEventType.other,
)
if exists(query_bonus):
return f'User {user_id} has already received a bonus'
# create bonus transaction
transactions = get_promo_bonus_transactions(request.root)
transaction_params = {
'user_id': user_id,
'value': cost,
'currency_code': currency_code,
'ttl': BONUS_EXPIRES,
'event_log': {
'event_type': PromoEventType.other,
'key': EVENT_KEY,
},
}
if not dry_run:
transaction, _ = transactions.http_post(request, params=transaction_params)
return f'Bonus {cost} {currency_code} applied successfully'
|
You can clone a snippet to your computer for local editing. Learn more.