import random import time from typing import Any, Dict, Optional, Tuple from bemani.client.base import BaseClient from bemani.protocol import Node class IIDXTricoroClient(BaseClient): NAME = 'TEST' def verify_shop_getname(self, lid: str) -> str: call = self.call_node() # Construct node IIDX21shop = Node.void('shop') call.add_child(IIDX21shop) IIDX21shop.set_attribute('method', 'getname') IIDX21shop.set_attribute('lid', lid) # Swap with server resp = self.exchange('', call) # Verify that response is correct self.assert_path(resp, "response/shop/@opname") self.assert_path(resp, "response/shop/@pid") self.assert_path(resp, "response/shop/@cls_opt") return resp.child('shop').attribute('opname') def verify_shop_savename(self, lid: str, name: str) -> None: call = self.call_node() # Construct node IIDX21shop = Node.void('shop') IIDX21shop.set_attribute('lid', lid) IIDX21shop.set_attribute('pid', '51') IIDX21shop.set_attribute('method', 'savename') IIDX21shop.set_attribute('cls_opt', '0') IIDX21shop.set_attribute('ccode', 'US') IIDX21shop.set_attribute('opname', name) IIDX21shop.set_attribute('rcode', '.') call.add_child(IIDX21shop) # Swap with server resp = self.exchange('', call) # Verify that response is correct self.assert_path(resp, "response/shop") def verify_pc_common(self) -> None: call = self.call_node() # Construct node IIDX21pc = Node.void('pc') call.add_child(IIDX21pc) IIDX21pc.set_attribute('method', 'common') # Swap with server resp = self.exchange('', call) # Verify that response is correct self.assert_path(resp, "response/pc/ir/@beat") self.assert_path(resp, "response/pc/limit/@phase") self.assert_path(resp, "response/pc/boss/@phase") self.assert_path(resp, "response/pc/red/@phase") self.assert_path(resp, "response/pc/yellow/@phase") self.assert_path(resp, "response/pc/medal/@phase") self.assert_path(resp, "response/pc/tricolettepark/@open") self.assert_path(resp, "response/pc/cafe/@open") def verify_music_crate(self) -> None: call = self.call_node() # Construct node IIDX21pc = Node.void('music') call.add_child(IIDX21pc) IIDX21pc.set_attribute('method', 'crate') # Swap with server resp = self.exchange('', call) self.assert_path(resp, "response/music") for child in resp.child("music").children: if child.name != 'c': raise Exception(f'Invalid node {child} in clear rate response!') if len(child.value) != 12: raise Exception(f'Invalid node data {child} in clear rate response!') for v in child.value: if v < 0 or v > 101: raise Exception(f'Invalid clear percent {child} in clear rate response!') def verify_shop_getconvention(self, lid: str) -> None: call = self.call_node() # Construct node IIDX21pc = Node.void('shop') call.add_child(IIDX21pc) IIDX21pc.set_attribute('method', 'getconvention') IIDX21pc.set_attribute('lid', lid) # Swap with server resp = self.exchange('', call) # Verify that response is correct self.assert_path(resp, "response/shop/valid") self.assert_path(resp, "response/shop/@music_0") self.assert_path(resp, "response/shop/@music_1") self.assert_path(resp, "response/shop/@music_2") self.assert_path(resp, "response/shop/@music_3") def verify_pc_visit(self, extid: int, lid: str) -> None: call = self.call_node() # Construct node IIDX21pc = Node.void('pc') call.add_child(IIDX21pc) IIDX21pc.set_attribute('iidxid', str(extid)) IIDX21pc.set_attribute('lid', lid) IIDX21pc.set_attribute('method', 'visit') IIDX21pc.set_attribute('pid', '51') # Swap with server resp = self.exchange('', call) # Verify that response is correct self.assert_path(resp, "response/pc/@aflg") self.assert_path(resp, "response/pc/@anum") self.assert_path(resp, "response/pc/@pflg") self.assert_path(resp, "response/pc/@pnum") self.assert_path(resp, "response/pc/@sflg") self.assert_path(resp, "response/pc/@snum") def verify_ranking_getranker(self, lid: str) -> None: for clid in [0, 1, 2, 3, 4, 5, 6]: call = self.call_node() # Construct node IIDX21pc = Node.void('ranking') call.add_child(IIDX21pc) IIDX21pc.set_attribute('method', 'getranker') IIDX21pc.set_attribute('lid', lid) IIDX21pc.set_attribute('clid', str(clid)) # Swap with server resp = self.exchange('', call) # Verify that response is correct self.assert_path(resp, "response/ranking") def verify_shop_sentinfo(self, lid: str) -> None: call = self.call_node() # Construct node IIDX21pc = Node.void('shop') call.add_child(IIDX21pc) IIDX21pc.set_attribute('method', 'sentinfo') IIDX21pc.set_attribute('lid', lid) IIDX21pc.set_attribute('bflg', '1') IIDX21pc.set_attribute('bnum', '2') IIDX21pc.set_attribute('ioid', '0') IIDX21pc.set_attribute('tax_phase', '0') # Swap with server resp = self.exchange('', call) # Verify that response is correct self.assert_path(resp, "response/shop") def verify_pc_get(self, ref_id: str, card_id: str, lid: str) -> Dict[str, Any]: call = self.call_node() # Construct node IIDX21pc = Node.void('pc') call.add_child(IIDX21pc) IIDX21pc.set_attribute('rid', ref_id) IIDX21pc.set_attribute('did', ref_id) IIDX21pc.set_attribute('pid', '51') IIDX21pc.set_attribute('lid', lid) IIDX21pc.set_attribute('cid', card_id) IIDX21pc.set_attribute('method', 'get') IIDX21pc.set_attribute('ctype', '1') # Swap with server resp = self.exchange('', call) # Verify that the response is correct self.assert_path(resp, "response/pc/pcdata/@name") self.assert_path(resp, "response/pc/pcdata/@pid") self.assert_path(resp, "response/pc/pcdata/@id") self.assert_path(resp, "response/pc/pcdata/@idstr") self.assert_path(resp, "response/pc/packinfo") self.assert_path(resp, "response/pc/commonboss/@deller") self.assert_path(resp, "response/pc/commonboss/@orb") self.assert_path(resp, "response/pc/commonboss/@baron") self.assert_path(resp, "response/pc/secret/flg1") self.assert_path(resp, "response/pc/secret/flg2") self.assert_path(resp, "response/pc/secret/flg3") self.assert_path(resp, "response/pc/achievements/trophy") self.assert_path(resp, "response/pc/skin") self.assert_path(resp, "response/pc/grade") self.assert_path(resp, "response/pc/rlist") self.assert_path(resp, "response/pc/step") name = resp.child('pc/pcdata').attribute('name') if name != self.NAME: raise Exception(f'Invalid name \'{name}\' returned for Ref ID \'{ref_id}\'') return { 'extid': int(resp.child('pc/pcdata').attribute('id')), 'sp_dan': int(resp.child('pc/grade').attribute('sgid')), 'dp_dan': int(resp.child('pc/grade').attribute('dgid')), 'deller': int(resp.child('pc/commonboss').attribute('deller')), } def verify_music_getrank(self, extid: int) -> Dict[int, Dict[int, Dict[str, int]]]: scores: Dict[int, Dict[int, Dict[str, int]]] = {} for cltype in [0, 1]: # singles, doubles call = self.call_node() # Construct node IIDX21music = Node.void('music') call.add_child(IIDX21music) IIDX21music.set_attribute('method', 'getrank') IIDX21music.set_attribute('iidxid', str(extid)) IIDX21music.set_attribute('cltype', str(cltype)) # Swap with server resp = self.exchange('', call) self.assert_path(resp, "response/music/style") if int(resp.child('music/style').attribute('type')) != cltype: raise Exception('Returned wrong clear type for IIDX21music.getrank!') for child in resp.child('music').children: if child.name == 'm': if child.value[0] != -1: raise Exception('Got non-self score back when requesting only our scores!') music_id = child.value[1] normal_clear_status = child.value[2] hyper_clear_status = child.value[3] another_clear_status = child.value[4] normal_ex_score = child.value[5] hyper_ex_score = child.value[6] another_ex_score = child.value[7] normal_miss_count = child.value[8] hyper_miss_count = child.value[9] another_miss_count = child.value[10] if cltype == 0: normal = 0 hyper = 1 another = 2 else: normal = 3 hyper = 4 another = 5 if music_id not in scores: scores[music_id] = {} scores[music_id][normal] = { 'clear_status': normal_clear_status, 'ex_score': normal_ex_score, 'miss_count': normal_miss_count, } scores[music_id][hyper] = { 'clear_status': hyper_clear_status, 'ex_score': hyper_ex_score, 'miss_count': hyper_miss_count, } scores[music_id][another] = { 'clear_status': another_clear_status, 'ex_score': another_ex_score, 'miss_count': another_miss_count, } elif child.name == 'b': music_id = child.value[0] clear_status = child.value[1] scores[music_id][6] = { 'clear_status': clear_status, 'ex_score': -1, 'miss_count': -1, } return scores def verify_pc_save(self, extid: int, card: str, lid: str) -> None: call = self.call_node() # Construct node IIDX21pc = Node.void('pc') call.add_child(IIDX21pc) IIDX21pc.set_attribute('achi', '449') IIDX21pc.set_attribute('opt', '8208') IIDX21pc.set_attribute('gpos', '0') IIDX21pc.set_attribute('gno', '8') IIDX21pc.set_attribute('timing', '0') IIDX21pc.set_attribute('help', '0') IIDX21pc.set_attribute('sdhd', '0') IIDX21pc.set_attribute('sdtype', '0') IIDX21pc.set_attribute('notes', '31.484070') IIDX21pc.set_attribute('pase', '0') IIDX21pc.set_attribute('judge', '0') IIDX21pc.set_attribute('opstyle', '1') IIDX21pc.set_attribute('hispeed', '5.771802') IIDX21pc.set_attribute('mode', '6') IIDX21pc.set_attribute('pmode', '0') IIDX21pc.set_attribute('lift', '60') IIDX21pc.set_attribute('judgeAdj', '0') IIDX21pc.set_attribute('method', 'save') IIDX21pc.set_attribute('iidxid', str(extid)) IIDX21pc.set_attribute('lid', lid) IIDX21pc.set_attribute('cid', card) IIDX21pc.set_attribute('cltype', '0') IIDX21pc.set_attribute('ctype', '1') pyramid = Node.void('pyramid') IIDX21pc.add_child(pyramid) pyramid.set_attribute('point', '290') destiny_catharsis = Node.void('destiny_catharsis') IIDX21pc.add_child(destiny_catharsis) destiny_catharsis.set_attribute('point', '290') bemani_summer_collabo = Node.void('bemani_summer_collabo') IIDX21pc.add_child(bemani_summer_collabo) bemani_summer_collabo.set_attribute('point', '290') deller = Node.void('deller') IIDX21pc.add_child(deller) deller.set_attribute('deller', '150') # Swap with server resp = self.exchange('', call) self.assert_path(resp, "response/pc") def verify_music_reg(self, extid: int, lid: str, score: Dict[str, Any]) -> None: call = self.call_node() # Construct node IIDX21music = Node.void('music') call.add_child(IIDX21music) IIDX21music.set_attribute('convid', '-1') IIDX21music.set_attribute('iidxid', str(extid)) IIDX21music.set_attribute('pgnum', str(score['pgnum'])) IIDX21music.set_attribute('pid', '51') IIDX21music.set_attribute('rankside', '1') IIDX21music.set_attribute('cflg', str(score['clear_status'])) IIDX21music.set_attribute('method', 'reg') IIDX21music.set_attribute('gnum', str(score['gnum'])) IIDX21music.set_attribute('clid', str(score['chart'])) IIDX21music.set_attribute('mnum', str(score['mnum'])) IIDX21music.set_attribute('is_death', '0') IIDX21music.set_attribute('theory', '0') IIDX21music.set_attribute('shopconvid', lid) IIDX21music.set_attribute('mid', str(score['id'])) IIDX21music.set_attribute('shopflg', '1') IIDX21music.add_child(Node.binary('ghost', bytes([1] * 64))) # Swap with server resp = self.exchange('', call) self.assert_path(resp, "response/music/shopdata/@rank") self.assert_path(resp, "response/music/ranklist/data") def verify_music_appoint(self, extid: int, musicid: int, chart: int) -> Tuple[int, bytes]: call = self.call_node() # Construct node IIDX21music = Node.void('music') call.add_child(IIDX21music) IIDX21music.set_attribute('clid', str(chart)) IIDX21music.set_attribute('method', 'appoint') IIDX21music.set_attribute('ctype', '0') IIDX21music.set_attribute('iidxid', str(extid)) IIDX21music.set_attribute('subtype', '') IIDX21music.set_attribute('mid', str(musicid)) # Swap with server resp = self.exchange('', call) self.assert_path(resp, "response/music/mydata/@score") return ( int(resp.child('music/mydata').attribute('score')), resp.child_value('music/mydata'), ) def verify_pc_reg(self, ref_id: str, card_id: str, lid: str) -> int: call = self.call_node() # Construct node IIDX21pc = Node.void('pc') call.add_child(IIDX21pc) IIDX21pc.set_attribute('lid', lid) IIDX21pc.set_attribute('pid', '51') IIDX21pc.set_attribute('method', 'reg') IIDX21pc.set_attribute('cid', card_id) IIDX21pc.set_attribute('did', ref_id) IIDX21pc.set_attribute('rid', ref_id) IIDX21pc.set_attribute('name', self.NAME) # Swap with server resp = self.exchange('', call) # Verify nodes that cause crashes if they don't exist self.assert_path(resp, "response/pc/@id") self.assert_path(resp, "response/pc/@id_str") return int(resp.child('pc').attribute('id')) def verify_pc_playstart(self) -> None: call = self.call_node() # Construct node IIDX21pc = Node.void('pc') IIDX21pc.set_attribute('method', 'playstart') IIDX21pc.set_attribute('side', '1') call.add_child(IIDX21pc) # Swap with server resp = self.exchange('', call) # Verify nodes that cause crashes if they don't exist self.assert_path(resp, "response/pc") def verify_music_play(self, score: Dict[str, int]) -> None: call = self.call_node() # Construct node IIDX21music = Node.void('music') IIDX21music.set_attribute('opt', '64') IIDX21music.set_attribute('clid', str(score['chart'])) IIDX21music.set_attribute('mid', str(score['id'])) IIDX21music.set_attribute('gnum', str(score['gnum'])) IIDX21music.set_attribute('cflg', str(score['clear_status'])) IIDX21music.set_attribute('pgnum', str(score['pgnum'])) IIDX21music.set_attribute('pid', '51') IIDX21music.set_attribute('method', 'play') call.add_child(IIDX21music) # Swap with server resp = self.exchange('', call) # Verify nodes that cause crashes if they don't exist self.assert_path(resp, "response/music/@clid") self.assert_path(resp, "response/music/@crate") self.assert_path(resp, "response/music/@frate") self.assert_path(resp, "response/music/@mid") def verify_pc_playend(self) -> None: call = self.call_node() # Construct node IIDX21pc = Node.void('pc') IIDX21pc.set_attribute('cltype', '0') IIDX21pc.set_attribute('bookkeep', '0') IIDX21pc.set_attribute('mode', '1') IIDX21pc.set_attribute('method', 'playend') call.add_child(IIDX21pc) # Swap with server resp = self.exchange('', call) # Verify nodes that cause crashes if they don't exist self.assert_path(resp, "response/pc") def verify_music_breg(self, iidxid: int, score: Dict[str, int]) -> None: call = self.call_node() # Construct node IIDX21music = Node.void('music') IIDX21music.set_attribute('gnum', str(score['gnum'])) IIDX21music.set_attribute('iidxid', str(iidxid)) IIDX21music.set_attribute('mid', str(score['id'])) IIDX21music.set_attribute('method', 'breg') IIDX21music.set_attribute('pgnum', str(score['pgnum'])) IIDX21music.set_attribute('cflg', str(score['clear_status'])) call.add_child(IIDX21music) # Swap with server resp = self.exchange('', call) # Verify nodes that cause crashes if they don't exist self.assert_path(resp, "response/music") def verify_grade_raised(self, iidxid: int, shop_name: str, dantype: str) -> None: call = self.call_node() # Construct node IIDX21grade = Node.void('grade') IIDX21grade.set_attribute('opname', shop_name) IIDX21grade.set_attribute('is_mirror', '0') IIDX21grade.set_attribute('oppid', '51') IIDX21grade.set_attribute('achi', '50') IIDX21grade.set_attribute('cflg', '4' if dantype == 'sp' else '3') IIDX21grade.set_attribute('gid', '5') IIDX21grade.set_attribute('iidxid', str(iidxid)) IIDX21grade.set_attribute('gtype', '0' if dantype == 'sp' else '1') IIDX21grade.set_attribute('is_ex', '0') IIDX21grade.set_attribute('pside', '0') IIDX21grade.set_attribute('method', 'raised') call.add_child(IIDX21grade) # Swap with server resp = self.exchange('', call) # Verify nodes that cause crashes if they don't exist self.assert_path(resp, "response/grade/@pnum") def verify(self, cardid: Optional[str]) -> None: # Verify boot sequence is okay self.verify_services_get( expected_services=[ 'pcbtracker', 'pcbevent', 'local', 'message', 'facility', 'cardmng', 'package', 'posevent', 'pkglist', 'dlstatus', 'eacoin', 'lobby', 'ntp', 'keepalive' ] ) paseli_enabled = self.verify_pcbtracker_alive() self.verify_package_list() self.verify_message_get() lid = self.verify_facility_get() self.verify_pcbevent_put() self.verify_shop_getname(lid) self.verify_pc_common() self.verify_music_crate() self.verify_shop_getconvention(lid) self.verify_ranking_getranker(lid) self.verify_shop_sentinfo(lid) # Verify card registration and profile lookup if cardid is not None: card = cardid else: card = self.random_card() print(f"Generated random card ID {card} for use.") if cardid is None: self.verify_cardmng_inquire(card, msg_type='unregistered', paseli_enabled=paseli_enabled) ref_id = self.verify_cardmng_getrefid(card) if len(ref_id) != 16: raise Exception(f'Invalid refid \'{ref_id}\' returned when registering card') if ref_id != self.verify_cardmng_inquire(card, msg_type='new', paseli_enabled=paseli_enabled): raise Exception(f'Invalid refid \'{ref_id}\' returned when querying card') self.verify_pc_reg(ref_id, card, lid) self.verify_pc_get(ref_id, card, lid) else: print("Skipping new card checks for existing card") ref_id = self.verify_cardmng_inquire(card, msg_type='query', paseli_enabled=paseli_enabled) # Verify pin handling and return card handling self.verify_cardmng_authpass(ref_id, correct=True) self.verify_cardmng_authpass(ref_id, correct=False) if ref_id != self.verify_cardmng_inquire(card, msg_type='query', paseli_enabled=paseli_enabled): raise Exception(f'Invalid refid \'{ref_id}\' returned when querying card') if cardid is None: # Verify score handling profile = self.verify_pc_get(ref_id, card, lid) if profile['sp_dan'] != -1: raise Exception('Somehow has SP DAN ranking on new profile!') if profile['dp_dan'] != -1: raise Exception('Somehow has DP DAN ranking on new profile!') if profile['deller'] != 0: raise Exception('Somehow has deller on new profile!') scores = self.verify_music_getrank(profile['extid']) if len(scores.keys()) > 0: raise Exception('Somehow have scores on a new profile!') for phase in [1, 2]: if phase == 1: dummyscores = [ # An okay score on a chart { 'id': 1000, 'chart': 2, 'clear_status': 4, 'pgnum': 123, 'gnum': 123, 'mnum': 5, }, # A good score on an easier chart of the same song { 'id': 1000, 'chart': 0, 'clear_status': 7, 'pgnum': 246, 'gnum': 0, 'mnum': 0, }, # A bad score on a hard chart { 'id': 1003, 'chart': 2, 'clear_status': 1, 'pgnum': 10, 'gnum': 20, 'mnum': 50, }, # A terrible score on an easy chart { 'id': 1003, 'chart': 0, 'clear_status': 1, 'pgnum': 2, 'gnum': 5, 'mnum': 75, }, ] if phase == 2: dummyscores = [ # A better score on the same chart { 'id': 1000, 'chart': 2, 'clear_status': 5, 'pgnum': 234, 'gnum': 234, 'mnum': 3, }, # A worse score on another same chart { 'id': 1000, 'chart': 0, 'clear_status': 4, 'pgnum': 123, 'gnum': 123, 'mnum': 35, 'expected_clear_status': 7, 'expected_ex_score': 492, 'expected_miss_count': 0, }, ] for dummyscore in dummyscores: self.verify_music_reg(profile['extid'], lid, dummyscore) self.verify_pc_visit(profile['extid'], lid) self.verify_pc_save(profile['extid'], card, lid) scores = self.verify_music_getrank(profile['extid']) for score in dummyscores: data = scores.get(score['id'], {}).get(score['chart'], None) if data is None: raise Exception(f'Expected to get score back for song {score["id"]} chart {score["chart"]}!') if 'expected_ex_score' in score: expected_score = score['expected_ex_score'] else: expected_score = (score['pgnum'] * 2) + score['gnum'] if 'expected_clear_status' in score: expected_clear_status = score['expected_clear_status'] else: expected_clear_status = score['clear_status'] if 'expected_miss_count' in score: expected_miss_count = score['expected_miss_count'] else: expected_miss_count = score['mnum'] if data['ex_score'] != expected_score: raise Exception(f'Expected a score of \'{expected_score}\' for song \'{score["id"]}\' chart \'{score["chart"]}\' but got score \'{data["ex_score"]}\'') if data['clear_status'] != expected_clear_status: raise Exception(f'Expected a clear status of \'{expected_clear_status}\' for song \'{score["id"]}\' chart \'{score["chart"]}\' but got clear status \'{data["clear_status"]}\'') if data['miss_count'] != expected_miss_count: raise Exception(f'Expected a miss count of \'{expected_miss_count}\' for song \'{score["id"]}\' chart \'{score["chart"]}\' but got miss count \'{data["miss_count"]}\'') # Verify we can fetch our own ghost ex_score, ghost = self.verify_music_appoint(profile['extid'], score['id'], score['chart']) if ex_score != expected_score: raise Exception(f'Expected a score of \'{expected_score}\' for song \'{score["id"]}\' chart \'{score["chart"]}\' but got score \'{data["ex_score"]}\'') if len(ghost) != 64: raise Exception(f'Wrong ghost length {len(ghost)} for ghost!') for g in ghost: if g != 0x01: raise Exception(f'Got back wrong ghost data for song \'{score["id"]}\' chart \'{score["chart"]}\'') # Sleep so we don't end up putting in score history on the same second time.sleep(1) # Verify that a player without a card can play self.verify_pc_playstart() self.verify_music_play({ 'id': 1000, 'chart': 2, 'clear_status': 4, 'pgnum': 123, 'gnum': 123, }) self.verify_pc_playend() # Verify shop name change setting self.verify_shop_savename(lid, 'newname1') newname = self.verify_shop_getname(lid) if newname != 'newname1': raise Exception('Invalid shop name returned after change!') self.verify_shop_savename(lid, 'newname2') newname = self.verify_shop_getname(lid) if newname != 'newname2': raise Exception('Invalid shop name returned after change!') # Verify beginner score saving self.verify_music_breg(profile['extid'], { 'id': 1000, 'clear_status': 4, 'pgnum': 123, 'gnum': 123, }) scores = self.verify_music_getrank(profile['extid']) if 1000 not in scores: raise Exception(f'Didn\'t get expected scores back for song {1000} beginner chart!') if 6 not in scores[1000]: raise Exception(f'Didn\'t get beginner score back for song {1000}!') if scores[1000][6] != {'clear_status': 4, 'ex_score': -1, 'miss_count': -1}: raise Exception('Didn\'t get correct status back from beginner save!') # Verify DAN score saving and loading self.verify_grade_raised(profile['extid'], newname, 'sp') self.verify_grade_raised(profile['extid'], newname, 'dp') profile = self.verify_pc_get(ref_id, card, lid) if profile['sp_dan'] != 5: raise Exception('Got wrong DAN score back for SP!') if profile['dp_dan'] != 5: raise Exception('Got wrong DAN score back for DP!') else: print("Skipping score checks for existing card") # Verify paseli handling if paseli_enabled: print("PASELI enabled for this PCBID, executing PASELI checks") else: print("PASELI disabled for this PCBID, skipping PASELI checks") return sessid, balance = self.verify_eacoin_checkin(card) if balance == 0: print("Skipping PASELI consume check because card has 0 balance") else: self.verify_eacoin_consume(sessid, balance, random.randint(0, balance)) self.verify_eacoin_checkout(sessid)