diff --git a/modules/iidx/api.py b/modules/iidx/api.py new file mode 100644 index 0000000..921c9fb --- /dev/null +++ b/modules/iidx/api.py @@ -0,0 +1,319 @@ +from fastapi import APIRouter, Request, Response, File, UploadFile + +from core_common import core_process_request, core_prepare_response, E + +from tinydb import Query, where +from core_database import get_db +from pydantic import BaseModel +from typing import Optional + +import config +import utils.card as conv +import utils.musicdata_tool as mdt +from utils.lz77 import EamuseLZ77 + +import xml.etree.ElementTree as ET +import ujson as json +from os import path + + +router = APIRouter(prefix="/iidx", tags=["api_iidx"]) + + +class IIDX_Profile_Main_Items(BaseModel): + card: str + pin: str + + +class IIDX_Profile_Version_Items(BaseModel): + djname: Optional[str] + region: Optional[int] + head: Optional[int] + hair: Optional[int] + face: Optional[int] + hand: Optional[int] + body: Optional[int] + frame: Optional[int] + turntable: Optional[int] + explosion: Optional[int] + bgm: Optional[int] + sudden: Optional[int] + categoryvoice: Optional[int] + note: Optional[int] + fullcombo: Optional[int] + keybeam: Optional[int] + judgestring: Optional[int] + soundpreview: Optional[int] + grapharea: Optional[int] + effector_lock: Optional[int] + effector_type: Optional[int] + explosion_size: Optional[int] + alternate_hcn: Optional[int] + kokokara_start: Optional[int] + show_category_grade: Optional[int] + show_category_status: Optional[int] + show_category_difficulty: Optional[int] + show_category_alphabet: Optional[int] + show_category_rival_play: Optional[int] + show_category_rival_winlose: Optional[int] + show_category_all_rival_play: Optional[int] + show_category_arena_winlose: Optional[int] + show_rival_shop_info: Optional[int] + hide_play_count: Optional[int] + show_score_graph_cutin: Optional[int] + hide_iidx_id: Optional[int] + classic_hispeed: Optional[int] + beginner_option_swap: Optional[int] + show_lamps_as_no_play_in_arena: Optional[int] + skin_customize_flag_frame: Optional[int] + skin_customize_flag_bgm: Optional[int] + skin_customize_flag_lane: Optional[int] + sp_rival_1_iidx_id: Optional[int] + sp_rival_2_iidx_id: Optional[int] + sp_rival_3_iidx_id: Optional[int] + sp_rival_4_iidx_id: Optional[int] + sp_rival_5_iidx_id: Optional[int] + sp_rival_6_iidx_id: Optional[int] + dp_rival_1_iidx_id: Optional[int] + dp_rival_2_iidx_id: Optional[int] + dp_rival_3_iidx_id: Optional[int] + dp_rival_4_iidx_id: Optional[int] + dp_rival_5_iidx_id: Optional[int] + dp_rival_6_iidx_id: Optional[int] + + +@router.get("/profiles") +async def iidx_profiles(): + return get_db().table("iidx_profile").all() + + +@router.get("/profiles/{iidx_id}") +async def iidx_profile_id(iidx_id: str): + iidx_id = int("".join([i for i in iidx_id if i.isnumeric()])) + return get_db().table("iidx_profile").get(where("iidx_id") == iidx_id) + + +@router.patch("/profiles/{iidx_id}") +async def iidx_profile_id_patch(iidx_id: str, item: IIDX_Profile_Main_Items): + iidx_id = int("".join([i for i in iidx_id if i.isnumeric()])) + profile = get_db().table("iidx_profile").get(where("iidx_id") == iidx_id) + + profile["card"] = item.card + profile["pin"] = item.pin + + get_db().table("iidx_profile").upsert(profile, where("iidx_id") == iidx_id) + return Response(status_code=204) + + +@router.patch("/profiles/{iidx_id}/{version}") +async def iidx_profile_id_version_patch( + iidx_id: str, version: int, item: IIDX_Profile_Version_Items +): + if version != 30: + # TODO: differentiate 18, 19, 20, 29, 30 + return Response(status_code=406) + iidx_id = int("".join([i for i in iidx_id if i.isnumeric()])) + profile = get_db().table("iidx_profile").get(where("iidx_id") == iidx_id) + game_profile = profile["version"].get(str(version), {}) + + game_profile["djname"] = item.djname + game_profile["region"] = item.region + game_profile["head"] = item.head + game_profile["hair"] = item.hair + game_profile["face"] = item.face + game_profile["hand"] = item.hand + game_profile["body"] = item.body + game_profile["frame"] = item.frame + game_profile["turntable"] = item.turntable + game_profile["explosion"] = item.explosion + game_profile["bgm"] = item.bgm + game_profile["sudden"] = item.sudden + game_profile["categoryvoice"] = item.categoryvoice + game_profile["note"] = item.note + game_profile["fullcombo"] = item.fullcombo + game_profile["keybeam"] = item.keybeam + game_profile["judgestring"] = item.judgestring + game_profile["soundpreview"] = item.soundpreview + game_profile["grapharea"] = item.grapharea + game_profile["effector_lock"] = item.effector_lock + game_profile["effector_type"] = item.effector_type + game_profile["explosion_size"] = item.explosion_size + game_profile["alternate_hcn"] = item.alternate_hcn + game_profile["kokokara_start"] = item.kokokara_start + game_profile["_show_category_grade"] = item.show_category_grade + game_profile["_show_category_status"] = item.show_category_status + game_profile["_show_category_difficulty"] = item.show_category_difficulty + game_profile["_show_category_alphabet"] = item.show_category_alphabet + game_profile["_show_category_rival_play"] = item.show_category_rival_play + game_profile["_show_category_rival_winlose"] = item.show_category_rival_winlose + game_profile["_show_category_all_rival_play"] = item.show_category_all_rival_play + game_profile["_show_category_arena_winlose"] = item.show_category_arena_winlose + game_profile["_show_rival_shop_info"] = item.show_rival_shop_info + game_profile["_hide_play_count"] = item.hide_play_count + game_profile["_show_score_graph_cutin"] = item.show_score_graph_cutin + game_profile["_hide_iidx_id"] = item.hide_iidx_id + game_profile["_classic_hispeed"] = item.classic_hispeed + game_profile["_beginner_option_swap"] = item.beginner_option_swap + game_profile[ + "_show_lamps_as_no_play_in_arena" + ] = item.show_lamps_as_no_play_in_arena + game_profile["skin_customize_flag_frame"] = item.skin_customize_flag_frame + game_profile["skin_customize_flag_bgm"] = item.skin_customize_flag_bgm + game_profile["skin_customize_flag_lane"] = item.skin_customize_flag_lane + game_profile["sp_rival_1_iidx_id"] = item.sp_rival_1_iidx_id + game_profile["sp_rival_2_iidx_id"] = item.sp_rival_2_iidx_id + game_profile["sp_rival_3_iidx_id"] = item.sp_rival_3_iidx_id + game_profile["sp_rival_4_iidx_id"] = item.sp_rival_4_iidx_id + game_profile["sp_rival_5_iidx_id"] = item.sp_rival_5_iidx_id + game_profile["sp_rival_6_iidx_id"] = item.sp_rival_6_iidx_id + game_profile["dp_rival_1_iidx_id"] = item.dp_rival_1_iidx_id + game_profile["dp_rival_2_iidx_id"] = item.dp_rival_2_iidx_id + game_profile["dp_rival_3_iidx_id"] = item.dp_rival_3_iidx_id + game_profile["dp_rival_4_iidx_id"] = item.dp_rival_4_iidx_id + game_profile["dp_rival_5_iidx_id"] = item.dp_rival_5_iidx_id + game_profile["dp_rival_6_iidx_id"] = item.dp_rival_6_iidx_id + + profile["version"][str(version)] = game_profile + get_db().table("iidx_profile").upsert(profile, where("iidx_id") == iidx_id) + return Response(status_code=204) + + +@router.get("/card/{card}") +async def iidx_card_to_profile(card: str): + card = card.upper() + lookalike = { + "I": "1", + "O": "0", + "Q": "0", + "V": "U", + } + for k, v in lookalike.items(): + card = card.replace(k, v) + if card.startswith("E004") or card.startswith("012E"): + card = "".join([c for c in card if c in "0123456789ABCDEF"]) + uid = card + kid = conv.to_konami_id(card) + else: + card = "".join([c for c in card if c in conv.valid_characters]) + uid = conv.to_uid(card) + kid = card + profile = get_db().table("iidx_profile").get(where("card") == uid) + return profile + + +@router.get("/scores") +async def iidx_scores(): + return get_db().table("iidx_scores").all() + + +@router.get("/scores/{iidx_id}") +async def iidx_scores_id(iidx_id: str): + iidx_id = int("".join([i for i in iidx_id if i.isnumeric()])) + return get_db().table("iidx_scores").search((where("iidx_id") == iidx_id)) + + +@router.get("/scores_best") +async def iidx_scores_best(): + return get_db().table("iidx_scores_best").all() + + +@router.get("/scores_best/{iidx_id}") +async def iidx_scores_best_id(iidx_id: str): + iidx_id = int("".join([i for i in iidx_id if i.isnumeric()])) + return get_db().table("iidx_scores_best").search((where("iidx_id") == iidx_id)) + + +@router.get("/music_id/{music_id}/all") +async def iidx_scores_id(music_id: int): + return get_db().table("iidx_scores").search((where("music_id") == music_id)) + + +@router.get("/music_id/{music_id}/best") +async def iidx_scores_id_best(music_id: int): + return get_db().table("iidx_scores_best").search((where("music_id") == music_id)) + + +@router.get("/class_best/{iidx_id}") +async def iidx_class_best(iidx_id: str): + iidx_id = int("".join([i for i in iidx_id if i.isnumeric()])) + return get_db().table("iidx_class_best").search((where("iidx_id") == iidx_id)) + + +@router.get("/score_stats/all") +async def iidx_score_stats(): + return get_db().table("iidx_score_stats").all() + + +@router.get("/score_stats/{music_id}") +async def iidx_score_stats_song(music_id: int): + return get_db().table("iidx_score_stats").search((where("music_id") == music_id)) + + +@router.post("/parse_mdb/upload") +async def iidx_receive_mdb(file: UploadFile = File(...)) -> bytes: + data = await file.read() + + iidx_bin = path.join("webui", "music_data.bin") + iidx_vid = path.join("webui", "video_music_list.xml") + iidx_metadata = path.join("webui", "iidx.json") + + if data[0:4] == b"IIDX": + # data_ver = int.from_bytes(data[4:8], "little") + with open(iidx_bin, "wb") as output: + output.write(data) + try: + mdt.extract_file(iidx_bin, iidx_metadata) + return Response(status_code=201) + except Exception as e: + print(e) + return Response(status_code=422) + else: + # video_music_list.xml to fix broken characters in title/artist + # (this should be a seperate route) + try: + with open(iidx_metadata, "r", encoding="utf-8") as f: + music_data = json.load(f) + + with open(iidx_vid, "wb") as output: + output.write(data) + + with open(iidx_vid, "r", encoding="utf-8") as fp: + tree = ET.parse(fp, ET.XMLParser()) + root = tree.getroot() + + proper_names = {} + for entry in root: + mid = int(entry.get("id")) + proper_names[mid] = {} + proper_names[mid]["title"] = entry.find("info/title_name").text + proper_names[mid]["artist"] = entry.find("info/artist_name").text + + for m in music_data["data"]: + try: + mid = m["song_id"] + vid_title = proper_names[mid]["title"] + bin_title = m["title"] + if vid_title != bin_title: + m["title"] = vid_title + # print(vid_title, bin_title) + vid_artist = proper_names[mid]["artist"] + bin_artist = m["artist"] + if vid_artist != bin_artist: + m["artist"] = vid_artist + # print(vid_artist, bin_artist) + except KeyError: + continue + + json.dump( + music_data, + open(iidx_metadata, "w", encoding="utf8"), + indent=4, + ensure_ascii=False, + escape_forward_slashes=False, + ) + return Response(status_code=201) + except Exception as e: + print(e) + return Response(status_code=422) + + return Response(status_code=406) diff --git a/utils/musicdata_tool.py b/utils/musicdata_tool.py new file mode 100644 index 0000000..9dd3bbc --- /dev/null +++ b/utils/musicdata_tool.py @@ -0,0 +1,699 @@ +import argparse +import ctypes +import ujson as json +import sys +import struct + + +def read_string(infile, length, encoding="cp932"): + string_data = infile.read(length) + try: + return string_data.decode(encoding).strip("\0") + except UnicodeDecodeError: + # Cannot decode truncated string with half of a multibyte sequence appended (0x83) + return string_data[:-1].decode(encoding).strip("\0") + + +def write_string(outfile, input, length, fill="\0", encoding="cp932"): + string_data = input[:length].encode(encoding) + outfile.write(string_data) + + if len(input) < length: + outfile.write("".join([fill] * (length - len(string_data))).encode(encoding)) + + +def reader(data_ver, infile, song_count): + song_entries = [] + + for i in range(song_count): + title = read_string(infile, 0x40) + title_ascii = read_string(infile, 0x40) + genre = read_string(infile, 0x40) + artist = read_string(infile, 0x40) + + ( + texture_title, + texture_artist, + texture_genre, + texture_load, + texture_list, + ) = struct.unpack("= 27: + ( + SPB_level, + SPN_level, + SPH_level, + SPA_level, + SPL_level, + DPB_level, + DPN_level, + DPH_level, + DPA_level, + DPL_level, + ) = struct.unpack("= 27: + unk_sect1 = infile.read(0x286) + else: + unk_sect1 = infile.read(0xA0) + + song_id, volume = struct.unpack("= 27: + ( + SPB_ident, + SPN_ident, + SPH_ident, + SPA_ident, + SPL_ident, + DPB_ident, + DPN_ident, + DPH_ident, + DPA_ident, + DPL_ident, + ) = struct.unpack("= 22: + afp_data = [] + for x in range(10): + afp_data.append(infile.read(0x20).hex()) + else: + afp_data = [] + for x in range(9): + afp_data.append(infile.read(0x20).hex()) + + if data_ver >= 26: + unk_sect4 = infile.read(4) + + entries = { + "song_id": song_id, + "title": title, + "title_ascii": title_ascii, + "genre": genre, + "artist": artist, + "texture_title": texture_title, + "texture_artist": texture_artist, + "texture_genre": texture_genre, + "texture_load": texture_load, + "texture_list": texture_list, + "font_idx": font_idx, + "game_version": game_version, + "other_folder": other_folder, + "bemani_folder": bemani_folder, + "splittable_diff": splittable_diff, + "SPB_level": SPB_level, + "SPN_level": SPN_level, + "SPH_level": SPH_level, + "SPA_level": SPA_level, + "SPL_level": SPL_level, + "DPB_level": DPB_level, + "DPN_level": DPN_level, + "DPH_level": DPH_level, + "DPA_level": DPA_level, + "DPL_level": DPL_level, + "volume": volume, + "SPB_ident": SPB_ident, + "SPN_ident": SPN_ident, + "SPH_ident": SPH_ident, + "SPA_ident": SPA_ident, + "SPL_ident": SPL_ident, + "DPB_ident": DPB_ident, + "DPN_ident": DPN_ident, + "DPH_ident": DPH_ident, + "DPA_ident": DPA_ident, + "DPL_ident": DPL_ident, + "bga_filename": bga_filename, + "bga_delay": bga_delay, + "afp_flag": afp_flag, + "afp_data": afp_data, + } + + # if data_ver == 80: + # unk = { + # 'unk_sect1': unk_sect1.hex(), + # 'unk_sect2': unk_sect2.hex(), + # 'unk_sect3': unk_sect3.hex(), + # 'unk_sect4': unk_sect4.hex(), + # } + # elif data_ver < 80 and data_ver >= 27: + # unk = { + # 'unk_sect1': unk_sect1.hex(), + # 'unk_sect4': unk_sect4.hex(), + # } + # elif data_ver == 26: + # unk = { + # 'unk_sect1': unk_sect1.hex(), + # 'unk_sect2': unk_sect2.hex(), + # 'unk_sect4': unk_sect4.hex(), + # } + # elif data_ver <= 25: + # unk = { + # 'unk_sect1': unk_sect1.hex(), + # 'unk_sect2': unk_sect2.hex(), + # } + # + # entries.update(unk) + + song_entries.append(entries) + + return song_entries + + +def writer(data_ver, outfile, data): + DATA_VERSION = data_ver + MAX_ENTRIES = data_ver * 1000 + 1000 + CUR_STYLE_ENTRIES = MAX_ENTRIES - 1000 + + # Write header + outfile.write(b"IIDX") + if data_ver == 80: + outfile.write(struct.pack("= CUR_STYLE_ENTRIES: + outfile.write(struct.pack("= 27: + outfile.write( + struct.pack( + "= 27: + outfile.write( + bytes.fromhex( + "00000000000001000000020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000300000004000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + ) + ) + else: + outfile.write( + bytes.fromhex( + "00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + ) + ) + + outfile.write(struct.pack("= 27: + outfile.write( + struct.pack( + "= 22: + for afp_data in song_data["afp_data"]: + outfile.write(bytes.fromhex(afp_data)) + if len(song_data["afp_data"]) == 9: + outfile.write( + bytes.fromhex( + "0000000000000000000000000000000000000000000000000000000000000000" + ) + ) + elif len(song_data["afp_data"]) == 10 and data_ver <= 21: + for afp_data in song_data["afp_data"][:9]: + outfile.write(bytes.fromhex(afp_data)) + elif len(song_data["afp_data"]) == 9 and data_ver <= 21: + for afp_data in song_data["afp_data"]: + outfile.write(bytes.fromhex(afp_data)) + + if data_ver >= 26: + outfile.write(bytes.fromhex("00000000")) + + +def course_reader(infile, total_entries): + course_entries = [] + + for i in range(total_entries): + is_DP, course_num, stages = struct.unpack("