1
0
mirror of synced 2024-11-12 01:00:46 +01:00

Add traffic test for festo, fix peace/kaimei referencing removed song and causing traffic failures.

This commit is contained in:
Jennifer Taylor 2022-10-03 02:10:43 +00:00
parent f203239098
commit e076757640
8 changed files with 951 additions and 14 deletions

View File

@ -2287,18 +2287,19 @@ class JubeatFesto(
# Grab categories stuff
fill_in_category = player.child('fill_in_category')
fill_in_category_normal = fill_in_category.child('normal')
if fill_in_category is not None:
newprofile.replace_int_array('normal_no_gray_flag_list', 16, fill_in_category_normal.child_value('no_gray_flag_list'))
newprofile.replace_int_array('normal_all_yellow_flag_list', 16, fill_in_category_normal.child_value('all_yellow_flag_list'))
newprofile.replace_int_array('normal_full_combo_flag_list', 16, fill_in_category_normal.child_value('full_combo_flag_list'))
newprofile.replace_int_array('normal_excellent_flag_list', 16, fill_in_category_normal.child_value('excellent_flag_list'))
fill_in_category_hard = fill_in_category.child('hard')
if fill_in_category is not None:
newprofile.replace_int_array('hard_no_gray_flag_list', 16, fill_in_category_hard.child_value('no_gray_flag_list'))
newprofile.replace_int_array('hard_all_yellow_flag_list', 16, fill_in_category_hard.child_value('all_yellow_flag_list'))
newprofile.replace_int_array('hard_full_combo_flag_list', 16, fill_in_category_hard.child_value('full_combo_flag_list'))
newprofile.replace_int_array('hard_excellent_flag_list', 16, fill_in_category_hard.child_value('excellent_flag_list'))
fill_in_category_normal = fill_in_category.child('normal')
if fill_in_category_normal is not None:
newprofile.replace_int_array('normal_no_gray_flag_list', 16, fill_in_category_normal.child_value('no_gray_flag_list'))
newprofile.replace_int_array('normal_all_yellow_flag_list', 16, fill_in_category_normal.child_value('all_yellow_flag_list'))
newprofile.replace_int_array('normal_full_combo_flag_list', 16, fill_in_category_normal.child_value('full_combo_flag_list'))
newprofile.replace_int_array('normal_excellent_flag_list', 16, fill_in_category_normal.child_value('excellent_flag_list'))
fill_in_category_hard = fill_in_category.child('hard')
if fill_in_category_hard is not None:
newprofile.replace_int_array('hard_no_gray_flag_list', 16, fill_in_category_hard.child_value('no_gray_flag_list'))
newprofile.replace_int_array('hard_all_yellow_flag_list', 16, fill_in_category_hard.child_value('all_yellow_flag_list'))
newprofile.replace_int_array('hard_full_combo_flag_list', 16, fill_in_category_hard.child_value('full_combo_flag_list'))
newprofile.replace_int_array('hard_excellent_flag_list', 16, fill_in_category_hard.child_value('excellent_flag_list'))
# jbox stuff
jbox = player.child('jbox')

View File

@ -90,7 +90,7 @@ class BaseClient:
if not self.__assert_path(root, path):
raise Exception(f'Path \'{path}\' not found in root node:\n{root}')
def verify_services_get(self, expected_services: List[str]=[]) -> None:
def verify_services_get(self, expected_services: List[str]=[], include_net: bool = False) -> None:
call = self.call_node()
# Construct node
@ -105,6 +105,21 @@ class BaseClient:
info.add_child(Node.string('AVS2', self.config['avs']))
if include_net:
net = Node.void('net')
services.add_child(net)
iface = Node.void('if')
net.add_child(iface)
iface.add_child(Node.u8('id', 0))
iface.add_child(Node.bool('valid', True))
iface.add_child(Node.u8('type', 1))
iface.add_child(Node.u8_array('mac', [1, 2, 3, 4, 5, 6]))
iface.add_child(Node.ipv4('addr', '10.0.0.100'))
iface.add_child(Node.ipv4('bcast', '10.0.0.255'))
iface.add_child(Node.ipv4('netmask', '255.255.255.0'))
iface.add_child(Node.ipv4('gateway', '10.0.0.1'))
iface.add_child(Node.ipv4('dhcp', '10.0.0.1'))
# Swap with server
resp = self.exchange('core/services', call)

View File

@ -3,6 +3,7 @@ from bemani.client.jubeat.saucerfulfill import JubeatSaucerFulfillClient
from bemani.client.jubeat.prop import JubeatPropClient
from bemani.client.jubeat.qubell import JubeatQubellClient
from bemani.client.jubeat.clan import JubeatClanClient
from bemani.client.jubeat.festo import JubeatFestoClient
__all__ = [
@ -11,4 +12,5 @@ __all__ = [
"JubeatPropClient",
"JubeatQubellClient",
"JubeatClanClient",
"JubeatFestoClient",
]

View File

@ -0,0 +1,905 @@
import random
import time
from typing import Any, Dict, List, Optional
from bemani.client.base import BaseClient
from bemani.common import CardCipher, Time
from bemani.protocol import Node
class JubeatFestoClient(BaseClient):
NAME = 'TEST'
def __verify_info(self, resp: Node, base: str) -> None:
# Verify that response is correct
self.assert_path(resp, f"response/{base}/data/info/event_info")
self.assert_path(resp, f"response/{base}/data/info/genre_def_music")
self.assert_path(resp, f"response/{base}/data/info/black_jacket_list")
self.assert_path(resp, f"response/{base}/data/info/white_music_list")
self.assert_path(resp, f"response/{base}/data/info/white_marker_list")
self.assert_path(resp, f"response/{base}/data/info/white_theme_list")
self.assert_path(resp, f"response/{base}/data/info/open_music_list")
self.assert_path(resp, f"response/{base}/data/info/shareable_music_list")
self.assert_path(resp, f"response/{base}/data/info/hot_music_list")
self.assert_path(resp, f"response/{base}/data/info/jbox/point")
self.assert_path(resp, f"response/{base}/data/info/jbox/emblem/normal/index")
self.assert_path(resp, f"response/{base}/data/info/jbox/emblem/premium/index")
self.assert_path(resp, f"response/{base}/data/info/born/status")
self.assert_path(resp, f"response/{base}/data/info/born/year")
self.assert_path(resp, f"response/{base}/data/info/konami_logo_50th/is_available")
self.assert_path(resp, f"response/{base}/data/info/expert_option/is_available")
self.assert_path(resp, f"response/{base}/data/info/all_music_matching/is_available")
self.assert_path(resp, f"response/{base}/data/info/department/shop_list")
self.assert_path(resp, f"response/{base}/data/info/question_list")
# Don't bother asserting on actual courses, this is highly specific.
self.assert_path(resp, f"response/{base}/data/info/course_list")
self.assert_path(resp, f"response/{base}/data/info/share_music")
self.assert_path(resp, f"response/{base}/data/info/weekly_music/value")
self.assert_path(resp, f"response/{base}/data/info/weekly_music/music_list")
self.assert_path(resp, f"response/{base}/data/info/add_default_music_list")
# These below I'm not sure are needed, and I think some of them mess with stone tablet.
self.assert_path(resp, f"response/{base}/data/info/team_battle")
self.assert_path(resp, f"response/{base}/data/info/emo_list")
self.assert_path(resp, f"response/{base}/data/info/hike_event")
self.assert_path(resp, f"response/{base}/data/info/tip_list")
self.assert_path(resp, f"response/{base}/data/info/travel")
self.assert_path(resp, f"response/{base}/data/info/stamp")
def verify_shopinfo_regist(self) -> None:
call = self.call_node()
# Construct node
shopinfo = Node.void('shopinfo')
shopinfo.set_attribute('method', 'regist')
call.add_child(shopinfo)
shop = Node.void('shop')
shopinfo.add_child(shop)
shop.add_child(Node.string('name', ''))
shop.add_child(Node.string('pref', 'JP-14'))
shop.add_child(Node.string('softwareid', ''))
shop.add_child(Node.string('systemid', self.pcbid))
shop.add_child(Node.string('hardwareid', '01020304050607080900'))
shop.add_child(Node.string('locationid', 'US-1'))
shop.add_child(Node.string('monitor', 'D26L155 6252 151'))
testmode = Node.void('testmode')
shop.add_child(testmode)
sound = Node.void('sound')
testmode.add_child(sound)
sound.add_child(Node.u8('volume_in_attract', 0))
game = Node.void('game')
testmode.add_child(game)
play_settings = Node.void('play_settings')
game.add_child(play_settings)
play_settings.add_child(Node.u8('max_member', 1))
game_settings = Node.void('game_settings')
game.add_child(game_settings)
game_settings.add_child(Node.u8('close_set', 0))
game_settings.add_child(Node.s32('close_time', 0))
display_type_settings = Node.void('display_type_settings')
game.add_child(display_type_settings)
display_type_settings.add_child(Node.u8('display_type', 2))
coin = Node.void('coin')
testmode.add_child(coin)
coin.add_child(Node.u8('free_play', 0))
coin.add_child(Node.u8('free_first_play', 1))
coin.add_child(Node.u8('coin_slot', 8))
coin.add_child(Node.u8('start', 1))
network = Node.void('network')
testmode.add_child(network)
network.add_child(Node.u8('cabinet_id', 1))
bookkeeping = Node.void('bookkeeping')
testmode.add_child(bookkeeping)
bookkeeping.add_child(Node.u8('enable', 0))
clock = Node.void('clock')
testmode.add_child(clock)
clock.add_child(Node.u8('enable', 1))
clock.add_child(Node.s32('offset', 0))
virtual_coin = Node.void('virtual_coin')
testmode.add_child(virtual_coin)
pattern1 = Node.void('pattern1')
virtual_coin.add_child(pattern1)
pattern1.add_child(Node.u16('basic_rate', 1000))
pattern1.add_child(Node.u8('balance_of_credit', 0))
pattern1.add_child(Node.u8('is_premium_start', 0))
pattern1.add_child(Node.u8('service_value', 10))
pattern1.add_child(Node.u8('service_limit', 10))
pattern1.add_child(Node.u8('service_time_start_h', 7))
pattern1.add_child(Node.u8('service_time_start_m', 0))
pattern1.add_child(Node.u8('service_time_end_h', 11))
pattern1.add_child(Node.u8('service_time_end_m', 0))
pattern2 = Node.void('pattern2')
virtual_coin.add_child(pattern2)
pattern2.add_child(Node.u16('basic_rate', 1000))
pattern2.add_child(Node.u8('balance_of_credit', 0))
pattern2.add_child(Node.u8('is_premium_start', 0))
pattern2.add_child(Node.u8('service_value', 10))
pattern2.add_child(Node.u8('service_limit', 10))
pattern2.add_child(Node.u8('service_time_start_h', 7))
pattern2.add_child(Node.u8('service_time_start_m', 0))
pattern2.add_child(Node.u8('service_time_end_h', 11))
pattern2.add_child(Node.u8('service_time_end_m', 0))
pattern3 = Node.void('pattern3')
virtual_coin.add_child(pattern3)
pattern3.add_child(Node.u16('basic_rate', 1000))
pattern3.add_child(Node.u8('balance_of_credit', 0))
pattern3.add_child(Node.u8('is_premium_start', 0))
pattern3.add_child(Node.u8('service_value', 10))
pattern3.add_child(Node.u8('service_limit', 10))
pattern3.add_child(Node.u8('service_time_start_h', 7))
pattern3.add_child(Node.u8('service_time_start_m', 0))
pattern3.add_child(Node.u8('service_time_end_h', 11))
pattern3.add_child(Node.u8('service_time_end_m', 0))
schedule = Node.void('schedule')
virtual_coin.add_child(schedule)
schedule.add_child(Node.u8('mon', 0))
schedule.add_child(Node.u8('tue', 0))
schedule.add_child(Node.u8('wed', 0))
schedule.add_child(Node.u8('thu', 0))
schedule.add_child(Node.u8('fri', 0))
schedule.add_child(Node.u8('sat', 0))
schedule.add_child(Node.u8('sun', 0))
schedule.add_child(Node.u8('holi', 0))
tax = Node.void('tax')
testmode.add_child(tax)
tax.add_child(Node.u8('tax_phase', 0))
tax.add_child(Node.u8('tax_mode', 0))
# Swap with server
resp = self.exchange('', call)
self.assert_path(resp, "response/shopinfo/data/cabid")
self.assert_path(resp, "response/shopinfo/data/locationid")
self.assert_path(resp, "response/shopinfo/data/tax_phase")
self.assert_path(resp, "response/shopinfo/data/facility/exist")
# Verify server flags for events and stuff.
self.__verify_info(resp, 'shopinfo')
def verify_logger_report(self) -> None:
call = self.call_node()
# Construct node
logger = Node.void('logger')
call.add_child(logger)
logger.set_attribute('method', 'report')
logger.add_child(Node.s32('retry', 0))
data = Node.void('data')
logger.add_child(data)
data.add_child(Node.string('code', 'pcbinfo_01'))
data.add_child(Node.string('information', 'u can literally put anything here lmao'))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/logger/@status")
def verify_demodata_get_info(self) -> None:
call = self.call_node()
# Construct node
demodata = Node.void('demodata')
call.add_child(demodata)
demodata.set_attribute('method', 'get_info')
pcbinfo = Node.void('pcbinfo')
demodata.add_child(pcbinfo)
pcbinfo.set_attribute('client_data_version', '0')
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/demodata/data/info/black_jacket_list")
def verify_demodata_get_news(self) -> None:
call = self.call_node()
# Construct node
demodata = Node.void('demodata')
call.add_child(demodata)
demodata.set_attribute('method', 'get_news')
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/demodata/data/officialnews/@count")
def verify_demodata_get_jbox_list(self) -> None:
call = self.call_node()
# Construct node
demodata = Node.void('demodata')
call.add_child(demodata)
demodata.set_attribute('method', 'get_jbox_list')
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/demodata/@status")
def verify_lab_get_ranking(self) -> None:
call = self.call_node()
# Construct node
lab = Node.void('lab')
call.add_child(lab)
lab.set_attribute('method', 'get_ranking')
lab.add_child(Node.s32('retry', 0))
lab.add_child(Node.s8('category', 1))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/lab/category")
self.assert_path(resp, "response/lab/entries/@count")
# Category should be the same as when we requested it.
if resp.child_value("lab/category") != 1:
raise Exception("Lab category did not get round-tripped!")
def __verify_profile(self, resp: Node, should_inherit: bool) -> int:
for item in [
'tune_cnt',
'save_cnt',
'saved_cnt',
'fc_cnt',
'ex_cnt',
'clear_cnt',
'match_cnt',
'beat_cnt',
'mynews_cnt',
'bonus_tune_points',
'is_bonus_tune_played',
'inherit',
'mtg_entry_cnt',
'mtg_hold_cnt',
'mtg_result',
]:
self.assert_path(resp, f"response/gametop/data/player/info/{item}")
# Technically we could use this to check profile succession doesn't show the
# tutorial, but we don't go that far yet.
if resp.child_value("gametop/data/player/info/inherit") != should_inherit:
raise Exception("Inherit flag wrong for profile!")
for item in [
'music_list',
'secret_list',
'theme_list',
'marker_list',
'title_list',
'parts_list',
'emblem_list',
'commu_list',
'new/secret_list',
'new/theme_list',
'new/marker_list',
]:
self.assert_path(resp, f"response/gametop/data/player/item/{item}")
for item in [
'play_time',
'shopname',
'areaname',
'music_id',
'seq_id',
'sort',
'category',
'expert_option',
]:
self.assert_path(resp, f"response/gametop/data/player/last/{item}")
for item in [
'marker',
'theme',
'title',
'parts',
'rank_sort',
'combo_disp',
'emblem',
'matching',
'hard',
'hazard',
]:
self.assert_path(resp, f"response/gametop/data/player/last/settings/{item}")
# Misc stuff
self.assert_path(resp, "response/gametop/data/player/session_id")
self.assert_path(resp, "response/gametop/data/player/event_flag")
# Profile settings
self.assert_path(resp, "response/gametop/data/player/name")
self.assert_path(resp, "response/gametop/data/player/jid")
if resp.child_value('gametop/data/player/name') != self.NAME:
raise Exception("Unexpected name received from server!")
# Required nodes for events and stuff
self.assert_path(resp, "response/gametop/data/player/rivallist")
self.assert_path(resp, "response/gametop/data/player/lab_edit_seq")
self.assert_path(resp, "response/gametop/data/player/fc_challenge/today/music_id")
self.assert_path(resp, "response/gametop/data/player/fc_challenge/today/state")
self.assert_path(resp, "response/gametop/data/player/fc_challenge/whim/music_id")
self.assert_path(resp, "response/gametop/data/player/fc_challenge/whim/state")
self.assert_path(resp, "response/gametop/data/player/official_news/news_list")
self.assert_path(resp, "response/gametop/data/player/history/@count")
self.assert_path(resp, "response/gametop/data/player/free_first_play/is_available")
self.assert_path(resp, "response/gametop/data/player/event_info")
self.assert_path(resp, "response/gametop/data/player/jbox/point")
self.assert_path(resp, "response/gametop/data/player/jbox/emblem/normal/index")
self.assert_path(resp, "response/gametop/data/player/jbox/emblem/premium/index")
self.assert_path(resp, "response/gametop/data/player/new_music")
self.assert_path(resp, "response/gametop/data/player/navi/flag")
self.assert_path(resp, "response/gametop/data/player/gift_list")
self.assert_path(resp, "response/gametop/data/player/born/status")
self.assert_path(resp, "response/gametop/data/player/born/year")
self.assert_path(resp, "response/gametop/data/player/question_list")
self.assert_path(resp, "response/gametop/data/player/emo_list")
self.assert_path(resp, "response/gametop/data/player/server")
self.assert_path(resp, "response/gametop/data/player/course_list")
self.assert_path(resp, "response/gametop/data/player/course_list/category_list")
self.assert_path(resp, "response/gametop/data/player/fill_in_category/normal/no_gray_flag_list")
self.assert_path(resp, "response/gametop/data/player/fill_in_category/normal/all_yellow_flag_list")
self.assert_path(resp, "response/gametop/data/player/fill_in_category/normal/full_combo_flag_list")
self.assert_path(resp, "response/gametop/data/player/fill_in_category/normal/excellent_flag_list")
self.assert_path(resp, "response/gametop/data/player/fill_in_category/hard/no_gray_flag_list")
self.assert_path(resp, "response/gametop/data/player/fill_in_category/hard/all_yellow_flag_list")
self.assert_path(resp, "response/gametop/data/player/fill_in_category/hard/full_combo_flag_list")
self.assert_path(resp, "response/gametop/data/player/fill_in_category/hard/excellent_flag_list")
self.assert_path(resp, "response/gametop/data/player/department/shop_list")
self.assert_path(resp, "response/gametop/data/player/stamp/sheet_list")
self.assert_path(resp, "response/gametop/data/player/festo_dungeon/phase")
self.assert_path(resp, "response/gametop/data/player/festo_dungeon/clear_flag")
# Return the jid
return resp.child_value('gametop/data/player/jid')
def verify_gameend_regist(
self,
ref_id: str,
jid: int,
scores: List[Dict[str, Any]],
) -> None:
call = self.call_node()
# Construct node
gameend = Node.void('gameend')
call.add_child(gameend)
gameend.set_attribute('method', 'regist')
gameend.add_child(Node.s32('retry', 0))
pcbinfo = Node.void('pcbinfo')
gameend.add_child(pcbinfo)
pcbinfo.set_attribute('client_data_version', '0')
data = Node.void('data')
gameend.add_child(data)
player = Node.void('player')
data.add_child(player)
player.add_child(Node.string('refid', ref_id))
player.add_child(Node.s32('jid', jid))
player.add_child(Node.string('name', self.NAME))
result = Node.void('result')
data.add_child(result)
result.set_attribute('count', str(len(scores)))
# Send scores
scoreid = 0
for score in scores:
# Always played
bits = 0x1
if score['clear']:
bits |= 0x2
if score['fc']:
bits |= 0x4
if score['ex']:
bits |= 0x8
# Intentionally starting at 1 because that's what the game does
scoreid = scoreid + 1
tune = Node.void('tune')
result.add_child(tune)
tune.set_attribute('id', str(scoreid))
tune.add_child(Node.s32('music', score['id']))
tune.add_child(Node.s64('timestamp', Time.now() * 1000))
player_1 = Node.void('player')
tune.add_child(player_1)
player_1.set_attribute('rank', '1')
scorenode = Node.s32('score', score['score'])
player_1.add_child(scorenode)
scorenode.set_attribute('seq', str(score['chart']))
scorenode.set_attribute('clear', str(bits))
scorenode.set_attribute('combo', '69')
player_1.add_child(Node.u8_array('mbar', [239, 175, 170, 170, 190, 234, 187, 158, 153, 230, 170, 90, 102, 170, 85, 150, 150, 102, 85, 234, 171, 169, 157, 150, 170, 101, 230, 90, 214, 255]))
player_1.add_child(Node.bool('is_hard_mode', score['hard']))
player_1.add_child(Node.s32('music_rate', score['rate']))
# Swap with server
resp = self.exchange('', call)
self.assert_path(resp, "response/gameend/data/player/session_id")
self.assert_path(resp, "response/gameend/data/player/end_final_session_id")
def verify_gameend_final(
self,
ref_id: str,
jid: int,
) -> None:
call = self.call_node()
# Construct node
gameend = Node.void('gameend')
call.add_child(gameend)
gameend.set_attribute('method', 'final')
gameend.add_child(Node.s32('retry', 0))
pcbinfo = Node.void('pcbinfo')
gameend.add_child(pcbinfo)
pcbinfo.set_attribute('client_data_version', '0')
data = Node.void('data')
gameend.add_child(data)
info = Node.void('info')
data.add_child(info)
born = Node.void('born')
info.add_child(born)
born.add_child(Node.s8('status', 3))
born.add_child(Node.s16('year', 0))
info.add_child(Node.void('question_list'))
player = Node.void('player')
data.add_child(player)
player.add_child(Node.string('refid', ref_id))
player.add_child(Node.s32('jid', jid))
jbox = Node.void('jbox')
player.add_child(jbox)
jbox.add_child(Node.s32('point', 0))
emblem = Node.void('emblem')
jbox.add_child(emblem)
emblem.add_child(Node.u8('type', 0))
emblem.add_child(Node.s16('index', 0))
# Swap with server
resp = self.exchange('', call)
self.assert_path(resp, "response/gameend/@status")
def verify_gametop_regist(self, card_id: str, ref_id: str) -> int:
call = self.call_node()
# Construct node
gametop = Node.void('gametop')
call.add_child(gametop)
gametop.set_attribute('method', 'regist')
data = Node.void('data')
gametop.add_child(data)
player = Node.void('player')
data.add_child(player)
player.add_child(Node.string('refid', ref_id))
player.add_child(Node.string('datid', ref_id))
player.add_child(Node.string('uid', card_id))
player.add_child(Node.bool('inherit', True))
player.add_child(Node.string('name', self.NAME))
# Swap with server
resp = self.exchange('', call)
# Verify server flags for events and stuff.
self.__verify_info(resp, 'gametop')
# Verify nodes that cause crashes or failed card-ins if they don't exist
return self.__verify_profile(resp, False)
def verify_gametop_get_pdata(self, card_id: str, ref_id: str) -> int:
call = self.call_node()
# Construct node
gametop = Node.void('gametop')
call.add_child(gametop)
gametop.set_attribute('method', 'get_pdata')
retry = Node.s32('retry', 0)
gametop.add_child(retry)
data = Node.void('data')
gametop.add_child(data)
player = Node.void('player')
data.add_child(player)
player.add_child(Node.string('refid', ref_id))
player.add_child(Node.string('datid', ref_id))
player.add_child(Node.string('uid', card_id))
player.add_child(Node.string('card_no', CardCipher.encode(card_id)))
# Swap with server
resp = self.exchange('', call)
# Verify nodes that cause crashes if they don't exist
return self.__verify_profile(resp, False)
def verify_gametop_get_mdata(self, jid: int) -> Dict[str, List[Dict[str, Any]]]:
call = self.call_node()
# Construct node
gametop = Node.void('gametop')
call.add_child(gametop)
gametop.set_attribute('method', 'get_mdata')
retry = Node.s32('retry', 0)
gametop.add_child(retry)
data = Node.void('data')
gametop.add_child(data)
player = Node.void('player')
data.add_child(player)
player.add_child(Node.s32('jid', jid))
# Technically the game sends this same packet 3 times, one with
# each value 1, 2, 3 here. This is for sharding across 3 requests,
# and the game will combine all 3 responses. Its up to the server to
# handle this the way it wants, and we just send everything back in the
# first request and ignore the rest.
player.add_child(Node.s8('mdata_ver', 1))
player.add_child(Node.bool('rival', False))
# Swap with server
resp = self.exchange('', call)
# Parse out scores
self.assert_path(resp, "response/gametop/data/player/jid")
self.assert_path(resp, "response/gametop/data/player/mdata_list")
if resp.child_value("gametop/data/player/jid") != jid:
raise Exception("Unexpected jid received from server!")
ret = {}
for musicdata in resp.child('gametop/data/player/mdata_list').children:
if musicdata.name != 'musicdata':
raise Exception('Unexpected node in playdata!')
music_id = musicdata.attribute('music_id')
scores_by_chart: List[Dict[str, int]] = [{}, {}, {}, {}, {}, {}]
def extract_cnts(name: str, offset: int, val: List[int]) -> None:
scores_by_chart[offset + 0][name] = val[0]
scores_by_chart[offset + 1][name] = val[1]
scores_by_chart[offset + 2][name] = val[2]
for subdata in musicdata.children:
if subdata.name == "normal":
offset = 0
elif subdata.name == "hard":
offset = 3
else:
raise Exception(f"Unexpected chart type {subdata.name}!")
extract_cnts('plays', offset, subdata.child_value('play_cnt'))
extract_cnts('clears', offset, subdata.child_value('clear_cnt'))
extract_cnts('full_combos', offset, subdata.child_value('fc_cnt'))
extract_cnts('excellents', offset, subdata.child_value('ex_cnt'))
extract_cnts('score', offset, subdata.child_value('score'))
extract_cnts('medal', offset, subdata.child_value('clear'))
extract_cnts('rate', offset, subdata.child_value('music_rate'))
ret[music_id] = scores_by_chart
return ret
def verify_gametop_get_meeting(self, jid: int) -> None:
call = self.call_node()
# Construct node
gametop = Node.void('gametop')
call.add_child(gametop)
gametop.set_attribute('method', 'get_meeting')
gametop.add_child(Node.s32('retry', 0))
data = Node.void('data')
gametop.add_child(data)
player = Node.void('player')
data.add_child(player)
player.add_child(Node.s32('jid', jid))
pcbinfo = Node.void('pcbinfo')
gametop.add_child(pcbinfo)
pcbinfo.set_attribute('client_data_version', '0')
# Swap with server
resp = self.exchange('', call)
# Verify expected nodes
self.assert_path(resp, "response/gametop/data/meeting/single/@count")
self.assert_path(resp, "response/gametop/data/meeting/tag/@count")
self.assert_path(resp, "response/gametop/data/reward/total")
self.assert_path(resp, "response/gametop/data/reward/point")
def verify_recommend_get_recommend(self, jid: int) -> None:
call = self.call_node()
# Construct node
recommend = Node.void('recommend')
call.add_child(recommend)
recommend.set_attribute('method', 'get_recommend')
recommend.add_child(Node.s32('retry', 0))
player = Node.void('player')
recommend.add_child(player)
player.add_child(Node.s32('jid', jid))
player.add_child(Node.void('music_list'))
# Swap with server
resp = self.exchange('', call)
# Verify expected nodes
self.assert_path(resp, "response/recommend/data/player/music_list")
def verify_demodata_get_hitchart(self) -> None:
call = self.call_node()
# Construct node
gametop = Node.void('demodata')
call.add_child(gametop)
gametop.set_attribute('method', 'get_hitchart')
# Swap with server
resp = self.exchange('', call)
# Verify expected nodes
self.assert_path(resp, "response/demodata/data/update")
self.assert_path(resp, "response/demodata/data/hitchart_lic/@count")
self.assert_path(resp, "response/demodata/data/hitchart_org/@count")
def verify_jbox_get_list(self, jid: int) -> None:
call = self.call_node()
# Construct node
jbox = Node.void('jbox')
call.add_child(jbox)
jbox.set_attribute('method', 'get_list')
data = Node.void('data')
jbox.add_child(data)
player = Node.void('player')
data.add_child(player)
player.add_child(Node.s32('jid', jid))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/jbox/selection_list")
def verify_jbox_get_agreement(self, jid: int) -> None:
call = self.call_node()
# Construct node
jbox = Node.void('jbox')
call.add_child(jbox)
jbox.set_attribute('method', 'get_agreement')
data = Node.void('data')
jbox.add_child(data)
player = Node.void('player')
data.add_child(player)
player.add_child(Node.s32('jid', jid))
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/jbox/is_agreement")
def verify(self, cardid: Optional[str]) -> None:
# Verify boot sequence is okay
self.verify_services_get(
expected_services=[
'pcbtracker',
'pcbevent',
'local',
'message',
'facility',
'cardmng',
'package',
'posevent',
'pkglist',
'dlstatus',
'eacoin',
'lobby',
'netlog',
'slocal',
'ntp',
'keepalive'
],
include_net=True,
)
paseli_enabled = self.verify_pcbtracker_alive(ecflag=3)
self.verify_package_list()
self.verify_message_get()
self.verify_facility_get(encoding='Shift-JIS')
self.verify_pcbevent_put()
self.verify_logger_report()
self.verify_shopinfo_regist()
self.verify_demodata_get_info()
self.verify_demodata_get_jbox_list()
self.verify_demodata_get_news()
self.verify_demodata_get_hitchart()
self.verify_lab_get_ranking()
# Verify card registration and profile lookup
if cardid is not None:
card = cardid
else:
card = self.random_card()
print(f"Generated random card ID {card} for use.")
if cardid is None:
self.verify_cardmng_inquire(card, msg_type='unregistered', paseli_enabled=paseli_enabled)
ref_id = self.verify_cardmng_getrefid(card)
if len(ref_id) != 16:
raise Exception(f'Invalid refid \'{ref_id}\' returned when registering card')
if ref_id != self.verify_cardmng_inquire(card, msg_type='new', paseli_enabled=paseli_enabled):
raise Exception(f'Invalid refid \'{ref_id}\' returned when querying card')
self.verify_gametop_regist(card, ref_id)
else:
print("Skipping new card checks for existing card")
ref_id = self.verify_cardmng_inquire(card, msg_type='query', paseli_enabled=paseli_enabled)
# Verify pin handling and return card handling
self.verify_cardmng_authpass(ref_id, correct=True)
self.verify_cardmng_authpass(ref_id, correct=False)
if ref_id != self.verify_cardmng_inquire(card, msg_type='query', paseli_enabled=paseli_enabled):
raise Exception(f'Invalid refid \'{ref_id}\' returned when querying card')
if cardid is None:
# Verify score handling
jid = self.verify_gametop_get_pdata(card, ref_id)
self.verify_recommend_get_recommend(jid)
scores = self.verify_gametop_get_mdata(jid)
self.verify_gametop_get_meeting(jid)
if scores is None:
raise Exception('Expected to get scores back, didn\'t get anything!')
if len(scores) > 0:
raise Exception('Got nonzero score count on a new card!')
# Verify end of game behavior
self.verify_jbox_get_list(jid)
self.verify_jbox_get_agreement(jid)
self.verify_gameend_final(ref_id, jid)
for phase in [1, 2]:
if phase == 1:
dummyscores = [
# An okay score on a chart
{
'id': 40000059,
'chart': 2,
'hard': False,
'clear': True,
'fc': False,
'ex': False,
'score': 800000,
'rate': 567,
'expected_medal': 0x3,
},
# A good score on an easier chart of the same song
{
'id': 40000059,
'chart': 1,
'hard': False,
'clear': True,
'fc': True,
'ex': False,
'score': 990000,
'rate': 456,
'expected_medal': 0x5,
},
# A perfect score on an easiest chart of the same song
{
'id': 40000059,
'chart': 0,
'hard': False,
'clear': True,
'fc': True,
'ex': True,
'score': 1000000,
'rate': 678,
'expected_medal': 0x9,
},
# A bad score on a hard chart
{
'id': 30000027,
'chart': 2,
'hard': False,
'clear': False,
'fc': False,
'ex': False,
'score': 400000,
'rate': 123,
'expected_medal': 0x1,
},
# A terrible score on an easy chart
{
'id': 50000045,
'chart': 0,
'hard': False,
'clear': False,
'fc': False,
'ex': False,
'score': 100000,
'rate': 69,
'expected_medal': 0x1,
},
# A good score on a hard chart to make sure
# it doesn't pollute regular charts.
{
'id': 40000059,
'chart': 2,
'hard': True,
'clear': True,
'fc': False,
'ex': False,
'score': 812300,
'rate': 666,
'expected_medal': 0x3,
},
]
if phase == 2:
dummyscores = [
# A better score on the same chart
{
'id': 50000045,
'chart': 0,
'hard': False,
'clear': True,
'fc': False,
'ex': False,
'score': 850000,
'rate': 555,
'expected_medal': 0x3,
},
# A worse score on another same chart
{
'id': 40000059,
'chart': 1,
'hard': False,
'clear': True,
'fc': False,
'ex': False,
'score': 925000,
'rate': 432,
'expected_score': 990000,
'expected_rate': 456,
'expected_medal': 0x7,
},
]
self.verify_gameend_regist(ref_id, jid, dummyscores)
jid = self.verify_gametop_get_pdata(card, ref_id)
scores = self.verify_gametop_get_mdata(jid)
for score in dummyscores:
chart = score['chart'] + (3 if score['hard'] else 0)
newscore = scores[str(score['id'])][chart]
if 'expected_score' in score:
expected_score = score['expected_score']
else:
expected_score = score['score']
if 'expected_rate' in score:
expected_rate = score['expected_rate']
else:
expected_rate = score['rate']
if newscore['score'] != expected_score:
raise Exception(f'Expected a score of \'{expected_score}\' for song \'{score["id"]}\' chart \'{chart}\' but got score \'{newscore["score"]}\'')
if newscore['rate'] != expected_rate:
raise Exception(f'Expected a rate of \'{expected_rate}\' for song \'{score["id"]}\' chart \'{chart}\' but got rate \'{newscore["rate"]}\'')
if newscore['medal'] != score['expected_medal']:
raise Exception(f'Expected a medal of \'{score["expected_medal"]}\' for song \'{score["id"]}\' chart \'{chart}\' but got medal \'{newscore["medal"]}\'')
# Sleep so we don't end up putting in score history on the same second
time.sleep(1)
else:
print("Skipping score checks for existing card")
# Verify paseli handling
if paseli_enabled:
print("PASELI enabled for this PCBID, executing PASELI checks")
else:
print("PASELI disabled for this PCBID, skipping PASELI checks")
return
sessid, balance = self.verify_eacoin_checkin(card)
if balance == 0:
print("Skipping PASELI consume check because card has 0 balance")
else:
self.verify_eacoin_consume(sessid, balance, random.randint(0, balance))
self.verify_eacoin_checkout(sessid)

View File

@ -581,7 +581,7 @@ class PopnMusicKaimeiClient(BaseClient):
},
]
# Random score to add in
songid = random.randint(907, 950)
songid = random.randint(920, 950)
chartid = random.randint(0, 3)
score = random.randint(0, 100000)
medal = random.randint(1, 11)

View File

@ -581,7 +581,7 @@ class PopnMusicPeaceClient(BaseClient):
},
]
# Random score to add in
songid = random.randint(907, 950)
songid = random.randint(920, 950)
chartid = random.randint(0, 3)
score = random.randint(0, 100000)
medal = random.randint(1, 11)

View File

@ -19,6 +19,7 @@ from bemani.client.jubeat import (
JubeatPropClient,
JubeatQubellClient,
JubeatClanClient,
JubeatFestoClient,
)
from bemani.client.popn import (
PopnMusicTuneStreetClient,
@ -139,6 +140,12 @@ def get_client(proto: ClientProtocol, pcbid: str, game: str, config: Dict[str, A
pcbid,
config,
)
if game == 'jubeat-festo':
return JubeatFestoClient(
proto,
pcbid,
config,
)
if game == 'iidx-rootage':
return IIDXRootageClient(
proto,
@ -380,6 +387,11 @@ def mainloop(address: str, port: int, configfile: str, action: str, game: str, c
'model': "L44:J:E:A:2018070901",
'avs': "2.17.3 r8311",
},
'jubeat-festo': {
'name': "Jubeat Festo",
'model': "L44:J:B:A:2022052400",
'avs': "2.17.3 r8311",
},
'iidx-rootage': {
'name': "Beatmania IIDX ROOTAGE",
'model': "LDJ:J:A:A:2019090200",
@ -586,6 +598,7 @@ def main() -> None:
'jubeat-6': 'jubeat-prop',
'jubeat-7': 'jubeat-qubell',
'jubeat-8': 'jubeat-clan',
'jubeat-9': 'jubeat-festo',
'ddr-12': 'ddr-x2',
'ddr-13': 'ddr-x3',
'ddr-14': 'ddr-2013',

View File

@ -23,6 +23,7 @@ declare -a arr=(
"jubeat-6"
"jubeat-7"
"jubeat-8"
"jubeat-9"
"ddr-12"
"ddr-13"
"ddr-2013"