223 lines
7.9 KiB
Python
223 lines
7.9 KiB
Python
import base64
|
||
import time
|
||
from typing import Optional
|
||
|
||
from bemani.client.base import BaseClient
|
||
from bemani.protocol import Node
|
||
|
||
|
||
class TheStarBishiBashiClient(BaseClient):
|
||
NAME = "TEST"
|
||
|
||
def verify_eventlog_write(self, location: str) -> None:
|
||
call = self.call_node()
|
||
|
||
# Construct node
|
||
eventlog = Node.void("eventlog")
|
||
call.add_child(eventlog)
|
||
eventlog.set_attribute("method", "write")
|
||
eventlog.add_child(Node.u32("retrycnt", 0))
|
||
data = Node.void("data")
|
||
eventlog.add_child(data)
|
||
data.add_child(Node.string("eventid", "S_PWRON"))
|
||
data.add_child(Node.s32("eventorder", 0))
|
||
data.add_child(Node.u64("pcbtime", int(time.time() * 1000)))
|
||
data.add_child(Node.s64("gamesession", -1))
|
||
data.add_child(Node.string("strdata1", "1.7.6"))
|
||
data.add_child(Node.string("strdata2", ""))
|
||
data.add_child(Node.s64("numdata1", 1))
|
||
data.add_child(Node.s64("numdata2", 0))
|
||
data.add_child(Node.string("locationid", location))
|
||
|
||
# Swap with server
|
||
resp = self.exchange("", call)
|
||
|
||
# Verify that response is correct
|
||
self.assert_path(resp, "response/eventlog/gamesession")
|
||
self.assert_path(resp, "response/eventlog/logsendflg")
|
||
self.assert_path(resp, "response/eventlog/logerrlevel")
|
||
self.assert_path(resp, "response/eventlog/evtidnosendflg")
|
||
|
||
def verify_system_getmaster(self) -> None:
|
||
call = self.call_node()
|
||
|
||
# Construct node
|
||
system = Node.void("system")
|
||
call.add_child(system)
|
||
system.set_attribute("method", "getmaster")
|
||
data = Node.void("data")
|
||
system.add_child(data)
|
||
data.add_child(Node.string("gamekind", "IBB"))
|
||
data.add_child(Node.string("datatype", "S_SRVMSG"))
|
||
data.add_child(Node.string("datakey", "INFO"))
|
||
|
||
# Swap with server
|
||
resp = self.exchange("", call)
|
||
|
||
# Verify that response is correct
|
||
self.assert_path(resp, "response/system/result")
|
||
|
||
def verify_usergamedata_send(self, ref_id: str, msg_type: str) -> None:
|
||
call = self.call_node()
|
||
|
||
# Set up profile write
|
||
profiledata = [
|
||
b"ffffffff",
|
||
b"IBBDAT00",
|
||
b"1",
|
||
b"0",
|
||
b"0",
|
||
b"0",
|
||
b"0",
|
||
b"0",
|
||
b"e474c1b",
|
||
b"0",
|
||
b"0",
|
||
b"ff",
|
||
b"0",
|
||
b"0",
|
||
b"0",
|
||
b"0",
|
||
b"0",
|
||
b"0",
|
||
b"0",
|
||
b"0.000000",
|
||
b"0.000000",
|
||
b"0.000000",
|
||
b"0.000000",
|
||
b"0.000000",
|
||
b"0.000000",
|
||
b"0.000000",
|
||
b"0.000000",
|
||
b"",
|
||
b"",
|
||
b"",
|
||
b"\x96\xa2\x90\xdd\x92\xe8",
|
||
b"\x8d\x81\x8d`",
|
||
b"",
|
||
b"",
|
||
b"",
|
||
]
|
||
|
||
if msg_type == "new":
|
||
# New profile gets blank name, because we save over it at the end of the round.
|
||
profiledata[27] = b""
|
||
elif msg_type == "existing":
|
||
# Exiting profile gets our hardcoded name saved.
|
||
profiledata[27] = self.NAME.encode("shift-jis")
|
||
|
||
# Construct node
|
||
playerdata = Node.void("playerdata")
|
||
call.add_child(playerdata)
|
||
playerdata.set_attribute("method", "usergamedata_send")
|
||
playerdata.add_child(Node.u32("retrycnt", 0))
|
||
|
||
data = Node.void("data")
|
||
playerdata.add_child(data)
|
||
data.add_child(Node.string("eaid", ref_id))
|
||
data.add_child(Node.string("gamekind", "IBB"))
|
||
data.add_child(Node.u32("datanum", 1))
|
||
record = Node.void("record")
|
||
data.add_child(record)
|
||
d = Node.string("d", base64.b64encode(b",".join(profiledata)).decode("ascii"))
|
||
record.add_child(d)
|
||
d.add_child(Node.string("bin1", ""))
|
||
|
||
# Swap with server
|
||
resp = self.exchange("", call)
|
||
self.assert_path(resp, "response/playerdata/result")
|
||
|
||
def verify_usergamedata_recv(self, ref_id: str) -> str:
|
||
call = self.call_node()
|
||
|
||
# Construct node
|
||
playerdata = Node.void("playerdata")
|
||
call.add_child(playerdata)
|
||
playerdata.set_attribute("method", "usergamedata_recv")
|
||
data = Node.void("data")
|
||
playerdata.add_child(data)
|
||
data.add_child(Node.string("eaid", ref_id))
|
||
data.add_child(Node.string("gamekind", "IBB"))
|
||
data.add_child(Node.u32("recv_num", 1))
|
||
data.add_child(Node.string("recv_csv", "IBBDAT00,3fffffffff"))
|
||
|
||
# Swap with server
|
||
resp = self.exchange("", call)
|
||
self.assert_path(resp, "response/playerdata/result")
|
||
self.assert_path(resp, "response/playerdata/player/record/d/bin1")
|
||
self.assert_path(resp, "response/playerdata/player/record_num")
|
||
|
||
# Grab binary data, parse out name
|
||
bindata = resp.child_value("playerdata/player/record/d")
|
||
profiledata = base64.b64decode(bindata).split(b",")
|
||
|
||
# We lob off the first two values in returning profile, so the name is offset by two
|
||
return profiledata[25].decode("shift-jis")
|
||
|
||
def verify(self, cardid: Optional[str]) -> None:
|
||
# Verify boot sequence is okay
|
||
self.verify_services_get(
|
||
expected_services=[
|
||
"pcbtracker",
|
||
"pcbevent",
|
||
"local",
|
||
"message",
|
||
"facility",
|
||
"cardmng",
|
||
"package",
|
||
"posevent",
|
||
"pkglist",
|
||
"dlstatus",
|
||
"eacoin",
|
||
"lobby",
|
||
"ntp",
|
||
"keepalive",
|
||
]
|
||
)
|
||
paseli_enabled = self.verify_pcbtracker_alive()
|
||
self.verify_message_get()
|
||
self.verify_package_list()
|
||
location = self.verify_facility_get()
|
||
self.verify_pcbevent_put()
|
||
self.verify_eventlog_write(location)
|
||
self.verify_system_getmaster()
|
||
|
||
# Verify card registration and profile lookup
|
||
if cardid is not None:
|
||
card = cardid
|
||
else:
|
||
card = self.random_card()
|
||
print(f"Generated random card ID {card} for use.")
|
||
|
||
if cardid is None:
|
||
self.verify_cardmng_inquire(card, msg_type="unregistered", paseli_enabled=paseli_enabled)
|
||
ref_id = self.verify_cardmng_getrefid(card)
|
||
if len(ref_id) != 16:
|
||
raise Exception(f"Invalid refid '{ref_id}' returned when registering card")
|
||
if ref_id != self.verify_cardmng_inquire(card, msg_type="new", paseli_enabled=paseli_enabled):
|
||
raise Exception(f"Invalid refid '{ref_id}' returned when querying card")
|
||
# Bishi doesn't read a new profile, it just writes out CSV for a blank one
|
||
self.verify_usergamedata_send(ref_id, msg_type="new")
|
||
else:
|
||
print("Skipping new card checks for existing card")
|
||
ref_id = self.verify_cardmng_inquire(card, msg_type="query", paseli_enabled=paseli_enabled)
|
||
|
||
# Verify pin handling and return card handling
|
||
self.verify_cardmng_authpass(ref_id, correct=True)
|
||
self.verify_cardmng_authpass(ref_id, correct=False)
|
||
if ref_id != self.verify_cardmng_inquire(card, msg_type="query", paseli_enabled=paseli_enabled):
|
||
raise Exception(f"Invalid refid '{ref_id}' returned when querying card")
|
||
|
||
if cardid is None:
|
||
# Verify profile saving
|
||
name = self.verify_usergamedata_recv(ref_id)
|
||
if name != "":
|
||
raise Exception("New profile has a name associated with it!")
|
||
|
||
self.verify_usergamedata_send(ref_id, msg_type="existing")
|
||
name = self.verify_usergamedata_recv(ref_id)
|
||
if name != self.NAME:
|
||
raise Exception("Existing profile has no name associated with it!")
|
||
else:
|
||
print("Skipping score checks for existing card")
|