1
0
mirror of synced 2024-12-17 00:35:52 +01:00
bemaniutils/bemani/client/sdvx/booth.py

640 lines
26 KiB
Python
Raw Normal View History

import random
import time
from typing import Any, Dict, List, Optional
from bemani.common import Time
from bemani.client.base import BaseClient
from bemani.protocol import Node
class SoundVoltexBoothClient(BaseClient):
NAME = 'TEST'
def verify_eventlog_write(self, location: str) -> None:
call = self.call_node()
# Construct node
eventlog = Node.void('eventlog')
call.add_child(eventlog)
eventlog.set_attribute('method', 'write')
eventlog.add_child(Node.u32('retrycnt', 0))
data = Node.void('data')
eventlog.add_child(data)
data.add_child(Node.string('eventid', 'S_PWRON'))
data.add_child(Node.s32('eventorder', 0))
data.add_child(Node.u64('pcbtime', int(time.time() * 1000)))
data.add_child(Node.s64('gamesession', -1))
data.add_child(Node.string('strdata1', '1.7.6'))
data.add_child(Node.string('strdata2', ''))
data.add_child(Node.s64('numdata1', 1))
data.add_child(Node.s64('numdata2', 0))
data.add_child(Node.string('locationid', location))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/eventlog/gamesession")
self.assert_path(resp, "response/eventlog/logsendflg")
self.assert_path(resp, "response/eventlog/logerrlevel")
self.assert_path(resp, "response/eventlog/evtidnosendflg")
def verify_game_hiscore(self, location: str) -> None:
call = self.call_node()
game = Node.void('game')
game.set_attribute('locid', location)
game.set_attribute('ver', '0')
game.set_attribute('method', 'hiscore')
call.add_child(game)
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/ranking/@id")
self.assert_path(resp, "response/game/hiscore/@type")
self.assert_path(resp, "response/game/hiscore/music/@id")
self.assert_path(resp, "response/game/hiscore/music/note/@name")
self.assert_path(resp, "response/game/hiscore/music/note/@score")
self.assert_path(resp, "response/game/hiscore/music/note/@type")
def verify_game_shop(self, location: str) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'shop')
game.set_attribute('ver', '0')
game.add_child(Node.string('locid', location))
game.add_child(Node.string('regcode', '.'))
game.add_child(Node.string('locname', ''))
game.add_child(Node.u8('loctype', 0))
game.add_child(Node.string('cstcode', ''))
game.add_child(Node.string('cpycode', ''))
game.add_child(Node.s32('latde', 0))
game.add_child(Node.s32('londe', 0))
game.add_child(Node.u8('accu', 0))
game.add_child(Node.string('linid', '.'))
game.add_child(Node.u8('linclass', 0))
game.add_child(Node.ipv4('ipaddr', '0.0.0.0'))
game.add_child(Node.string('hadid', '00010203040506070809'))
game.add_child(Node.string('licid', '00010203040506070809'))
game.add_child(Node.string('actid', self.pcbid))
game.add_child(Node.s8('appstate', 0))
game.add_child(Node.s8('c_need', 1))
game.add_child(Node.s8('c_credit', 2))
game.add_child(Node.s8('s_credit', 2))
game.add_child(Node.bool('free_p', True))
game.add_child(Node.bool('close', True))
game.add_child(Node.s32('close_t', 1380))
game.add_child(Node.u32('playc', 0))
game.add_child(Node.u32('playn', 0))
game.add_child(Node.u32('playe', 0))
game.add_child(Node.u32('test_m', 0))
game.add_child(Node.u32('service', 0))
game.add_child(Node.bool('paseli', True))
game.add_child(Node.u32('update', 0))
game.add_child(Node.string('shopname', ''))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/nxt_time")
def verify_game_new(self, location: str, refid: str) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('name', self.NAME)
game.set_attribute('method', 'new')
game.set_attribute('refid', refid)
game.set_attribute('locid', location)
game.set_attribute('dataid', refid)
game.set_attribute('ver', '0')
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify_game_frozen(self, refid: str, time: int) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('refid', refid)
game.set_attribute('method', 'frozen')
game.set_attribute('ver', '0')
game.add_child(Node.u32('sec', time))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/@result")
def verify_game_save(self, location: str, refid: str, packet: int, block: int, exp: int) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'save')
game.set_attribute('refid', refid)
game.set_attribute('locid', location)
game.set_attribute('ver', '0')
game.add_child(Node.u8('headphone', 0))
game.add_child(Node.u8('hispeed', 16))
game.add_child(Node.u16('appeal_id', 19))
game.add_child(Node.u16('frame0', 0))
game.add_child(Node.u16('frame1', 0))
game.add_child(Node.u16('frame2', 0))
game.add_child(Node.u16('frame3', 0))
game.add_child(Node.u16('frame4', 0))
last = Node.void('last')
game.add_child(last)
last.set_attribute('music_type', '1')
last.set_attribute('music_id', '29')
last.set_attribute('sort_type', '4')
game.add_child(Node.u32('earned_gamecoin_packet', packet))
game.add_child(Node.u32('earned_gamecoin_block', block))
game.add_child(Node.u32('gain_exp', exp))
game.add_child(Node.u32('m_user_cnt', 0))
game.add_child(Node.bool_array('have_item', [False] * 512))
game.add_child(Node.bool_array('have_note', [False] * 512))
tracking = Node.void('tracking')
game.add_child(tracking)
m0 = Node.void('m0')
tracking.add_child(m0)
m0.add_child(Node.u8('type', 2))
m0.add_child(Node.u32('id', 5))
m0.add_child(Node.u32('score', 774566))
tracking.add_child(Node.time('p_start', Time.now() - 300))
tracking.add_child(Node.time('p_end', Time.now()))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify_game_common(self) -> None:
call = self.call_node()
game = Node.void('game')
game.set_attribute('ver', '0')
game.set_attribute('method', 'common')
call.add_child(game)
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/limited")
self.assert_path(resp, "response/game/event/info/@id")
self.assert_path(resp, "response/game/catalog/info/@currency")
self.assert_path(resp, "response/game/catalog/info/@id")
self.assert_path(resp, "response/game/catalog/info/@price")
self.assert_path(resp, "response/game/kacinfo/note00")
self.assert_path(resp, "response/game/kacinfo/note01")
self.assert_path(resp, "response/game/kacinfo/note02")
self.assert_path(resp, "response/game/kacinfo/note10")
self.assert_path(resp, "response/game/kacinfo/note11")
self.assert_path(resp, "response/game/kacinfo/note12")
self.assert_path(resp, "response/game/kacinfo/rabbeat0")
self.assert_path(resp, "response/game/kacinfo/rabbeat1")
def verify_game_buy(self, refid: str, catalogid: int, success: bool) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('refid', refid)
game.set_attribute('ver', '0')
game.set_attribute('method', 'buy')
game.add_child(Node.u32('catalog_id', catalogid))
game.add_child(Node.u32('earned_gamecoin_packet', 0))
game.add_child(Node.u32('earned_gamecoin_block', 0))
game.add_child(Node.u32('open_index', 4))
game.add_child(Node.u32('currency_type', 1))
game.add_child(Node.u32('price', 10))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/gamecoin_packet")
self.assert_path(resp, "response/game/gamecoin_block")
self.assert_path(resp, "response/game/result")
if success:
if resp.child_value('game/result') != 0:
raise Exception('Failed to purchase!')
else:
if resp.child_value('game/result') == 0:
raise Exception('Purchased when shouldn\'t have!')
def verify_game_load(self, cardid: str, refid: str, msg_type: str) -> Dict[str, Any]:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('cardid', cardid)
game.set_attribute('dataid', refid)
game.set_attribute('ver', '0')
game.set_attribute('method', 'load')
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
if msg_type == "new":
self.assert_path(resp, "response/game/@none")
return None
if msg_type == "existing":
self.assert_path(resp, "response/game/name")
self.assert_path(resp, "response/game/code")
self.assert_path(resp, "response/game/gamecoin_packet")
self.assert_path(resp, "response/game/gamecoin_block")
self.assert_path(resp, "response/game/exp_point")
self.assert_path(resp, "response/game/m_user_cnt")
self.assert_path(resp, "response/game/have_item")
self.assert_path(resp, "response/game/have_note")
self.assert_path(resp, "response/game/last/@appeal_id")
self.assert_path(resp, "response/game/last/@frame0")
self.assert_path(resp, "response/game/last/@frame1")
self.assert_path(resp, "response/game/last/@frame2")
self.assert_path(resp, "response/game/last/@frame3")
self.assert_path(resp, "response/game/last/@frame4")
self.assert_path(resp, "response/game/last/@headphone")
self.assert_path(resp, "response/game/last/@hispeed")
self.assert_path(resp, "response/game/last/@music_id")
self.assert_path(resp, "response/game/last/@music_type")
self.assert_path(resp, "response/game/last/@sort_type")
return {
'name': resp.child_value('game/name'),
'packet': resp.child_value('game/gamecoin_packet'),
'block': resp.child_value('game/gamecoin_block'),
'exp': resp.child_value('game/exp_point'),
}
else:
raise Exception(f"Invalid game load type {msg_type}")
def verify_game_lounge(self) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'lounge')
game.set_attribute('ver', '0')
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/interval")
def verify_game_entry_s(self) -> int:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('ver', '0')
game.set_attribute('method', 'entry_s')
game.add_child(Node.u8('c_ver', 22))
game.add_child(Node.u8('p_num', 1))
game.add_child(Node.u8('p_rest', 1))
game.add_child(Node.u8('filter', 1))
game.add_child(Node.u32('mid', 5))
game.add_child(Node.u32('sec', 45))
game.add_child(Node.u16('port', 10007))
game.add_child(Node.fouru8('gip', [127, 0, 0, 1]))
game.add_child(Node.fouru8('lip', [10, 0, 5, 73]))
game.add_child(Node.bool('claim', True))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/entry_id")
return resp.child_value('game/entry_id')
def verify_game_entry_e(self, eid: int) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'entry_e')
game.set_attribute('ver', '0')
game.add_child(Node.u32('eid', eid))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify_game_load_m(self, refid: str) -> List[Dict[str, int]]:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'load_m')
game.set_attribute('ver', '0')
game.set_attribute('all', '1')
game.set_attribute('dataid', refid)
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game")
scores = []
for child in resp.child('game').children:
if child.name != 'music':
continue
musicid = int(child.attribute('music_id'))
for typenode in child.children:
if typenode.name != 'type':
continue
chart = int(typenode.attribute('type_id'))
clear_type = int(typenode.attribute('clear_type'))
score = int(typenode.attribute('score'))
grade = int(typenode.attribute('score_grade'))
scores.append({
'id': musicid,
'chart': chart,
'clear_type': clear_type,
'score': score,
'grade': grade,
})
return scores
def verify_game_save_m(self, location: str, refid: str, score: Dict[str, int]) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'save_m')
game.set_attribute('max_chain', '0')
game.set_attribute('clear_type', str(score['clear_type']))
game.set_attribute('music_type', str(score['chart']))
game.set_attribute('score_grade', str(score['grade']))
game.set_attribute('locid', location)
game.set_attribute('music_id', str(score['id']))
game.set_attribute('dataid', refid)
game.set_attribute('ver', '0')
game.set_attribute('score', str(score['score']))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify(self, cardid: Optional[str]) -> None:
# Verify boot sequence is okay
self.verify_services_get(
expected_services=[
'pcbtracker',
'pcbevent',
'local',
'message',
'facility',
'cardmng',
'package',
'posevent',
'pkglist',
'dlstatus',
'eacoin',
'lobby',
'ntp',
'keepalive'
]
)
paseli_enabled = self.verify_pcbtracker_alive()
self.verify_message_get()
self.verify_package_list()
location = self.verify_facility_get()
self.verify_pcbevent_put()
self.verify_eventlog_write(location)
self.verify_game_shop(location)
self.verify_game_common()
# Verify card registration and profile lookup
if cardid is not None:
card = cardid
else:
card = self.random_card()
print(f"Generated random card ID {card} for use.")
if cardid is None:
self.verify_cardmng_inquire(card, msg_type='unregistered', paseli_enabled=paseli_enabled)
ref_id = self.verify_cardmng_getrefid(card)
if len(ref_id) != 16:
raise Exception(f'Invalid refid \'{ref_id}\' returned when registering card')
if ref_id != self.verify_cardmng_inquire(card, msg_type='new', paseli_enabled=paseli_enabled):
raise Exception(f'Invalid refid \'{ref_id}\' returned when querying card')
# SDVX doesn't read the new profile, it asks for the profile itself after calling new
self.verify_game_load(card, ref_id, msg_type='new')
self.verify_game_new(location, ref_id)
self.verify_game_load(card, ref_id, msg_type='existing')
else:
print("Skipping new card checks for existing card")
ref_id = self.verify_cardmng_inquire(card, msg_type='query', paseli_enabled=paseli_enabled)
# Verify pin handling and return card handling
self.verify_cardmng_authpass(ref_id, correct=True)
self.verify_cardmng_authpass(ref_id, correct=False)
if ref_id != self.verify_cardmng_inquire(card, msg_type='query', paseli_enabled=paseli_enabled):
raise Exception(f'Invalid refid \'{ref_id}\' returned when querying card')
# Verify account freezing
self.verify_game_frozen(ref_id, 900)
self.verify_game_frozen(ref_id, 0)
# Verify lobby functionality
self.verify_game_lounge()
eid = self.verify_game_entry_s()
self.verify_game_entry_e(eid)
if cardid is None:
# Verify profile loading and saving
profile = self.verify_game_load(card, ref_id, msg_type='existing')
if profile['name'] != self.NAME:
raise Exception(f'Profile has incorrect name {profile["name"]} associated with it!')
if profile['packet'] != 0:
raise Exception('Profile has nonzero blocks associated with it!')
if profile['block'] != 0:
raise Exception('Profile has nonzero packets associated with it!')
if profile['exp'] != 0:
raise Exception('Profile has nonzero exp associated with it!')
# Verify purchase failure
self.verify_game_buy(ref_id, 1004, False)
self.verify_game_save(location, ref_id, packet=123, block=234, exp=42)
profile = self.verify_game_load(card, ref_id, msg_type='existing')
if profile['name'] != self.NAME:
raise Exception(f'Profile has incorrect name {profile["name"]} associated with it!')
if profile['packet'] != 123:
raise Exception('Profile has invalid blocks associated with it!')
if profile['block'] != 234:
raise Exception('Profile has invalid packets associated with it!')
if profile['exp'] != 42:
raise Exception('Profile has invalid exp associated with it!')
self.verify_game_save(location, ref_id, packet=1, block=2, exp=3)
profile = self.verify_game_load(card, ref_id, msg_type='existing')
if profile['name'] != self.NAME:
raise Exception(f'Profile has incorrect name {profile["name"]} associated with it!')
if profile['packet'] != 124:
raise Exception('Profile has invalid blocks associated with it!')
if profile['block'] != 236:
raise Exception('Profile has invalid packets associated with it!')
if profile['exp'] != 45:
raise Exception('Profile has invalid exp associated with it!')
# Verify purchase success
self.verify_game_buy(ref_id, 1004, True)
profile = self.verify_game_load(card, ref_id, msg_type='existing')
if profile['name'] != self.NAME:
raise Exception(f'Profile has incorrect name {profile["name"]} associated with it!')
if profile['packet'] != 124:
raise Exception('Profile has invalid blocks associated with it!')
if profile['block'] != 226:
raise Exception('Profile has invalid packets associated with it!')
if profile['exp'] != 45:
raise Exception('Profile has invalid exp associated with it!')
# Verify empty profile has no scores on it
scores = self.verify_game_load_m(ref_id)
if len(scores) > 0:
raise Exception('Score on an empty profile!')
# Verify score saving and updating
for phase in [1, 2]:
if phase == 1:
dummyscores = [
# An okay score on a chart
{
'id': 1,
'chart': 1,
'grade': 3,
'clear_type': 2,
'score': 765432,
},
# A good score on an easier chart of the same song
{
'id': 1,
'chart': 0,
'grade': 6,
'clear_type': 3,
'score': 7654321,
},
# A bad score on a hard chart
{
'id': 2,
'chart': 2,
'grade': 1,
'clear_type': 1,
'score': 12345,
},
# A terrible score on an easy chart
{
'id': 3,
'chart': 0,
'grade': 1,
'clear_type': 1,
'score': 123,
},
]
if phase == 2:
dummyscores = [
# A better score on the same chart
{
'id': 1,
'chart': 1,
'grade': 5,
'clear_type': 3,
'score': 8765432,
},
# A worse score on another same chart
{
'id': 1,
'chart': 0,
'grade': 4,
'clear_type': 2,
'score': 6543210,
'expected_score': 7654321,
'expected_clear_type': 3,
'expected_grade': 6,
},
]
for dummyscore in dummyscores:
self.verify_game_save_m(location, ref_id, dummyscore)
scores = self.verify_game_load_m(ref_id)
for expected in dummyscores:
actual = None
for received in scores:
if received['id'] == expected['id'] and received['chart'] == expected['chart']:
actual = received
break
if actual is None:
raise Exception(f"Didn't find song {expected['id']} chart {expected['chart']} in response!")
if 'expected_score' in expected:
expected_score = expected['expected_score']
else:
expected_score = expected['score']
if 'expected_grade' in expected:
expected_grade = expected['expected_grade']
else:
expected_grade = expected['grade']
if 'expected_clear_type' in expected:
expected_clear_type = expected['expected_clear_type']
else:
expected_clear_type = expected['clear_type']
if actual['score'] != expected_score:
raise Exception(f'Expected a score of \'{expected_score}\' for song \'{expected["id"]}\' chart \'{expected["chart"]}\' but got score \'{actual["score"]}\'')
if actual['grade'] != expected_grade:
raise Exception(f'Expected a grade of \'{expected_grade}\' for song \'{expected["id"]}\' chart \'{expected["chart"]}\' but got grade \'{actual["grade"]}\'')
if actual['clear_type'] != expected_clear_type:
raise Exception(f'Expected a clear_type of \'{expected_clear_type}\' for song \'{expected["id"]}\' chart \'{expected["chart"]}\' but got clear_type \'{actual["clear_type"]}\'')
# Sleep so we don't end up putting in score history on the same second
time.sleep(1)
else:
print("Skipping score checks for existing card")
# Verify high score tables
self.verify_game_hiscore(location)
# Verify paseli handling
if paseli_enabled:
print("PASELI enabled for this PCBID, executing PASELI checks")
else:
print("PASELI disabled for this PCBID, skipping PASELI checks")
return
sessid, balance = self.verify_eacoin_checkin(card)
if balance == 0:
print("Skipping PASELI consume check because card has 0 balance")
else:
self.verify_eacoin_consume(sessid, balance, random.randint(0, balance))
self.verify_eacoin_checkout(sessid)