import base64 import time from typing import Optional from bemani.client.base import BaseClient from bemani.protocol import Node class TheStarBishiBashiClient(BaseClient): NAME = "TEST" def verify_eventlog_write(self, location: str) -> None: call = self.call_node() # Construct node eventlog = Node.void("eventlog") call.add_child(eventlog) eventlog.set_attribute("method", "write") eventlog.add_child(Node.u32("retrycnt", 0)) data = Node.void("data") eventlog.add_child(data) data.add_child(Node.string("eventid", "S_PWRON")) data.add_child(Node.s32("eventorder", 0)) data.add_child(Node.u64("pcbtime", int(time.time() * 1000))) data.add_child(Node.s64("gamesession", -1)) data.add_child(Node.string("strdata1", "1.7.6")) data.add_child(Node.string("strdata2", "")) data.add_child(Node.s64("numdata1", 1)) data.add_child(Node.s64("numdata2", 0)) data.add_child(Node.string("locationid", location)) # Swap with server resp = self.exchange("", call) # Verify that response is correct self.assert_path(resp, "response/eventlog/gamesession") self.assert_path(resp, "response/eventlog/logsendflg") self.assert_path(resp, "response/eventlog/logerrlevel") self.assert_path(resp, "response/eventlog/evtidnosendflg") def verify_system_getmaster(self) -> None: call = self.call_node() # Construct node system = Node.void("system") call.add_child(system) system.set_attribute("method", "getmaster") data = Node.void("data") system.add_child(data) data.add_child(Node.string("gamekind", "IBB")) data.add_child(Node.string("datatype", "S_SRVMSG")) data.add_child(Node.string("datakey", "INFO")) # Swap with server resp = self.exchange("", call) # Verify that response is correct self.assert_path(resp, "response/system/result") def verify_usergamedata_send(self, ref_id: str, msg_type: str) -> None: call = self.call_node() # Set up profile write profiledata = [ b"ffffffff", b"IBBDAT00", b"1", b"0", b"0", b"0", b"0", b"0", b"e474c1b", b"0", b"0", b"ff", b"0", b"0", b"0", b"0", b"0", b"0", b"0", b"0.000000", b"0.000000", b"0.000000", b"0.000000", b"0.000000", b"0.000000", b"0.000000", b"0.000000", b"", b"", b"", b"\x96\xa2\x90\xdd\x92\xe8", b"\x8d\x81\x8d`", b"", b"", b"", ] if msg_type == "new": # New profile gets blank name, because we save over it at the end of the round. profiledata[27] = b"" elif msg_type == "existing": # Exiting profile gets our hardcoded name saved. profiledata[27] = self.NAME.encode("shift-jis") # Construct node playerdata = Node.void("playerdata") call.add_child(playerdata) playerdata.set_attribute("method", "usergamedata_send") playerdata.add_child(Node.u32("retrycnt", 0)) data = Node.void("data") playerdata.add_child(data) data.add_child(Node.string("eaid", ref_id)) data.add_child(Node.string("gamekind", "IBB")) data.add_child(Node.u32("datanum", 1)) record = Node.void("record") data.add_child(record) d = Node.string("d", base64.b64encode(b",".join(profiledata)).decode("ascii")) record.add_child(d) d.add_child(Node.string("bin1", "")) # Swap with server resp = self.exchange("", call) self.assert_path(resp, "response/playerdata/result") def verify_usergamedata_recv(self, ref_id: str) -> str: call = self.call_node() # Construct node playerdata = Node.void("playerdata") call.add_child(playerdata) playerdata.set_attribute("method", "usergamedata_recv") data = Node.void("data") playerdata.add_child(data) data.add_child(Node.string("eaid", ref_id)) data.add_child(Node.string("gamekind", "IBB")) data.add_child(Node.u32("recv_num", 1)) data.add_child(Node.string("recv_csv", "IBBDAT00,3fffffffff")) # Swap with server resp = self.exchange("", call) self.assert_path(resp, "response/playerdata/result") self.assert_path(resp, "response/playerdata/player/record/d/bin1") self.assert_path(resp, "response/playerdata/player/record_num") # Grab binary data, parse out name bindata = resp.child_value("playerdata/player/record/d") profiledata = base64.b64decode(bindata).split(b",") # We lob off the first two values in returning profile, so the name is offset by two return profiledata[25].decode("shift-jis") def verify(self, cardid: Optional[str]) -> None: # Verify boot sequence is okay self.verify_services_get( expected_services=[ "pcbtracker", "pcbevent", "local", "message", "facility", "cardmng", "package", "posevent", "pkglist", "dlstatus", "eacoin", "lobby", "ntp", "keepalive", ] ) paseli_enabled = self.verify_pcbtracker_alive() self.verify_message_get() self.verify_package_list() location = self.verify_facility_get() self.verify_pcbevent_put() self.verify_eventlog_write(location) self.verify_system_getmaster() # Verify card registration and profile lookup if cardid is not None: card = cardid else: card = self.random_card() print(f"Generated random card ID {card} for use.") if cardid is None: self.verify_cardmng_inquire(card, msg_type="unregistered", paseli_enabled=paseli_enabled) ref_id = self.verify_cardmng_getrefid(card) if len(ref_id) != 16: raise Exception(f"Invalid refid '{ref_id}' returned when registering card") if ref_id != self.verify_cardmng_inquire(card, msg_type="new", paseli_enabled=paseli_enabled): raise Exception(f"Invalid refid '{ref_id}' returned when querying card") # Bishi doesn't read a new profile, it just writes out CSV for a blank one self.verify_usergamedata_send(ref_id, msg_type="new") else: print("Skipping new card checks for existing card") ref_id = self.verify_cardmng_inquire(card, msg_type="query", paseli_enabled=paseli_enabled) # Verify pin handling and return card handling self.verify_cardmng_authpass(ref_id, correct=True) self.verify_cardmng_authpass(ref_id, correct=False) if ref_id != self.verify_cardmng_inquire(card, msg_type="query", paseli_enabled=paseli_enabled): raise Exception(f"Invalid refid '{ref_id}' returned when querying card") if cardid is None: # Verify profile saving name = self.verify_usergamedata_recv(ref_id) if name != "": raise Exception("New profile has a name associated with it!") self.verify_usergamedata_send(ref_id, msg_type="existing") name = self.verify_usergamedata_recv(ref_id) if name != self.NAME: raise Exception("Existing profile has no name associated with it!") else: print("Skipping score checks for existing card")