from tinydb import Query, where import config import random import time from fastapi import APIRouter, Request, Response from core_common import core_process_request, core_prepare_response, E from core_database import get_db router = APIRouter(prefix="/local2", tags=["local2"]) router.model_whitelist = ["KFC"] def get_profile(cid): return get_db().table('sdvx_profile').get( where('card') == cid ) def get_game_profile(cid, game_version): profile = get_profile(cid) return profile['version'].get(str(game_version), None) def get_id_from_profile(cid): profile = get_db().table('sdvx_profile').get( where('card') == cid ) djid = "%08d" % profile['sdvx_id'] djid_split = '-'.join([djid[:4], djid[4:]]) return profile['sdvx_id'], djid_split @router.post('/{gameinfo}/game/sv6_common') async def game_sv6_common(request: Request): request_info = await core_process_request(request) event = [ 'DEMOGAME_PLAY', 'MATCHING_MODE', 'MATCHING_MODE_FREE_IP', 'LEVEL_LIMIT_EASING', 'ACHIEVEMENT_ENABLE', 'APICAGACHADRAW\t30', 'VOLFORCE_ENABLE', 'AKANAME_ENABLE', 'PAUSE_ONLINEUPDATE', 'CONTINUATION', 'TENKAICHI_MODE', 'QC_MODE', 'KAC_MODE', 'APPEAL_CARD_GEN_PRICE\t100', 'APPEAL_CARD_GEN_NEW_PRICE\t200', 'APPEAL_CARD_UNLOCK\t0,20170914,0,20171014,0,20171116,0,20180201,0,20180607,0,20181206,0,20200326,0,20200611,4,10140732,6,10150431', 'FAVORITE_APPEALCARD_MAX\t200', 'FAVORITE_MUSIC_MAX\t200', 'EVENTDATE_APRILFOOL', 'KONAMI_50TH_LOGO', 'OMEGA_ARS_ENABLE', 'DISABLE_MONITOR_ID_CHECK', 'SKILL_ANALYZER_ABLE', 'BLASTER_ABLE', 'STANDARD_UNLOCK_ENABLE', 'PLAYERJUDGEADJ_ENABLE', 'MIXID_INPUT_ENABLE', 'EVENTDATE_ONIGO', 'EVENTDATE_GOTT', 'GENERATOR_ABLE', 'CREW_SELECT_ABLE', 'PREMIUM_TIME_ENABLE', 'OMEGA_ENABLE\t1,2,3,4,5,6,7,8,9', 'HEXA_ENABLE\t1,2,3,4,5', 'MEGAMIX_ENABLE', 'VALGENE_ENABLE', 'ARENA_ENABLE', 'DISP_PASELI_BANNER', ] unlock = [] for i in range(2000): for j in range(0, 5): unlock.append([i, j]) response = E.response( E.game( E.event( *[E.info( E.event_id(s, __type="str"), )for s in event], ), E.music_limited( *[E.info( E.music_id(s[0], __type="s32"), E.music_type(s[1], __type="u8"), E.limited(3, __type="u8"), )for s in unlock], ), ) ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post('/{gameinfo}/game/sv6_new') async def game_sv6_new(request: Request): request_info = await core_process_request(request) game_version = request_info['game_version'] root = request_info['root'][0] dataid = root.find("dataid").text cardno = root.find("cardno").text name = root.find("name").text db = get_db().table('sdvx_profile') all_profiles_for_card = db.get(Query().card == dataid) if all_profiles_for_card is None: all_profiles_for_card = { 'card': dataid, 'version': {} } if 'sdvx_id' not in all_profiles_for_card: sdvx_id = random.randint(10000000, 99999999) all_profiles_for_card['sdvx_id'] = sdvx_id all_profiles_for_card['version'][str(game_version)] = { 'game_version': game_version, 'name': name, 'appeal_id': 0, 'skill_level': 0, 'skill_base_id': 0, 'skill_name_id': 0, 'earned_gamecoin_packet': 0, 'earned_gamecoin_block': 0, 'earned_blaster_energy': 0, 'earned_extrack_energy': 0, 'used_packet_booster': 0, 'used_block_booster': 0, 'hispeed': 0, 'lanespeed': 0, 'gauge_option': 0, 'ars_option': 0, 'notes_option': 0, 'early_late_disp': 0, 'draw_adjust': 0, 'eff_c_left': 0, 'eff_c_right': 1, 'music_id': 0, 'music_type': 0, 'sort_type': 0, 'narrow_down': 0, 'headphone': 1, 'print_count': 0, 'start_option': 0, 'bgm': 0, 'submonitor': 0, 'nemsys': 0, 'stampA': 0, 'stampB': 0, 'stampC': 0, 'stampD': 0, 'items': [], 'params': [], } db.upsert(all_profiles_for_card, where('card') == dataid) response = E.response( E.game( E.result(0, __type="u8"), ), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post('/{gameinfo}/game/sv6_load') async def game_sv6_load(request: Request): request_info = await core_process_request(request) game_version = request_info['game_version'] dataid = request_info['root'][0].find("dataid").text profile = get_game_profile(dataid, game_version) if profile: djid, djid_split = get_id_from_profile(dataid) unlock = [] for i in range(301): unlock.append([i, 11, 15]) for i in range(6001): unlock.append([i, 1, 1]) unlock.append([599, 4, 10]) for item in profile['items']: unlock.append(item) customize = [[2, 2, [profile['bgm'], profile['submonitor'], profile['nemsys'], profile['stampA'], profile['stampB'], profile['stampC'], profile['stampD']]]] for item in profile['params']: customize.append(item) response = E.response( E.game( E.result(0, __type="u8"), E.name(profile['name'], __type="str"), E.code(djid_split, __type="str"), E.sdvx_id(djid_split, __type="str"), E.appeal_id(profile['appeal_id'], __type="u16"), E.skill_level(profile['skill_level'], __type="s16"), E.skill_base_id(profile['skill_base_id'], __type="s16"), E.skill_name_id(profile['skill_name_id'], __type="s16"), E.gamecoin_packet(profile['earned_gamecoin_packet'], __type="u32"), E.gamecoin_block(profile['earned_gamecoin_block'], __type="u32"), E.blaster_energy(profile['earned_blaster_energy'], __type="u32"), E.blaster_count(9999, __type="u32"), E.extrack_energy(profile['earned_extrack_energy'], __type="u16"), E.play_count(1001, __type="u32"), E.day_count(301, __type="u32"), E.today_count(21, __type="u32"), E.play_chain(31, __type="u32"), E.max_play_chain(31, __type="u32"), E.week_count(9, __type="u32"), E.week_play_count(101, __type="u32"), E.week_chain(31, __type="u32"), E.max_week_chain(1001, __type="u32"), E.creator_id(1, __type="u32"), E.eaappli( E.relation(1, __type="s8") ), E.ea_shop( E.blaster_pass_enable(1, __type="bool"), E.blaster_pass_limit_date(1605871200, __type="u64"), ), E.kac_id(profile['name'], __type="str"), E.block_no(0, __type="s32"), E.volte_factory( *[E.info( E.goods_id(s, __type="s32"), E.status(1, __type="s32"), )for s in range(1, 999)], ), *[E.campaign( E.campaign_id(s, __type="s32"), E.jackpot_flg(1, __type="bool"), )for s in range(99)], E.cloud( E.relation(1, __type="s8") ), E.something( *[E.info( E.ranking_id(s[0], __type="s32"), E.value(s[1], __type="s64"), )for s in [[1402, 20000]]], ), E.festival( E.fes_id(1, __type="s32"), E.live_energy(1000000, __type="s32"), *[E.bonus( E.energy_type(s, __type="s32"), E.live_energy(1000000, __type="s32"), )for s in range(1, 6)], ), E.valgene_ticket( E.ticket_num(0, __type="s32"), E.limit_date(1605871200, __type="u64"), ), E.arena( E.last_play_season(0, __type="s32"), E.rank_point(0, __type="s32"), E.shop_point(0, __type="s32"), E.ultimate_rate(0, __type="s32"), E.ultimate_rank_num(0, __type="s32"), E.rank_play_cnt(0, __type="s32"), E.ultimate_play_cnt(0, __type="s32"), ), E.hispeed(profile['hispeed'], __type="s32"), E.lanespeed(profile['lanespeed'], __type="u32"), E.gauge_option(profile['gauge_option'], __type="u8"), E.ars_option(profile['ars_option'], __type="u8"), E.notes_option(profile['notes_option'], __type="u8"), E.early_late_disp(profile['early_late_disp'], __type="u8"), E.draw_adjust(profile['draw_adjust'], __type="s32"), E.eff_c_left(profile['eff_c_left'], __type="u8"), E.eff_c_right(profile['eff_c_right'], __type="u8"), E.last_music_id(profile['music_id'], __type="s32"), E.last_music_type(profile['music_type'], __type="u8"), E.sort_type(profile['sort_type'], __type="u8"), E.narrow_down(profile['narrow_down'], __type="u8"), E.headphone(profile['headphone'], __type="u8"), E.item( *[E.info( E.id(s[0], __type="u32"), E.type(s[1], __type="u8"), E.param(s[2], __type="u32"), )for s in unlock], ), E.param( *[E.info( E.type(s[0], __type="s32"), E.id(s[1], __type="s32"), E.param(s[2], __type="s32", __count=len(s[2])), )for s in customize], ), ), ) else: response = E.response( E.game( E.result(1, __type="u8"), ) ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post('/{gameinfo}/game/sv6_load_m') async def game_sv6_load_m(request: Request): request_info = await core_process_request(request) game_version = request_info['game_version'] dataid = request_info['root'][0].find("refid").text profile = get_game_profile(dataid, game_version) djid, djid_split = get_id_from_profile(dataid) best_scores = [] db = get_db() for record in db.table('sdvx_scores_best').search( (where('game_version') == game_version) & (where('sdvx_id') == djid) ): best_scores.append([ record['music_id'], record['music_type'], record['score'], record['exscore'], record['clear_type'], record['score_grade'], 0, 0, record['btn_rate'], record['long_rate'], record['vol_rate'], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ]) response = E.response( E.game( E.music( *[E.info( E.param(x, __type="u32"), )for x in best_scores], ), ), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post('/{gameinfo}/game/sv6_save') async def game_sv6_save(request: Request): request_info = await core_process_request(request) game_version = request_info['game_version'] dataid = request_info['root'][0].find("refid").text profile = get_profile(dataid) game_profile = profile['version'].get(str(game_version), {}) root = request_info['root'][0] game_profile['appeal_id'] = int(root.find('appeal_id').text) nodes = [ 'appeal_id', 'skill_level', 'skill_base_id', 'skill_name_id', 'earned_gamecoin_packet', 'earned_gamecoin_block', 'earned_blaster_energy', 'earned_extrack_energy', 'hispeed', 'lanespeed', 'gauge_option', 'ars_option', 'notes_option', 'early_late_disp', 'draw_adjust', 'eff_c_left', 'eff_c_right', 'music_id', 'music_type', 'sort_type', 'narrow_down', 'headphone', 'start_option', ] for node in nodes: game_profile[node] = int(root.find(node).text) game_profile['used_packet_booster'] = int(root.find('ea_shop')[0].text) game_profile['used_block_booster'] = int(root.find('ea_shop')[1].text) game_profile['print_count'] = int(root.find('print')[0].text) items = [] for info in root.find('item'): items.append([int(info.find('id').text), int(info.find('type').text), int(info.find('param').text)]) game_profile['items'] = items params = [] for info in root.find('param'): p = info.find('param') params.append([int(info.find('type').text), int(info.find('id').text), [int(x) for x in p.text.split(' ')]]) game_profile['params'] = params profile['version'][str(game_version)] = game_profile get_db().table('sdvx_profile').upsert(profile, where('card') == dataid) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post('/{gameinfo}/game/sv6_save_m') async def game_sv6_save_m(request: Request): request_info = await core_process_request(request) game_version = request_info['game_version'] timestamp = time.time() root = request_info['root'][0] dataid = root.find("dataid").text profile = get_game_profile(dataid, game_version) djid, djid_split = get_id_from_profile(dataid) track = root.find("track") play_id = int(track.find("play_id").text) music_id = int(track.find("music_id").text) music_type = int(track.find("music_type").text) score = int(track.find("score").text) exscore = int(track.find("exscore").text) clear_type = int(track.find("clear_type").text) score_grade = int(track.find("score_grade").text) max_chain = int(track.find("max_chain").text) just = int(track.find("just").text) critical = int(track.find("critical").text) near = int(track.find("near").text) error = int(track.find("error").text) effective_rate = int(track.find("effective_rate").text) btn_rate = int(track.find("btn_rate").text) long_rate = int(track.find("long_rate").text) vol_rate = int(track.find("vol_rate").text) mode = int(track.find("mode").text) gauge_type = int(track.find("gauge_type").text) notes_option = int(track.find("notes_option").text) online_num = int(track.find("online_num").text) local_num = int(track.find("local_num").text) challenge_type = int(track.find("challenge_type").text) retry_cnt = int(track.find("retry_cnt").text) judge = [int(x) for x in track.find("judge").text.split(' ')] db = get_db() db.table('sdvx_scores').insert( { 'timestamp': timestamp, 'game_version': game_version, 'sdvx_id': djid, 'play_id': play_id, 'music_id': music_id, 'music_type': music_type, 'score': score, 'exscore': exscore, 'clear_type': clear_type, 'score_grade': score_grade, 'max_chain': max_chain, 'just': just, 'critical': critical, 'near': near, 'error': error, 'effective_rate': effective_rate, 'btn_rate': btn_rate, 'long_rate': long_rate, 'vol_rate': vol_rate, 'mode': mode, 'gauge_type': gauge_type, 'notes_option': notes_option, 'online_num': online_num, 'local_num': local_num, 'challenge_type': challenge_type, 'retry_cnt': retry_cnt, 'judge': judge, }, ) best = db.table('sdvx_scores_best').get( (where('sdvx_id') == djid) & (where('game_version') == game_version) & (where('music_id') == music_id) & (where('music_type') == music_type) ) best = {} if best is None else best best_score_data = { 'game_version': game_version, 'sdvx_id': djid, 'name': profile['name'], 'music_id': music_id, 'music_type': music_type, 'score': max(score, best.get('score', score)), 'exscore': max(exscore, best.get('exscore', exscore)), 'clear_type': max(clear_type, best.get('clear_type', clear_type)), 'score_grade': max(score_grade, best.get('score_grade', score_grade)), 'btn_rate': max(btn_rate, best.get('btn_rate', btn_rate)), 'long_rate': max(long_rate, best.get('long_rate', long_rate)), 'vol_rate': max(vol_rate, best.get('vol_rate', vol_rate)), } db.table('sdvx_scores_best').upsert( best_score_data, (where('sdvx_id') == djid) & (where('game_version') == game_version) & (where('music_id') == music_id) & (where('music_type') == music_type) ) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post('/{gameinfo}/game/sv6_hiscore') async def game_sv6_hiscore(request: Request): request_info = await core_process_request(request) game_version = request_info['game_version'] best_scores = [] db = get_db() for record in db.table('sdvx_scores_best').search( (where('game_version') == game_version) ): best_scores.append([ record['music_id'], record['music_type'], record['sdvx_id'], record['name'], record['score'], ]) response = E.response( E.game( E.sc( *[E.d( E.id(s[0], __type="u32"), E.ty(s[1], __type="u32"), E.a_sq(s[2], __type="str"), E.a_nm(s[3], __type="str"), E.a_sc(s[4], __type="u32"), E.l_sq(s[2], __type="str"), E.l_nm(s[3], __type="str"), E.l_sc(s[4], __type="u32"), )for s in best_scores], ), ), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post('/{gameinfo}/game/sv6_lounge') async def game_sv6_lounge(request: Request): request_info = await core_process_request(request) response = E.response( E.game( E.interval(30, __type="u32") ), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post('/{gameinfo}/game/sv6_shop') async def game_sv6_shop(request: Request): request_info = await core_process_request(request) response = E.response( E.game( E.nxt_time(1000 * 5 * 60, __type="u32") ), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) for stub in [ 'load_r', 'frozen', 'save_e', 'save_mega', 'play_e', 'play_s', 'entry_s', 'entry_e', 'log' ]: @router.post(f'/{{gameinfo}}/game/sv6_{stub}') async def game_sv6_stub(request: Request): request_info = await core_process_request(request) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers)