2022-11-15 14:03:37 +00:00

879 lines
29 KiB
Python

from tinydb import Query, where
import config
import random
from fastapi import APIRouter, Request, Response
from core_common import core_process_request, core_prepare_response, E
from core_database import get_db
router = APIRouter(prefix="/local", tags=["local"])
router.model_whitelist = ["LDJ", "KDZ", "JDZ"]
def get_profile(cid):
return get_db().table("iidx_profile").get(where("card") == cid)
def get_profile_by_id(iidx_id):
return get_db().table("iidx_profile").get(where("iidx_id") == iidx_id)
def get_game_profile(cid, game_version):
profile = get_profile(cid)
return profile["version"].get(str(game_version), None)
def get_id_from_profile(cid):
profile = get_db().table("iidx_profile").get(where("card") == cid)
djid = "%08d" % profile["iidx_id"]
djid_split = "-".join([djid[:4], djid[4:]])
return profile["iidx_id"], djid_split
def calculate_folder_mask(profile):
return (
profile.get("_show_category_grade", 0) << 0
| (profile.get("_show_category_status", 0) << 1)
| (profile.get("_show_category_difficulty", 0) << 2)
| (profile.get("_show_category_alphabet", 0) << 3)
| (profile.get("_show_category_rival_play", 0) << 4)
| (profile.get("_show_category_rival_winlose", 0) << 6)
| (profile.get("_show_rival_shop_info", 0) << 7)
| (profile.get("_hide_play_count", 0) << 8)
| (profile.get("_hide_rival_info", 0) << 9)
)
@router.post("/{gameinfo}/pc/get")
async def pc_get(request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
cid = request_info["root"][0].attrib["rid"]
profile = get_game_profile(cid, game_version)
djid, djid_split = get_id_from_profile(cid)
if game_version == 20:
response = E.response(
E.pc(
E.pcdata(
dach=profile["dach"],
dp_opt=profile["dp_opt"],
dp_opt2=profile["dp_opt2"],
dpnum=profile["dpnum"],
gno=profile["gno"],
gpos=profile["gpos"],
help=profile["help"],
hispeed=profile["hispeed"],
id=djid,
idstr=djid_split,
judge=profile["judge"],
judgeAdj=profile["judgeAdj"],
liflen=profile["lift"],
mode=profile["mode"],
name=profile["djname"],
notes=profile["notes"],
opstyle=profile["opstyle"],
pase=profile["pase"],
pid=profile["region"],
pmode=profile["pmode"],
sach=profile["sach"],
sdhd=profile["sdhd"],
sdtype=profile["sdtype"],
sp_opt=profile["sp_opt"],
spnum=profile["spnum"],
timing=profile["timing"],
),
E.qprodata(
[
profile["head"],
profile["hair"],
profile["face"],
profile["hand"],
profile["body"],
],
__type="u32",
__size=5 * 4,
),
E.skin(
[
0,
profile["turntable"],
profile["explosion"],
profile["bgm"],
calculate_folder_mask(profile),
profile["sudden"],
0,
profile["categoryvoice"],
profile["note"],
profile["fullcombo"],
profile["keybeam"],
profile["judgestring"],
-1,
profile["soundpreview"],
],
__type="s16",
),
E.rlist(),
E.commonboss(baron=0, deller=profile["deller"], orb=0),
E.secret(
E.flg1(profile.get("secret_flg1", [-1]), __type="s64"),
E.flg2(profile.get("secret_flg2", [-1]), __type="s64"),
E.flg3(profile.get("secret_flg3", [-1]), __type="s64"),
),
E.join_shop(
join_cflg=1, join_id=10, join_name=config.arcade, joinflg=1
),
E.grade(
*[E.g(x, __type="u8") for x in profile["grade_values"]],
dgid=profile["grade_double"],
sgid=profile["grade_single"],
),
E.redboss(
crush=profile.get("redboss_crush", 0),
open=profile.get("redboss_open", 0),
progress=profile.get("redboss_progress", 0),
),
E.blueboss(
column0=profile.get("blueboss_column0", 0),
column1=profile.get("blueboss_column1", 0),
first_flg=profile.get("blueboss_first_flg", 0),
gauge=profile.get("blueboss_gauge", 0),
general=profile.get("blueboss_general", 0),
item=profile.get("blueboss_item", 0),
item_flg=profile.get("blueboss_item_flg", 0),
level=profile.get("blueboss_level", 0),
row0=profile.get("blueboss_row0", 0),
row1=profile.get("blueboss_row1", 0),
sector=profile.get("blueboss_sector", 0),
),
E.yellowboss(
E.p_attack(
profile.get("yellowboss_p_attack", [0] * 7), __type="s32"
),
E.pbest_attack(
profile.get("yellowboss_pbest_attack", [0] * 7), __type="s32"
),
E.defeat(profile.get("yellowboss_defeat", [0] * 7), __type="bool"),
E.shop_damage(
profile.get("yellowboss_shop_damage", [0] * 7), __type="s32"
),
critical=profile.get("yellowboss_critical", 0),
destiny=profile.get("yellowboss_destiny", 0),
first_flg=profile.get("yellowboss_first_flg", 1),
heroic0=profile.get("yellowboss_heroic0", 0),
heroic1=profile.get("yellowboss_heroic1", 0),
join_num=profile.get("yellowboss_join_num", 0),
last_select=profile.get("yellowboss_last_select", 0),
level=profile.get("yellowboss_level", 1),
shop_message=profile.get("yellowboss_shop_message", ""),
special_move=profile.get("yellowboss_special_move", ""),
),
E.link5(
anisakis=1,
bad=1,
beachside=1,
beautiful=1,
broken=1,
castle=1,
china=1,
cuvelia=1,
exusia=1,
fallen=1,
flip=1,
glass=1,
glassflg=1,
qpro=1,
qproflg=1,
quaver=1,
reflec_data=1,
reunion=1,
sakura=1,
sampling=1,
second=1,
summer=1,
survival=1,
thunder=1,
titans=1,
treasure=1,
turii=1,
waxing=1,
whydidyou=1,
wuv=1,
),
E.cafe(
astraia=1,
bastie=1,
beachimp=1,
food=0,
holysnow=1,
is_first=0,
ledvsscu=1,
pastry=0,
rainbow=1,
service=0,
trueblue=1,
),
E.tricolettepark(
attack_rate=0,
boss0_damage=0,
boss0_stun=0,
boss1_damage=0,
boss1_stun=0,
boss2_damage=0,
boss2_stun=0,
boss3_damage=0,
boss3_stun=0,
is_union=0,
magic_gauge=0,
open_music=-1,
party=0,
),
E.weekly(
mid=-1,
wid=1,
),
E.packinfo(
music_0=-1,
music_1=-1,
music_2=-1,
pack_id=1,
),
E.visitor(anum=1, pnum=2, snum=1, vs_flg=1),
E.gakuen(music_list=-1),
E.achievements(
E.trophy(profile.get("achievements_trophy", [])[:10], __type="s64"),
last_weekly=profile.get("achievements_last_weekly", 0),
pack=profile.get("achievements_pack_id", 0),
pack_comp=profile.get("achievements_pack_comp", 0),
rival_crush=0,
visit_flg=profile.get("achievements_visit_flg", 0),
weekly_num=profile.get("achievements_weekly_num", 0),
),
E.step(
E.stamp(profile.get("stepup_stamp", ""), __type="bin"),
E.help(profile.get("stepup_help", ""), __type="bin"),
dp_ach=profile.get("stepup_dp_ach", 0),
dp_hdpt=profile.get("stepup_dp_hdpt", 0),
dp_level=profile.get("stepup_dp_level", 0),
dp_mplay=profile.get("stepup_dp_mplay", 0),
dp_round=profile.get("stepup_dp_round", 0),
review=profile.get("stepup_review", 0),
sp_ach=profile.get("stepup_sp_ach", 0),
sp_hdpt=profile.get("stepup_sp_hdpt", 0),
sp_level=profile.get("stepup_sp_level", 0),
sp_mplay=profile.get("stepup_sp_mplay", 0),
sp_round=profile.get("stepup_sp_round", 0),
),
)
)
elif game_version == 19:
response = E.response(
E.pc(
E.pcdata(
dach=profile["dach"],
dp_opt=profile["dp_opt"],
dp_opt2=profile["dp_opt2"],
dpnum=profile["dpnum"],
gno=profile["gno"],
help=profile["help"],
id=djid,
idstr=djid_split,
liflen=profile["lift"],
mode=profile["mode"],
name=profile["djname"],
notes=profile["notes"],
pase=profile["pase"],
pid=profile["region"],
pmode=profile["pmode"],
sach=profile["sach"],
sdhd=profile["sdhd"],
sdtype=profile["sdtype"],
sflg0=-1,
sflg1=-1,
sp_opt=profile["sp_opt"],
spnum=profile["spnum"],
timing=profile["timing"],
),
E.qprodata(
[
profile["head"],
profile["hair"],
profile["face"],
profile["hand"],
profile["body"],
],
__type="u32",
__size=5 * 4,
),
E.skin(
[
profile["frame"],
profile["turntable"],
profile["explosion"],
profile["bgm"],
calculate_folder_mask(profile),
profile["sudden"],
0,
profile["categoryvoice"],
profile["note"],
profile["fullcombo"],
profile["keybeam"],
profile["judgestring"],
0,
0,
],
__type="s16",
),
E.grade(
dgid=profile["grade_double"],
sgid=profile["grade_single"],
),
E.ex(),
E.ocrs(),
E.step(
E.sp_cflg("", __type="bin"),
E.dp_cflg("", __type="bin"),
dp_ach=0,
dp_dif=0,
sp_ach=0,
sp_dif=0,
),
E.lincle(comflg=1),
E.reflec(br=1, sg=1, sr=1, ssc=1, tb=1, tf=1, wu=1),
E.phase2(wonder=1, yellow=1),
E.event(knee=1, lethe=0, resist=0, jknee=1, jlethe=0, jresist=0),
E.phase4(
qpro=1,
glass=1,
treasure=1,
beautiful=1,
quaver=1,
castle=1,
flip=1,
titans=1,
exusia=1,
waxing=1,
sampling=1,
beachside=1,
cuvelia=1,
qproflg=1,
glassflg=1,
reunion=1,
bad=1,
turii=1,
anisakis=1,
second=1,
whydidyou=1,
china=1,
fallen=1,
broken=1,
summer=1,
sakura=1,
wuv=1,
survival=1,
thunder=1,
),
E.shop(
E.item([3, 3, 3], __type="u8"),
spitem=1,
),
E.rlist(),
)
)
elif game_version == 18:
response = E.response(
E.pc(
E.pcdata(
dach=profile["dach"],
dp_opt=profile["dp_opt"],
dp_opt2=profile["dp_opt2"],
dpnum=profile["dpnum"],
gno=profile["gno"],
id=djid,
idstr=djid_split,
liflen=profile["lift"],
mcomb=0,
mode=profile["mode"],
name=profile["djname"],
ncomb=0,
pid=profile["region"],
pmode=profile["pmode"],
sach=profile["sach"],
sdhd=profile["sdhd"],
sdtype=profile["sdtype"],
sflg0=-1,
sflg1=-1,
sp_opt=profile["sp_opt"],
spnum=profile["spnum"],
timing=profile["timing"],
),
E.skin(
[
profile["frame"],
profile["turntable"],
profile["explosion"],
profile["bgm"],
calculate_folder_mask(profile),
profile["sudden"],
0,
0,
0,
0,
0,
0,
],
__type="u16",
),
E.grade(
dgid="-1",
sgid="-1",
),
E.ex(),
E.ocrs(),
E.rlist(),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/pc/common")
async def pc_common(request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
if game_version == 20:
response = E.response(
E.pc(
E.mranking(
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
__type="u16",
),
E.ir(beat=2),
E.boss(phase=0),
E.red(phase=2),
E.yellow(phase=4),
E.limit(phase=25),
E.cafe(open=1),
E.yellow_correct(
*[
E.detail(
avg_shop=7,
critical=2,
max_condition=18,
max_member=20,
max_resist=1,
min_condition=10,
min_member=1,
min_resist=1,
rival=2,
)
for detail in range(6)
],
E.detail(
avg_shop=7,
critical=2,
max_condition=144,
max_member=20,
max_resist=1,
min_condition=80,
min_member=1,
min_resist=1,
rival=2,
),
avg_shop=7,
),
expire=600,
)
)
elif game_version == 19:
response = E.response(
E.pc(
E.secret(
E.mid([1901, 1914, 1946, 1955, 1956, 1966], __type="u16"),
E.open([1, 1, 1, 1, 1, 1], __type="bool"),
),
E.boss(phase=2),
E.ir(beat=2),
E.travel(flg=1),
E.lincle(phase=4),
E.monex(no=3),
expire=600,
)
)
elif game_version == 18:
response = E.response(
E.pc(
E.cmd(
gmbl=1,
gmbla=1,
regl=1,
rndp=1,
hrnd=1,
alls=1,
),
E.lg(lea=1),
E.ir(beat=3),
E.ev(pha=2),
expire=600,
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/pc/save")
async def pc_save(request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
root = request_info["root"][0]
xid = int(root.attrib["iidxid"])
clt = int(root.attrib["cltype"])
profile = get_profile_by_id(xid)
game_profile = profile["version"].get(str(game_version), {})
if clt == 0:
game_profile["sach"] = root.attrib["achi"]
game_profile["sp_opt"] = root.attrib["opt"]
elif clt == 1:
game_profile["dach"] = root.attrib["achi"]
game_profile["dp_opt"] = root.attrib["opt"]
game_profile["dp_opt2"] = root.attrib["opt2"]
for k in [
"gno",
"gpos",
"help",
"hispeed",
"judge",
"judgeAdj",
"lift",
"mode",
"notes",
"opstyle",
"pnum",
"sdhd",
"sdtype",
"timing",
]:
if k in root.attrib:
game_profile[k] = root.attrib[k]
secret = root.find("secret")
if secret is not None:
for k in ["flg1", "flg2", "flg3", "flg4"]:
flg = secret.find(k)
if flg is not None:
game_profile["secret_" + k] = [int(x) for x in flg.text.split(" ")]
step = root.find("step")
if step is not None:
for k in [
"dp_level",
"dp_mplay",
"enemy_damage",
"enemy_defeat_flg",
"mission_clear_num",
"progress",
"sp_level",
"sp_mplay",
"tips_read_list",
"total_point",
]:
game_profile["stepup_" + k] = int(step.attrib[k])
is_track_ticket = step.find("is_track_ticket")
if is_track_ticket is not None:
game_profile["stepup_is_track_ticket"] = int(is_track_ticket.text)
achievements = root.find("achievements")
if achievements is not None:
for k in [
"last_weekly",
"pack_comp",
"pack_flg",
"pack_id",
"play_pack",
"visit_flg",
"weekly_num",
]:
game_profile["achievements_" + k] = int(achievements.attrib[k])
trophy = achievements.find("trophy")
if trophy is not None:
game_profile["achievements_trophy"] = [
int(x) for x in trophy.text.split(" ")
]
grade = request_info["root"][0].find("grade")
if grade is not None:
grade_values = []
for g in grade.findall("g"):
grade_values.append([int(x) for x in g.text.split(" ")])
profile["grade_single"] = int(grade.attrib["sgid"])
profile["grade_double"] = int(grade.attrib["dgid"])
profile["grade_values"] = grade_values
deller_amount = game_profile.get("deller", 0)
commonboss = root.find("commonboss")
if commonboss is not None:
deller_amount = int(commonboss.attrib["deller"])
game_profile["deller"] = deller_amount
game_profile["spnum"] = game_profile.get("spnum", 0) + (1 if clt == 0 else 0)
game_profile["dpnum"] = game_profile.get("dpnum", 0) + (1 if clt == 1 else 0)
profile["version"][str(game_version)] = game_profile
get_db().table("iidx_profile").upsert(profile, where("iidx_id") == xid)
response = E.response(E.pc(iidxid=xid, cltype=clt))
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/pc/visit")
async def pc_visit(request: Request):
request_info = await core_process_request(request)
response = E.response(
E.pc(
aflg=1,
anum=1,
pflg=1,
pnum=1,
sflg=1,
snum=1,
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/pc/reg")
async def pc_reg(request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
cid = request_info["root"][0].attrib["cid"]
name = request_info["root"][0].attrib["name"]
pid = request_info["root"][0].attrib["pid"]
db = get_db().table("iidx_profile")
all_profiles_for_card = db.get(Query().card == cid)
if all_profiles_for_card is None:
all_profiles_for_card = {"card": cid, "version": {}}
if "iidx_id" not in all_profiles_for_card:
iidx_id = random.randint(10000000, 99999999)
all_profiles_for_card["iidx_id"] = iidx_id
if game_version == 20:
all_profiles_for_card["version"][str(game_version)] = {
"game_version": game_version,
"djname": name,
"region": int(pid),
"head": 0,
"hair": 0,
"face": 0,
"hand": 0,
"body": 0,
"turntable": 0,
"explosion": 0,
"bgm": 0,
"folder_mask": 0,
"sudden": 0,
"categoryvoice": 0,
"note": 0,
"fullcombo": 0,
"keybeam": 0,
"judgestring": 0,
"soundpreview": 0,
"dach": 0,
"dp_opt": 0,
"dp_opt2": 0,
"dpnum": 0,
"gno": 0,
"gpos": 0,
"help": 0,
"hispeed": 0,
"judge": 0,
"judgeAdj": 0,
"lift": 0,
"mode": 0,
"notes": 0,
"opstyle": 0,
"pase": 0,
"pmode": 0,
"sach": 0,
"sdhd": 50,
"sdtype": 0,
"sp_opt": 0,
"spnum": 0,
"timing": 0,
"deller": 0,
# Step up mode
"stepup_stamp": "",
"stepup_help": "",
"stepup_dp_ach": 0,
"stepup_dp_hdpt": 0,
"stepup_dp_level": 0,
"stepup_dp_mplay": 0,
"stepup_dp_round": 0,
"stepup_review": 0,
"stepup_sp_ach": 0,
"stepup_sp_hdpt": 0,
"stepup_sp_level": 0,
"stepup_sp_mplay": 0,
"stepup_sp_round": 0,
# Grades
"grade_single": -1,
"grade_double": -1,
"grade_values": [],
# Achievements
"achievements_trophy": [0] * 80,
"achievements_last_weekly": 0,
"achievements_pack_comp": 0,
"achievements_pack_flg": 0,
"achievements_pack_id": 0,
"achievements_play_pack": 0,
"achievements_visit_flg": 0,
"achievements_weekly_num": 0,
# Web UI/Other options
"_show_category_grade": 0,
"_show_category_status": 1,
"_show_category_difficulty": 1,
"_show_category_alphabet": 1,
"_show_category_rival_play": 0,
"_show_category_rival_winlose": 0,
"_show_rival_shop_info": 0,
"_hide_play_count": 0,
"_hide_rival_info": 1,
}
elif game_version == 19:
all_profiles_for_card["version"][str(game_version)] = {
"game_version": game_version,
"djname": name,
"region": int(pid),
"head": 0,
"hair": 0,
"face": 0,
"hand": 0,
"body": 0,
"frame": 0,
"turntable": 0,
"explosion": 0,
"bgm": 0,
"folder_mask": 0,
"sudden": 0,
"categoryvoice": 0,
"note": 0,
"fullcombo": 0,
"keybeam": 0,
"judgestring": 0,
"dach": 0,
"dp_opt": 0,
"dp_opt2": 0,
"dpnum": 0,
"gno": 0,
"help": 0,
"lift": 0,
"mode": 0,
"notes": 0,
"pase": 0,
"pmode": 0,
"sach": 0,
"sdhd": 50,
"sdtype": 0,
"sp_opt": 0,
"spnum": 0,
"timing": 0,
# Grades
"grade_single": -1,
"grade_double": -1,
"grade_values": [],
# Web UI/Other options
"_show_category_grade": 0,
"_show_category_status": 1,
"_show_category_difficulty": 1,
"_show_category_alphabet": 1,
"_show_category_rival_play": 0,
"_show_category_rival_winlose": 0,
"_show_rival_shop_info": 0,
"_hide_play_count": 0,
"_hide_rival_info": 1,
}
elif game_version == 18:
all_profiles_for_card["version"][str(game_version)] = {
"game_version": game_version,
"djname": name,
"region": int(pid),
"frame": 0,
"turntable": 0,
"explosion": 0,
"bgm": 0,
"folder_mask": 0,
"sudden": 0,
"dach": 0,
"dp_opt": 0,
"dp_opt2": 0,
"dpnum": 0,
"gno": 0,
"lift": 0,
"mode": 0,
"pmode": 0,
"sach": 0,
"sdhd": 50,
"sdtype": 0,
"sp_opt": 0,
"spnum": 0,
"timing": 0,
# Grades
"grade_single": -1,
"grade_double": -1,
"grade_values": [],
# Web UI/Other options
"_show_category_grade": 0,
"_show_category_status": 1,
"_show_category_difficulty": 1,
"_show_category_alphabet": 1,
"_show_category_rival_play": 0,
"_show_category_rival_winlose": 0,
"_show_rival_shop_info": 0,
"_hide_play_count": 0,
"_hide_rival_info": 1,
}
db.upsert(all_profiles_for_card, where("card") == cid)
card, card_split = get_id_from_profile(cid)
response = E.response(E.pc(id=card, id_str=card_split))
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/pc/logout")
async def pc_logout(request: Request):
request_info = await core_process_request(request)
response = E.response(E.pc())
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)