1
0
mirror of synced 2024-11-28 07:50:51 +01:00
bemaniutils/bemani/client/sdvx/booth.py
Jennifer Taylor 509cb4f0d9 Convert most of the format() string calls to f-strings using libcst.
Exact commands run were:

  python3 -m libcst.tool codemod convert_format_to_fstring.ConvertFormatStringCommand . --no-format
  python3 setup.py build_ext --inplace
2020-01-07 21:29:07 +00:00

640 lines
26 KiB
Python

import random
import time
from typing import Any, Dict, List, Optional
from bemani.common import Time
from bemani.client.base import BaseClient
from bemani.protocol import Node
class SoundVoltexBoothClient(BaseClient):
NAME = 'TEST'
def verify_eventlog_write(self, location: str) -> None:
call = self.call_node()
# Construct node
eventlog = Node.void('eventlog')
call.add_child(eventlog)
eventlog.set_attribute('method', 'write')
eventlog.add_child(Node.u32('retrycnt', 0))
data = Node.void('data')
eventlog.add_child(data)
data.add_child(Node.string('eventid', 'S_PWRON'))
data.add_child(Node.s32('eventorder', 0))
data.add_child(Node.u64('pcbtime', int(time.time() * 1000)))
data.add_child(Node.s64('gamesession', -1))
data.add_child(Node.string('strdata1', '1.7.6'))
data.add_child(Node.string('strdata2', ''))
data.add_child(Node.s64('numdata1', 1))
data.add_child(Node.s64('numdata2', 0))
data.add_child(Node.string('locationid', location))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/eventlog/gamesession")
self.assert_path(resp, "response/eventlog/logsendflg")
self.assert_path(resp, "response/eventlog/logerrlevel")
self.assert_path(resp, "response/eventlog/evtidnosendflg")
def verify_game_hiscore(self, location: str) -> None:
call = self.call_node()
game = Node.void('game')
game.set_attribute('locid', location)
game.set_attribute('ver', '0')
game.set_attribute('method', 'hiscore')
call.add_child(game)
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/ranking/@id")
self.assert_path(resp, "response/game/hiscore/@type")
self.assert_path(resp, "response/game/hiscore/music/@id")
self.assert_path(resp, "response/game/hiscore/music/note/@name")
self.assert_path(resp, "response/game/hiscore/music/note/@score")
self.assert_path(resp, "response/game/hiscore/music/note/@type")
def verify_game_shop(self, location: str) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'shop')
game.set_attribute('ver', '0')
game.add_child(Node.string('locid', location))
game.add_child(Node.string('regcode', '.'))
game.add_child(Node.string('locname', ''))
game.add_child(Node.u8('loctype', 0))
game.add_child(Node.string('cstcode', ''))
game.add_child(Node.string('cpycode', ''))
game.add_child(Node.s32('latde', 0))
game.add_child(Node.s32('londe', 0))
game.add_child(Node.u8('accu', 0))
game.add_child(Node.string('linid', '.'))
game.add_child(Node.u8('linclass', 0))
game.add_child(Node.ipv4('ipaddr', '0.0.0.0'))
game.add_child(Node.string('hadid', '00010203040506070809'))
game.add_child(Node.string('licid', '00010203040506070809'))
game.add_child(Node.string('actid', self.pcbid))
game.add_child(Node.s8('appstate', 0))
game.add_child(Node.s8('c_need', 1))
game.add_child(Node.s8('c_credit', 2))
game.add_child(Node.s8('s_credit', 2))
game.add_child(Node.bool('free_p', True))
game.add_child(Node.bool('close', True))
game.add_child(Node.s32('close_t', 1380))
game.add_child(Node.u32('playc', 0))
game.add_child(Node.u32('playn', 0))
game.add_child(Node.u32('playe', 0))
game.add_child(Node.u32('test_m', 0))
game.add_child(Node.u32('service', 0))
game.add_child(Node.bool('paseli', True))
game.add_child(Node.u32('update', 0))
game.add_child(Node.string('shopname', ''))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/nxt_time")
def verify_game_new(self, location: str, refid: str) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('name', self.NAME)
game.set_attribute('method', 'new')
game.set_attribute('refid', refid)
game.set_attribute('locid', location)
game.set_attribute('dataid', refid)
game.set_attribute('ver', '0')
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify_game_frozen(self, refid: str, time: int) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('refid', refid)
game.set_attribute('method', 'frozen')
game.set_attribute('ver', '0')
game.add_child(Node.u32('sec', time))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/@result")
def verify_game_save(self, location: str, refid: str, packet: int, block: int, exp: int) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'save')
game.set_attribute('refid', refid)
game.set_attribute('locid', location)
game.set_attribute('ver', '0')
game.add_child(Node.u8('headphone', 0))
game.add_child(Node.u8('hispeed', 16))
game.add_child(Node.u16('appeal_id', 19))
game.add_child(Node.u16('frame0', 0))
game.add_child(Node.u16('frame1', 0))
game.add_child(Node.u16('frame2', 0))
game.add_child(Node.u16('frame3', 0))
game.add_child(Node.u16('frame4', 0))
last = Node.void('last')
game.add_child(last)
last.set_attribute('music_type', '1')
last.set_attribute('music_id', '29')
last.set_attribute('sort_type', '4')
game.add_child(Node.u32('earned_gamecoin_packet', packet))
game.add_child(Node.u32('earned_gamecoin_block', block))
game.add_child(Node.u32('gain_exp', exp))
game.add_child(Node.u32('m_user_cnt', 0))
game.add_child(Node.bool_array('have_item', [False] * 512))
game.add_child(Node.bool_array('have_note', [False] * 512))
tracking = Node.void('tracking')
game.add_child(tracking)
m0 = Node.void('m0')
tracking.add_child(m0)
m0.add_child(Node.u8('type', 2))
m0.add_child(Node.u32('id', 5))
m0.add_child(Node.u32('score', 774566))
tracking.add_child(Node.time('p_start', Time.now() - 300))
tracking.add_child(Node.time('p_end', Time.now()))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify_game_common(self) -> None:
call = self.call_node()
game = Node.void('game')
game.set_attribute('ver', '0')
game.set_attribute('method', 'common')
call.add_child(game)
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/limited")
self.assert_path(resp, "response/game/event/info/@id")
self.assert_path(resp, "response/game/catalog/info/@currency")
self.assert_path(resp, "response/game/catalog/info/@id")
self.assert_path(resp, "response/game/catalog/info/@price")
self.assert_path(resp, "response/game/kacinfo/note00")
self.assert_path(resp, "response/game/kacinfo/note01")
self.assert_path(resp, "response/game/kacinfo/note02")
self.assert_path(resp, "response/game/kacinfo/note10")
self.assert_path(resp, "response/game/kacinfo/note11")
self.assert_path(resp, "response/game/kacinfo/note12")
self.assert_path(resp, "response/game/kacinfo/rabbeat0")
self.assert_path(resp, "response/game/kacinfo/rabbeat1")
def verify_game_buy(self, refid: str, catalogid: int, success: bool) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('refid', refid)
game.set_attribute('ver', '0')
game.set_attribute('method', 'buy')
game.add_child(Node.u32('catalog_id', catalogid))
game.add_child(Node.u32('earned_gamecoin_packet', 0))
game.add_child(Node.u32('earned_gamecoin_block', 0))
game.add_child(Node.u32('open_index', 4))
game.add_child(Node.u32('currency_type', 1))
game.add_child(Node.u32('price', 10))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/gamecoin_packet")
self.assert_path(resp, "response/game/gamecoin_block")
self.assert_path(resp, "response/game/result")
if success:
if resp.child_value('game/result') != 0:
raise Exception('Failed to purchase!')
else:
if resp.child_value('game/result') == 0:
raise Exception('Purchased when shouldn\'t have!')
def verify_game_load(self, cardid: str, refid: str, msg_type: str) -> Dict[str, Any]:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('cardid', cardid)
game.set_attribute('dataid', refid)
game.set_attribute('ver', '0')
game.set_attribute('method', 'load')
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
if msg_type == "new":
self.assert_path(resp, "response/game/@none")
return None
if msg_type == "existing":
self.assert_path(resp, "response/game/name")
self.assert_path(resp, "response/game/code")
self.assert_path(resp, "response/game/gamecoin_packet")
self.assert_path(resp, "response/game/gamecoin_block")
self.assert_path(resp, "response/game/exp_point")
self.assert_path(resp, "response/game/m_user_cnt")
self.assert_path(resp, "response/game/have_item")
self.assert_path(resp, "response/game/have_note")
self.assert_path(resp, "response/game/last/@appeal_id")
self.assert_path(resp, "response/game/last/@frame0")
self.assert_path(resp, "response/game/last/@frame1")
self.assert_path(resp, "response/game/last/@frame2")
self.assert_path(resp, "response/game/last/@frame3")
self.assert_path(resp, "response/game/last/@frame4")
self.assert_path(resp, "response/game/last/@headphone")
self.assert_path(resp, "response/game/last/@hispeed")
self.assert_path(resp, "response/game/last/@music_id")
self.assert_path(resp, "response/game/last/@music_type")
self.assert_path(resp, "response/game/last/@sort_type")
return {
'name': resp.child_value('game/name'),
'packet': resp.child_value('game/gamecoin_packet'),
'block': resp.child_value('game/gamecoin_block'),
'exp': resp.child_value('game/exp_point'),
}
else:
raise Exception(f"Invalid game load type {msg_type}")
def verify_game_lounge(self) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'lounge')
game.set_attribute('ver', '0')
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/interval")
def verify_game_entry_s(self) -> int:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('ver', '0')
game.set_attribute('method', 'entry_s')
game.add_child(Node.u8('c_ver', 22))
game.add_child(Node.u8('p_num', 1))
game.add_child(Node.u8('p_rest', 1))
game.add_child(Node.u8('filter', 1))
game.add_child(Node.u32('mid', 5))
game.add_child(Node.u32('sec', 45))
game.add_child(Node.u16('port', 10007))
game.add_child(Node.fouru8('gip', [127, 0, 0, 1]))
game.add_child(Node.fouru8('lip', [10, 0, 5, 73]))
game.add_child(Node.bool('claim', True))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game/entry_id")
return resp.child_value('game/entry_id')
def verify_game_entry_e(self, eid: int) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'entry_e')
game.set_attribute('ver', '0')
game.add_child(Node.u32('eid', eid))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify_game_load_m(self, refid: str) -> List[Dict[str, int]]:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'load_m')
game.set_attribute('ver', '0')
game.set_attribute('all', '1')
game.set_attribute('dataid', refid)
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game")
scores = []
for child in resp.child('game').children:
if child.name != 'music':
continue
musicid = int(child.attribute('music_id'))
for typenode in child.children:
if typenode.name != 'type':
continue
chart = int(typenode.attribute('type_id'))
clear_type = int(typenode.attribute('clear_type'))
score = int(typenode.attribute('score'))
grade = int(typenode.attribute('score_grade'))
scores.append({
'id': musicid,
'chart': chart,
'clear_type': clear_type,
'score': score,
'grade': grade,
})
return scores
def verify_game_save_m(self, location: str, refid: str, score: Dict[str, int]) -> None:
call = self.call_node()
game = Node.void('game')
call.add_child(game)
game.set_attribute('method', 'save_m')
game.set_attribute('max_chain', '0')
game.set_attribute('clear_type', str(score['clear_type']))
game.set_attribute('music_type', str(score['chart']))
game.set_attribute('score_grade', str(score['grade']))
game.set_attribute('locid', location)
game.set_attribute('music_id', str(score['id']))
game.set_attribute('dataid', refid)
game.set_attribute('ver', '0')
game.set_attribute('score', str(score['score']))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/game")
def verify(self, cardid: Optional[str]) -> None:
# Verify boot sequence is okay
self.verify_services_get(
expected_services=[
'pcbtracker',
'pcbevent',
'local',
'message',
'facility',
'cardmng',
'package',
'posevent',
'pkglist',
'dlstatus',
'eacoin',
'lobby',
'ntp',
'keepalive'
]
)
paseli_enabled = self.verify_pcbtracker_alive()
self.verify_message_get()
self.verify_package_list()
location = self.verify_facility_get()
self.verify_pcbevent_put()
self.verify_eventlog_write(location)
self.verify_game_shop(location)
self.verify_game_common()
# Verify card registration and profile lookup
if cardid is not None:
card = cardid
else:
card = self.random_card()
print(f"Generated random card ID {card} for use.")
if cardid is None:
self.verify_cardmng_inquire(card, msg_type='unregistered', paseli_enabled=paseli_enabled)
ref_id = self.verify_cardmng_getrefid(card)
if len(ref_id) != 16:
raise Exception(f'Invalid refid \'{ref_id}\' returned when registering card')
if ref_id != self.verify_cardmng_inquire(card, msg_type='new', paseli_enabled=paseli_enabled):
raise Exception(f'Invalid refid \'{ref_id}\' returned when querying card')
# SDVX doesn't read the new profile, it asks for the profile itself after calling new
self.verify_game_load(card, ref_id, msg_type='new')
self.verify_game_new(location, ref_id)
self.verify_game_load(card, ref_id, msg_type='existing')
else:
print("Skipping new card checks for existing card")
ref_id = self.verify_cardmng_inquire(card, msg_type='query', paseli_enabled=paseli_enabled)
# Verify pin handling and return card handling
self.verify_cardmng_authpass(ref_id, correct=True)
self.verify_cardmng_authpass(ref_id, correct=False)
if ref_id != self.verify_cardmng_inquire(card, msg_type='query', paseli_enabled=paseli_enabled):
raise Exception(f'Invalid refid \'{ref_id}\' returned when querying card')
# Verify account freezing
self.verify_game_frozen(ref_id, 900)
self.verify_game_frozen(ref_id, 0)
# Verify lobby functionality
self.verify_game_lounge()
eid = self.verify_game_entry_s()
self.verify_game_entry_e(eid)
if cardid is None:
# Verify profile loading and saving
profile = self.verify_game_load(card, ref_id, msg_type='existing')
if profile['name'] != self.NAME:
raise Exception(f'Profile has incorrect name {profile["name"]} associated with it!')
if profile['packet'] != 0:
raise Exception('Profile has nonzero blocks associated with it!')
if profile['block'] != 0:
raise Exception('Profile has nonzero packets associated with it!')
if profile['exp'] != 0:
raise Exception('Profile has nonzero exp associated with it!')
# Verify purchase failure
self.verify_game_buy(ref_id, 1004, False)
self.verify_game_save(location, ref_id, packet=123, block=234, exp=42)
profile = self.verify_game_load(card, ref_id, msg_type='existing')
if profile['name'] != self.NAME:
raise Exception(f'Profile has incorrect name {profile["name"]} associated with it!')
if profile['packet'] != 123:
raise Exception('Profile has invalid blocks associated with it!')
if profile['block'] != 234:
raise Exception('Profile has invalid packets associated with it!')
if profile['exp'] != 42:
raise Exception('Profile has invalid exp associated with it!')
self.verify_game_save(location, ref_id, packet=1, block=2, exp=3)
profile = self.verify_game_load(card, ref_id, msg_type='existing')
if profile['name'] != self.NAME:
raise Exception(f'Profile has incorrect name {profile["name"]} associated with it!')
if profile['packet'] != 124:
raise Exception('Profile has invalid blocks associated with it!')
if profile['block'] != 236:
raise Exception('Profile has invalid packets associated with it!')
if profile['exp'] != 45:
raise Exception('Profile has invalid exp associated with it!')
# Verify purchase success
self.verify_game_buy(ref_id, 1004, True)
profile = self.verify_game_load(card, ref_id, msg_type='existing')
if profile['name'] != self.NAME:
raise Exception(f'Profile has incorrect name {profile["name"]} associated with it!')
if profile['packet'] != 124:
raise Exception('Profile has invalid blocks associated with it!')
if profile['block'] != 226:
raise Exception('Profile has invalid packets associated with it!')
if profile['exp'] != 45:
raise Exception('Profile has invalid exp associated with it!')
# Verify empty profile has no scores on it
scores = self.verify_game_load_m(ref_id)
if len(scores) > 0:
raise Exception('Score on an empty profile!')
# Verify score saving and updating
for phase in [1, 2]:
if phase == 1:
dummyscores = [
# An okay score on a chart
{
'id': 1,
'chart': 1,
'grade': 3,
'clear_type': 2,
'score': 765432,
},
# A good score on an easier chart of the same song
{
'id': 1,
'chart': 0,
'grade': 6,
'clear_type': 3,
'score': 7654321,
},
# A bad score on a hard chart
{
'id': 2,
'chart': 2,
'grade': 1,
'clear_type': 1,
'score': 12345,
},
# A terrible score on an easy chart
{
'id': 3,
'chart': 0,
'grade': 1,
'clear_type': 1,
'score': 123,
},
]
if phase == 2:
dummyscores = [
# A better score on the same chart
{
'id': 1,
'chart': 1,
'grade': 5,
'clear_type': 3,
'score': 8765432,
},
# A worse score on another same chart
{
'id': 1,
'chart': 0,
'grade': 4,
'clear_type': 2,
'score': 6543210,
'expected_score': 7654321,
'expected_clear_type': 3,
'expected_grade': 6,
},
]
for dummyscore in dummyscores:
self.verify_game_save_m(location, ref_id, dummyscore)
scores = self.verify_game_load_m(ref_id)
for expected in dummyscores:
actual = None
for received in scores:
if received['id'] == expected['id'] and received['chart'] == expected['chart']:
actual = received
break
if actual is None:
raise Exception(f"Didn't find song {expected['id']} chart {expected['chart']} in response!")
if 'expected_score' in expected:
expected_score = expected['expected_score']
else:
expected_score = expected['score']
if 'expected_grade' in expected:
expected_grade = expected['expected_grade']
else:
expected_grade = expected['grade']
if 'expected_clear_type' in expected:
expected_clear_type = expected['expected_clear_type']
else:
expected_clear_type = expected['clear_type']
if actual['score'] != expected_score:
raise Exception(f'Expected a score of \'{expected_score}\' for song \'{expected["id"]}\' chart \'{expected["chart"]}\' but got score \'{actual["score"]}\'')
if actual['grade'] != expected_grade:
raise Exception(f'Expected a grade of \'{expected_grade}\' for song \'{expected["id"]}\' chart \'{expected["chart"]}\' but got grade \'{actual["grade"]}\'')
if actual['clear_type'] != expected_clear_type:
raise Exception(f'Expected a clear_type of \'{expected_clear_type}\' for song \'{expected["id"]}\' chart \'{expected["chart"]}\' but got clear_type \'{actual["clear_type"]}\'')
# Sleep so we don't end up putting in score history on the same second
time.sleep(1)
else:
print("Skipping score checks for existing card")
# Verify high score tables
self.verify_game_hiscore(location)
# Verify paseli handling
if paseli_enabled:
print("PASELI enabled for this PCBID, executing PASELI checks")
else:
print("PASELI disabled for this PCBID, skipping PASELI checks")
return
sessid, balance = self.verify_eacoin_checkin(card)
if balance == 0:
print("Skipping PASELI consume check because card has 0 balance")
else:
self.verify_eacoin_consume(sessid, balance, random.randint(0, balance))
self.verify_eacoin_checkout(sessid)