MonkeyBusiness/modules/sdvx/game.py
2022-08-26 10:39:11 +00:00

649 lines
21 KiB
Python

from tinydb import Query, where
import config
import random
import time
from fastapi import APIRouter, Request, Response
from core_common import core_process_request, core_prepare_response, E
from core_database import get_db
router = APIRouter(prefix="/local2", tags=["local2"])
router.model_whitelist = ["KFC"]
def get_profile(cid):
return get_db().table('sdvx_profile').get(
where('card') == cid
)
def get_game_profile(cid, game_version):
profile = get_profile(cid)
return profile['version'].get(str(game_version), None)
def get_id_from_profile(cid):
profile = get_db().table('sdvx_profile').get(
where('card') == cid
)
djid = "%08d" % profile['sdvx_id']
djid_split = '-'.join([djid[:4], djid[4:]])
return profile['sdvx_id'], djid_split
@router.post('/{gameinfo}/game/sv6_common')
async def game_sv6_common(request: Request):
request_info = await core_process_request(request)
event = [
'DEMOGAME_PLAY',
'MATCHING_MODE',
'MATCHING_MODE_FREE_IP',
'LEVEL_LIMIT_EASING',
'ACHIEVEMENT_ENABLE',
'APICAGACHADRAW\t30',
'VOLFORCE_ENABLE',
'AKANAME_ENABLE',
'PAUSE_ONLINEUPDATE',
'CONTINUATION',
'TENKAICHI_MODE',
'QC_MODE',
'KAC_MODE',
'APPEAL_CARD_GEN_PRICE\t100',
'APPEAL_CARD_GEN_NEW_PRICE\t200',
'APPEAL_CARD_UNLOCK\t0,20170914,0,20171014,0,20171116,0,20180201,0,20180607,0,20181206,0,20200326,0,20200611,4,10140732,6,10150431',
'FAVORITE_APPEALCARD_MAX\t200',
'FAVORITE_MUSIC_MAX\t200',
'EVENTDATE_APRILFOOL',
'KONAMI_50TH_LOGO',
'OMEGA_ARS_ENABLE',
'DISABLE_MONITOR_ID_CHECK',
'SKILL_ANALYZER_ABLE',
'BLASTER_ABLE',
'STANDARD_UNLOCK_ENABLE',
'PLAYERJUDGEADJ_ENABLE',
'MIXID_INPUT_ENABLE',
'EVENTDATE_ONIGO',
'EVENTDATE_GOTT',
'GENERATOR_ABLE',
'CREW_SELECT_ABLE',
'PREMIUM_TIME_ENABLE',
'OMEGA_ENABLE\t1,2,3,4,5,6,7,8,9',
'HEXA_ENABLE\t1,2,3,4,5',
'MEGAMIX_ENABLE',
'VALGENE_ENABLE',
'ARENA_ENABLE',
'DISP_PASELI_BANNER',
]
unlock = []
for i in range(2000):
for j in range(0, 5):
unlock.append([i, j])
response = E.response(
E.game(
E.event(
*[E.info(
E.event_id(s, __type="str"),
)for s in event],
),
E.music_limited(
*[E.info(
E.music_id(s[0], __type="s32"),
E.music_type(s[1], __type="u8"),
E.limited(3, __type="u8"),
)for s in unlock],
),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post('/{gameinfo}/game/sv6_new')
async def game_sv6_new(request: Request):
request_info = await core_process_request(request)
game_version = request_info['game_version']
root = request_info['root'][0]
dataid = root.find("dataid").text
cardno = root.find("cardno").text
name = root.find("name").text
db = get_db().table('sdvx_profile')
all_profiles_for_card = db.get(Query().card == dataid)
if all_profiles_for_card is None:
all_profiles_for_card = {
'card': dataid,
'version': {}
}
if 'sdvx_id' not in all_profiles_for_card:
sdvx_id = random.randint(10000000, 99999999)
all_profiles_for_card['sdvx_id'] = sdvx_id
all_profiles_for_card['version'][str(game_version)] = {
'game_version': game_version,
'name': name,
'appeal_id': 0,
'skill_level': 0,
'skill_base_id': 0,
'skill_name_id': 0,
'earned_gamecoin_packet': 0,
'earned_gamecoin_block': 0,
'earned_blaster_energy': 0,
'earned_extrack_energy': 0,
'used_packet_booster': 0,
'used_block_booster': 0,
'hispeed': 0,
'lanespeed': 0,
'gauge_option': 0,
'ars_option': 0,
'notes_option': 0,
'early_late_disp': 0,
'draw_adjust': 0,
'eff_c_left': 0,
'eff_c_right': 1,
'music_id': 0,
'music_type': 0,
'sort_type': 0,
'narrow_down': 0,
'headphone': 1,
'print_count': 0,
'start_option': 0,
'bgm': 0,
'submonitor': 0,
'nemsys': 0,
'stampA': 0,
'stampB': 0,
'stampC': 0,
'stampD': 0,
'items': [],
'params': [],
}
db.upsert(all_profiles_for_card, where('card') == dataid)
response = E.response(
E.game(
E.result(0, __type="u8"),
),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post('/{gameinfo}/game/sv6_load')
async def game_sv6_load(request: Request):
request_info = await core_process_request(request)
game_version = request_info['game_version']
dataid = request_info['root'][0].find("dataid").text
profile = get_game_profile(dataid, game_version)
if profile:
djid, djid_split = get_id_from_profile(dataid)
unlock = []
for i in range(301):
unlock.append([i, 11, 15])
for i in range(6001):
unlock.append([i, 1, 1])
unlock.append([599, 4, 10])
for item in profile['items']:
unlock.append(item)
customize = [[2, 2, [profile['bgm'], profile['submonitor'], profile['nemsys'], profile['stampA'], profile['stampB'], profile['stampC'], profile['stampD']]]]
for item in profile['params']:
customize.append(item)
response = E.response(
E.game(
E.result(0, __type="u8"),
E.name(profile['name'], __type="str"),
E.code(djid_split, __type="str"),
E.sdvx_id(djid_split, __type="str"),
E.appeal_id(profile['appeal_id'], __type="u16"),
E.skill_level(profile['skill_level'], __type="s16"),
E.skill_base_id(profile['skill_base_id'], __type="s16"),
E.skill_name_id(profile['skill_name_id'], __type="s16"),
E.gamecoin_packet(profile['earned_gamecoin_packet'], __type="u32"),
E.gamecoin_block(profile['earned_gamecoin_block'], __type="u32"),
E.blaster_energy(profile['earned_blaster_energy'], __type="u32"),
E.blaster_count(9999, __type="u32"),
E.extrack_energy(profile['earned_extrack_energy'], __type="u16"),
E.play_count(1001, __type="u32"),
E.day_count(301, __type="u32"),
E.today_count(21, __type="u32"),
E.play_chain(31, __type="u32"),
E.max_play_chain(31, __type="u32"),
E.week_count(9, __type="u32"),
E.week_play_count(101, __type="u32"),
E.week_chain(31, __type="u32"),
E.max_week_chain(1001, __type="u32"),
E.creator_id(1, __type="u32"),
E.eaappli(
E.relation(1, __type="s8")
),
E.ea_shop(
E.blaster_pass_enable(1, __type="bool"),
E.blaster_pass_limit_date(1605871200, __type="u64"),
),
E.kac_id(profile['name'], __type="str"),
E.block_no(0, __type="s32"),
E.volte_factory(
*[E.info(
E.goods_id(s, __type="s32"),
E.status(1, __type="s32"),
)for s in range(1, 999)],
),
*[E.campaign(
E.campaign_id(s, __type="s32"),
E.jackpot_flg(1, __type="bool"),
)for s in range(99)],
E.cloud(
E.relation(1, __type="s8")
),
E.something(
*[E.info(
E.ranking_id(s[0], __type="s32"),
E.value(s[1], __type="s64"),
)for s in [[1402, 20000]]],
),
E.festival(
E.fes_id(1, __type="s32"),
E.live_energy(1000000, __type="s32"),
*[E.bonus(
E.energy_type(s, __type="s32"),
E.live_energy(1000000, __type="s32"),
)for s in range(1, 6)],
),
E.valgene_ticket(
E.ticket_num(0, __type="s32"),
E.limit_date(1605871200, __type="u64"),
),
E.arena(
E.last_play_season(0, __type="s32"),
E.rank_point(0, __type="s32"),
E.shop_point(0, __type="s32"),
E.ultimate_rate(0, __type="s32"),
E.ultimate_rank_num(0, __type="s32"),
E.rank_play_cnt(0, __type="s32"),
E.ultimate_play_cnt(0, __type="s32"),
),
E.hispeed(profile['hispeed'], __type="s32"),
E.lanespeed(profile['lanespeed'], __type="u32"),
E.gauge_option(profile['gauge_option'], __type="u8"),
E.ars_option(profile['ars_option'], __type="u8"),
E.notes_option(profile['notes_option'], __type="u8"),
E.early_late_disp(profile['early_late_disp'], __type="u8"),
E.draw_adjust(profile['draw_adjust'], __type="s32"),
E.eff_c_left(profile['eff_c_left'], __type="u8"),
E.eff_c_right(profile['eff_c_right'], __type="u8"),
E.last_music_id(profile['music_id'], __type="s32"),
E.last_music_type(profile['music_type'], __type="u8"),
E.sort_type(profile['sort_type'], __type="u8"),
E.narrow_down(profile['narrow_down'], __type="u8"),
E.headphone(profile['headphone'], __type="u8"),
E.item(
*[E.info(
E.id(s[0], __type="u32"),
E.type(s[1], __type="u8"),
E.param(s[2], __type="u32"),
)for s in unlock],
),
E.param(
*[E.info(
E.type(s[0], __type="s32"),
E.id(s[1], __type="s32"),
E.param(s[2], __type="s32", __count=len(s[2])),
)for s in customize],
),
),
)
else:
response = E.response(
E.game(
E.result(1, __type="u8"),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post('/{gameinfo}/game/sv6_load_m')
async def game_sv6_load_m(request: Request):
request_info = await core_process_request(request)
game_version = request_info['game_version']
dataid = request_info['root'][0].find("refid").text
profile = get_game_profile(dataid, game_version)
djid, djid_split = get_id_from_profile(dataid)
best_scores = []
db = get_db()
for record in db.table('sdvx_scores_best').search(
(where('game_version') == game_version)
& (where('sdvx_id') == djid)
):
best_scores.append([
record['music_id'],
record['music_type'],
record['score'],
record['exscore'],
record['clear_type'],
record['score_grade'],
0,
0,
record['btn_rate'],
record['long_rate'],
record['vol_rate'],
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
])
response = E.response(
E.game(
E.music(
*[E.info(
E.param(x, __type="u32"),
)for x in best_scores],
),
),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post('/{gameinfo}/game/sv6_save')
async def game_sv6_save(request: Request):
request_info = await core_process_request(request)
game_version = request_info['game_version']
dataid = request_info['root'][0].find("refid").text
profile = get_profile(dataid)
game_profile = profile['version'].get(str(game_version), {})
root = request_info['root'][0]
game_profile['appeal_id'] = int(root.find('appeal_id').text)
nodes = [
'appeal_id',
'skill_level',
'skill_base_id',
'skill_name_id',
'earned_gamecoin_packet',
'earned_gamecoin_block',
'earned_blaster_energy',
'earned_extrack_energy',
'hispeed',
'lanespeed',
'gauge_option',
'ars_option',
'notes_option',
'early_late_disp',
'draw_adjust',
'eff_c_left',
'eff_c_right',
'music_id',
'music_type',
'sort_type',
'narrow_down',
'headphone',
'start_option',
]
for node in nodes:
game_profile[node] = int(root.find(node).text)
game_profile['used_packet_booster'] = int(root.find('ea_shop')[0].text)
game_profile['used_block_booster'] = int(root.find('ea_shop')[1].text)
game_profile['print_count'] = int(root.find('print')[0].text)
items = []
for info in root.find('item'):
items.append([int(info.find('id').text), int(info.find('type').text), int(info.find('param').text)])
game_profile['items'] = items
params = []
for info in root.find('param'):
p = info.find('param')
params.append([int(info.find('type').text), int(info.find('id').text), [int(x) for x in p.text.split(' ')]])
game_profile['params'] = params
profile['version'][str(game_version)] = game_profile
get_db().table('sdvx_profile').upsert(profile, where('card') == dataid)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post('/{gameinfo}/game/sv6_save_m')
async def game_sv6_save_m(request: Request):
request_info = await core_process_request(request)
game_version = request_info['game_version']
timestamp = time.time()
root = request_info['root'][0]
dataid = root.find("dataid").text
profile = get_game_profile(dataid, game_version)
djid, djid_split = get_id_from_profile(dataid)
track = root.find("track")
play_id = int(track.find("play_id").text)
music_id = int(track.find("music_id").text)
music_type = int(track.find("music_type").text)
score = int(track.find("score").text)
exscore = int(track.find("exscore").text)
clear_type = int(track.find("clear_type").text)
score_grade = int(track.find("score_grade").text)
max_chain = int(track.find("max_chain").text)
just = int(track.find("just").text)
critical = int(track.find("critical").text)
near = int(track.find("near").text)
error = int(track.find("error").text)
effective_rate = int(track.find("effective_rate").text)
btn_rate = int(track.find("btn_rate").text)
long_rate = int(track.find("long_rate").text)
vol_rate = int(track.find("vol_rate").text)
mode = int(track.find("mode").text)
gauge_type = int(track.find("gauge_type").text)
notes_option = int(track.find("notes_option").text)
online_num = int(track.find("online_num").text)
local_num = int(track.find("local_num").text)
challenge_type = int(track.find("challenge_type").text)
retry_cnt = int(track.find("retry_cnt").text)
judge = [int(x) for x in track.find("judge").text.split(' ')]
db = get_db()
db.table('sdvx_scores').insert(
{
'timestamp': timestamp,
'game_version': game_version,
'sdvx_id': djid,
'play_id': play_id,
'music_id': music_id,
'music_type': music_type,
'score': score,
'exscore': exscore,
'clear_type': clear_type,
'score_grade': score_grade,
'max_chain': max_chain,
'just': just,
'critical': critical,
'near': near,
'error': error,
'effective_rate': effective_rate,
'btn_rate': btn_rate,
'long_rate': long_rate,
'vol_rate': vol_rate,
'mode': mode,
'gauge_type': gauge_type,
'notes_option': notes_option,
'online_num': online_num,
'local_num': local_num,
'challenge_type': challenge_type,
'retry_cnt': retry_cnt,
'judge': judge,
},
)
best = db.table('sdvx_scores_best').get(
(where('sdvx_id') == djid)
& (where('game_version') == game_version)
& (where('music_id') == music_id)
& (where('music_type') == music_type)
)
best = {} if best is None else best
best_score_data = {
'game_version': game_version,
'sdvx_id': djid,
'name': profile['name'],
'music_id': music_id,
'music_type': music_type,
'score': max(score, best.get('score', score)),
'exscore': max(exscore, best.get('exscore', exscore)),
'clear_type': max(clear_type, best.get('clear_type', clear_type)),
'score_grade': max(score_grade, best.get('score_grade', score_grade)),
'btn_rate': max(btn_rate, best.get('btn_rate', btn_rate)),
'long_rate': max(long_rate, best.get('long_rate', long_rate)),
'vol_rate': max(vol_rate, best.get('vol_rate', vol_rate)),
}
db.table('sdvx_scores_best').upsert(
best_score_data,
(where('sdvx_id') == djid)
& (where('game_version') == game_version)
& (where('music_id') == music_id)
& (where('music_type') == music_type)
)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post('/{gameinfo}/game/sv6_hiscore')
async def game_sv6_hiscore(request: Request):
request_info = await core_process_request(request)
game_version = request_info['game_version']
best_scores = []
db = get_db()
for record in db.table('sdvx_scores_best').search(
(where('game_version') == game_version)
):
best_scores.append([
record['music_id'],
record['music_type'],
record['sdvx_id'],
record['name'],
record['score'],
])
response = E.response(
E.game(
E.sc(
*[E.d(
E.id(s[0], __type="u32"),
E.ty(s[1], __type="u32"),
E.a_sq(s[2], __type="str"),
E.a_nm(s[3], __type="str"),
E.a_sc(s[4], __type="u32"),
E.l_sq(s[2], __type="str"),
E.l_nm(s[3], __type="str"),
E.l_sc(s[4], __type="u32"),
)for s in best_scores],
),
),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post('/{gameinfo}/game/sv6_lounge')
async def game_sv6_lounge(request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(
E.interval(30, __type="u32")
),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post('/{gameinfo}/game/sv6_shop')
async def game_sv6_shop(request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(
E.nxt_time(1000 * 5 * 60, __type="u32")
),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
for stub in [
'load_r',
'frozen',
'save_e',
'save_mega',
'play_e',
'play_s',
'entry_s',
'entry_e',
'log'
]:
@router.post(f'/{{gameinfo}}/game/sv6_{stub}')
async def game_sv6_stub(request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)