from typing import Dict, List, Tuple from typing_extensions import Final from bemani.backend.reflec.base import ReflecBeatBase from bemani.common import ID, Time, Profile from bemani.data import Attempt, UserID from bemani.protocol import Node class ReflecBeatVolzzaBase(ReflecBeatBase): # Clear types according to the game GAME_CLEAR_TYPE_NO_PLAY: Final[int] = 0 GAME_CLEAR_TYPE_EARLY_FAILED: Final[int] = 1 GAME_CLEAR_TYPE_FAILED: Final[int] = 2 GAME_CLEAR_TYPE_CLEARED: Final[int] = 9 GAME_CLEAR_TYPE_HARD_CLEARED: Final[int] = 10 GAME_CLEAR_TYPE_S_HARD_CLEARED: Final[int] = 11 # Combo types according to the game (actually a bitmask, where bit 0 is # full combo status, and bit 2 is just reflec status). But we don't support # saving just reflec without full combo, so we downgrade it. GAME_COMBO_TYPE_NONE: Final[int] = 0 GAME_COMBO_TYPE_ALL_JUST: Final[int] = 2 GAME_COMBO_TYPE_FULL_COMBO: Final[int] = 1 GAME_COMBO_TYPE_FULL_COMBO_ALL_JUST: Final[int] = 3 def _db_to_game_clear_type(self, db_status: int) -> int: return { self.CLEAR_TYPE_NO_PLAY: self.GAME_CLEAR_TYPE_NO_PLAY, self.CLEAR_TYPE_FAILED: self.GAME_CLEAR_TYPE_FAILED, self.CLEAR_TYPE_CLEARED: self.GAME_CLEAR_TYPE_CLEARED, self.CLEAR_TYPE_HARD_CLEARED: self.GAME_CLEAR_TYPE_HARD_CLEARED, self.CLEAR_TYPE_S_HARD_CLEARED: self.GAME_CLEAR_TYPE_S_HARD_CLEARED, }[db_status] def _game_to_db_clear_type(self, status: int) -> int: return { self.GAME_CLEAR_TYPE_NO_PLAY: self.CLEAR_TYPE_NO_PLAY, self.GAME_CLEAR_TYPE_EARLY_FAILED: self.CLEAR_TYPE_FAILED, self.GAME_CLEAR_TYPE_FAILED: self.CLEAR_TYPE_FAILED, self.GAME_CLEAR_TYPE_CLEARED: self.CLEAR_TYPE_CLEARED, self.GAME_CLEAR_TYPE_HARD_CLEARED: self.CLEAR_TYPE_HARD_CLEARED, self.GAME_CLEAR_TYPE_S_HARD_CLEARED: self.CLEAR_TYPE_S_HARD_CLEARED, }[status] def _db_to_game_combo_type(self, db_combo: int) -> int: return { self.COMBO_TYPE_NONE: self.GAME_COMBO_TYPE_NONE, self.COMBO_TYPE_ALMOST_COMBO: self.GAME_COMBO_TYPE_NONE, self.COMBO_TYPE_FULL_COMBO: self.GAME_COMBO_TYPE_FULL_COMBO, self.COMBO_TYPE_FULL_COMBO_ALL_JUST: self.GAME_COMBO_TYPE_FULL_COMBO_ALL_JUST, }[db_combo] def _game_to_db_combo_type(self, game_combo: int, miss_count: int) -> int: if game_combo in [ self.GAME_COMBO_TYPE_NONE, self.GAME_COMBO_TYPE_ALL_JUST, ]: if miss_count >= 0 and miss_count <= 2: return self.COMBO_TYPE_ALMOST_COMBO else: return self.COMBO_TYPE_NONE if game_combo == self.GAME_COMBO_TYPE_FULL_COMBO: return self.COMBO_TYPE_FULL_COMBO if game_combo == self.GAME_COMBO_TYPE_FULL_COMBO_ALL_JUST: return self.COMBO_TYPE_FULL_COMBO_ALL_JUST raise Exception(f'Invalid game_combo value {game_combo}') def _add_event_info(self, root: Node) -> None: # Overridden in subclasses pass def _add_shop_score(self, root: Node) -> None: shop_score = Node.void('shop_score') root.add_child(shop_score) today = Node.void('today') shop_score.add_child(today) yesterday = Node.void('yesterday') shop_score.add_child(yesterday) all_profiles = self.data.local.user.get_all_profiles(self.game, self.version) all_attempts = self.data.local.music.get_all_attempts(self.game, self.version, timelimit=(Time.beginning_of_today() - Time.SECONDS_IN_DAY)) machine = self.data.local.machine.get_machine(self.config.machine.pcbid) if machine.arcade is not None: lids = [ machine.id for machine in self.data.local.machine.get_all_machines(machine.arcade) ] else: lids = [machine.id] relevant_profiles = [ profile for profile in all_profiles if profile[1].get_int('lid', -1) in lids ] for (rootnode, timeoffset) in [ (today, 0), (yesterday, Time.SECONDS_IN_DAY), ]: # Grab all attempts made in the relevant day relevant_attempts = [ attempt for attempt in all_attempts if ( attempt[1].timestamp >= (Time.beginning_of_today() - timeoffset) and attempt[1].timestamp <= (Time.end_of_today() - timeoffset) ) ] # Calculate scores based on attempt scores_by_user: Dict[UserID, Dict[int, Dict[int, Attempt]]] = {} for (userid, attempt) in relevant_attempts: if userid not in scores_by_user: scores_by_user[userid] = {} if attempt.id not in scores_by_user[userid]: scores_by_user[userid][attempt.id] = {} if attempt.chart not in scores_by_user[userid][attempt.id]: # No high score for this yet, just use this attempt scores_by_user[userid][attempt.id][attempt.chart] = attempt else: # If this attempt is better than the stored one, replace it if scores_by_user[userid][attempt.id][attempt.chart].points < attempt.points: scores_by_user[userid][attempt.id][attempt.chart] = attempt # Calculate points earned by user in the day points_by_user: Dict[UserID, int] = {} for userid in scores_by_user: points_by_user[userid] = 0 for mid in scores_by_user[userid]: for chart in scores_by_user[userid][mid]: points_by_user[userid] = points_by_user[userid] + scores_by_user[userid][mid][chart].points # Output that day's earned points for (userid, profile) in relevant_profiles: data = Node.void('data') rootnode.add_child(data) data.add_child(Node.s16('day_id', int((Time.now() - timeoffset) / Time.SECONDS_IN_DAY))) data.add_child(Node.s32('user_id', profile.extid)) data.add_child(Node.s16('icon_id', profile.get_dict('config').get_int('icon_id'))) data.add_child(Node.s16('point', min(points_by_user.get(userid, 0), 32767))) data.add_child(Node.s32('update_time', Time.now())) data.add_child(Node.string('name', profile.get_str('name'))) rootnode.add_child(Node.s32('time', Time.beginning_of_today() - timeoffset)) def handle_info_rb5_info_read_request(self, request: Node) -> Node: root = Node.void('info') self._add_event_info(root) return root def handle_info_rb5_info_read_hit_chart_request(self, request: Node) -> Node: version = request.child_value('ver') root = Node.void('info') root.add_child(Node.s32('ver', version)) ranking = Node.void('ranking') root.add_child(ranking) def add_hitchart(name: str, start: int, end: int, hitchart: List[Tuple[int, int]]) -> None: base = Node.void(name) ranking.add_child(base) base.add_child(Node.s32('bt', start)) base.add_child(Node.s32('et', end)) new = Node.void('new') base.add_child(new) for (mid, plays) in hitchart: d = Node.void('d') new.add_child(d) d.add_child(Node.s16('mid', mid)) d.add_child(Node.s32('cnt', plays)) # Weekly hit chart add_hitchart( 'weekly', Time.now() - Time.SECONDS_IN_WEEK, Time.now(), self.data.local.music.get_hit_chart(self.game, self.version, 1024, 7), ) # Monthly hit chart add_hitchart( 'monthly', Time.now() - Time.SECONDS_IN_DAY * 30, Time.now(), self.data.local.music.get_hit_chart(self.game, self.version, 1024, 30), ) # All time hit chart add_hitchart( 'total', Time.now() - Time.SECONDS_IN_DAY * 365, Time.now(), self.data.local.music.get_hit_chart(self.game, self.version, 1024, 365), ) return root def handle_info_rb5_info_read_shop_ranking_request(self, request: Node) -> Node: start_music_id = request.child_value('min') end_music_id = request.child_value('max') root = Node.void('info') shop_score = Node.void('shop_score') root.add_child(shop_score) shop_score.add_child(Node.s32('time', Time.now())) profiles: Dict[UserID, Profile] = {} for songid in range(start_music_id, end_music_id + 1): allscores = self.data.local.music.get_all_scores( self.game, self.version, songid=songid, ) for ng in [ self.CHART_TYPE_BASIC, self.CHART_TYPE_MEDIUM, self.CHART_TYPE_HARD, self.CHART_TYPE_SPECIAL, ]: scores = sorted( [score for score in allscores if score[1].chart == ng], key=lambda score: score[1].points, reverse=True, ) for i in range(len(scores)): userid, score = scores[i] if userid not in profiles: profiles[userid] = self.get_any_profile(userid) profile = profiles[userid] data = Node.void('data') shop_score.add_child(data) data.add_child(Node.s32('rank', i + 1)) data.add_child(Node.s16('music_id', songid)) data.add_child(Node.s8('note_grade', score.chart)) data.add_child(Node.s8('clear_type', self._db_to_game_clear_type(score.data.get_int('clear_type')))) data.add_child(Node.s32('user_id', profile.extid)) data.add_child(Node.s16('icon_id', profile.get_dict('config').get_int('icon_id'))) data.add_child(Node.s32('score', score.points)) data.add_child(Node.s32('time', score.timestamp)) data.add_child(Node.string('name', profile.get_str('name'))) return root def handle_lobby_rb5_lobby_entry_request(self, request: Node) -> Node: root = Node.void('lobby') root.add_child(Node.s32('interval', 120)) root.add_child(Node.s32('interval_p', 120)) # Create a lobby entry for this user extid = request.child_value('e/uid') userid = self.data.remote.user.from_extid(self.game, self.version, extid) if userid is not None: profile = self.get_profile(userid) info = self.data.local.lobby.get_play_session_info(self.game, self.version, userid) if profile is None or info is None: return root self.data.local.lobby.put_lobby( self.game, self.version, userid, { 'mid': request.child_value('e/mid'), 'ng': request.child_value('e/ng'), 'mopt': request.child_value('e/mopt'), 'lid': request.child_value('e/lid'), 'sn': request.child_value('e/sn'), 'pref': request.child_value('e/pref'), 'stg': request.child_value('e/stg'), 'pside': request.child_value('e/pside'), 'eatime': request.child_value('e/eatime'), 'ga': request.child_value('e/ga'), 'gp': request.child_value('e/gp'), 'la': request.child_value('e/la'), 'ver': request.child_value('e/ver'), } ) lobby = self.data.local.lobby.get_lobby( self.game, self.version, userid, ) root.add_child(Node.s32('eid', lobby.get_int('id'))) e = Node.void('e') root.add_child(e) e.add_child(Node.s32('eid', lobby.get_int('id'))) e.add_child(Node.u16('mid', lobby.get_int('mid'))) e.add_child(Node.u8('ng', lobby.get_int('ng'))) e.add_child(Node.s32('uid', profile.extid)) e.add_child(Node.s32('uattr', profile.get_int('uattr'))) e.add_child(Node.string('pn', profile.get_str('name'))) e.add_child(Node.s32('plyid', info.get_int('id'))) e.add_child(Node.s16('mg', profile.get_int('mg'))) e.add_child(Node.s32('mopt', lobby.get_int('mopt'))) e.add_child(Node.string('lid', lobby.get_str('lid'))) e.add_child(Node.string('sn', lobby.get_str('sn'))) e.add_child(Node.u8('pref', lobby.get_int('pref'))) e.add_child(Node.s8('stg', lobby.get_int('stg'))) e.add_child(Node.s8('pside', lobby.get_int('pside'))) e.add_child(Node.s16('eatime', lobby.get_int('eatime'))) e.add_child(Node.u8_array('ga', lobby.get_int_array('ga', 4))) e.add_child(Node.u16('gp', lobby.get_int('gp'))) e.add_child(Node.u8_array('la', lobby.get_int_array('la', 4))) e.add_child(Node.u8('ver', lobby.get_int('ver'))) return root def handle_lobby_rb5_lobby_read_request(self, request: Node) -> Node: root = Node.void('lobby') root.add_child(Node.s32('interval', 120)) root.add_child(Node.s32('interval_p', 120)) # Look up all lobbies matching the criteria specified ver = request.child_value('var') mg = request.child_value('m_grade') # noqa: F841 extid = request.child_value('uid') limit = request.child_value('max') userid = self.data.remote.user.from_extid(self.game, self.version, extid) if userid is not None: lobbies = self.data.local.lobby.get_all_lobbies(self.game, self.version) for (user, lobby) in lobbies: if limit <= 0: break if user == userid: # If we have our own lobby, don't return it continue if ver != lobby.get_int('ver'): # Don't return lobby data for different versions continue profile = self.get_profile(user) info = self.data.local.lobby.get_play_session_info(self.game, self.version, userid) if profile is None or info is None: # No profile info, don't return this lobby return root e = Node.void('e') root.add_child(e) e.add_child(Node.s32('eid', lobby.get_int('id'))) e.add_child(Node.u16('mid', lobby.get_int('mid'))) e.add_child(Node.u8('ng', lobby.get_int('ng'))) e.add_child(Node.s32('uid', profile.extid)) e.add_child(Node.s32('uattr', profile.get_int('uattr'))) e.add_child(Node.string('pn', profile.get_str('name'))) e.add_child(Node.s32('plyid', info.get_int('id'))) e.add_child(Node.s16('mg', profile.get_int('mg'))) e.add_child(Node.s32('mopt', lobby.get_int('mopt'))) e.add_child(Node.string('lid', lobby.get_str('lid'))) e.add_child(Node.string('sn', lobby.get_str('sn'))) e.add_child(Node.u8('pref', lobby.get_int('pref'))) e.add_child(Node.s8('stg', lobby.get_int('stg'))) e.add_child(Node.s8('pside', lobby.get_int('pside'))) e.add_child(Node.s16('eatime', lobby.get_int('eatime'))) e.add_child(Node.u8_array('ga', lobby.get_int_array('ga', 4))) e.add_child(Node.u16('gp', lobby.get_int('gp'))) e.add_child(Node.u8_array('la', lobby.get_int_array('la', 4))) e.add_child(Node.u8('ver', lobby.get_int('ver'))) limit = limit - 1 return root def handle_lobby_rb5_lobby_delete_entry_request(self, request: Node) -> Node: eid = request.child_value('eid') self.data.local.lobby.destroy_lobby(eid) return Node.void('lobby') def handle_pcb_rb5_pcb_boot_request(self, request: Node) -> Node: shop_id = ID.parse_machine_id(request.child_value('lid')) machine = self.get_machine_by_id(shop_id) if machine is not None: machine_name = machine.name close = machine.data.get_bool('close') hour = machine.data.get_int('hour') minute = machine.data.get_int('minute') else: machine_name = '' close = False hour = 0 minute = 0 root = Node.void('pcb') sinfo = Node.void('sinfo') root.add_child(sinfo) sinfo.add_child(Node.string('nm', machine_name)) sinfo.add_child(Node.bool('cl_enbl', close)) sinfo.add_child(Node.u8('cl_h', hour)) sinfo.add_child(Node.u8('cl_m', minute)) sinfo.add_child(Node.bool('shop_flag', True)) return root def handle_pcb_rb5_pcb_error_request(self, request: Node) -> Node: return Node.void('pcb') def handle_pcb_rb5_pcb_update_request(self, request: Node) -> Node: return Node.void('pcb') def handle_shop_rb5_shop_write_setting_request(self, request: Node) -> Node: return Node.void('shop') def handle_shop_rb5_shop_write_info_request(self, request: Node) -> Node: self.update_machine_name(request.child_value('sinfo/nm')) self.update_machine_data({ 'close': request.child_value('sinfo/cl_enbl'), 'hour': request.child_value('sinfo/cl_h'), 'minute': request.child_value('sinfo/cl_m'), 'pref': request.child_value('sinfo/prf'), }) return Node.void('shop') def handle_player_rb5_player_start_request(self, request: Node) -> Node: root = Node.void('player') # Create a new play session based on info from the request refid = request.child_value('rid') userid = self.data.remote.user.from_refid(self.game, self.version, refid) if userid is not None: self.data.local.lobby.put_play_session_info( self.game, self.version, userid, { 'ga': request.child_value('ga'), 'gp': request.child_value('gp'), 'la': request.child_value('la'), 'pnid': request.child_value('pnid'), }, ) info = self.data.local.lobby.get_play_session_info( self.game, self.version, userid, ) if info is not None: play_id = info.get_int('id') else: play_id = 0 else: play_id = 0 # Session stuff, and resend global defaults root.add_child(Node.s32('plyid', play_id)) root.add_child(Node.u64('start_time', Time.now() * 1000)) self._add_event_info(root) return root def handle_player_rb5_player_end_request(self, request: Node) -> Node: # Destroy play session based on info from the request refid = request.child_value('rid') userid = self.data.remote.user.from_refid(self.game, self.version, refid) if userid is not None: # Kill any lingering lobbies by this user lobby = self.data.local.lobby.get_lobby( self.game, self.version, userid, ) if lobby is not None: self.data.local.lobby.destroy_lobby(lobby.get_int('id')) self.data.local.lobby.destroy_play_session_info(self.game, self.version, userid) return Node.void('player') def handle_player_rb5_player_delete_request(self, request: Node) -> Node: return Node.void('player') def handle_player_rb5_player_succeed_request(self, request: Node) -> Node: refid = request.child_value('rid') userid = self.data.remote.user.from_refid(self.game, self.version, refid) if userid is not None: previous_version = self.previous_version() profile = previous_version.get_profile(userid) else: profile = None root = Node.void('player') if profile is None: # Return empty succeed to say this is new root.add_child(Node.string('name', '')) root.add_child(Node.s32('grd', -1)) root.add_child(Node.s32('ap', -1)) root.add_child(Node.s32('uattr', 0)) else: # Return previous profile formatted to say this is data succession root.add_child(Node.string('name', profile.get_str('name'))) root.add_child(Node.s32('grd', profile.get_int('mg'))) # This is a guess root.add_child(Node.s32('ap', profile.get_int('ap'))) root.add_child(Node.s32('uattr', profile.get_int('uattr'))) return root def handle_player_rb5_player_read_request(self, request: Node) -> Node: refid = request.child_value('rid') profile = self.get_profile_by_refid(refid) if profile: return profile return Node.void('player')