265 lines
7.9 KiB
Python
Raw Normal View History

from fastapi import APIRouter, Request, Response, File, UploadFile
from core_common import core_process_request, core_prepare_response, E
from tinydb import Query, where
from core_database import get_db
from pydantic import BaseModel
import config
import utils.card as conv
from utils.lz77 import EamuseLZ77
import lxml.etree as ET
2022-12-17 23:08:29 +00:00
import ujson as json
import struct
from typing import Dict, List, Tuple
from os import path
2023-03-02 11:47:04 +00:00
router = APIRouter(prefix="/ddr", tags=["api_ddr"])
class DDR_Profile_Main_Items(BaseModel):
card: str
pin: str
class DDR_Profile_Version_Items(BaseModel):
game_version: int
calories_disp: bool
character: str
arrow_skin: str
filter: str
guideline: str
priority: str
timing_disp: bool
common: str
option: str
last: str
rival: str
rival_1_ddr_id: int
rival_2_ddr_id: int
rival_3_ddr_id: int
single_grade: int
double_grade: int
@router.get("/profiles")
async def ddr_profiles():
return get_db().table("ddr_profile").all()
@router.get("/profiles/{ddr_id}")
async def ddr_profile_id(ddr_id: str):
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
return get_db().table("ddr_profile").get(where("ddr_id") == ddr_id)
@router.patch("/profiles/{ddr_id}")
async def ddr_profile_id_patch(ddr_id: str, item: DDR_Profile_Main_Items):
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
profile = get_db().table("ddr_profile").get(where("ddr_id") == ddr_id)
profile["card"] = item.card
profile["pin"] = item.pin
get_db().table("ddr_profile").upsert(profile, where("ddr_id") == ddr_id)
return Response(status_code=204)
@router.patch("/profiles/{ddr_id}/{version}")
async def ddr_profile_id_version_patch(
ddr_id: str, version: int, item: DDR_Profile_Version_Items
):
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
profile = get_db().table("ddr_profile").get(where("ddr_id") == ddr_id)
game_profile = profile["version"].get(str(version), {})
if version >= 19:
game_profile["game_version"] = item.game_version
game_profile["calories_disp"] = "On" if item.calories_disp else "Off"
game_profile["character"] = item.character
game_profile["arrow_skin"] = item.arrow_skin
game_profile["filter"] = item.filter
game_profile["guideline"] = item.guideline
game_profile["priority"] = item.priority
game_profile["timing_disp"] = "On" if item.timing_disp else "Off"
game_profile["common"] = item.common
game_profile["option"] = item.option
game_profile["last"] = item.last
game_profile["rival"] = item.rival
game_profile["rival_1_ddr_id"] = item.rival_1_ddr_id
game_profile["rival_2_ddr_id"] = item.rival_2_ddr_id
game_profile["rival_3_ddr_id"] = item.rival_3_ddr_id
profile["version"][str(version)] = game_profile
get_db().table("ddr_profile").upsert(profile, where("ddr_id") == ddr_id)
return Response(status_code=204)
@router.get("/card/{card}")
async def ddr_card_to_profile(card: str):
card = card.upper()
lookalike = {
"I": "1",
"O": "0",
"Q": "0",
"V": "U",
}
for k, v in lookalike.items():
card = card.replace(k, v)
if card.startswith("E004") or card.startswith("012E"):
card = "".join([c for c in card if c in "0123456789ABCDEF"])
uid = card
kid = conv.to_konami_id(card)
else:
card = "".join([c for c in card if c in conv.valid_characters])
uid = conv.to_uid(card)
kid = card
profile = get_db().table("ddr_profile").get(where("card") == uid)
return profile
@router.get("/scores")
async def ddr_scores():
return get_db().table("ddr_scores").all()
@router.get("/scores/{ddr_id}")
async def ddr_scores_id(ddr_id: str):
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
return get_db().table("ddr_scores").search((where("ddr_id") == ddr_id))
@router.get("/scores_best")
async def ddr_scores_best():
return get_db().table("ddr_scores_best").all()
@router.get("/scores_best/{ddr_id}")
async def ddr_scores_best_id(ddr_id: str):
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
return get_db().table("ddr_scores_best").search((where("ddr_id") == ddr_id))
@router.get("/mcode/{mcode}/all")
async def ddr_scores_id(mcode: int):
return get_db().table("ddr_scores").search((where("mcode") == mcode))
@router.get("/mcode/{mcode}/best")
2023-03-02 11:47:04 +00:00
async def ddr_scores_id_best(mcode: int):
return get_db().table("ddr_scores_best").search((where("mcode") == mcode))
class ARC:
# https://github.com/DragonMinded/bemaniutils/blob/trunk/bemani/format/arc.py
"""
Class representing an `.arc` file. These are found in DDR Ace, and possibly
other games that use ESS. Given a serires of bytes, this will allow you to
query included filenames as well as read the contents of any file inside the
archive.
"""
def __init__(self, data: bytes) -> None:
self.__files: Dict[str, Tuple[int, int, int]] = {}
self.__data = data
self.__parse_file(data)
def __parse_file(self, data: bytes) -> None:
# Check file header
if data[0:4] != bytes([0x20, 0x11, 0x75, 0x19]):
# raise Exception("Unknown file format!")
return Response(status_code=406)
# Grab header offsets
(_, numfiles, _) = struct.unpack("<III", data[4:16])
for fno in range(numfiles):
start = 16 + (16 * fno)
end = start + 16
(nameoffset, fileoffset, uncompressedsize, compressedsize) = struct.unpack(
"<IIII", data[start:end]
)
name = ""
while data[nameoffset] != 0:
name = name + data[nameoffset : (nameoffset + 1)].decode("ascii")
nameoffset = nameoffset + 1
self.__files[name] = (fileoffset, uncompressedsize, compressedsize)
@property
def filenames(self) -> List[str]:
return [f for f in self.__files]
def read_file(self, filename: str) -> bytes:
(fileoffset, uncompressedsize, compressedsize) = self.__files[filename]
if compressedsize == uncompressedsize:
# Just stored
return self.__data[fileoffset : (fileoffset + compressedsize)]
else:
# Compressed
return EamuseLZ77.decode(
self.__data[fileoffset : (fileoffset + compressedsize)]
)
@router.post("/parse_mdb/upload")
async def ddr_receive_mdb(file: UploadFile = File(...)) -> bytes:
data = await file.read()
arc = ARC(data)
try:
mdb_new = ET.fromstring(
arc.read_file("data/gamedata/musicdb.xml"),
parser=ET.XMLParser(encoding="utf-8"),
)
except KeyError:
return Response(status_code=406)
def get_attr(attrname):
try:
mdb[mcode][attrname] = attr.find(attrname).text.rstrip()
except AttributeError:
mdb[mcode][attrname] = ""
mdb = {}
for attr in mdb_new:
mcode = attr.find("mcode").text
mdb[mcode] = {}
attributes = (
"basename",
"title",
"title_yomi",
"artist",
"bpmmin",
"bpmmax",
"series",
"eventno",
"bemaniflag",
"bgstage",
"movie",
"genreflag",
"voice",
)
for a in attributes:
get_attr(a)
mdb[mcode]["diffLv"] = attr.find("diffLv").text.split(" ")
ddr_metadata = path.join("webui", "ddr.json")
if path.exists(ddr_metadata):
with open(ddr_metadata, "r", encoding="utf-8") as fp:
mdb_old = json.load(fp)
for mcode in mdb_old.keys():
mdb[mcode] = mdb_old[mcode]
with open(ddr_metadata, "w", encoding="utf-8") as fp:
2023-03-02 11:47:04 +00:00
json.dump(mdb, fp, indent=4, ensure_ascii=False, escape_forward_slashes=False)
return Response(status_code=201)