from typing import List, Optional, Dict, Any, Tuple, Set from bemani.common import APIConstants, GameConstants, VersionConstants, DBConstants, Parallel from bemani.data.interfaces import APIProviderInterface from bemani.data.api.base import BaseGlobalData from bemani.data.mysql.user import UserData from bemani.data.mysql.music import MusicData from bemani.data.remoteuser import RemoteUser from bemani.data.types import UserID, Score, Song class GlobalMusicData(BaseGlobalData): def __init__(self, api: APIProviderInterface, user: UserData, music: MusicData) -> None: super().__init__(api) self.user = user self.music = music def __get_cardids(self, userid: UserID) -> List[str]: if RemoteUser.is_remote(userid): return [RemoteUser.userid_to_card(userid)] else: return self.user.get_cards(userid) def __min(self, int1: int, int2: int) -> int: # -1 is used as a 'no value' so it should not overwrite a 0 if int1 == -1: return int2 if int2 == -1: return int1 return min(int1, int2) def __max(self, int1: int, int2: int) -> int: return max(int1, int2) def __format_ddr_score(self, version: int, songid: int, songchart: int, data: Dict[str, Any]) -> Score: halo = { 'none': DBConstants.DDR_HALO_NONE, 'gfc': DBConstants.DDR_HALO_GOOD_FULL_COMBO, 'fc': DBConstants.DDR_HALO_GREAT_FULL_COMBO, 'pfc': DBConstants.DDR_HALO_PERFECT_FULL_COMBO, 'mfc': DBConstants.DDR_HALO_MARVELOUS_FULL_COMBO, }.get(data.get('halo'), DBConstants.DDR_HALO_NONE) rank = { "AAA": DBConstants.DDR_RANK_AAA, "AA+": DBConstants.DDR_RANK_AA_PLUS, "AA": DBConstants.DDR_RANK_AA, "AA-": DBConstants.DDR_RANK_AA_MINUS, "A+": DBConstants.DDR_RANK_A_PLUS, "A": DBConstants.DDR_RANK_A, "A-": DBConstants.DDR_RANK_A_MINUS, "B+": DBConstants.DDR_RANK_B_PLUS, "B": DBConstants.DDR_RANK_B, "B-": DBConstants.DDR_RANK_B_MINUS, "C+": DBConstants.DDR_RANK_C_PLUS, "C": DBConstants.DDR_RANK_C, "C-": DBConstants.DDR_RANK_C_MINUS, "D+": DBConstants.DDR_RANK_D_PLUS, "D": DBConstants.DDR_RANK_D, "E": DBConstants.DDR_RANK_E, }.get(data.get('rank'), DBConstants.DDR_RANK_E) ghost = '' trace: List[int] = [] if version == VersionConstants.DDR_ACE: # DDR Ace is specia ghost = ''.join([str(x) for x in data.get('ghost', [])]) else: trace = [int(x) for x in data.get('ghost', [])] return Score( -1, songid, songchart, int(data.get('points', 0)), int(data.get('timestamp', -1)), self.__max(int(data.get('timestamp', -1)), int(data.get('updated', -1))), -1, # No location for remote play 1, # No play info for remote play { 'combo': int(data.get('combo', -1)), 'rank': rank, 'halo': halo, 'ghost': ghost, 'trace': trace, }, ) def __format_iidx_score(self, version: int, songid: int, songchart: int, data: Dict[str, Any]) -> Score: status = { 'np': DBConstants.IIDX_CLEAR_STATUS_NO_PLAY, 'failed': DBConstants.IIDX_CLEAR_STATUS_FAILED, 'ac': DBConstants.IIDX_CLEAR_STATUS_ASSIST_CLEAR, 'ec': DBConstants.IIDX_CLEAR_STATUS_EASY_CLEAR, 'nc': DBConstants.IIDX_CLEAR_STATUS_CLEAR, 'hc': DBConstants.IIDX_CLEAR_STATUS_HARD_CLEAR, 'exhc': DBConstants.IIDX_CLEAR_STATUS_EX_HARD_CLEAR, 'fc': DBConstants.IIDX_CLEAR_STATUS_FULL_COMBO, }.get(data.get('status'), DBConstants.IIDX_CLEAR_STATUS_NO_PLAY) return Score( -1, songid, songchart, int(data.get('points', 0)), int(data.get('timestamp', -1)), self.__max(int(data.get('timestamp', -1)), int(data.get('updated', -1))), -1, # No location for remote play 1, # No play info for remote play { 'clear_status': status, 'ghost': bytes([int(b) for b in data.get('ghost', [])]), 'miss_count': int(data.get('miss', -1)), 'pgreats': int(data.get('pgreat', -1)), 'greats': int(data.get('great', -1)), }, ) def __format_jubeat_score(self, version: int, songid: int, songchart: int, data: Dict[str, Any]) -> Score: status = { 'failed': DBConstants.JUBEAT_PLAY_MEDAL_FAILED, 'cleared': DBConstants.JUBEAT_PLAY_MEDAL_CLEARED, 'nfc': DBConstants.JUBEAT_PLAY_MEDAL_NEARLY_FULL_COMBO, 'fc': DBConstants.JUBEAT_PLAY_MEDAL_FULL_COMBO, 'nec': DBConstants.JUBEAT_PLAY_MEDAL_NEARLY_EXCELLENT, 'exc': DBConstants.JUBEAT_PLAY_MEDAL_EXCELLENT, }.get(data.get('status'), DBConstants.JUBEAT_PLAY_MEDAL_FAILED) return Score( -1, songid, songchart, int(data.get('points', 0)), int(data.get('timestamp', -1)), self.__max(int(data.get('timestamp', -1)), int(data.get('updated', -1))), -1, # No location for remote play 1, # No play info for remote play { 'medal': status, 'combo': int(data.get('combo', -1)), 'ghost': [int(x) for x in data.get('ghost', [])] }, ) def __format_museca_score(self, version: int, songid: int, songchart: int, data: Dict[str, Any]) -> Score: rank = { 'death': DBConstants.MUSECA_GRADE_DEATH, 'poor': DBConstants.MUSECA_GRADE_POOR, 'mediocre': DBConstants.MUSECA_GRADE_MEDIOCRE, 'good': DBConstants.MUSECA_GRADE_GOOD, 'great': DBConstants.MUSECA_GRADE_GREAT, 'excellent': DBConstants.MUSECA_GRADE_EXCELLENT, 'superb': DBConstants.MUSECA_GRADE_SUPERB, 'masterpiece': DBConstants.MUSECA_GRADE_MASTERPIECE, 'perfect': DBConstants.MUSECA_GRADE_PERFECT, }.get(data.get('rank'), DBConstants.MUSECA_GRADE_DEATH) status = { 'failed': DBConstants.MUSECA_CLEAR_TYPE_FAILED, 'cleared': DBConstants.MUSECA_CLEAR_TYPE_CLEARED, 'fc': DBConstants.MUSECA_CLEAR_TYPE_FULL_COMBO, }.get(data.get('status'), DBConstants.MUSECA_CLEAR_TYPE_FAILED) return Score( -1, songid, songchart, int(data.get('points', 0)), int(data.get('timestamp', -1)), self.__max(int(data.get('timestamp', -1)), int(data.get('updated', -1))), -1, # No location for remote play 1, # No play info for remote play { 'grade': rank, 'clear_type': status, 'combo': int(data.get('combo', -1)), 'stats': { 'btn_rate': int(data.get('buttonrate', -1)), 'long_rate': int(data.get('longrate', -1)), 'vol_rate': int(data.get('volrate', -1)), }, }, ) def __format_popn_score(self, version: int, songid: int, songchart: int, data: Dict[str, Any]) -> Score: status = { 'cf': DBConstants.POPN_MUSIC_PLAY_MEDAL_CIRCLE_FAILED, 'df': DBConstants.POPN_MUSIC_PLAY_MEDAL_DIAMOND_FAILED, 'sf': DBConstants.POPN_MUSIC_PLAY_MEDAL_STAR_FAILED, 'ec': DBConstants.POPN_MUSIC_PLAY_MEDAL_EASY_CLEAR, 'cc': DBConstants.POPN_MUSIC_PLAY_MEDAL_CIRCLE_CLEARED, 'dc': DBConstants.POPN_MUSIC_PLAY_MEDAL_DIAMOND_CLEARED, 'sc': DBConstants.POPN_MUSIC_PLAY_MEDAL_STAR_CLEARED, 'cfc': DBConstants.POPN_MUSIC_PLAY_MEDAL_CIRCLE_FULL_COMBO, 'dfc': DBConstants.POPN_MUSIC_PLAY_MEDAL_DIAMOND_FULL_COMBO, 'sfc': DBConstants.POPN_MUSIC_PLAY_MEDAL_STAR_FULL_COMBO, 'p': DBConstants.POPN_MUSIC_PLAY_MEDAL_PERFECT, }.get(data.get('status'), DBConstants.POPN_MUSIC_PLAY_MEDAL_CIRCLE_FAILED) return Score( -1, songid, songchart, int(data.get('points', 0)), int(data.get('timestamp', -1)), self.__max(int(data.get('timestamp', -1)), int(data.get('updated', -1))), -1, # No location for remote play 1, # No play info for remote play { 'medal': status, 'combo': int(data.get('combo', -1)), }, ) def __format_reflec_score(self, version: int, songid: int, songchart: int, data: Dict[str, Any]) -> Score: status = { 'np': DBConstants.REFLEC_BEAT_CLEAR_TYPE_NO_PLAY, 'failed': DBConstants.REFLEC_BEAT_CLEAR_TYPE_FAILED, 'cleared': DBConstants.REFLEC_BEAT_CLEAR_TYPE_CLEARED, 'hc': DBConstants.REFLEC_BEAT_CLEAR_TYPE_HARD_CLEARED, 'shc': DBConstants.REFLEC_BEAT_CLEAR_TYPE_S_HARD_CLEARED, }.get(data.get('status'), DBConstants.REFLEC_BEAT_CLEAR_TYPE_NO_PLAY) halo = { 'none': DBConstants.REFLEC_BEAT_COMBO_TYPE_NONE, 'ac': DBConstants.REFLEC_BEAT_COMBO_TYPE_ALMOST_COMBO, 'fc': DBConstants.REFLEC_BEAT_COMBO_TYPE_FULL_COMBO, 'fcaj': DBConstants.REFLEC_BEAT_COMBO_TYPE_FULL_COMBO_ALL_JUST, }.get(data.get('halo'), DBConstants.REFLEC_BEAT_COMBO_TYPE_NONE) return Score( -1, songid, songchart, int(data.get('points', 0)), int(data.get('timestamp', -1)), self.__max(int(data.get('timestamp', -1)), int(data.get('updated', -1))), -1, # No location for remote play 1, # No play info for remote play { 'achievement_rate': int(data.get('rate', -1)), 'clear_type': status, 'combo_type': halo, 'miss_count': int(data.get('miss', -1)), 'combo': int(data.get('combo', -1)), }, ) def __format_sdvx_score(self, version: int, songid: int, songchart: int, data: Dict[str, Any]) -> Score: status = { 'np': DBConstants.SDVX_CLEAR_TYPE_NO_PLAY, 'failed': DBConstants.SDVX_CLEAR_TYPE_FAILED, 'cleared': DBConstants.SDVX_CLEAR_TYPE_CLEAR, 'hc': DBConstants.SDVX_CLEAR_TYPE_HARD_CLEAR, 'uc': DBConstants.SDVX_CLEAR_TYPE_ULTIMATE_CHAIN, 'puc': DBConstants.SDVX_CLEAR_TYPE_PERFECT_ULTIMATE_CHAIN, }.get(data.get('status'), DBConstants.SDVX_CLEAR_TYPE_NO_PLAY) rank = { 'E': DBConstants.SDVX_GRADE_NO_PLAY, 'D': DBConstants.SDVX_GRADE_D, 'C': DBConstants.SDVX_GRADE_C, 'B': DBConstants.SDVX_GRADE_B, 'A': DBConstants.SDVX_GRADE_A, 'A+': DBConstants.SDVX_GRADE_A_PLUS, 'AA': DBConstants.SDVX_GRADE_AA, 'AA+': DBConstants.SDVX_GRADE_AA_PLUS, 'AAA': DBConstants.SDVX_GRADE_AAA, 'AAA+': DBConstants.SDVX_GRADE_AAA_PLUS, 'S': DBConstants.SDVX_GRADE_S, }.get(data.get('rank'), DBConstants.SDVX_GRADE_NO_PLAY) return Score( -1, songid, songchart, int(data.get('points', 0)), int(data.get('timestamp', -1)), self.__max(int(data.get('timestamp', -1)), int(data.get('updated', -1))), -1, # No location for remote play 1, # No play info for remote play { 'grade': rank, 'clear_type': status, 'combo': int(data.get('combo', -1)), 'stats': { 'btn_rate': int(data.get('buttonrate', -1)), 'long_rate': int(data.get('longrate', -1)), 'vol_rate': int(data.get('volrate', -1)), }, }, ) def __format_score(self, game: GameConstants, version: int, songid: int, songchart: int, data: Dict[str, Any]) -> Optional[Score]: if game == GameConstants.DDR: return self.__format_ddr_score(version, songid, songchart, data) if game == GameConstants.IIDX: return self.__format_iidx_score(version, songid, songchart, data) if game == GameConstants.JUBEAT: return self.__format_jubeat_score(version, songid, songchart, data) if game == GameConstants.MUSECA: return self.__format_museca_score(version, songid, songchart, data) if game == GameConstants.POPN_MUSIC: return self.__format_popn_score(version, songid, songchart, data) if game == GameConstants.REFLEC_BEAT: return self.__format_reflec_score(version, songid, songchart, data) if game == GameConstants.SDVX: return self.__format_sdvx_score(version, songid, songchart, data) return None def __merge_ddr_score(self, version: int, oldscore: Score, newscore: Score) -> Score: return Score( -1, oldscore.id, oldscore.chart, self.__max(oldscore.points, newscore.points), self.__max(oldscore.timestamp, newscore.timestamp), self.__max(self.__max(oldscore.update, newscore.update), self.__max(oldscore.timestamp, newscore.timestamp)), oldscore.location, # Always propagate location from local setup if possible oldscore.plays + newscore.plays, { 'rank': self.__max(oldscore.data['rank'], newscore.data['rank']), 'halo': self.__max(oldscore.data['halo'], newscore.data['halo']), 'ghost': oldscore.data.get('ghost') if oldscore.points > newscore.points else newscore.data.get('ghost'), 'trace': oldscore.data.get('trace') if oldscore.points > newscore.points else newscore.data.get('trace'), 'combo': self.__max(oldscore.data['combo'], newscore.data['combo']), }, ) def __merge_iidx_score(self, version: int, oldscore: Score, newscore: Score) -> Score: return Score( -1, oldscore.id, oldscore.chart, self.__max(oldscore.points, newscore.points), self.__max(oldscore.timestamp, newscore.timestamp), self.__max(self.__max(oldscore.update, newscore.update), self.__max(oldscore.timestamp, newscore.timestamp)), oldscore.location, # Always propagate location from local setup if possible oldscore.plays + newscore.plays, { 'clear_status': self.__max(oldscore.data['clear_status'], newscore.data['clear_status']), 'ghost': oldscore.data.get('ghost') if oldscore.points > newscore.points else newscore.data.get('ghost'), 'miss_count': self.__min(oldscore.data.get_int('miss_count', -1), newscore.data.get_int('miss_count', -1)), 'pgreats': oldscore.data.get_int('pgreats', -1) if oldscore.points > newscore.points else newscore.data.get_int('pgreats', -1), 'greats': oldscore.data.get_int('greats', -1) if oldscore.points > newscore.points else newscore.data.get_int('greats', -1), }, ) def __merge_jubeat_score(self, version: int, oldscore: Score, newscore: Score) -> Score: return Score( -1, oldscore.id, oldscore.chart, self.__max(oldscore.points, newscore.points), self.__max(oldscore.timestamp, newscore.timestamp), self.__max(self.__max(oldscore.update, newscore.update), self.__max(oldscore.timestamp, newscore.timestamp)), oldscore.location, # Always propagate location from local setup if possible oldscore.plays + newscore.plays, { 'ghost': oldscore.data.get('ghost') if oldscore.points > newscore.points else newscore.data.get('ghost'), 'combo': self.__max(oldscore.data['combo'], newscore.data['combo']), 'medal': self.__max(oldscore.data['medal'], newscore.data['medal']), }, ) def __merge_museca_score(self, version: int, oldscore: Score, newscore: Score) -> Score: return Score( -1, oldscore.id, oldscore.chart, self.__max(oldscore.points, newscore.points), self.__max(oldscore.timestamp, newscore.timestamp), self.__max(self.__max(oldscore.update, newscore.update), self.__max(oldscore.timestamp, newscore.timestamp)), oldscore.location, # Always propagate location from local setup if possible oldscore.plays + newscore.plays, { 'grade': self.__max(oldscore.data['grade'], newscore.data['grade']), 'clear_type': self.__max(oldscore.data['clear_type'], newscore.data['clear_type']), 'combo': self.__max(oldscore.data['combo'], newscore.data['combo']), 'stats': oldscore.data['stats'] if oldscore.points > newscore.points else newscore.data['stats'], }, ) def __merge_popn_score(self, version: int, oldscore: Score, newscore: Score) -> Score: return Score( -1, oldscore.id, oldscore.chart, self.__max(oldscore.points, newscore.points), self.__max(oldscore.timestamp, newscore.timestamp), self.__max(self.__max(oldscore.update, newscore.update), self.__max(oldscore.timestamp, newscore.timestamp)), oldscore.location, # Always propagate location from local setup if possible oldscore.plays + newscore.plays, { 'combo': self.__max(oldscore.data['combo'], newscore.data['combo']), 'medal': self.__max(oldscore.data['medal'], newscore.data['medal']), }, ) def __merge_reflec_score(self, version: int, oldscore: Score, newscore: Score) -> Score: return Score( -1, oldscore.id, oldscore.chart, self.__max(oldscore.points, newscore.points), self.__max(oldscore.timestamp, newscore.timestamp), self.__max(self.__max(oldscore.update, newscore.update), self.__max(oldscore.timestamp, newscore.timestamp)), oldscore.location, # Always propagate location from local setup if possible oldscore.plays + newscore.plays, { 'clear_type': self.__max(oldscore.data['clear_type'], newscore.data['clear_type']), 'combo_type': self.__max(oldscore.data['combo_type'], newscore.data['combo_type']), 'miss_count': self.__min(oldscore.data.get_int('miss_count', -1), newscore.data.get_int('miss_count', -1)), 'combo': self.__max(oldscore.data['combo'], newscore.data['combo']), 'achievement_rate': self.__max(oldscore.data['achievement_rate'], newscore.data['achievement_rate']), }, ) def __merge_sdvx_score(self, version: int, oldscore: Score, newscore: Score) -> Score: return Score( -1, oldscore.id, oldscore.chart, self.__max(oldscore.points, newscore.points), self.__max(oldscore.timestamp, newscore.timestamp), self.__max(self.__max(oldscore.update, newscore.update), self.__max(oldscore.timestamp, newscore.timestamp)), oldscore.location, # Always propagate location from local setup if possible oldscore.plays + newscore.plays, { 'grade': self.__max(oldscore.data['grade'], newscore.data['grade']), 'clear_type': self.__max(oldscore.data['clear_type'], newscore.data['clear_type']), 'combo': self.__max(oldscore.data.get_int('combo', 1), newscore.data.get_int('combo', -1)), 'stats': oldscore.data['stats'] if oldscore.points > newscore.points else newscore.data['stats'], }, ) def __merge_score(self, game: GameConstants, version: int, oldscore: Score, newscore: Score) -> Score: if oldscore.id != newscore.id or oldscore.chart != newscore.chart: raise Exception('Logic error! Tried to merge scores from different song/charts!') if game == GameConstants.DDR: return self.__merge_ddr_score(version, oldscore, newscore) if game == GameConstants.IIDX: return self.__merge_iidx_score(version, oldscore, newscore) if game == GameConstants.JUBEAT: return self.__merge_jubeat_score(version, oldscore, newscore) if game == GameConstants.MUSECA: return self.__merge_museca_score(version, oldscore, newscore) if game == GameConstants.POPN_MUSIC: return self.__merge_popn_score(version, oldscore, newscore) if game == GameConstants.REFLEC_BEAT: return self.__merge_reflec_score(version, oldscore, newscore) if game == GameConstants.SDVX: return self.__merge_sdvx_score(version, oldscore, newscore) return oldscore def get_score(self, game: GameConstants, version: int, userid: UserID, songid: int, songchart: int) -> Optional[Score]: # Helper function so we can iterate over all servers for a single card def get_scores_for_card(cardid: str) -> List[Score]: return Parallel.flatten(Parallel.call( [client.get_records for client in self.clients], game, version, APIConstants.ID_TYPE_INSTANCE, [songid, songchart, cardid], )) relevant_cards = self.__get_cardids(userid) if RemoteUser.is_remote(userid): # No need to look up local score for this user scores = Parallel.flatten(Parallel.map( get_scores_for_card, relevant_cards, )) localscore = None else: localscore, scores = Parallel.execute([ lambda: self.music.get_score(game, version, userid, songid, songchart), lambda: Parallel.flatten(Parallel.map( get_scores_for_card, relevant_cards, )), ]) topscore = localscore for score in scores: if int(score['song']) != songid: continue if int(score['chart']) != songchart: continue newscore = self.__format_score(game, version, songid, songchart, score) if topscore is None: # No merging needed topscore = newscore continue topscore = self.__merge_score(game, version, topscore, newscore) return topscore def get_scores( self, game: GameConstants, version: int, userid: UserID, since: Optional[int]=None, until: Optional[int]=None, ) -> List[Score]: relevant_cards = self.__get_cardids(userid) if RemoteUser.is_remote(userid): # No need to look up local score for this user scores = Parallel.flatten(Parallel.call( [client.get_records for client in self.clients], game, version, APIConstants.ID_TYPE_CARD, relevant_cards, since, until, )) localscores: List[Score] = [] else: localscores, scores = Parallel.execute([ lambda: self.music.get_scores(game, version, userid, since, until), lambda: Parallel.flatten(Parallel.call( [client.get_records for client in self.clients], game, version, APIConstants.ID_TYPE_CARD, relevant_cards, since, until, )), ]) allscores: Dict[int, Dict[int, Score]] = {} def add_score(score: Score) -> None: if score.id not in allscores: allscores[score.id] = {} allscores[score.id][score.chart] = score def get_score(songid: int, songchart: int) -> Optional[Score]: return allscores.get(songid, {}).get(songchart) # First, seed with local scores for score in localscores: add_score(score) # Second, merge in remote scorse for remotescore in scores: songid = int(remotescore['song']) chart = int(remotescore['chart']) newscore = self.__format_score(game, version, songid, chart, remotescore) oldscore = get_score(songid, chart) if oldscore is None: add_score(newscore) else: add_score(self.__merge_score(game, version, oldscore, newscore)) # Finally, flatten and return finalscores: List[Score] = [] for songid in allscores: for chart in allscores[songid]: finalscores.append(allscores[songid][chart]) return finalscores def __merge_global_scores( self, game: GameConstants, version: int, localcards: List[Tuple[str, UserID]], localscores: List[Tuple[UserID, Score]], remotescores: List[Dict[str, Any]], ) -> List[Tuple[UserID, Score]]: card_to_id = {cardid: userid for (cardid, userid) in localcards} allscores: Dict[UserID, Dict[int, Dict[int, Score]]] = {} def add_score(userid: UserID, score: Score) -> None: if userid not in allscores: allscores[userid] = {} if score.id not in allscores[userid]: allscores[userid][score.id] = {} allscores[userid][score.id][score.chart] = score def get_score(userid: UserID, songid: int, songchart: int) -> Optional[Score]: return allscores.get(userid, {}).get(songid, {}).get(songchart) # First, seed with local scores for (userid, score) in localscores: add_score(userid, score) # Second, merge in remote scorse for remotescore in remotescores: # Figure out the userid of this score cardids = sorted([card.upper() for card in remotescore.get('cards', [])]) if len(cardids) == 0: continue for cardid in cardids: if cardid in card_to_id: userid = card_to_id[cardid] break else: userid = RemoteUser.card_to_userid(cardids[0]) songid = int(remotescore['song']) chart = int(remotescore['chart']) newscore = self.__format_score(game, version, songid, chart, remotescore) oldscore = get_score(userid, songid, chart) if oldscore is None: add_score(userid, newscore) else: add_score(userid, self.__merge_score(game, version, oldscore, newscore)) # Finally, flatten and return finalscores: List[Tuple[UserID, Score]] = [] for userid in allscores: for songid in allscores[userid]: for chart in allscores[userid][songid]: finalscores.append((userid, allscores[userid][songid][chart])) return finalscores def get_all_scores( self, game: GameConstants, version: Optional[int]=None, userid: Optional[UserID]=None, songid: Optional[int]=None, songchart: Optional[int]=None, since: Optional[int]=None, until: Optional[int]=None, ) -> List[Tuple[UserID, Score]]: # First, pass off to local-only if this was called with parameters we don't support if ( version is None or userid is not None or songid is None ): return self.music.get_all_scores(game, version, userid, songid, songchart, since, until) # Now, figure out the request key based on passed in parameters if songchart is None: songkey = [songid] else: songkey = [songid, songchart] # Now, fetch all the scores remotely and locally localcards, localscores, remotescores = Parallel.execute([ self.user.get_all_cards, lambda: self.music.get_all_scores(game, version, userid, songid, songchart, since, until), lambda: Parallel.flatten(Parallel.call( [client.get_records for client in self.clients], game, version, APIConstants.ID_TYPE_SONG, songkey, since, until, )), ]) return self.__merge_global_scores(game, version, localcards, localscores, remotescores) def __merge_global_records( self, game: GameConstants, version: int, localcards: List[Tuple[str, UserID]], localscores: List[Tuple[UserID, Score]], remotescores: List[Dict[str, Any]], ) -> List[Tuple[UserID, Score]]: card_to_id = {cardid: userid for (cardid, userid) in localcards} allscores: Dict[int, Dict[int, Tuple[UserID, Score]]] = {} def add_score(userid: UserID, score: Score) -> None: if score.id not in allscores: allscores[score.id] = {} allscores[score.id][score.chart] = (userid, score) def get_score(songid: int, songchart: int) -> Tuple[Optional[UserID], Optional[Score]]: return allscores.get(songid, {}).get(songchart, (None, None)) # First, seed with local records for (userid, score) in localscores: add_score(userid, score) # Second, merge in remote records for remotescore in remotescores: # Figure out the userid of this score cardids = sorted([card.upper() for card in remotescore.get('cards', [])]) if len(cardids) == 0: continue for cardid in cardids: if cardid in card_to_id: userid = card_to_id[cardid] break else: userid = RemoteUser.card_to_userid(cardids[0]) songid = int(remotescore['song']) chart = int(remotescore['chart']) newscore = self.__format_score(game, version, songid, chart, remotescore) oldid, oldscore = get_score(songid, chart) if oldscore is None: add_score(userid, newscore) else: # if IDs are the same then we should merge them if oldid == userid: add_score(userid, self.__merge_score(game, version, oldscore, newscore)) else: # if the IDs are different we need to check which score actually belongs if newscore.points > oldscore.points: add_score(userid, newscore) # Finally, flatten and return finalscores: List[Tuple[UserID, Score]] = [] for songid in allscores: for chart in allscores[songid]: finalscores.append((allscores[songid][chart][0], allscores[songid][chart][1])) return finalscores def get_all_records( self, game: GameConstants, version: Optional[int]=None, userlist: Optional[List[UserID]]=None, locationlist: Optional[List[int]]=None, ) -> List[Tuple[UserID, Score]]: # First, pass off to local-only if this was called with parameters we don't support if ( version is None or userlist is not None or locationlist is not None ): return self.music.get_all_records(game, version, userlist, locationlist) # Now, fetch all records remotely and locally localcards, localscores, remotescores = Parallel.execute([ self.user.get_all_cards, lambda: self.music.get_all_records(game, version, userlist, locationlist), lambda: Parallel.flatten(Parallel.call( [client.get_records for client in self.clients], game, version, APIConstants.ID_TYPE_SERVER, [], )), ]) return self.__merge_global_records(game, version, localcards, localscores, remotescores) def get_clear_rates( self, game: GameConstants, version: int, songid: Optional[int]=None, songchart: Optional[int]=None, ) -> Dict[int, Dict[int, Dict[str, int]]]: """ Given an optional songid, or optional songid and songchart, looks up clear rates in remote servers that are connected to us. If neither id or chart is given, looks up global clear rates. If songid is given, looks up clear rates for each chart for the song. If songid and chart is given, looks up clear rates for that song/chart. Returns a dictionary keyed by songid, whos values are a dictionary keyed by chart, whos values are a dictionary containing integer counts keyed by 'plays', 'clears', and 'combos'. An example is as follows: { musicid: { chart: { plays: total plays, clears: total clears, combos: total full combos, }, }, } """ if songid is None and songchart is None: statistics = Parallel.flatten(Parallel.call( [client.get_statistics for client in self.clients], game, version, APIConstants.ID_TYPE_SERVER, [], )) elif songid is not None: if songchart is None: ids = [songid] else: ids = [songid, songchart] statistics = Parallel.flatten(Parallel.call( [client.get_statistics for client in self.clients], game, version, APIConstants.ID_TYPE_SONG, ids, )) else: statistics = [] retval: Dict[int, Dict[int, Dict[str, int]]] = {} for stat in statistics: songid = stat.get('song') songchart = stat.get('chart') if songid is None or songchart is None: continue songid = int(songid) songchart = int(songchart) if songid not in retval: retval[songid] = {} if songchart not in retval[songid]: retval[songid][songchart] = { 'plays': 0, 'clears': 0, 'combos': 0, } def get_val(v: str) -> int: out = stat.get(v, -1) if out < 0: out = 0 return out retval[songid][songchart]['plays'] += get_val('plays') retval[songid][songchart]['clears'] += get_val('clears') retval[songid][songchart]['combos'] += get_val('combos') return retval def __format_ddr_song( self, version: int, songid: int, songchart: int, name: Optional[str], artist: Optional[str], genre: Optional[str], data: Dict[str, Any], ) -> Song: return Song( game=GameConstants.DDR, version=version, songid=songid, songchart=songchart, name=name, artist=artist, genre=genre, data={ 'groove': { 'air': int(data['groove']['air']), 'chaos': int(data['groove']['chaos']), 'freeze': int(data['groove']['freeze']), 'stream': int(data['groove']['stream']), 'voltage': int(data['groove']['voltage']), }, 'bpm_min': int(data['bpm_min']), 'bpm_max': int(data['bpm_max']), 'category': int(data['category']), 'difficulty': int(data['difficulty']), 'edit_id': int(data['editid']), }, ) def __format_iidx_song( self, version: int, songid: int, songchart: int, name: Optional[str], artist: Optional[str], genre: Optional[str], data: Dict[str, Any], ) -> Song: return Song( game=GameConstants.IIDX, version=version, songid=songid, songchart=songchart, name=name, artist=artist, genre=genre, data={ 'bpm_min': int(data['bpm_min']), 'bpm_max': int(data['bpm_max']), 'notecount': int(data['notecount']), 'difficulty': int(data['difficulty']), }, ) def __format_jubeat_song( self, version: int, songid: int, songchart: int, name: Optional[str], artist: Optional[str], genre: Optional[str], data: Dict[str, Any], ) -> Song: return Song( game=GameConstants.JUBEAT, version=version, songid=songid, songchart=songchart, name=name, artist=artist, genre=genre, data={ 'bpm_min': int(data['bpm_min']), 'bpm_max': int(data['bpm_max']), 'difficulty': int(data['difficulty']), 'version': int(data.get('category', int(songid / 10000000))), }, ) def __format_museca_song( self, version: int, songid: int, songchart: int, name: Optional[str], artist: Optional[str], genre: Optional[str], data: Dict[str, Any], ) -> Song: return Song( game=GameConstants.MUSECA, version=version, songid=songid, songchart=songchart, name=name, artist=artist, genre=genre, data={ 'bpm_min': int(data['bpm_min']), 'bpm_max': int(data['bpm_max']), 'limited': int(data['limited']), 'difficulty': int(data['difficulty']), }, ) def __format_popn_song( self, version: int, songid: int, songchart: int, name: Optional[str], artist: Optional[str], genre: Optional[str], data: Dict[str, Any], ) -> Song: return Song( game=GameConstants.POPN_MUSIC, version=version, songid=songid, songchart=songchart, name=name, artist=artist, genre=genre, data={ 'difficulty': int(data['difficulty']), 'category': str(data['category']), }, ) def __format_reflec_song( self, version: int, songid: int, songchart: int, name: Optional[str], artist: Optional[str], genre: Optional[str], data: Dict[str, Any], ) -> Song: return Song( game=GameConstants.REFLEC_BEAT, version=version, songid=songid, songchart=songchart, name=name, artist=artist, genre=genre, data={ 'difficulty': int(data['difficulty']), 'folder': int(data['category']), 'chart_id': str(data['musicid']), }, ) def __format_sdvx_song( self, version: int, songid: int, songchart: int, name: Optional[str], artist: Optional[str], genre: Optional[str], data: Dict[str, Any], ) -> Song: return Song( game=GameConstants.SDVX, version=version, songid=songid, songchart=songchart, name=name, artist=artist, genre=genre, data={ 'bpm_min': int(data['bpm_min']), 'bpm_max': int(data['bpm_max']), 'limited': int(data['limited']), 'difficulty': int(data['difficulty']), }, ) def __format_song( self, game: GameConstants, version: int, songid: int, songchart: int, name: Optional[str], artist: Optional[str], genre: Optional[str], data: Dict[str, Any], ) -> Optional[Song]: if game == GameConstants.DDR: return self.__format_ddr_song(version, songid, songchart, name, artist, genre, data) if game == GameConstants.IIDX: return self.__format_iidx_song(version, songid, songchart, name, artist, genre, data) if game == GameConstants.JUBEAT: return self.__format_jubeat_song(version, songid, songchart, name, artist, genre, data) if game == GameConstants.MUSECA: return self.__format_museca_song(version, songid, songchart, name, artist, genre, data) if game == GameConstants.POPN_MUSIC: return self.__format_popn_song(version, songid, songchart, name, artist, genre, data) if game == GameConstants.REFLEC_BEAT: return self.__format_reflec_song(version, songid, songchart, name, artist, genre, data) if game == GameConstants.SDVX: return self.__format_sdvx_song(version, songid, songchart, name, artist, genre, data) return None def get_all_songs( self, game: GameConstants, version: Optional[int]=None, ) -> List[Song]: """ Given a game and a version, look up all song/chart combos associated with that game. Parameters: game - Enum value representing a game series. version - Integer representing which version of the game. Returns: A list of Song objects detailing the song information for each song. """ if version is None: # We could do a ton of work to support this by iterating over all versions # and combining, but this isn't going to be used in that manner, so lets # skip that for now. return [] catalogs: List[Dict[str, List[Dict[str, Any]]]] = Parallel.call( [client.get_catalog for client in self.clients], game, version ) retval: List[Song] = [] seen: Set[str] = set() for catalog in catalogs: for entry in catalog.get('songs', []): song = self.__format_song( game, version, int(entry['song']), int(entry['chart']), str(entry['title'] if entry['title'] is not None else "") or None, str(entry['artist'] if entry['artist'] is not None else "") or None, str(entry['genre'] if entry['genre'] is not None else "") or None, entry, ) if song is None: continue key = f"{song.id}_{song.chart}" if key in seen: continue retval.append(song) seen.add(key) return retval