1
0
mirror of synced 2024-12-03 01:57:18 +01:00
bemaniutils/bemani/backend/iidx/base.py
Subject38 b2edbdce24
Older IIDX routing refactor. (#51)
* Older IIDX routing refactor.
* game_to_db_chart() pattern added
2022-08-17 18:16:47 -04:00

854 lines
32 KiB
Python

# vim: set fileencoding=utf-8
import struct
from typing import Optional, Dict, Any, List, Tuple
from typing_extensions import Final
from bemani.backend.base import Base
from bemani.backend.core import CoreHandler, CardManagerHandler, PASELIHandler
from bemani.common import Profile, ValidatedDict, Model, GameConstants, DBConstants, Parallel
from bemani.data import Config, Data, Score, Machine, UserID
from bemani.protocol import Node
class IIDXBase(CoreHandler, CardManagerHandler, PASELIHandler, Base):
"""
Base game class for all Beatmania IIDX versions. Handles common functionality for
getting profiles based on refid, creating new profiles, looking up and saving
scores.
"""
game: GameConstants = GameConstants.IIDX
paseli_padding: int = 15
CLEAR_TYPE_SINGLE: Final[int] = 1
CLEAR_TYPE_DOUBLE: Final[int] = 2
CLEAR_STATUS_NO_PLAY: Final[int] = DBConstants.IIDX_CLEAR_STATUS_NO_PLAY
CLEAR_STATUS_FAILED: Final[int] = DBConstants.IIDX_CLEAR_STATUS_FAILED
CLEAR_STATUS_ASSIST_CLEAR: Final[int] = DBConstants.IIDX_CLEAR_STATUS_ASSIST_CLEAR
CLEAR_STATUS_EASY_CLEAR: Final[int] = DBConstants.IIDX_CLEAR_STATUS_EASY_CLEAR
CLEAR_STATUS_CLEAR: Final[int] = DBConstants.IIDX_CLEAR_STATUS_CLEAR
CLEAR_STATUS_HARD_CLEAR: Final[int] = DBConstants.IIDX_CLEAR_STATUS_HARD_CLEAR
CLEAR_STATUS_EX_HARD_CLEAR: Final[int] = DBConstants.IIDX_CLEAR_STATUS_EX_HARD_CLEAR
CLEAR_STATUS_FULL_COMBO: Final[int] = DBConstants.IIDX_CLEAR_STATUS_FULL_COMBO
CHART_TYPE_N7: Final[int] = 0
CHART_TYPE_H7: Final[int] = 1
CHART_TYPE_A7: Final[int] = 2
CHART_TYPE_N14: Final[int] = 3
CHART_TYPE_H14: Final[int] = 4
CHART_TYPE_A14: Final[int] = 5
# Beginner charts only save status
CHART_TYPE_B7: Final[int] = 6
DAN_RANK_7_KYU: Final[int] = DBConstants.IIDX_DAN_RANK_7_KYU
DAN_RANK_6_KYU: Final[int] = DBConstants.IIDX_DAN_RANK_6_KYU
DAN_RANK_5_KYU: Final[int] = DBConstants.IIDX_DAN_RANK_5_KYU
DAN_RANK_4_KYU: Final[int] = DBConstants.IIDX_DAN_RANK_4_KYU
DAN_RANK_3_KYU: Final[int] = DBConstants.IIDX_DAN_RANK_3_KYU
DAN_RANK_2_KYU: Final[int] = DBConstants.IIDX_DAN_RANK_2_KYU
DAN_RANK_1_KYU: Final[int] = DBConstants.IIDX_DAN_RANK_1_KYU
DAN_RANK_1_DAN: Final[int] = DBConstants.IIDX_DAN_RANK_1_DAN
DAN_RANK_2_DAN: Final[int] = DBConstants.IIDX_DAN_RANK_2_DAN
DAN_RANK_3_DAN: Final[int] = DBConstants.IIDX_DAN_RANK_3_DAN
DAN_RANK_4_DAN: Final[int] = DBConstants.IIDX_DAN_RANK_4_DAN
DAN_RANK_5_DAN: Final[int] = DBConstants.IIDX_DAN_RANK_5_DAN
DAN_RANK_6_DAN: Final[int] = DBConstants.IIDX_DAN_RANK_6_DAN
DAN_RANK_7_DAN: Final[int] = DBConstants.IIDX_DAN_RANK_7_DAN
DAN_RANK_8_DAN: Final[int] = DBConstants.IIDX_DAN_RANK_8_DAN
DAN_RANK_9_DAN: Final[int] = DBConstants.IIDX_DAN_RANK_9_DAN
DAN_RANK_10_DAN: Final[int] = DBConstants.IIDX_DAN_RANK_10_DAN
DAN_RANK_CHUDEN: Final[int] = DBConstants.IIDX_DAN_RANK_CHUDEN
DAN_RANK_KAIDEN: Final[int] = DBConstants.IIDX_DAN_RANK_KAIDEN
DAN_RANKING_SINGLE: Final[str] = 'sgrade'
DAN_RANKING_DOUBLE: Final[str] = 'dgrade'
GHOST_TYPE_NONE: Final[int] = 0
GHOST_TYPE_RIVAL: Final[int] = 100
GHOST_TYPE_GLOBAL_TOP: Final[int] = 200
GHOST_TYPE_GLOBAL_AVERAGE: Final[int] = 300
GHOST_TYPE_LOCAL_TOP: Final[int] = 400
GHOST_TYPE_LOCAL_AVERAGE: Final[int] = 500
GHOST_TYPE_DAN_TOP: Final[int] = 600
GHOST_TYPE_DAN_AVERAGE: Final[int] = 700
GHOST_TYPE_RIVAL_TOP: Final[int] = 800
GHOST_TYPE_RIVAL_AVERAGE: Final[int] = 900
# Return the local2 service so that Copula and above will send certain packets.
extra_services: List[str] = [
'local2',
]
def __init__(self, data: Data, config: Config, model: Model) -> None:
super().__init__(data, config, model)
if model.rev == 'X':
self.omnimix = True
else:
self.omnimix = False
@property
def music_version(self) -> int:
if self.omnimix:
return DBConstants.OMNIMIX_VERSION_BUMP + self.version
return self.version
def previous_version(self) -> Optional['IIDXBase']:
"""
Returns the previous version of the game, based on this game. Should
be overridden.
"""
return None
def format_profile(self, userid: UserID, profile: Profile) -> Node:
"""
Base handler for a profile. Given a userid and a profile dictionary,
return a Node representing a profile. Should be overridden.
"""
return Node.void('pc')
def unformat_profile(self, userid: UserID, request: Node, oldprofile: Profile) -> Profile:
"""
Base handler for profile parsing. Given a request and an old profile,
return a new profile that's been updated with the contents of the request.
Should be overridden.
"""
return oldprofile
def get_profile_by_refid(self, refid: Optional[str]) -> Optional[Node]:
"""
Given a RefID, return a formatted profile node. Basically every game
needs a profile lookup, even if it handles where that happens in
a different request. This is provided for code deduplication.
"""
if refid is None:
return None
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
if userid is None:
# User doesn't exist but should at this point
return None
# Trying to import from current version
profile = self.get_profile(userid)
if profile is None:
return None
return self.format_profile(userid, profile)
def new_profile_by_refid(self, refid: Optional[str], name: Optional[str], pid: Optional[int]) -> Profile:
"""
Given a RefID and an optional name, create a profile and then return
that newly created profile.
"""
if refid is None:
return None
if name is None:
name = 'なし'
if pid is None:
pid = self.get_machine_region()
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
profile = Profile(
self.game,
self.version,
refid,
0,
{
'name': name,
'pid': pid,
'settings': {
'flags': 223 # Default to turning on all optional folders
},
},
)
self.put_profile(userid, profile)
return profile
def put_profile_by_extid(self, extid: Optional[int], request: Node) -> None:
"""
Given an ExtID and a request node, unformat the profile and save it.
"""
userid = self.data.remote.user.from_extid(self.game, self.version, extid)
if userid is None:
return
oldprofile = self.get_profile(userid)
newprofile = self.unformat_profile(userid, request, oldprofile)
if newprofile is not None:
self.put_profile(userid, newprofile)
def get_machine_by_id(self, shop_id: int) -> Optional[Machine]:
pcbid = self.data.local.machine.from_machine_id(shop_id)
if pcbid is not None:
return self.data.local.machine.get_machine(pcbid)
else:
return None
def machine_joined_arcade(self) -> bool:
machine = self.data.local.machine.get_machine(self.config.machine.pcbid)
return machine.arcade is not None
def get_clear_rates(
self,
songid: Optional[int]=None,
songchart: Optional[int]=None,
) -> Dict[int, Dict[int, Dict[str, int]]]:
"""
Returns a dictionary similar to the following:
{
musicid: {
chart: {
total: total plays,
clears: total clears,
fcs: total full combos,
},
},
}
"""
all_attempts, remote_attempts = Parallel.execute([
lambda: self.data.local.music.get_all_attempts(
game=self.game,
version=self.music_version,
songid=songid,
songchart=songchart,
),
lambda: self.data.remote.music.get_clear_rates(
game=self.game,
version=self.music_version,
songid=songid,
songchart=songchart,
),
])
attempts: Dict[int, Dict[int, Dict[str, int]]] = {}
for (_, attempt) in all_attempts:
if attempt.data.get_int('clear_status') == self.CLEAR_STATUS_NO_PLAY:
# This attempt was outside of the clear infra, so don't bother with it.
continue
# Terrible temporary structure is terrible.
if attempt.id not in attempts:
attempts[attempt.id] = {}
if attempt.chart not in attempts[attempt.id]:
attempts[attempt.id][attempt.chart] = {
'total': 0,
'clears': 0,
'fcs': 0,
}
# We saw an attempt, keep the total attempts in sync.
attempts[attempt.id][attempt.chart]['total'] = attempts[attempt.id][attempt.chart]['total'] + 1
if attempt.data.get_int('clear_status', self.CLEAR_STATUS_FAILED) == self.CLEAR_STATUS_FAILED:
# This attempt was a failure, so don't count it against clears of full combos
continue
# It was at least a clear
attempts[attempt.id][attempt.chart]['clears'] = attempts[attempt.id][attempt.chart]['clears'] + 1
if attempt.data.get_int('clear_status') == self.CLEAR_STATUS_FULL_COMBO:
# This was a full combo clear, so it also counts here
attempts[attempt.id][attempt.chart]['fcs'] = attempts[attempt.id][attempt.chart]['fcs'] + 1
# Merge in remote attempts
for songid in remote_attempts:
if songid not in attempts:
attempts[songid] = {}
for songchart in remote_attempts[songid]:
if songchart not in attempts[songid]:
attempts[songid][songchart] = {
'total': 0,
'clears': 0,
'fcs': 0,
}
attempts[songid][songchart]['total'] += remote_attempts[songid][songchart]['plays']
attempts[songid][songchart]['clears'] += remote_attempts[songid][songchart]['clears']
attempts[songid][songchart]['fcs'] += remote_attempts[songid][songchart]['combos']
# If requesting a specific song/chart, make sure its in the dict
if songid is not None:
if songid not in attempts:
attempts[songid] = {}
if songchart is not None:
if songchart not in attempts[songid]:
attempts[songid][songchart] = {
'total': 0,
'clears': 0,
'fcs': 0,
}
return attempts
def update_score(
self,
userid: Optional[UserID],
songid: int,
chart: int,
clear_status: int,
pgreats: int,
greats: int,
miss_count: int,
ghost: Optional[bytes],
shop: Optional[int],
) -> None:
"""
Given various pieces of a score, update the user's high score and score
history in a controlled manner, so all games in IIDX series can expect
the same attributes in a score. Note that the medals passed here are
expected to be converted from game identifier to our internal identifier,
so that any game in the series may convert them back. In this way, a song
played on Pendual that exists in Tricoro will still have scores/medals
going back all versions.
"""
# Range check medals
if clear_status not in [
self.CLEAR_STATUS_NO_PLAY,
self.CLEAR_STATUS_FAILED,
self.CLEAR_STATUS_ASSIST_CLEAR,
self.CLEAR_STATUS_EASY_CLEAR,
self.CLEAR_STATUS_CLEAR,
self.CLEAR_STATUS_HARD_CLEAR,
self.CLEAR_STATUS_EX_HARD_CLEAR,
self.CLEAR_STATUS_FULL_COMBO,
]:
raise Exception(f"Invalid clear status value {clear_status}")
# Calculate ex score
ex_score = (2 * pgreats) + greats
if userid is not None:
if ghost is None:
raise Exception("Expected a ghost for user score save!")
oldscore = self.data.local.music.get_score(
self.game,
self.music_version,
userid,
songid,
chart,
)
else:
# Storing an anonymous attempt
if ghost is not None:
raise Exception("Expected no ghost for anonymous score save!")
oldscore = None
# Score history is verbatum, instead of highest score
history = ValidatedDict({
'clear_status': clear_status,
'miss_count': miss_count,
})
old_ex_score = ex_score
if ghost is not None:
history['ghost'] = ghost
if oldscore is None:
# If it is a new score, create a new dictionary to add to
scoredata = ValidatedDict({
'clear_status': clear_status,
'pgreats': pgreats,
'greats': greats,
})
if miss_count != -1:
scoredata.replace_int('miss_count', miss_count)
if ghost is not None:
scoredata['ghost'] = ghost
raised = True
highscore = True
else:
# Set the score to any new record achieved
raised = ex_score > oldscore.points
highscore = ex_score >= oldscore.points
ex_score = max(ex_score, oldscore.points)
scoredata = oldscore.data
scoredata.replace_int('clear_status', max(scoredata.get_int('clear_status'), clear_status))
if miss_count != -1:
if scoredata.get_int('miss_count', -1) == -1:
scoredata.replace_int('miss_count', miss_count)
else:
scoredata.replace_int('miss_count', min(scoredata.get_int('miss_count'), miss_count))
if raised:
scoredata.replace_int('pgreats', pgreats)
scoredata.replace_int('greats', greats)
if ghost is not None:
scoredata.replace_bytes('ghost', ghost)
if shop is not None:
history.replace_int('shop', shop)
scoredata.replace_int('shop', shop)
# Look up where this score was earned
lid = self.get_machine_id()
if userid is not None:
# Write the new score back
self.data.local.music.put_score(
self.game,
self.music_version,
userid,
songid,
chart,
lid,
ex_score,
scoredata,
highscore,
)
# Save the history of this score too
self.data.local.music.put_attempt(
self.game,
self.music_version,
userid,
songid,
chart,
lid,
old_ex_score,
history,
raised,
)
def update_rank(
self,
userid: UserID,
dantype: str,
rank: int,
percent: int,
cleared: bool,
stages_cleared: int,
) -> None:
# Range check type
if dantype not in [
self.DAN_RANKING_SINGLE,
self.DAN_RANKING_DOUBLE,
]:
raise Exception(f"Invalid dan rank type value {dantype}")
# Range check rank
if rank not in [
self.DAN_RANK_7_KYU,
self.DAN_RANK_6_KYU,
self.DAN_RANK_5_KYU,
self.DAN_RANK_4_KYU,
self.DAN_RANK_3_KYU,
self.DAN_RANK_2_KYU,
self.DAN_RANK_1_KYU,
self.DAN_RANK_1_DAN,
self.DAN_RANK_2_DAN,
self.DAN_RANK_3_DAN,
self.DAN_RANK_4_DAN,
self.DAN_RANK_5_DAN,
self.DAN_RANK_6_DAN,
self.DAN_RANK_7_DAN,
self.DAN_RANK_8_DAN,
self.DAN_RANK_9_DAN,
self.DAN_RANK_10_DAN,
self.DAN_RANK_CHUDEN,
self.DAN_RANK_KAIDEN,
]:
raise Exception(f"Invalid dan rank {rank}")
if cleared:
# Update profile if needed
profile = self.get_profile(userid)
if profile is None:
profile = Profile(
self.game,
self.version,
"",
0,
)
profile.replace_int(dantype, max(rank, profile.get_int(dantype, -1)))
self.put_profile(userid, profile)
# Update achievement to track pass rate
dan_score = self.data.local.user.get_achievement(
self.game,
self.version,
userid,
rank,
dantype,
)
if dan_score is None:
dan_score = ValidatedDict()
dan_score.replace_int('percent', max(percent, dan_score.get_int('percent')))
dan_score.replace_int('stages_cleared', max(stages_cleared, dan_score.get_int('stages_cleared')))
self.data.local.user.put_achievement(
self.game,
self.version,
userid,
rank,
dantype,
dan_score
)
def db_to_game_status(self, db_status: int) -> int:
"""
Given a DB status, translate to a game clear status.
"""
raise Exception('Implement in specific game class!')
def game_to_db_status(self, game_status: int) -> int:
"""
Given a game clear status, translate to DB status.
"""
raise Exception('Implement in specific game class!')
def game_to_db_chart(self, game_chart: int) -> int:
"""
Given a game's chart for a song, return the chart as defined above.
"""
raise Exception('Implement in sub-class!')
def db_to_game_chart(self, db_chart: int) -> int:
"""
Given a chart as defined above, return the game's chart constant.
"""
raise Exception('Implement in sub-class!')
def make_score_struct(self, scores: List[Score], cltype: int, index: int) -> List[List[int]]:
scorestruct: Dict[int, List[int]] = {}
for score in scores:
musicid = score.id
chart = score.chart
# Filter to only singles/doubles charts
if cltype == self.CLEAR_TYPE_SINGLE:
if chart not in [
self.CHART_TYPE_N7,
self.CHART_TYPE_H7,
self.CHART_TYPE_A7,
]:
continue
chartindex = {
self.CHART_TYPE_N7: 0,
self.CHART_TYPE_H7: 1,
self.CHART_TYPE_A7: 2,
}[chart]
if cltype == self.CLEAR_TYPE_DOUBLE:
if chart not in [
self.CHART_TYPE_N14,
self.CHART_TYPE_H14,
self.CHART_TYPE_A14,
]:
continue
chartindex = {
self.CHART_TYPE_N14: 0,
self.CHART_TYPE_H14: 1,
self.CHART_TYPE_A14: 2,
}[chart]
if musicid not in scorestruct:
scorestruct[musicid] = [
index, # -1 is our scores, positive is rival index
musicid, # Music ID!
0, # Normal status,
0, # Hyper status,
0, # Another status,
0, # EX score normal,
0, # EX score hyper,
0, # EX score another,
-1, # Miss count normal,
-1, # Miss count hyper,
-1, # Miss count another,
]
scorestruct[musicid][chartindex + 2] = self.db_to_game_status(score.data.get_int('clear_status'))
scorestruct[musicid][chartindex + 5] = score.points
scorestruct[musicid][chartindex + 8] = score.data.get_int('miss_count', -1)
return [scorestruct[s] for s in scorestruct]
def make_beginner_struct(self, scores: List[Score]) -> List[List[int]]:
scorelist: List[List[int]] = []
for score in scores:
musicid = score.id
chart = score.chart
# Filter to only beginner charts
if chart != self.CHART_TYPE_B7:
continue
scorelist.append([
musicid,
self.db_to_game_status(score.data.get_int('clear_status')),
])
return scorelist
def delta_score(
self,
scores: List[Score],
ghost_length: int,
) -> Tuple[Optional[int], Optional[bytes]]:
if len(scores) == 0:
return None, None
total_ghost = [0] * ghost_length
count = 0
# Sum up for each bucket
for score in scores:
ghost = score.data.get_bytes('ghost')
for i in range(len(ghost)):
total_ghost[i] = total_ghost[i] + ghost[i]
count = count + 1
# Calculate average for each bucket
total_ghost = [int(b / count) for b in total_ghost]
# Grab the ex score for this new ghost, being sure to reverse the scaling rate
new_ex_score = sum(total_ghost)
# Spread out into even buckets so we can compute deltas
reference_ghost = [int(new_ex_score / ghost_length)] * ghost_length
added_bucket = 0
try:
jump = max(1, int(ghost_length / (new_ex_score - sum(reference_ghost))))
except ZeroDivisionError:
jump = 1
while sum(reference_ghost) != new_ex_score:
reference_ghost[added_bucket] = reference_ghost[added_bucket] + 1
added_bucket = added_bucket + jump
# Calculate delta ghost
delta_ghost = [total_ghost[i] - reference_ghost[i] for i in range(ghost_length)]
# Return averages
return new_ex_score, struct.pack('b' * ghost_length, *delta_ghost)
def user_joined_arcade(self, machine: Machine, profile: Optional[Profile]) -> bool:
if profile is None:
return False
if 'shop_location' not in profile:
return False
machineid = profile.get_int('shop_location')
if machineid == machine.id:
# We can short-circuit arcade lookup because their machine
# is the current machine.
return True
their_machine = self.get_machine_by_id(machineid)
if their_machine is None:
return False
# The machine they joined matches the arcade of the current machine
return their_machine.arcade == machine.arcade
def get_ghost(
self,
ghost_type: int,
parameter: str,
ghost_length: int,
musicid: int,
chart: int,
userid: UserID,
) -> Optional[Dict[str, Any]]:
ghost_score: Dict[str, Any] = None
if ghost_type == self.GHOST_TYPE_RIVAL:
rival_extid = int(parameter)
rival_userid = self.data.remote.user.from_extid(self.game, self.version, rival_extid)
if rival_userid is not None:
rival_profile = self.get_profile(rival_userid)
rival_score = self.data.remote.music.get_score(self.game, self.music_version, rival_userid, musicid, chart)
if rival_score is not None and rival_profile is not None:
ghost_score = {
'score': rival_score.points,
'ghost': rival_score.data.get_bytes('ghost'),
'name': rival_profile.get_str('name'),
'pid': rival_profile.get_int('pid'),
}
if (
ghost_type == self.GHOST_TYPE_GLOBAL_TOP or
ghost_type == self.GHOST_TYPE_LOCAL_TOP or
ghost_type == self.GHOST_TYPE_GLOBAL_AVERAGE or
ghost_type == self.GHOST_TYPE_LOCAL_AVERAGE
):
if (
ghost_type == self.GHOST_TYPE_LOCAL_TOP or
ghost_type == self.GHOST_TYPE_LOCAL_AVERAGE
):
all_scores = sorted(
self.data.local.music.get_all_scores(game=self.game, version=self.music_version, songid=musicid, songchart=chart),
key=lambda s: s[1].points,
reverse=True,
)
# Figure out what arcade this user joined and filter scores by
# other users who have also joined that arcade.
my_profile = self.get_profile(userid)
if my_profile is None:
my_profile = Profile(
self.game,
self.version,
"",
0,
)
if 'shop_location' in my_profile:
shop_id = my_profile.get_int('shop_location')
machine = self.get_machine_by_id(shop_id)
else:
machine = None
if machine is not None:
all_scores = [
score for score in all_scores
if self.user_joined_arcade(machine, self.get_any_profile(score[0]))
]
else:
# Not joined an arcade, so nobody matches our scores
all_scores = []
else:
all_scores = sorted(
self.data.remote.music.get_all_scores(game=self.game, version=self.music_version, songid=musicid, songchart=chart),
key=lambda s: s[1].points,
reverse=True,
)
if (
ghost_type == self.GHOST_TYPE_GLOBAL_TOP or
ghost_type == self.GHOST_TYPE_LOCAL_TOP
):
for potential_top in all_scores:
top_userid = potential_top[0]
top_score = potential_top[1]
top_profile = self.get_any_profile(top_userid)
if top_profile is not None:
ghost_score = {
'score': top_score.points,
'ghost': top_score.data.get_bytes('ghost'),
'name': top_profile.get_str('name'),
'pid': top_profile.get_int('pid'),
'extid': top_profile.extid,
}
break
if (
ghost_type == self.GHOST_TYPE_GLOBAL_AVERAGE or
ghost_type == self.GHOST_TYPE_LOCAL_AVERAGE
):
average_score, delta_ghost = self.delta_score([score[1] for score in all_scores], ghost_length)
if average_score is not None and delta_ghost is not None:
ghost_score = {
'score': average_score,
'ghost': bytes([0] * ghost_length),
}
if (
ghost_type == self.GHOST_TYPE_DAN_TOP or
ghost_type == self.GHOST_TYPE_DAN_AVERAGE
):
is_dp = chart not in [
self.CHART_TYPE_N7,
self.CHART_TYPE_H7,
self.CHART_TYPE_A7,
]
my_profile = self.get_profile(userid)
if my_profile is None:
my_profile = Profile(
self.game,
self.version,
"",
0,
)
if is_dp:
dan_rank = my_profile.get_int(self.DAN_RANKING_DOUBLE, -1)
else:
dan_rank = my_profile.get_int(self.DAN_RANKING_SINGLE, -1)
if dan_rank != -1:
all_scores = sorted(
self.data.local.music.get_all_scores(game=self.game, version=self.music_version, songid=musicid, songchart=chart),
key=lambda s: s[1].points,
reverse=True,
)
all_profiles = self.data.local.user.get_all_profiles(self.game, self.version)
relevant_userids = {
profile[0] for profile in all_profiles
if profile[1].get_int(self.DAN_RANKING_DOUBLE if is_dp else self.DAN_RANKING_SINGLE) == dan_rank
}
relevant_scores = [
score for score in all_scores
if score[0] in relevant_userids
]
if ghost_type == self.GHOST_TYPE_DAN_TOP:
for potential_top in relevant_scores:
top_userid = potential_top[0]
top_score = potential_top[1]
top_profile = self.get_any_profile(top_userid)
if top_profile is not None:
ghost_score = {
'score': top_score.points,
'ghost': top_score.data.get_bytes('ghost'),
'name': top_profile.get_str('name'),
'pid': top_profile.get_int('pid'),
'extid': top_profile.extid,
}
break
if ghost_type == self.GHOST_TYPE_DAN_AVERAGE:
average_score, delta_ghost = self.delta_score([score[1] for score in relevant_scores], ghost_length)
if average_score is not None and delta_ghost is not None:
ghost_score = {
'score': average_score,
'ghost': bytes([0] * ghost_length),
}
if (
ghost_type == self.GHOST_TYPE_RIVAL_TOP or
ghost_type == self.GHOST_TYPE_RIVAL_AVERAGE
):
rival_extids = [int(e[1:-1]) for e in parameter.split(',')]
rival_userids = [
self.data.remote.user.from_extid(self.game, self.version, rival_extid)
for rival_extid in rival_extids
]
all_scores = sorted(
[
score for score in
self.data.remote.music.get_all_scores(game=self.game, version=self.music_version, songid=musicid, songchart=chart)
if score[0] in rival_userids
],
key=lambda s: s[1].points,
reverse=True,
)
if ghost_type == self.GHOST_TYPE_RIVAL_TOP:
for potential_top in all_scores:
top_userid = potential_top[0]
top_score = potential_top[1]
top_profile = self.get_any_profile(top_userid)
if top_profile is not None:
ghost_score = {
'score': top_score.points,
'ghost': top_score.data.get_bytes('ghost'),
'name': top_profile.get_str('name'),
'pid': top_profile.get_int('pid'),
'extid': top_profile.extid,
}
break
if ghost_type == self.GHOST_TYPE_RIVAL_AVERAGE:
average_score, delta_ghost = self.delta_score([score[1] for score in all_scores], ghost_length)
if average_score is not None and delta_ghost is not None:
ghost_score = {
'score': average_score,
'ghost': bytes([0] * ghost_length),
}
return ghost_score