1
0
mirror of synced 2024-11-27 23:50:47 +01:00
bemaniutils/bemani/client/sdvx/booth.py

677 lines
27 KiB
Python

import random
import time
from typing import Any, Dict, List, Optional
from bemani.common import Time
from bemani.client.base import BaseClient
from bemani.protocol import Node
class SoundVoltexBoothClient(BaseClient):
NAME = "TEST"
def verify_eventlog_write(self, location: str) -> None:
call = self.call_node()
# Construct node
eventlog = Node.void("eventlog")
call.add_child(eventlog)
eventlog.set_attribute("method", "write")
eventlog.add_child(Node.u32("retrycnt", 0))
data = Node.void("data")
eventlog.add_child(data)
data.add_child(Node.string("eventid", "S_PWRON"))
data.add_child(Node.s32("eventorder", 0))
data.add_child(Node.u64("pcbtime", int(time.time() * 1000)))
data.add_child(Node.s64("gamesession", -1))
data.add_child(Node.string("strdata1", "1.7.6"))
data.add_child(Node.string("strdata2", ""))
data.add_child(Node.s64("numdata1", 1))
data.add_child(Node.s64("numdata2", 0))
data.add_child(Node.string("locationid", location))
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/eventlog/gamesession")
self.assert_path(resp, "response/eventlog/logsendflg")
self.assert_path(resp, "response/eventlog/logerrlevel")
self.assert_path(resp, "response/eventlog/evtidnosendflg")
def verify_game_hiscore(self, location: str) -> None:
call = self.call_node()
game = Node.void("game")
game.set_attribute("locid", location)
game.set_attribute("ver", "0")
game.set_attribute("method", "hiscore")
call.add_child(game)
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game/ranking/@id")
self.assert_path(resp, "response/game/hiscore/@type")
self.assert_path(resp, "response/game/hiscore/music/@id")
self.assert_path(resp, "response/game/hiscore/music/note/@name")
self.assert_path(resp, "response/game/hiscore/music/note/@score")
self.assert_path(resp, "response/game/hiscore/music/note/@type")
def verify_game_shop(self, location: str) -> None:
call = self.call_node()
game = Node.void("game")
call.add_child(game)
game.set_attribute("method", "shop")
game.set_attribute("ver", "0")
game.add_child(Node.string("locid", location))
game.add_child(Node.string("regcode", "."))
game.add_child(Node.string("locname", ""))
game.add_child(Node.u8("loctype", 0))
game.add_child(Node.string("cstcode", ""))
game.add_child(Node.string("cpycode", ""))
game.add_child(Node.s32("latde", 0))
game.add_child(Node.s32("londe", 0))
game.add_child(Node.u8("accu", 0))
game.add_child(Node.string("linid", "."))
game.add_child(Node.u8("linclass", 0))
game.add_child(Node.ipv4("ipaddr", "0.0.0.0"))
game.add_child(Node.string("hadid", "00010203040506070809"))
game.add_child(Node.string("licid", "00010203040506070809"))
game.add_child(Node.string("actid", self.pcbid))
game.add_child(Node.s8("appstate", 0))
game.add_child(Node.s8("c_need", 1))
game.add_child(Node.s8("c_credit", 2))
game.add_child(Node.s8("s_credit", 2))
game.add_child(Node.bool("free_p", True))
game.add_child(Node.bool("close", True))
game.add_child(Node.s32("close_t", 1380))
game.add_child(Node.u32("playc", 0))
game.add_child(Node.u32("playn", 0))
game.add_child(Node.u32("playe", 0))
game.add_child(Node.u32("test_m", 0))
game.add_child(Node.u32("service", 0))
game.add_child(Node.bool("paseli", True))
game.add_child(Node.u32("update", 0))
game.add_child(Node.string("shopname", ""))
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game/nxt_time")
def verify_game_new(self, location: str, refid: str) -> None:
call = self.call_node()
game = Node.void("game")
call.add_child(game)
game.set_attribute("name", self.NAME)
game.set_attribute("method", "new")
game.set_attribute("refid", refid)
game.set_attribute("locid", location)
game.set_attribute("dataid", refid)
game.set_attribute("ver", "0")
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify_game_frozen(self, refid: str, time: int) -> None:
call = self.call_node()
game = Node.void("game")
call.add_child(game)
game.set_attribute("refid", refid)
game.set_attribute("method", "frozen")
game.set_attribute("ver", "0")
game.add_child(Node.u32("sec", time))
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game/@result")
def verify_game_save(
self, location: str, refid: str, packet: int, block: int, exp: int
) -> None:
call = self.call_node()
game = Node.void("game")
call.add_child(game)
game.set_attribute("method", "save")
game.set_attribute("refid", refid)
game.set_attribute("locid", location)
game.set_attribute("ver", "0")
game.add_child(Node.u8("headphone", 0))
game.add_child(Node.u8("hispeed", 16))
game.add_child(Node.u16("appeal_id", 19))
game.add_child(Node.u16("frame0", 0))
game.add_child(Node.u16("frame1", 0))
game.add_child(Node.u16("frame2", 0))
game.add_child(Node.u16("frame3", 0))
game.add_child(Node.u16("frame4", 0))
last = Node.void("last")
game.add_child(last)
last.set_attribute("music_type", "1")
last.set_attribute("music_id", "29")
last.set_attribute("sort_type", "4")
game.add_child(Node.u32("earned_gamecoin_packet", packet))
game.add_child(Node.u32("earned_gamecoin_block", block))
game.add_child(Node.u32("gain_exp", exp))
game.add_child(Node.u32("m_user_cnt", 0))
game.add_child(Node.bool_array("have_item", [False] * 512))
game.add_child(Node.bool_array("have_note", [False] * 512))
tracking = Node.void("tracking")
game.add_child(tracking)
m0 = Node.void("m0")
tracking.add_child(m0)
m0.add_child(Node.u8("type", 2))
m0.add_child(Node.u32("id", 5))
m0.add_child(Node.u32("score", 774566))
tracking.add_child(Node.time("p_start", Time.now() - 300))
tracking.add_child(Node.time("p_end", Time.now()))
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify_game_common(self) -> None:
call = self.call_node()
game = Node.void("game")
game.set_attribute("ver", "0")
game.set_attribute("method", "common")
call.add_child(game)
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game/limited")
self.assert_path(resp, "response/game/event/info/@id")
self.assert_path(resp, "response/game/catalog/info/@currency")
self.assert_path(resp, "response/game/catalog/info/@id")
self.assert_path(resp, "response/game/catalog/info/@price")
self.assert_path(resp, "response/game/kacinfo/note00")
self.assert_path(resp, "response/game/kacinfo/note01")
self.assert_path(resp, "response/game/kacinfo/note02")
self.assert_path(resp, "response/game/kacinfo/note10")
self.assert_path(resp, "response/game/kacinfo/note11")
self.assert_path(resp, "response/game/kacinfo/note12")
self.assert_path(resp, "response/game/kacinfo/rabbeat0")
self.assert_path(resp, "response/game/kacinfo/rabbeat1")
def verify_game_buy(self, refid: str, catalogid: int, success: bool) -> None:
call = self.call_node()
game = Node.void("game")
call.add_child(game)
game.set_attribute("refid", refid)
game.set_attribute("ver", "0")
game.set_attribute("method", "buy")
game.add_child(Node.u32("catalog_id", catalogid))
game.add_child(Node.u32("earned_gamecoin_packet", 0))
game.add_child(Node.u32("earned_gamecoin_block", 0))
game.add_child(Node.u32("open_index", 4))
game.add_child(Node.u32("currency_type", 1))
game.add_child(Node.u32("price", 10))
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game/gamecoin_packet")
self.assert_path(resp, "response/game/gamecoin_block")
self.assert_path(resp, "response/game/result")
if success:
if resp.child_value("game/result") != 0:
raise Exception("Failed to purchase!")
else:
if resp.child_value("game/result") == 0:
raise Exception("Purchased when shouldn't have!")
def verify_game_load(
self, cardid: str, refid: str, msg_type: str
) -> Dict[str, Any]:
call = self.call_node()
game = Node.void("game")
call.add_child(game)
game.set_attribute("cardid", cardid)
game.set_attribute("dataid", refid)
game.set_attribute("ver", "0")
game.set_attribute("method", "load")
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
if msg_type == "new":
self.assert_path(resp, "response/game/@none")
return None
if msg_type == "existing":
self.assert_path(resp, "response/game/name")
self.assert_path(resp, "response/game/code")
self.assert_path(resp, "response/game/gamecoin_packet")
self.assert_path(resp, "response/game/gamecoin_block")
self.assert_path(resp, "response/game/exp_point")
self.assert_path(resp, "response/game/m_user_cnt")
self.assert_path(resp, "response/game/have_item")
self.assert_path(resp, "response/game/have_note")
self.assert_path(resp, "response/game/last/@appeal_id")
self.assert_path(resp, "response/game/last/@frame0")
self.assert_path(resp, "response/game/last/@frame1")
self.assert_path(resp, "response/game/last/@frame2")
self.assert_path(resp, "response/game/last/@frame3")
self.assert_path(resp, "response/game/last/@frame4")
self.assert_path(resp, "response/game/last/@headphone")
self.assert_path(resp, "response/game/last/@hispeed")
self.assert_path(resp, "response/game/last/@music_id")
self.assert_path(resp, "response/game/last/@music_type")
self.assert_path(resp, "response/game/last/@sort_type")
return {
"name": resp.child_value("game/name"),
"packet": resp.child_value("game/gamecoin_packet"),
"block": resp.child_value("game/gamecoin_block"),
"exp": resp.child_value("game/exp_point"),
}
else:
raise Exception(f"Invalid game load type {msg_type}")
def verify_game_lounge(self) -> None:
call = self.call_node()
game = Node.void("game")
call.add_child(game)
game.set_attribute("method", "lounge")
game.set_attribute("ver", "0")
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game/interval")
def verify_game_entry_s(self) -> int:
call = self.call_node()
game = Node.void("game")
call.add_child(game)
game.set_attribute("ver", "0")
game.set_attribute("method", "entry_s")
game.add_child(Node.u8("c_ver", 22))
game.add_child(Node.u8("p_num", 1))
game.add_child(Node.u8("p_rest", 1))
game.add_child(Node.u8("filter", 1))
game.add_child(Node.u32("mid", 5))
game.add_child(Node.u32("sec", 45))
game.add_child(Node.u16("port", 10007))
game.add_child(Node.fouru8("gip", [127, 0, 0, 1]))
game.add_child(Node.fouru8("lip", [10, 0, 5, 73]))
game.add_child(Node.bool("claim", True))
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game/entry_id")
return resp.child_value("game/entry_id")
def verify_game_entry_e(self, eid: int) -> None:
call = self.call_node()
game = Node.void("game")
call.add_child(game)
game.set_attribute("method", "entry_e")
game.set_attribute("ver", "0")
game.add_child(Node.u32("eid", eid))
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify_game_load_m(self, refid: str) -> List[Dict[str, int]]:
call = self.call_node()
game = Node.void("game")
call.add_child(game)
game.set_attribute("method", "load_m")
game.set_attribute("ver", "0")
game.set_attribute("all", "1")
game.set_attribute("dataid", refid)
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game")
scores = []
for child in resp.child("game").children:
if child.name != "music":
continue
musicid = int(child.attribute("music_id"))
for typenode in child.children:
if typenode.name != "type":
continue
chart = int(typenode.attribute("type_id"))
clear_type = int(typenode.attribute("clear_type"))
score = int(typenode.attribute("score"))
grade = int(typenode.attribute("score_grade"))
scores.append(
{
"id": musicid,
"chart": chart,
"clear_type": clear_type,
"score": score,
"grade": grade,
}
)
return scores
def verify_game_save_m(
self, location: str, refid: str, score: Dict[str, int]
) -> None:
call = self.call_node()
game = Node.void("game")
call.add_child(game)
game.set_attribute("method", "save_m")
game.set_attribute("max_chain", "0")
game.set_attribute("clear_type", str(score["clear_type"]))
game.set_attribute("music_type", str(score["chart"]))
game.set_attribute("score_grade", str(score["grade"]))
game.set_attribute("locid", location)
game.set_attribute("music_id", str(score["id"]))
game.set_attribute("dataid", refid)
game.set_attribute("ver", "0")
game.set_attribute("score", str(score["score"]))
# Swap with server
resp = self.exchange("", call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify(self, cardid: Optional[str]) -> None:
# Verify boot sequence is okay
self.verify_services_get(
expected_services=[
"pcbtracker",
"pcbevent",
"local",
"message",
"facility",
"cardmng",
"package",
"posevent",
"pkglist",
"dlstatus",
"eacoin",
"lobby",
"ntp",
"keepalive",
]
)
paseli_enabled = self.verify_pcbtracker_alive()
self.verify_message_get()
self.verify_package_list()
location = self.verify_facility_get()
self.verify_pcbevent_put()
self.verify_eventlog_write(location)
self.verify_game_shop(location)
self.verify_game_common()
# Verify card registration and profile lookup
if cardid is not None:
card = cardid
else:
card = self.random_card()
print(f"Generated random card ID {card} for use.")
if cardid is None:
self.verify_cardmng_inquire(
card, msg_type="unregistered", paseli_enabled=paseli_enabled
)
ref_id = self.verify_cardmng_getrefid(card)
if len(ref_id) != 16:
raise Exception(
f"Invalid refid '{ref_id}' returned when registering card"
)
if ref_id != self.verify_cardmng_inquire(
card, msg_type="new", paseli_enabled=paseli_enabled
):
raise Exception(f"Invalid refid '{ref_id}' returned when querying card")
# SDVX doesn't read the new profile, it asks for the profile itself after calling new
self.verify_game_load(card, ref_id, msg_type="new")
self.verify_game_new(location, ref_id)
self.verify_game_load(card, ref_id, msg_type="existing")
else:
print("Skipping new card checks for existing card")
ref_id = self.verify_cardmng_inquire(
card, msg_type="query", paseli_enabled=paseli_enabled
)
# Verify pin handling and return card handling
self.verify_cardmng_authpass(ref_id, correct=True)
self.verify_cardmng_authpass(ref_id, correct=False)
if ref_id != self.verify_cardmng_inquire(
card, msg_type="query", paseli_enabled=paseli_enabled
):
raise Exception(f"Invalid refid '{ref_id}' returned when querying card")
# Verify account freezing
self.verify_game_frozen(ref_id, 900)
self.verify_game_frozen(ref_id, 0)
# Verify lobby functionality
self.verify_game_lounge()
eid = self.verify_game_entry_s()
self.verify_game_entry_e(eid)
if cardid is None:
# Verify profile loading and saving
profile = self.verify_game_load(card, ref_id, msg_type="existing")
if profile["name"] != self.NAME:
raise Exception(
f'Profile has incorrect name {profile["name"]} associated with it!'
)
if profile["packet"] != 0:
raise Exception("Profile has nonzero blocks associated with it!")
if profile["block"] != 0:
raise Exception("Profile has nonzero packets associated with it!")
if profile["exp"] != 0:
raise Exception("Profile has nonzero exp associated with it!")
# Verify purchase failure
self.verify_game_buy(ref_id, 1004, False)
self.verify_game_save(location, ref_id, packet=123, block=234, exp=42)
profile = self.verify_game_load(card, ref_id, msg_type="existing")
if profile["name"] != self.NAME:
raise Exception(
f'Profile has incorrect name {profile["name"]} associated with it!'
)
if profile["packet"] != 123:
raise Exception("Profile has invalid blocks associated with it!")
if profile["block"] != 234:
raise Exception("Profile has invalid packets associated with it!")
if profile["exp"] != 42:
raise Exception("Profile has invalid exp associated with it!")
self.verify_game_save(location, ref_id, packet=1, block=2, exp=3)
profile = self.verify_game_load(card, ref_id, msg_type="existing")
if profile["name"] != self.NAME:
raise Exception(
f'Profile has incorrect name {profile["name"]} associated with it!'
)
if profile["packet"] != 124:
raise Exception("Profile has invalid blocks associated with it!")
if profile["block"] != 236:
raise Exception("Profile has invalid packets associated with it!")
if profile["exp"] != 45:
raise Exception("Profile has invalid exp associated with it!")
# Verify purchase success
self.verify_game_buy(ref_id, 1004, True)
profile = self.verify_game_load(card, ref_id, msg_type="existing")
if profile["name"] != self.NAME:
raise Exception(
f'Profile has incorrect name {profile["name"]} associated with it!'
)
if profile["packet"] != 124:
raise Exception("Profile has invalid blocks associated with it!")
if profile["block"] != 226:
raise Exception("Profile has invalid packets associated with it!")
if profile["exp"] != 45:
raise Exception("Profile has invalid exp associated with it!")
# Verify empty profile has no scores on it
scores = self.verify_game_load_m(ref_id)
if len(scores) > 0:
raise Exception("Score on an empty profile!")
# Verify score saving and updating
for phase in [1, 2]:
if phase == 1:
dummyscores = [
# An okay score on a chart
{
"id": 1,
"chart": 1,
"grade": 3,
"clear_type": 2,
"score": 765432,
},
# A good score on an easier chart of the same song
{
"id": 1,
"chart": 0,
"grade": 6,
"clear_type": 3,
"score": 7654321,
},
# A bad score on a hard chart
{
"id": 2,
"chart": 2,
"grade": 1,
"clear_type": 1,
"score": 12345,
},
# A terrible score on an easy chart
{
"id": 3,
"chart": 0,
"grade": 1,
"clear_type": 1,
"score": 123,
},
]
if phase == 2:
dummyscores = [
# A better score on the same chart
{
"id": 1,
"chart": 1,
"grade": 5,
"clear_type": 3,
"score": 8765432,
},
# A worse score on another same chart
{
"id": 1,
"chart": 0,
"grade": 4,
"clear_type": 2,
"score": 6543210,
"expected_score": 7654321,
"expected_clear_type": 3,
"expected_grade": 6,
},
]
for dummyscore in dummyscores:
self.verify_game_save_m(location, ref_id, dummyscore)
scores = self.verify_game_load_m(ref_id)
for expected in dummyscores:
actual = None
for received in scores:
if (
received["id"] == expected["id"]
and received["chart"] == expected["chart"]
):
actual = received
break
if actual is None:
raise Exception(
f"Didn't find song {expected['id']} chart {expected['chart']} in response!"
)
if "expected_score" in expected:
expected_score = expected["expected_score"]
else:
expected_score = expected["score"]
if "expected_grade" in expected:
expected_grade = expected["expected_grade"]
else:
expected_grade = expected["grade"]
if "expected_clear_type" in expected:
expected_clear_type = expected["expected_clear_type"]
else:
expected_clear_type = expected["clear_type"]
if actual["score"] != expected_score:
raise Exception(
f'Expected a score of \'{expected_score}\' for song \'{expected["id"]}\' chart \'{expected["chart"]}\' but got score \'{actual["score"]}\''
)
if actual["grade"] != expected_grade:
raise Exception(
f'Expected a grade of \'{expected_grade}\' for song \'{expected["id"]}\' chart \'{expected["chart"]}\' but got grade \'{actual["grade"]}\''
)
if actual["clear_type"] != expected_clear_type:
raise Exception(
f'Expected a clear_type of \'{expected_clear_type}\' for song \'{expected["id"]}\' chart \'{expected["chart"]}\' but got clear_type \'{actual["clear_type"]}\''
)
# Sleep so we don't end up putting in score history on the same second
time.sleep(1)
else:
print("Skipping score checks for existing card")
# Verify high score tables
self.verify_game_hiscore(location)
# Verify paseli handling
if paseli_enabled:
print("PASELI enabled for this PCBID, executing PASELI checks")
else:
print("PASELI disabled for this PCBID, skipping PASELI checks")
return
sessid, balance = self.verify_eacoin_checkin(card)
if balance == 0:
print("Skipping PASELI consume check because card has 0 balance")
else:
self.verify_eacoin_consume(sessid, balance, random.randint(0, balance))
self.verify_eacoin_checkout(sessid)