Snippets
Revised by
Alexander Tryastsyn
70b9727
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | from decimal import Decimal, ROUND_HALF_UP
from collections import namedtuple
import pendulum
from sqlalchemy import func
from novakid.backend.authentication.interfaces import UserGroup
from novakid.backend.authentication.models import UserModel
from novakid.backend.billing.models import PaymentModel
from novakid.backend.classes.interfaces import ClassStatus, ClassResult
from novakid.backend.classes.models import ClassModel
from novakid.backend.db.utils import exists
from novakid.backend.exceptions import UnexpectedException
from novakid.backend.promos.bonuses.models import BonusTransactionModel
from novakid.backend.promos.bonuses.utils import get_promo_bonus_transactions
from novakid.backend.promos.promo_events.interfaces import PromoEventType
from novakid.backend.promos.promo_events.models import PromoEventLogModel
from novakid.backend.students.models import StudentModel
from novakid.backend.utils.other import log2console
BONUS_COEFFICIENT = Decimal('1.0')
BONUS_EXPIRES = pendulum.interval(days=90)
BONUS_THRESHOLD = 4
EVENT_KEY = 'winter_intensive'
PROMO_REGIONS = {'PL'}
PROMO_WEEK_START_TIMES = [
pendulum.create(2020, 12, 28),
pendulum.create(2021, 1, 4),
pendulum.create(2021, 1, 11),
]
PromoApplyResult = namedtuple('PromoApplyResult', 'successful message')
def winter_intensive_promo(request, dry_run=True, logprint=False):
def _log(log_str):
if logprint:
log2console(log_str)
db_session = request.db_session
user_ids = []
totals = []
users_all = 0
users_processed = 0
exception_count = 0
bonus_all = 0
# Apply bonus to users from certain regions
with request.tm:
start_time_gte = PROMO_WEEK_START_TIMES[0]
start_time_lt = PROMO_WEEK_START_TIMES[2].add(weeks=1)
q = db_session.query(
UserModel.id
).join(
StudentModel, StudentModel.parent_id == UserModel.id
).join(
ClassModel, ClassModel.student_id == StudentModel.id
).filter(
UserModel.region_code.in_(PROMO_REGIONS),
UserModel.group == UserGroup.users,
ClassModel.status == ClassStatus.done,
ClassModel.result == ClassResult.completed,
ClassModel.start_time >= start_time_gte,
ClassModel.start_time < start_time_lt
).group_by(
UserModel.id
).having(
func.count(ClassModel.id) >= BONUS_THRESHOLD
)
for x in q:
user_ids.append(x.id)
log2console(f'{len(user_ids)} users found.')
user_query = db_session.query(
UserModel
)
for user_id in user_ids:
users_all += 1
_log(80*'-')
if users_all % 1000 == 0:
log2console('\n')
log2console(f'Users: all - {users_all} | processed - {users_processed} | errors - {exception_count}')
log2console('\n')
with request.tm:
user_model = user_query.get(user_id)
_log(f'User: id - {user_id} | currency - {user_model.region.currency_code}')
user_bonuses_count = 0
user_bonuses_per_week = 0
for start_time_gte in PROMO_WEEK_START_TIMES:
start_time_lt = start_time_gte.add(weeks=1)
class_query = ClassModel.query_classes_of_parent(db_session, user_id)
class_query = ClassModel.filter_classes_in_time_interval(class_query,
start_time_gte=start_time_gte,
start_time_lt=start_time_lt)
class_query = class_query.filter(
ClassModel.status == ClassStatus.done,
ClassModel.result == ClassResult.completed
)
classes_count = class_query.count()
# One bonus per every BONUS_THRESHOLD-th successful class
user_bonuses_per_week = classes_count // BONUS_THRESHOLD
user_bonuses_count += user_bonuses_per_week
_log(f'Week: {start_time_gte} - {start_time_lt} | classes done/completed - {classes_count} | bonuses - {user_bonuses_per_week}')
if user_bonuses_count > 0:
try:
# Get bonus cost as one native class cost in user's currency multiplied by BONUS_COEFFICIENT
last_payment = PaymentModel.last_paid_for_user(db_session, user_model)
if last_payment and last_payment.cost and last_payment.value:
native_class_cost = last_payment.cost / last_payment.value
user_bonus_cost = native_class_cost * user_bonuses_count * BONUS_COEFFICIENT
user_bonus_cost = user_bonus_cost.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
user_bonus_currency_code = last_payment.currency_code
if user_bonus_cost > 0:
_log(f'Last payment: cost - {last_payment.cost} | '
f'value: {last_payment.value} | '
f'currency - {last_payment.currency_code}')
_log(f'User bonus: count - {user_bonuses_count} | '
f'cost - {user_bonus_cost} | '
f'currency - {user_bonus_currency_code}')
result = apply_bonus(request, user_model, user_bonus_cost, user_bonus_currency_code, dry_run)
if result.successful:
users_processed += 1
bonus_all += user_bonuses_count
totals.append((user_id, user_bonus_cost, user_bonus_currency_code,))
_log(result.message)
except UnexpectedException as err:
exception_count += 1
msg = 'Unexpected exception on Winter Intensive promo'
_log(f'{msg}: {err}')
log2console(80*'-')
for row in totals:
message = ', '.join([str(item) for item in row])
print(message)
log2console(80*'-')
log2console(f'Users: all - {users_all} | processed - {users_processed} | errors - {exception_count}')
def apply_bonus(request, user_model, cost, currency_code, dry_run=True) -> PromoApplyResult:
db_session = request.db_session
user_id = user_model.id
# Check user region
if user_model.region.currency_code != currency_code:
return PromoApplyResult(False, f'Region currency {user_model.region.currency_code} not equal user {user_id} currency {currency_code}')
# Check existing bonus transaction
query_bonus = db_session.query(BonusTransactionModel).filter(
BonusTransactionModel.user_id == user_id,
).join(
PromoEventLogModel, PromoEventLogModel.bonus_transaction_id == BonusTransactionModel.id,
).filter(
PromoEventLogModel.key == EVENT_KEY,
PromoEventLogModel.event_type == PromoEventType.other,
)
if exists(query_bonus):
return PromoApplyResult(False, f'User {user_id} has already received a bonus')
# create bonus transaction
transactions = get_promo_bonus_transactions(request.root)
transaction_params = {
'user_id': user_id,
'value': cost,
'currency_code': currency_code,
'ttl': BONUS_EXPIRES,
'event_log': {
'event_type': PromoEventType.other,
'key': EVENT_KEY,
},
}
if not dry_run:
transaction, _ = transactions.http_post(request, params=transaction_params)
return PromoApplyResult(True, f'Bonus {cost} {currency_code} applied successfully')
|
You can clone a snippet to your computer for local editing. Learn more.