mirror of
https://github.com/drmext/MonkeyBusiness.git
synced 2025-02-13 09:02:37 +01:00
Implement DDR endpoints for ReactJS frontend
This commit is contained in:
parent
532cd3d9b9
commit
23841b1846
264
modules/ddr/api.py
Normal file
264
modules/ddr/api.py
Normal file
@ -0,0 +1,264 @@
|
||||
from fastapi import APIRouter, Request, Response, File, UploadFile
|
||||
|
||||
from core_common import core_process_request, core_prepare_response, E
|
||||
|
||||
from tinydb import Query, where
|
||||
from core_database import get_db
|
||||
from pydantic import BaseModel
|
||||
|
||||
import config
|
||||
import utils.card as conv
|
||||
from utils.lz77 import EamuseLZ77
|
||||
|
||||
import lxml.etree as ET
|
||||
import json
|
||||
import struct
|
||||
from typing import Dict, List, Tuple
|
||||
from os import path
|
||||
|
||||
|
||||
router = APIRouter(prefix="/ddr", tags=["api"])
|
||||
|
||||
|
||||
class DDR_Profile_Main_Items(BaseModel):
|
||||
card: str
|
||||
pin: str
|
||||
|
||||
|
||||
class DDR_Profile_Version_Items(BaseModel):
|
||||
game_version: int
|
||||
calories_disp: bool
|
||||
character: str
|
||||
arrow_skin: str
|
||||
filter: str
|
||||
guideline: str
|
||||
priority: str
|
||||
timing_disp: bool
|
||||
common: str
|
||||
option: str
|
||||
last: str
|
||||
rival: str
|
||||
rival_1_ddr_id: int
|
||||
rival_2_ddr_id: int
|
||||
rival_3_ddr_id: int
|
||||
single_grade: int
|
||||
double_grade: int
|
||||
|
||||
|
||||
@router.get("/profiles")
|
||||
async def ddr_profiles():
|
||||
return get_db().table("ddr_profile").all()
|
||||
|
||||
|
||||
@router.get("/profiles/{ddr_id}")
|
||||
async def ddr_profile_id(ddr_id: str):
|
||||
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
|
||||
return get_db().table("ddr_profile").get(where("ddr_id") == ddr_id)
|
||||
|
||||
|
||||
@router.patch("/profiles/{ddr_id}")
|
||||
async def ddr_profile_id_patch(ddr_id: str, item: DDR_Profile_Main_Items):
|
||||
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
|
||||
profile = get_db().table("ddr_profile").get(where("ddr_id") == ddr_id)
|
||||
|
||||
profile["card"] = item.card
|
||||
profile["pin"] = item.pin
|
||||
|
||||
get_db().table("ddr_profile").upsert(profile, where("ddr_id") == ddr_id)
|
||||
return Response(status_code=204)
|
||||
|
||||
|
||||
@router.patch("/profiles/{ddr_id}/{version}")
|
||||
async def ddr_profile_id_version_patch(
|
||||
ddr_id: str, version: int, item: DDR_Profile_Version_Items
|
||||
):
|
||||
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
|
||||
profile = get_db().table("ddr_profile").get(where("ddr_id") == ddr_id)
|
||||
game_profile = profile["version"].get(str(version), {})
|
||||
|
||||
if version >= 19:
|
||||
game_profile["game_version"] = item.game_version
|
||||
game_profile["calories_disp"] = "On" if item.calories_disp else "Off"
|
||||
game_profile["character"] = item.character
|
||||
game_profile["arrow_skin"] = item.arrow_skin
|
||||
game_profile["filter"] = item.filter
|
||||
game_profile["guideline"] = item.guideline
|
||||
game_profile["priority"] = item.priority
|
||||
game_profile["timing_disp"] = "On" if item.timing_disp else "Off"
|
||||
game_profile["common"] = item.common
|
||||
game_profile["option"] = item.option
|
||||
game_profile["last"] = item.last
|
||||
game_profile["rival"] = item.rival
|
||||
game_profile["rival_1_ddr_id"] = item.rival_1_ddr_id
|
||||
game_profile["rival_2_ddr_id"] = item.rival_2_ddr_id
|
||||
game_profile["rival_3_ddr_id"] = item.rival_3_ddr_id
|
||||
|
||||
profile["version"][str(version)] = game_profile
|
||||
get_db().table("ddr_profile").upsert(profile, where("ddr_id") == ddr_id)
|
||||
return Response(status_code=204)
|
||||
|
||||
|
||||
@router.get("/card/{card}")
|
||||
async def ddr_card_to_profile(card: str):
|
||||
card = card.upper()
|
||||
lookalike = {
|
||||
"I": "1",
|
||||
"O": "0",
|
||||
"Q": "0",
|
||||
"V": "U",
|
||||
}
|
||||
for k, v in lookalike.items():
|
||||
card = card.replace(k, v)
|
||||
if card.startswith("E004") or card.startswith("012E"):
|
||||
card = "".join([c for c in card if c in "0123456789ABCDEF"])
|
||||
uid = card
|
||||
kid = conv.to_konami_id(card)
|
||||
else:
|
||||
card = "".join([c for c in card if c in conv.valid_characters])
|
||||
uid = conv.to_uid(card)
|
||||
kid = card
|
||||
profile = get_db().table("ddr_profile").get(where("card") == uid)
|
||||
return profile
|
||||
|
||||
|
||||
@router.get("/scores")
|
||||
async def ddr_scores():
|
||||
return get_db().table("ddr_scores").all()
|
||||
|
||||
|
||||
@router.get("/scores/{ddr_id}")
|
||||
async def ddr_scores_id(ddr_id: str):
|
||||
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
|
||||
return get_db().table("ddr_scores").search((where("ddr_id") == ddr_id))
|
||||
|
||||
|
||||
@router.get("/scores_best")
|
||||
async def ddr_scores_best():
|
||||
return get_db().table("ddr_scores_best").all()
|
||||
|
||||
|
||||
@router.get("/scores_best/{ddr_id}")
|
||||
async def ddr_scores_best_id(ddr_id: str):
|
||||
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
|
||||
return get_db().table("ddr_scores_best").search((where("ddr_id") == ddr_id))
|
||||
|
||||
|
||||
@router.get("/mcode/{mcode}/all")
|
||||
async def ddr_scores_id(mcode: int):
|
||||
return get_db().table("ddr_scores").search((where("mcode") == mcode))
|
||||
|
||||
|
||||
@router.get("/mcode/{mcode}/best")
|
||||
async def ddr_scores_id(mcode: int):
|
||||
return get_db().table("ddr_scores_best").search((where("mcode") == mcode))
|
||||
|
||||
|
||||
class ARC:
|
||||
# https://github.com/DragonMinded/bemaniutils/blob/trunk/bemani/format/arc.py
|
||||
"""
|
||||
Class representing an `.arc` file. These are found in DDR Ace, and possibly
|
||||
other games that use ESS. Given a serires of bytes, this will allow you to
|
||||
query included filenames as well as read the contents of any file inside the
|
||||
archive.
|
||||
"""
|
||||
|
||||
def __init__(self, data: bytes) -> None:
|
||||
self.__files: Dict[str, Tuple[int, int, int]] = {}
|
||||
self.__data = data
|
||||
self.__parse_file(data)
|
||||
|
||||
def __parse_file(self, data: bytes) -> None:
|
||||
# Check file header
|
||||
if data[0:4] != bytes([0x20, 0x11, 0x75, 0x19]):
|
||||
# raise Exception("Unknown file format!")
|
||||
return Response(status_code=406)
|
||||
|
||||
# Grab header offsets
|
||||
(_, numfiles, _) = struct.unpack("<III", data[4:16])
|
||||
|
||||
for fno in range(numfiles):
|
||||
start = 16 + (16 * fno)
|
||||
end = start + 16
|
||||
(nameoffset, fileoffset, uncompressedsize, compressedsize) = struct.unpack(
|
||||
"<IIII", data[start:end]
|
||||
)
|
||||
name = ""
|
||||
|
||||
while data[nameoffset] != 0:
|
||||
name = name + data[nameoffset : (nameoffset + 1)].decode("ascii")
|
||||
nameoffset = nameoffset + 1
|
||||
|
||||
self.__files[name] = (fileoffset, uncompressedsize, compressedsize)
|
||||
|
||||
@property
|
||||
def filenames(self) -> List[str]:
|
||||
return [f for f in self.__files]
|
||||
|
||||
def read_file(self, filename: str) -> bytes:
|
||||
(fileoffset, uncompressedsize, compressedsize) = self.__files[filename]
|
||||
|
||||
if compressedsize == uncompressedsize:
|
||||
# Just stored
|
||||
return self.__data[fileoffset : (fileoffset + compressedsize)]
|
||||
else:
|
||||
# Compressed
|
||||
return EamuseLZ77.decode(
|
||||
self.__data[fileoffset : (fileoffset + compressedsize)]
|
||||
)
|
||||
|
||||
|
||||
@router.post("/parse_mdb/upload")
|
||||
async def ddr_receive_mdb(file: UploadFile = File(...)) -> bytes:
|
||||
data = await file.read()
|
||||
arc = ARC(data)
|
||||
try:
|
||||
mdb_new = ET.fromstring(
|
||||
arc.read_file("data/gamedata/musicdb.xml"),
|
||||
parser=ET.XMLParser(encoding="utf-8"),
|
||||
)
|
||||
except KeyError:
|
||||
return Response(status_code=406)
|
||||
|
||||
def get_attr(attrname):
|
||||
try:
|
||||
mdb[mcode][attrname] = attr.find(attrname).text.rstrip()
|
||||
except AttributeError:
|
||||
mdb[mcode][attrname] = ""
|
||||
|
||||
mdb = {}
|
||||
for attr in mdb_new:
|
||||
mcode = attr.find("mcode").text
|
||||
mdb[mcode] = {}
|
||||
|
||||
attributes = (
|
||||
"basename",
|
||||
"title",
|
||||
"title_yomi",
|
||||
"artist",
|
||||
"bpmmin",
|
||||
"bpmmax",
|
||||
"series",
|
||||
"eventno",
|
||||
"bemaniflag",
|
||||
"bgstage",
|
||||
"movie",
|
||||
"genreflag",
|
||||
"voice",
|
||||
)
|
||||
|
||||
for a in attributes:
|
||||
get_attr(a)
|
||||
|
||||
mdb[mcode]["diffLv"] = attr.find("diffLv").text.split(" ")
|
||||
|
||||
ddr_metadata = path.join("webui", "ddr.json")
|
||||
if path.exists(ddr_metadata):
|
||||
with open(ddr_metadata, "r", encoding="utf-8") as fp:
|
||||
mdb_old = json.load(fp)
|
||||
for mcode in mdb_old.keys():
|
||||
mdb[mcode] = mdb_old[mcode]
|
||||
|
||||
with open(ddr_metadata, "w", encoding="utf-8") as fp:
|
||||
json.dump(mdb, fp, indent=4, ensure_ascii=False)
|
||||
|
||||
return Response(status_code=201)
|
81
pyeamu.py
81
pyeamu.py
@ -2,10 +2,17 @@ from urllib.parse import urlunparse, urlencode
|
||||
|
||||
import uvicorn
|
||||
|
||||
import json
|
||||
from os import path
|
||||
|
||||
from fastapi import FastAPI, Request, Response
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from starlette.responses import RedirectResponse
|
||||
|
||||
import config
|
||||
import modules
|
||||
import utils.card as conv
|
||||
|
||||
from core_common import core_process_request, core_prepare_response, E
|
||||
|
||||
@ -20,13 +27,45 @@ server_services_url = urlunparse(
|
||||
)
|
||||
keepalive_address = "127.0.0.1"
|
||||
|
||||
settings = {}
|
||||
for s in (
|
||||
"ip",
|
||||
"port",
|
||||
"services_prefix",
|
||||
"verbose_log",
|
||||
"arcade",
|
||||
"paseli",
|
||||
"maintenance_mode",
|
||||
):
|
||||
settings[s] = getattr(config, s)
|
||||
|
||||
app = FastAPI()
|
||||
for router in modules.routers:
|
||||
app.include_router(router)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
if path.exists("webui"):
|
||||
webui = True
|
||||
with open(path.join("webui", "monkey.json"), "w") as f:
|
||||
json.dump(settings, f, indent=2)
|
||||
app.mount("/webui", StaticFiles(directory="webui", html=True), name="webui")
|
||||
else:
|
||||
webui = False
|
||||
|
||||
@app.get("/webui")
|
||||
async def redirect_to_config():
|
||||
return RedirectResponse(url="/config")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("https://github.com/drmext/MonkeyBusiness")
|
||||
print(" __ __ _ ")
|
||||
print("| \/ | ___ _ __ | | _____ _ _ ")
|
||||
print("| |\/| |/ _ \| '_ \| |/ / _ \ | | |")
|
||||
@ -39,9 +78,17 @@ if __name__ == "__main__":
|
||||
print("| |_) | |_| \__ \ | | | | __/\__ \__ \\")
|
||||
print("|____/ \__,_|___/_|_| |_|\___||___/___/")
|
||||
print()
|
||||
print("Game Config:")
|
||||
print(f"<services>{server_services_url}</services>")
|
||||
print('<url_slash __type="bool">1</url_slash>')
|
||||
print()
|
||||
if webui:
|
||||
print("Web Interface:")
|
||||
print(f"http://{server_address}/webui/")
|
||||
print()
|
||||
print("Source Repository:")
|
||||
print("https://github.com/drmext/MonkeyBusiness")
|
||||
print()
|
||||
uvicorn.run("pyeamu:app", host=config.ip, port=config.port, reload=True)
|
||||
|
||||
|
||||
@ -101,3 +148,35 @@ async def services_get(request: Request):
|
||||
|
||||
response_body, response_headers = await core_prepare_response(request, response)
|
||||
return Response(content=response_body, headers=response_headers)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def redirect_to_webui():
|
||||
return RedirectResponse(url="/webui")
|
||||
|
||||
|
||||
@app.get("/config")
|
||||
async def get_config():
|
||||
return settings
|
||||
|
||||
|
||||
@app.get("/conv/{card}")
|
||||
async def card_conv(card: str):
|
||||
card = card.upper()
|
||||
lookalike = {
|
||||
"I": "1",
|
||||
"O": "0",
|
||||
"Q": "0",
|
||||
"V": "U",
|
||||
}
|
||||
for k, v in lookalike.items():
|
||||
card = card.replace(k, v)
|
||||
if card.startswith("E004") or card.startswith("012E"):
|
||||
card = "".join([c for c in card if c in "0123456789ABCDEF"])
|
||||
uid = card
|
||||
kid = conv.to_konami_id(card)
|
||||
else:
|
||||
card = "".join([c for c in card if c in conv.valid_characters])
|
||||
uid = conv.to_uid(card)
|
||||
kid = card
|
||||
return {"uid": uid, "konami_id": kid}
|
||||
|
Loading…
x
Reference in New Issue
Block a user