1
0
mirror of synced 2024-11-28 07:50:51 +01:00
bemaniutils/bemani/client/jubeat/saucerfulfill.py

636 lines
25 KiB
Python

import random
import time
from typing import Any, Dict, List, Optional
from bemani.client.base import BaseClient
from bemani.protocol import Node
class JubeatSaucerFulfillClient(BaseClient):
NAME = 'TEST'
def verify_shopinfo_regist(self) -> None:
call = self.call_node()
# Construct node
shopinfo = Node.void('shopinfo')
shopinfo.set_attribute('method', 'regist')
call.add_child(shopinfo)
shop = Node.void('shop')
shopinfo.add_child(shop)
shop.add_child(Node.string('name', ''))
shop.add_child(Node.string('pref', 'JP-14'))
shop.add_child(Node.string('softwareid', ''))
shop.add_child(Node.string('systemid', self.pcbid))
shop.add_child(Node.string('hardwareid', '01020304050607080900'))
shop.add_child(Node.string('locationid', 'US-1'))
shop.add_child(Node.string('monitor', 'D26L155 6252 151'))
testmode = Node.void('testmode')
shop.add_child(testmode)
testmode.set_attribute('send', '0')
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/shopinfo/data/cabid")
self.assert_path(resp, "response/shopinfo/data/locationid")
self.assert_path(resp, "response/shopinfo/data/is_send")
self.assert_path(resp, "response/shopinfo/data/white_music_list")
self.assert_path(resp, "response/shopinfo/data/tax_phase")
self.assert_path(resp, "response/shopinfo/data/lab/is_open")
self.assert_path(resp, "response/shopinfo/data/vocaloid_event/state")
self.assert_path(resp, "response/shopinfo/data/vocaloid_event/music_id")
self.assert_path(resp, "response/shopinfo/data/vocaloid_event2/state")
self.assert_path(resp, "response/shopinfo/data/vocaloid_event2/music_id")
self.assert_path(resp, "response/shopinfo/data/matching_off/is_open")
self.assert_path(resp, "response/shopinfo/data/tenka/is_participant")
def verify_demodata_get_news(self) -> None:
call = self.call_node()
# Construct node
demodata = Node.void('demodata')
call.add_child(demodata)
demodata.set_attribute('method', 'get_news')
# Swap with server
resp = self.exchange('', call)
# Verify that response is correct
self.assert_path(resp, "response/demodata/data/officialnews")
def __verify_profile(self, resp: Node) -> int:
for item in [
'jubility',
'jubility_yday',
'tune_cnt',
'save_cnt',
'saved_cnt',
'fc_cnt',
'ex_cnt',
'pf_cnt',
'clear_cnt',
'match_cnt',
'beat_cnt',
'mynews_cnt',
'extra_point',
'is_extra_played',
'inherit',
'mtg_entry_cnt',
'mtg_hold_cnt',
'mtg_result',
]:
self.assert_path(resp, f"response/gametop/data/player/info/{item}")
for item in [
'secret_list',
'title_list',
'theme_list',
'marker_list',
'parts_list',
'new/secret_list',
'new/title_list',
'new/theme_list',
'new/marker_list',
]:
self.assert_path(resp, f"response/gametop/data/player/item/{item}")
for item in [
'music_id',
'marker',
'title',
'theme',
'sort',
'rank_sort',
'combo_disp',
'seq_id',
'parts',
'category',
'play_time',
'expert_option',
'matching',
'hazard',
'hard',
'shopname',
'areaname',
]:
self.assert_path(resp, f"response/gametop/data/player/last/{item}")
# Misc stuff
self.assert_path(resp, "response/gametop/data/player/session_id")
self.assert_path(resp, "response/gametop/data/player/event_flag")
self.assert_path(resp, "response/gametop/data/player/only_now_music")
self.assert_path(resp, "response/gametop/data/player/lab_edit_seq")
self.assert_path(resp, "response/gametop/data/player/kac_music")
self.assert_path(resp, "response/gametop/data/player/rivallist")
self.assert_path(resp, "response/gametop/data/player/share_music")
self.assert_path(resp, "response/gametop/data/player/bonus_music")
self.assert_path(resp, "response/gametop/data/player/history")
self.assert_path(resp, "response/gametop/data/player/news/checked")
self.assert_path(resp, "response/gametop/data/player/group/group_id")
self.assert_path(resp, "response/gametop/data/player/bingo/reward/total")
self.assert_path(resp, "response/gametop/data/player/bingo/reward/point")
self.assert_path(resp, "response/gametop/data/player/challenge/today/music_id")
self.assert_path(resp, "response/gametop/data/player/challenge/today/state")
self.assert_path(resp, "response/gametop/data/player/challenge/whim/music_id")
self.assert_path(resp, "response/gametop/data/player/challenge/whim/state")
# Profile settings
self.assert_path(resp, "response/gametop/data/player/name")
self.assert_path(resp, "response/gametop/data/player/jid")
self.assert_path(resp, "response/gametop/data/player/refid")
# Non-player stuff
self.assert_path(resp, "response/gametop/data/termver")
self.assert_path(resp, "response/gametop/data/season_etime")
self.assert_path(resp, "response/gametop/data/white_music_list")
self.assert_path(resp, "response/gametop/data/open_music_list")
# Return the jid
return resp.child_value('gametop/data/player/jid')
def verify_gameend_regist(self, ref_id: str, jid: int, mode: int, scores: List[Dict[str, Any]], course: Dict[str, Any]) -> None:
call = self.call_node()
# Construct node
gameend = Node.void('gameend')
call.add_child(gameend)
gameend.set_attribute('method', 'regist')
gameend.add_child(Node.s32('retry', 0))
data = Node.void('data')
gameend.add_child(data)
player = Node.void('player')
data.add_child(player)
player.add_child(Node.s8('mode', mode))
player.add_child(Node.string('refid', ref_id))
player.add_child(Node.s32('jid', jid))
player.add_child(Node.string('name', self.NAME))
result = Node.void('result')
data.add_child(result)
result.set_attribute('count', str(len(scores)))
# Send scores
scoreid = 0
for score in scores:
# Always played
bits = 0x1
if score['clear']:
bits |= 0x2
if score['fc']:
bits |= 0x4
if score['ex']:
bits |= 0x8
# Intentionally starting at 1 because that's what the game does
scoreid = scoreid + 1
tune = Node.void('tune')
result.add_child(tune)
tune.set_attribute('id', str(scoreid))
tune.set_attribute('count', '0')
tune.add_child(Node.s32('music', score['id']))
player_1 = Node.void('player')
tune.add_child(player_1)
player_1.set_attribute('rank', '1')
scorenode = Node.s32('score', score['score'])
player_1.add_child(scorenode)
scorenode.set_attribute('seq', str(score['chart']))
scorenode.set_attribute('clear', str(bits))
scorenode.set_attribute('combo', '69')
player_1.add_child(Node.u8_array('mbar', [239, 175, 170, 170, 190, 234, 187, 158, 153, 230, 170, 90, 102, 170, 85, 150, 150, 102, 85, 234, 171, 169, 157, 150, 170, 101, 230, 90, 214, 255]))
if len(course) > 0:
coursenode = Node.void('course')
data.add_child(coursenode)
coursenode.add_child(Node.s32('course_id', course['course_id']))
coursenode.add_child(Node.u8('rating', course['rating']))
index = 0
for coursescore in course['scores']:
music = Node.void('music')
coursenode.add_child(music)
music.set_attribute('index', str(index))
music.add_child(Node.s32('score', coursescore))
index = index + 1
# Swap with server
resp = self.exchange('', call)
self.assert_path(resp, "response/gameend/data/player/session_id")
def verify_gametop_regist(self, card_id: str, ref_id: str) -> int:
call = self.call_node()
# Construct node
gametop = Node.void('gametop')
call.add_child(gametop)
gametop.set_attribute('method', 'regist')
data = Node.void('data')
gametop.add_child(data)
player = Node.void('player')
data.add_child(player)
passnode = Node.void('pass')
player.add_child(passnode)
passnode.add_child(Node.string('refid', ref_id))
passnode.add_child(Node.string('datid', ref_id))
passnode.add_child(Node.string('uid', card_id))
passnode.add_child(Node.bool('inherit', True))
player.add_child(Node.string('name', self.NAME))
# Swap with server
resp = self.exchange('', call)
# Verify nodes that cause crashes if they don't exist
return self.__verify_profile(resp)
def verify_gametop_get_pdata(self, card_id: str, ref_id: str) -> int:
call = self.call_node()
# Construct node
gametop = Node.void('gametop')
call.add_child(gametop)
gametop.set_attribute('method', 'get_pdata')
retry = Node.s32('retry', 0)
gametop.add_child(retry)
data = Node.void('data')
gametop.add_child(data)
player = Node.void('player')
data.add_child(player)
passnode = Node.void('pass')
player.add_child(passnode)
passnode.add_child(Node.string('refid', ref_id))
passnode.add_child(Node.string('datid', ref_id))
passnode.add_child(Node.string('uid', card_id))
# Swap with server
resp = self.exchange('', call)
# Verify nodes that cause crashes if they don't exist
return self.__verify_profile(resp)
def verify_gametop_get_mdata(self, jid: int) -> Dict[str, List[Dict[str, Any]]]:
call = self.call_node()
# Construct node
gametop = Node.void('gametop')
call.add_child(gametop)
gametop.set_attribute('method', 'get_mdata')
retry = Node.s32('retry', 0)
gametop.add_child(retry)
data = Node.void('data')
gametop.add_child(data)
player = Node.void('player')
data.add_child(player)
player.add_child(Node.s32('jid', jid))
# Technically the game sends this same packet 3 times, one with
# each value 1, 2, 3 here. Unclear why, but we won't emulate it.
player.add_child(Node.s8('mdata_ver', 1))
# Swap with server
resp = self.exchange('', call)
# Parse out scores
self.assert_path(resp, "response/gametop/data/player/playdata")
ret = {}
for musicdata in resp.child('gametop/data/player/playdata').children:
if musicdata.name != 'musicdata':
raise Exception('Unexpected node in playdata!')
music_id = musicdata.attribute('music_id')
scores_by_chart: List[Dict[str, int]] = [{}, {}, {}]
def extract_cnts(name: str, val: List[int]) -> None:
scores_by_chart[0][name] = val[0]
scores_by_chart[1][name] = val[1]
scores_by_chart[2][name] = val[2]
extract_cnts('plays', musicdata.child_value('play_cnt'))
extract_cnts('clears', musicdata.child_value('clear_cnt'))
extract_cnts('full_combos', musicdata.child_value('fc_cnt'))
extract_cnts('excellents', musicdata.child_value('ex_cnt'))
extract_cnts('score', musicdata.child_value('score'))
extract_cnts('medal', musicdata.child_value('clear'))
ret[music_id] = scores_by_chart
return ret
def verify_gametop_get_course(self, jid: int) -> List[Dict[str, Any]]:
call = self.call_node()
# Construct node
gametop = Node.void('gametop')
call.add_child(gametop)
gametop.set_attribute('method', 'get_course')
gametop.add_child(Node.s32('retry', 0))
data = Node.void('data')
gametop.add_child(data)
player = Node.void('player')
data.add_child(player)
player.add_child(Node.s32('jid', jid))
# Swap with server
resp = self.exchange('', call)
# Verify expected nodes
self.assert_path(resp, "response/gametop/data/course_list")
self.assert_path(resp, "response/gametop/data/player_list")
self.assert_path(resp, "response/gametop/data/last_course_id")
playernode = None
for player in resp.child('gametop/data/player_list').children:
if player.child_value('jid') == jid:
playernode = player
break
if playernode is None:
raise Exception(f"Didn't find any scores for ExtID {jid}")
ret = []
for result in playernode.child('result_list').children:
if result.name != 'result':
raise Exception('Unexpected node in result_list!')
course_id = result.child_value('id')
rating = result.child_value('rating')
scores = result.child_value('score')
ret.append({'course_id': course_id, 'rating': rating, 'scores': scores})
return ret
def verify_gametop_get_meeting(self, jid: int) -> None:
call = self.call_node()
# Construct node
gametop = Node.void('gametop')
call.add_child(gametop)
gametop.set_attribute('method', 'get_meeting')
gametop.add_child(Node.s32('retry', 0))
data = Node.void('data')
gametop.add_child(data)
player = Node.void('player')
data.add_child(player)
player.add_child(Node.s32('jid', jid))
# Swap with server
resp = self.exchange('', call)
# Verify expected nodes
self.assert_path(resp, "response/gametop/data/meeting/single")
self.assert_path(resp, "response/gametop/data/meeting/tag")
self.assert_path(resp, "response/gametop/data/reward/total")
self.assert_path(resp, "response/gametop/data/reward/point")
def verify(self, cardid: Optional[str]) -> None:
# Verify boot sequence is okay
self.verify_services_get(
expected_services=[
'pcbtracker',
'pcbevent',
'local',
'message',
'facility',
'cardmng',
'package',
'posevent',
'pkglist',
'dlstatus',
'eacoin',
'lobby',
'ntp',
'keepalive'
]
)
paseli_enabled = self.verify_pcbtracker_alive()
self.verify_message_get()
self.verify_package_list()
self.verify_facility_get()
self.verify_pcbevent_put()
self.verify_shopinfo_regist()
self.verify_demodata_get_news()
# Verify card registration and profile lookup
if cardid is not None:
card = cardid
else:
card = self.random_card()
print(f"Generated random card ID {card} for use.")
if cardid is None:
self.verify_cardmng_inquire(card, msg_type='unregistered', paseli_enabled=paseli_enabled)
ref_id = self.verify_cardmng_getrefid(card)
if len(ref_id) != 16:
raise Exception(f'Invalid refid \'{ref_id}\' returned when registering card')
if ref_id != self.verify_cardmng_inquire(card, msg_type='new', paseli_enabled=paseli_enabled):
raise Exception(f'Invalid refid \'{ref_id}\' returned when querying card')
self.verify_gametop_regist(card, ref_id)
else:
print("Skipping new card checks for existing card")
ref_id = self.verify_cardmng_inquire(card, msg_type='query', paseli_enabled=paseli_enabled)
# Verify pin handling and return card handling
self.verify_cardmng_authpass(ref_id, correct=True)
self.verify_cardmng_authpass(ref_id, correct=False)
if ref_id != self.verify_cardmng_inquire(card, msg_type='query', paseli_enabled=paseli_enabled):
raise Exception(f'Invalid refid \'{ref_id}\' returned when querying card')
if cardid is None:
# Verify score handling
jid = self.verify_gametop_get_pdata(card, ref_id)
scores = self.verify_gametop_get_mdata(jid)
courses = self.verify_gametop_get_course(jid)
self.verify_gametop_get_meeting(jid)
if scores is None:
raise Exception('Expected to get scores back, didn\'t get anything!')
if courses is None:
raise Exception('Expected to get courses back, didn\'t get anything!')
if len(scores) > 0:
raise Exception('Got nonzero score count on a new card!')
if len(courses) > 0:
raise Exception('Got nonzero course count on a new card!')
for phase in [1, 2]:
if phase == 1:
dummyscores = [
# An okay score on a chart
{
'id': 40000059,
'chart': 2,
'clear': True,
'fc': False,
'ex': False,
'score': 800000,
'expected_medal': 0x3,
},
# A good score on an easier chart of the same song
{
'id': 40000059,
'chart': 1,
'clear': True,
'fc': True,
'ex': False,
'score': 990000,
'expected_medal': 0x5,
},
# A perfect score on an easiest chart of the same song
{
'id': 40000059,
'chart': 0,
'clear': True,
'fc': True,
'ex': True,
'score': 1000000,
'expected_medal': 0x9,
},
# A bad score on a hard chart
{
'id': 30000024,
'chart': 2,
'clear': False,
'fc': False,
'ex': False,
'score': 400000,
'expected_medal': 0x1,
},
# A terrible score on an easy chart
{
'id': 50000045,
'chart': 0,
'clear': False,
'fc': False,
'ex': False,
'score': 100000,
'expected_medal': 0x1,
},
]
if phase == 2:
dummyscores = [
# A better score on the same chart
{
'id': 50000045,
'chart': 0,
'clear': True,
'fc': False,
'ex': False,
'score': 850000,
'expected_medal': 0x3,
},
# A worse score on another same chart
{
'id': 40000059,
'chart': 1,
'clear': True,
'fc': False,
'ex': False,
'score': 925000,
'expected_score': 990000,
'expected_medal': 0x7,
},
]
self.verify_gameend_regist(ref_id, jid, 8, dummyscores, {})
jid = self.verify_gametop_get_pdata(card, ref_id)
scores = self.verify_gametop_get_mdata(jid)
courses = self.verify_gametop_get_course(jid)
if len(courses) > 0:
raise Exception('Got nonzero course count without playing any courses!')
for score in dummyscores:
newscore = scores[str(score['id'])][score['chart']]
if 'expected_score' in score:
expected_score = score['expected_score']
else:
expected_score = score['score']
if newscore['score'] != expected_score:
raise Exception(f'Expected a score of \'{expected_score}\' for song \'{score["id"]}\' chart \'{score["chart"]}\' but got score \'{newscore["score"]}\'')
if newscore['medal'] != score['expected_medal']:
raise Exception(f'Expected a medal of \'{score["expected_medal"]}\' for song \'{score["id"]}\' chart \'{score["chart"]}\' but got medal \'{newscore["medal"]}\'')
# Sleep so we don't end up putting in score history on the same second
time.sleep(1)
for phase in [1, 2]:
dummycourses: List[Dict[str, Any]] = []
if phase == 1:
dummycourses.extend([
{
'course_id': 1,
'rating': 1,
'scores': [123456, 123457, 123458, 123459, 123460],
},
{
'course_id': 2,
'rating': 2,
'scores': [123456, 123457, 123458, 123459, 123460],
},
])
else:
dummycourses.extend([
{
'course_id': 1,
'rating': 2,
'scores': [223456, 223457, 223458, 223459, 223460],
},
{
'course_id': 2,
'rating': 1,
'expected_rating': 2,
'scores': [23456, 23457, 23458, 23459, 23460],
'expected_scores': [123456, 123457, 123458, 123459, 123460],
},
])
for course in dummycourses:
self.verify_gameend_regist(ref_id, jid, 6, [], course)
jid = self.verify_gametop_get_pdata(card, ref_id)
courses = self.verify_gametop_get_course(jid)
for course in dummycourses:
# Find the course
foundcourses = [c for c in courses if c['course_id'] == course['course_id']]
if len(foundcourses) == 0:
raise Exception(f"Didn't find course by ID {course['course_id']}")
foundcourse = foundcourses[0]
if 'expected_rating' in course:
expected_rating = course['expected_rating']
else:
expected_rating = course['rating']
if 'expected_scores' in course:
expected_scores = course['expected_scores']
else:
expected_scores = course['scores']
if foundcourse['course_id'] != course['course_id']:
raise Exception("Logic error!")
if foundcourse['rating'] != expected_rating:
raise Exception(f'Expected a rating of \'{expected_rating}\' for course \'{course["course_id"]}\' but got rating \'{foundcourse["rating"]}\'')
for i in range(len(expected_scores)):
if foundcourse['scores'][i] != expected_scores[i]:
raise Exception(f'Expected a score of \'{expected_scores[i]}\' for course \'{course["course_id"]}\' song number \'{i}\' but got score \'{foundcourse["scores"][i]}\'')
# Sleep so we don't end up putting in score history on the same second
time.sleep(1)
else:
print("Skipping score checks for existing card")
# Verify paseli handling
if paseli_enabled:
print("PASELI enabled for this PCBID, executing PASELI checks")
else:
print("PASELI disabled for this PCBID, skipping PASELI checks")
return
sessid, balance = self.verify_eacoin_checkin(card)
if balance == 0:
print("Skipping PASELI consume check because card has 0 balance")
else:
self.verify_eacoin_consume(sessid, balance, random.randint(0, balance))
self.verify_eacoin_checkout(sessid)