824 lines
30 KiB
Python
824 lines
30 KiB
Python
# vim: set fileencoding=utf-8
|
|
import struct
|
|
from typing import Optional, Dict, Any, List, Tuple
|
|
|
|
from bemani.backend.base import Base
|
|
from bemani.backend.core import CoreHandler, CardManagerHandler, PASELIHandler
|
|
from bemani.common import ValidatedDict, Model, GameConstants, DBConstants, Parallel
|
|
from bemani.data import Data, Score, Machine, UserID
|
|
from bemani.protocol import Node
|
|
|
|
|
|
class IIDXBase(CoreHandler, CardManagerHandler, PASELIHandler, Base):
|
|
"""
|
|
Base game class for all Beatmania IIDX versions. Handles common functionality for
|
|
getting profiles based on refid, creating new profiles, looking up and saving
|
|
scores.
|
|
"""
|
|
|
|
game = GameConstants.IIDX
|
|
|
|
paseli_padding = 15
|
|
|
|
CLEAR_TYPE_SINGLE = 1
|
|
CLEAR_TYPE_DOUBLE = 2
|
|
|
|
CLEAR_STATUS_NO_PLAY = DBConstants.IIDX_CLEAR_STATUS_NO_PLAY
|
|
CLEAR_STATUS_FAILED = DBConstants.IIDX_CLEAR_STATUS_FAILED
|
|
CLEAR_STATUS_ASSIST_CLEAR = DBConstants.IIDX_CLEAR_STATUS_ASSIST_CLEAR
|
|
CLEAR_STATUS_EASY_CLEAR = DBConstants.IIDX_CLEAR_STATUS_EASY_CLEAR
|
|
CLEAR_STATUS_CLEAR = DBConstants.IIDX_CLEAR_STATUS_CLEAR
|
|
CLEAR_STATUS_HARD_CLEAR = DBConstants.IIDX_CLEAR_STATUS_HARD_CLEAR
|
|
CLEAR_STATUS_EX_HARD_CLEAR = DBConstants.IIDX_CLEAR_STATUS_EX_HARD_CLEAR
|
|
CLEAR_STATUS_FULL_COMBO = DBConstants.IIDX_CLEAR_STATUS_FULL_COMBO
|
|
|
|
CHART_TYPE_N7 = 0
|
|
CHART_TYPE_H7 = 1
|
|
CHART_TYPE_A7 = 2
|
|
CHART_TYPE_N14 = 3
|
|
CHART_TYPE_H14 = 4
|
|
CHART_TYPE_A14 = 5
|
|
# Beginner charts only save status
|
|
CHART_TYPE_B7 = 6
|
|
|
|
DAN_RANK_7_KYU = DBConstants.IIDX_DAN_RANK_7_KYU
|
|
DAN_RANK_6_KYU = DBConstants.IIDX_DAN_RANK_6_KYU
|
|
DAN_RANK_5_KYU = DBConstants.IIDX_DAN_RANK_5_KYU
|
|
DAN_RANK_4_KYU = DBConstants.IIDX_DAN_RANK_4_KYU
|
|
DAN_RANK_3_KYU = DBConstants.IIDX_DAN_RANK_3_KYU
|
|
DAN_RANK_2_KYU = DBConstants.IIDX_DAN_RANK_2_KYU
|
|
DAN_RANK_1_KYU = DBConstants.IIDX_DAN_RANK_1_KYU
|
|
DAN_RANK_1_DAN = DBConstants.IIDX_DAN_RANK_1_DAN
|
|
DAN_RANK_2_DAN = DBConstants.IIDX_DAN_RANK_2_DAN
|
|
DAN_RANK_3_DAN = DBConstants.IIDX_DAN_RANK_3_DAN
|
|
DAN_RANK_4_DAN = DBConstants.IIDX_DAN_RANK_4_DAN
|
|
DAN_RANK_5_DAN = DBConstants.IIDX_DAN_RANK_5_DAN
|
|
DAN_RANK_6_DAN = DBConstants.IIDX_DAN_RANK_6_DAN
|
|
DAN_RANK_7_DAN = DBConstants.IIDX_DAN_RANK_7_DAN
|
|
DAN_RANK_8_DAN = DBConstants.IIDX_DAN_RANK_8_DAN
|
|
DAN_RANK_9_DAN = DBConstants.IIDX_DAN_RANK_9_DAN
|
|
DAN_RANK_10_DAN = DBConstants.IIDX_DAN_RANK_10_DAN
|
|
DAN_RANK_CHUDEN = DBConstants.IIDX_DAN_RANK_CHUDEN
|
|
DAN_RANK_KAIDEN = DBConstants.IIDX_DAN_RANK_KAIDEN
|
|
|
|
DAN_RANKING_SINGLE = 'sgrade'
|
|
DAN_RANKING_DOUBLE = 'dgrade'
|
|
|
|
GHOST_TYPE_NONE = 0
|
|
GHOST_TYPE_RIVAL = 100
|
|
GHOST_TYPE_GLOBAL_TOP = 200
|
|
GHOST_TYPE_GLOBAL_AVERAGE = 300
|
|
GHOST_TYPE_LOCAL_TOP = 400
|
|
GHOST_TYPE_LOCAL_AVERAGE = 500
|
|
GHOST_TYPE_DAN_TOP = 600
|
|
GHOST_TYPE_DAN_AVERAGE = 700
|
|
GHOST_TYPE_RIVAL_TOP = 800
|
|
GHOST_TYPE_RIVAL_AVERAGE = 900
|
|
|
|
def __init__(self, data: Data, config: Dict[str, Any], model: Model) -> None:
|
|
super().__init__(data, config, model)
|
|
if model.rev == 'X':
|
|
self.omnimix = True
|
|
else:
|
|
self.omnimix = False
|
|
|
|
@property
|
|
def music_version(self) -> int:
|
|
if self.omnimix:
|
|
return DBConstants.OMNIMIX_VERSION_BUMP + self.version
|
|
return self.version
|
|
|
|
def previous_version(self) -> Optional['IIDXBase']:
|
|
"""
|
|
Returns the previous version of the game, based on this game. Should
|
|
be overridden.
|
|
"""
|
|
return None
|
|
|
|
def extra_services(self) -> List[str]:
|
|
"""
|
|
Return the local2 service so that Copula and above will send certain packets.
|
|
"""
|
|
return [
|
|
'local2',
|
|
]
|
|
|
|
def format_profile(self, userid: UserID, profile: ValidatedDict) -> Node:
|
|
"""
|
|
Base handler for a profile. Given a userid and a profile dictionary,
|
|
return a Node representing a profile. Should be overridden.
|
|
"""
|
|
return Node.void('pc')
|
|
|
|
def unformat_profile(self, userid: UserID, request: Node, oldprofile: ValidatedDict) -> ValidatedDict:
|
|
"""
|
|
Base handler for profile parsing. Given a request and an old profile,
|
|
return a new profile that's been updated with the contents of the request.
|
|
Should be overridden.
|
|
"""
|
|
return oldprofile
|
|
|
|
def get_profile_by_refid(self, refid: Optional[str]) -> Optional[Node]:
|
|
"""
|
|
Given a RefID, return a formatted profile node. Basically every game
|
|
needs a profile lookup, even if it handles where that happens in
|
|
a different request. This is provided for code deduplication.
|
|
"""
|
|
if refid is None:
|
|
return None
|
|
|
|
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
|
|
if userid is None:
|
|
# User doesn't exist but should at this point
|
|
return None
|
|
|
|
# Trying to import from current version
|
|
profile = self.get_profile(userid)
|
|
if profile is None:
|
|
return None
|
|
return self.format_profile(userid, profile)
|
|
|
|
def new_profile_by_refid(self, refid: Optional[str], name: Optional[str], pid: Optional[int]) -> ValidatedDict:
|
|
"""
|
|
Given a RefID and an optional name, create a profile and then return
|
|
that newly created profile.
|
|
"""
|
|
if refid is None:
|
|
return None
|
|
|
|
if name is None:
|
|
name = 'なし'
|
|
if pid is None:
|
|
pid = 51
|
|
|
|
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
|
|
defaultprofile = ValidatedDict({
|
|
'name': name,
|
|
'pid': pid,
|
|
'settings': {
|
|
'flags': 223 # Default to turning on all optional folders
|
|
},
|
|
})
|
|
self.put_profile(userid, defaultprofile)
|
|
profile = self.get_profile(userid)
|
|
return profile
|
|
|
|
def put_profile_by_extid(self, extid: Optional[int], request: Node) -> None:
|
|
"""
|
|
Given an ExtID and a request node, unformat the profile and save it.
|
|
"""
|
|
userid = self.data.remote.user.from_extid(self.game, self.version, extid)
|
|
if userid is None:
|
|
return
|
|
|
|
oldprofile = self.get_profile(userid)
|
|
newprofile = self.unformat_profile(userid, request, oldprofile)
|
|
if newprofile is not None:
|
|
self.put_profile(userid, newprofile)
|
|
|
|
def get_machine_by_id(self, shop_id: int) -> Optional[Machine]:
|
|
pcbid = self.data.local.machine.from_machine_id(shop_id)
|
|
if pcbid is not None:
|
|
return self.data.local.machine.get_machine(pcbid)
|
|
else:
|
|
return None
|
|
|
|
def machine_joined_arcade(self) -> bool:
|
|
machine = self.data.local.machine.get_machine(self.config['machine']['pcbid'])
|
|
return machine.arcade is not None
|
|
|
|
def get_clear_rates(
|
|
self,
|
|
songid: Optional[int]=None,
|
|
songchart: Optional[int]=None,
|
|
) -> Dict[int, Dict[int, Dict[str, int]]]:
|
|
"""
|
|
Returns a dictionary similar to the following:
|
|
|
|
{
|
|
musicid: {
|
|
chart: {
|
|
total: total plays,
|
|
clears: total clears,
|
|
fcs: total full combos,
|
|
},
|
|
},
|
|
}
|
|
"""
|
|
all_attempts, remote_attempts = Parallel.execute([
|
|
lambda: self.data.local.music.get_all_attempts(
|
|
game=self.game,
|
|
version=self.music_version,
|
|
songid=songid,
|
|
songchart=songchart,
|
|
),
|
|
lambda: self.data.remote.music.get_clear_rates(
|
|
game=self.game,
|
|
version=self.music_version,
|
|
songid=songid,
|
|
songchart=songchart,
|
|
),
|
|
])
|
|
|
|
attempts: Dict[int, Dict[int, Dict[str, int]]] = {}
|
|
for (_, attempt) in all_attempts:
|
|
if attempt.data.get_int('clear_status') == self.CLEAR_STATUS_NO_PLAY:
|
|
# This attempt was outside of the clear infra, so don't bother with it.
|
|
continue
|
|
|
|
# Terrible temporary structure is terrible.
|
|
if attempt.id not in attempts:
|
|
attempts[attempt.id] = {}
|
|
if attempt.chart not in attempts[attempt.id]:
|
|
attempts[attempt.id][attempt.chart] = {
|
|
'total': 0,
|
|
'clears': 0,
|
|
'fcs': 0,
|
|
}
|
|
|
|
# We saw an attempt, keep the total attempts in sync.
|
|
attempts[attempt.id][attempt.chart]['total'] = attempts[attempt.id][attempt.chart]['total'] + 1
|
|
|
|
if attempt.data.get_int('clear_status', self.CLEAR_STATUS_FAILED) == self.CLEAR_STATUS_FAILED:
|
|
# This attempt was a failure, so don't count it against clears of full combos
|
|
continue
|
|
|
|
# It was at least a clear
|
|
attempts[attempt.id][attempt.chart]['clears'] = attempts[attempt.id][attempt.chart]['clears'] + 1
|
|
|
|
if attempt.data.get_int('clear_status') == self.CLEAR_STATUS_FULL_COMBO:
|
|
# This was a full combo clear, so it also counts here
|
|
attempts[attempt.id][attempt.chart]['fcs'] = attempts[attempt.id][attempt.chart]['fcs'] + 1
|
|
|
|
# Merge in remote attempts
|
|
for songid in remote_attempts:
|
|
if songid not in attempts:
|
|
attempts[songid] = {}
|
|
|
|
for songchart in remote_attempts[songid]:
|
|
if songchart not in attempts[songid]:
|
|
attempts[songid][songchart] = {
|
|
'total': 0,
|
|
'clears': 0,
|
|
'fcs': 0,
|
|
}
|
|
|
|
attempts[songid][songchart]['total'] += remote_attempts[songid][songchart]['plays']
|
|
attempts[songid][songchart]['clears'] += remote_attempts[songid][songchart]['clears']
|
|
attempts[songid][songchart]['fcs'] += remote_attempts[songid][songchart]['combos']
|
|
|
|
# If requesting a specific song/chart, make sure its in the dict
|
|
if songid is not None:
|
|
if songid not in attempts:
|
|
attempts[songid] = {}
|
|
|
|
if songchart is not None:
|
|
if songchart not in attempts[songid]:
|
|
attempts[songid][songchart] = {
|
|
'total': 0,
|
|
'clears': 0,
|
|
'fcs': 0,
|
|
}
|
|
|
|
return attempts
|
|
|
|
def update_score(
|
|
self,
|
|
userid: Optional[UserID],
|
|
songid: int,
|
|
chart: int,
|
|
clear_status: int,
|
|
pgreats: int,
|
|
greats: int,
|
|
miss_count: int,
|
|
ghost: Optional[bytes],
|
|
shop: Optional[int],
|
|
) -> None:
|
|
"""
|
|
Given various pieces of a score, update the user's high score and score
|
|
history in a controlled manner, so all games in IIDX series can expect
|
|
the same attributes in a score. Note that the medals passed here are
|
|
expected to be converted from game identifier to our internal identifier,
|
|
so that any game in the series may convert them back. In this way, a song
|
|
played on Pendual that exists in Tricoro will still have scores/medals
|
|
going back all versions.
|
|
"""
|
|
# Range check medals
|
|
if clear_status not in [
|
|
self.CLEAR_STATUS_NO_PLAY,
|
|
self.CLEAR_STATUS_FAILED,
|
|
self.CLEAR_STATUS_ASSIST_CLEAR,
|
|
self.CLEAR_STATUS_EASY_CLEAR,
|
|
self.CLEAR_STATUS_CLEAR,
|
|
self.CLEAR_STATUS_HARD_CLEAR,
|
|
self.CLEAR_STATUS_EX_HARD_CLEAR,
|
|
self.CLEAR_STATUS_FULL_COMBO,
|
|
]:
|
|
raise Exception(f"Invalid clear status value {clear_status}")
|
|
|
|
# Calculate ex score
|
|
ex_score = (2 * pgreats) + greats
|
|
|
|
if userid is not None:
|
|
if ghost is None:
|
|
raise Exception("Expected a ghost for user score save!")
|
|
oldscore = self.data.local.music.get_score(
|
|
self.game,
|
|
self.music_version,
|
|
userid,
|
|
songid,
|
|
chart,
|
|
)
|
|
else:
|
|
# Storing an anonymous attempt
|
|
if ghost is not None:
|
|
raise Exception("Expected no ghost for anonymous score save!")
|
|
oldscore = None
|
|
|
|
# Score history is verbatum, instead of highest score
|
|
history = ValidatedDict({
|
|
'clear_status': clear_status,
|
|
'miss_count': miss_count,
|
|
})
|
|
old_ex_score = ex_score
|
|
|
|
if ghost is not None:
|
|
history['ghost'] = ghost
|
|
|
|
if oldscore is None:
|
|
# If it is a new score, create a new dictionary to add to
|
|
scoredata = ValidatedDict({
|
|
'clear_status': clear_status,
|
|
'pgreats': pgreats,
|
|
'greats': greats,
|
|
})
|
|
if miss_count != -1:
|
|
scoredata.replace_int('miss_count', miss_count)
|
|
if ghost is not None:
|
|
scoredata['ghost'] = ghost
|
|
raised = True
|
|
highscore = True
|
|
else:
|
|
# Set the score to any new record achieved
|
|
raised = ex_score > oldscore.points
|
|
highscore = ex_score >= oldscore.points
|
|
ex_score = max(ex_score, oldscore.points)
|
|
scoredata = oldscore.data
|
|
scoredata.replace_int('clear_status', max(scoredata.get_int('clear_status'), clear_status))
|
|
if miss_count != -1:
|
|
if scoredata.get_int('miss_count', -1) == -1:
|
|
scoredata.replace_int('miss_count', miss_count)
|
|
else:
|
|
scoredata.replace_int('miss_count', min(scoredata.get_int('miss_count'), miss_count))
|
|
if raised:
|
|
scoredata.replace_int('pgreats', pgreats)
|
|
scoredata.replace_int('greats', greats)
|
|
if ghost is not None:
|
|
scoredata.replace_bytes('ghost', ghost)
|
|
|
|
if shop is not None:
|
|
history.replace_int('shop', shop)
|
|
scoredata.replace_int('shop', shop)
|
|
|
|
# Look up where this score was earned
|
|
lid = self.get_machine_id()
|
|
|
|
if userid is not None:
|
|
# Write the new score back
|
|
self.data.local.music.put_score(
|
|
self.game,
|
|
self.music_version,
|
|
userid,
|
|
songid,
|
|
chart,
|
|
lid,
|
|
ex_score,
|
|
scoredata,
|
|
highscore,
|
|
)
|
|
|
|
# Save the history of this score too
|
|
self.data.local.music.put_attempt(
|
|
self.game,
|
|
self.music_version,
|
|
userid,
|
|
songid,
|
|
chart,
|
|
lid,
|
|
old_ex_score,
|
|
history,
|
|
raised,
|
|
)
|
|
|
|
def update_rank(
|
|
self,
|
|
userid: UserID,
|
|
dantype: str,
|
|
rank: int,
|
|
percent: int,
|
|
cleared: bool,
|
|
stages_cleared: int,
|
|
) -> None:
|
|
# Range check type
|
|
if dantype not in [
|
|
self.DAN_RANKING_SINGLE,
|
|
self.DAN_RANKING_DOUBLE,
|
|
]:
|
|
raise Exception(f"Invalid dan rank type value {dantype}")
|
|
|
|
# Range check rank
|
|
if rank not in [
|
|
self.DAN_RANK_7_KYU,
|
|
self.DAN_RANK_6_KYU,
|
|
self.DAN_RANK_5_KYU,
|
|
self.DAN_RANK_4_KYU,
|
|
self.DAN_RANK_3_KYU,
|
|
self.DAN_RANK_2_KYU,
|
|
self.DAN_RANK_1_KYU,
|
|
self.DAN_RANK_1_DAN,
|
|
self.DAN_RANK_2_DAN,
|
|
self.DAN_RANK_3_DAN,
|
|
self.DAN_RANK_4_DAN,
|
|
self.DAN_RANK_5_DAN,
|
|
self.DAN_RANK_6_DAN,
|
|
self.DAN_RANK_7_DAN,
|
|
self.DAN_RANK_8_DAN,
|
|
self.DAN_RANK_9_DAN,
|
|
self.DAN_RANK_10_DAN,
|
|
self.DAN_RANK_CHUDEN,
|
|
self.DAN_RANK_KAIDEN,
|
|
]:
|
|
raise Exception(f"Invalid dan rank {rank}")
|
|
|
|
if cleared:
|
|
# Update profile if needed
|
|
profile = self.get_profile(userid)
|
|
if profile is None:
|
|
profile = ValidatedDict()
|
|
|
|
profile.replace_int(dantype, max(rank, profile.get_int(dantype, -1)))
|
|
self.put_profile(userid, profile)
|
|
|
|
# Update achievement to track pass rate
|
|
dan_score = self.data.local.user.get_achievement(
|
|
self.game,
|
|
self.version,
|
|
userid,
|
|
rank,
|
|
dantype,
|
|
)
|
|
if dan_score is None:
|
|
dan_score = ValidatedDict()
|
|
dan_score.replace_int('percent', max(percent, dan_score.get_int('percent')))
|
|
dan_score.replace_int('stages_cleared', max(stages_cleared, dan_score.get_int('stages_cleared')))
|
|
self.data.local.user.put_achievement(
|
|
self.game,
|
|
self.version,
|
|
userid,
|
|
rank,
|
|
dantype,
|
|
dan_score
|
|
)
|
|
|
|
def db_to_game_status(self, db_status: int) -> int:
|
|
"""
|
|
Given a DB status, translate to a game clear status.
|
|
"""
|
|
raise Exception('Implement in specific game class!')
|
|
|
|
def game_to_db_status(self, game_status: int) -> int:
|
|
"""
|
|
Given a game clear status, translate to DB status.
|
|
"""
|
|
raise Exception('Implement in specific game class!')
|
|
|
|
def make_score_struct(self, scores: List[Score], cltype: int, index: int) -> List[List[int]]:
|
|
scorestruct: Dict[int, List[int]] = {}
|
|
|
|
for score in scores:
|
|
musicid = score.id
|
|
chart = score.chart
|
|
|
|
# Filter to only singles/doubles charts
|
|
if cltype == self.CLEAR_TYPE_SINGLE:
|
|
if chart not in [
|
|
self.CHART_TYPE_N7,
|
|
self.CHART_TYPE_H7,
|
|
self.CHART_TYPE_A7,
|
|
]:
|
|
continue
|
|
chartindex = {
|
|
self.CHART_TYPE_N7: 0,
|
|
self.CHART_TYPE_H7: 1,
|
|
self.CHART_TYPE_A7: 2,
|
|
}[chart]
|
|
if cltype == self.CLEAR_TYPE_DOUBLE:
|
|
if chart not in [
|
|
self.CHART_TYPE_N14,
|
|
self.CHART_TYPE_H14,
|
|
self.CHART_TYPE_A14,
|
|
]:
|
|
continue
|
|
chartindex = {
|
|
self.CHART_TYPE_N14: 0,
|
|
self.CHART_TYPE_H14: 1,
|
|
self.CHART_TYPE_A14: 2,
|
|
}[chart]
|
|
|
|
if musicid not in scorestruct:
|
|
scorestruct[musicid] = [
|
|
index, # -1 is our scores, positive is rival index
|
|
musicid, # Music ID!
|
|
0, # Normal status,
|
|
0, # Hyper status,
|
|
0, # Another status,
|
|
0, # EX score normal,
|
|
0, # EX score hyper,
|
|
0, # EX score another,
|
|
-1, # Miss count normal,
|
|
-1, # Miss count hyper,
|
|
-1, # Miss count another,
|
|
]
|
|
|
|
scorestruct[musicid][chartindex + 2] = self.db_to_game_status(score.data.get_int('clear_status'))
|
|
scorestruct[musicid][chartindex + 5] = score.points
|
|
scorestruct[musicid][chartindex + 8] = score.data.get_int('miss_count', -1)
|
|
|
|
return [scorestruct[s] for s in scorestruct]
|
|
|
|
def make_beginner_struct(self, scores: List[Score]) -> List[List[int]]:
|
|
scorelist: List[List[int]] = []
|
|
|
|
for score in scores:
|
|
musicid = score.id
|
|
chart = score.chart
|
|
|
|
# Filter to only beginner charts
|
|
if chart != self.CHART_TYPE_B7:
|
|
continue
|
|
|
|
scorelist.append([
|
|
musicid,
|
|
self.db_to_game_status(score.data.get_int('clear_status')),
|
|
])
|
|
|
|
return scorelist
|
|
|
|
def delta_score(
|
|
self,
|
|
scores: List[Score],
|
|
ghost_length: int,
|
|
) -> Tuple[Optional[int], Optional[bytes]]:
|
|
if len(scores) == 0:
|
|
return None, None
|
|
|
|
total_ghost = [0] * ghost_length
|
|
count = 0
|
|
|
|
# Sum up for each bucket
|
|
for score in scores:
|
|
ghost = score.data.get_bytes('ghost')
|
|
for i in range(len(ghost)):
|
|
total_ghost[i] = total_ghost[i] + ghost[i]
|
|
count = count + 1
|
|
|
|
# Calculate average for each bucket
|
|
total_ghost = [int(b / count) for b in total_ghost]
|
|
|
|
# Grab the ex score for this new ghost, being sure to reverse the scaling rate
|
|
new_ex_score = sum(total_ghost)
|
|
|
|
# Spread out into even buckets so we can compute deltas
|
|
reference_ghost = [int(new_ex_score / ghost_length)] * ghost_length
|
|
|
|
added_bucket = 0
|
|
try:
|
|
jump = max(1, int(ghost_length / (new_ex_score - sum(reference_ghost))))
|
|
except ZeroDivisionError:
|
|
jump = 1
|
|
while sum(reference_ghost) != new_ex_score:
|
|
reference_ghost[added_bucket] = reference_ghost[added_bucket] + 1
|
|
added_bucket = added_bucket + jump
|
|
|
|
# Calculate delta ghost
|
|
delta_ghost = [total_ghost[i] - reference_ghost[i] for i in range(ghost_length)]
|
|
|
|
# Return averages
|
|
return new_ex_score, struct.pack('b' * ghost_length, *delta_ghost)
|
|
|
|
def user_joined_arcade(self, machine: Machine, profile: Optional[ValidatedDict]) -> bool:
|
|
if profile is None:
|
|
return False
|
|
|
|
if 'shop_location' not in profile:
|
|
return False
|
|
|
|
machineid = profile.get_int('shop_location')
|
|
if machineid == machine.id:
|
|
# We can short-circuit arcade lookup because their machine
|
|
# is the current machine.
|
|
return True
|
|
|
|
their_machine = self.get_machine_by_id(machineid)
|
|
if their_machine is None:
|
|
return False
|
|
|
|
# The machine they joined matches the arcade of the current machine
|
|
return their_machine.arcade == machine.arcade
|
|
|
|
def get_ghost(
|
|
self,
|
|
ghost_type: int,
|
|
parameter: str,
|
|
ghost_length: int,
|
|
musicid: int,
|
|
chart: int,
|
|
userid: UserID,
|
|
) -> Optional[Dict[str, Any]]:
|
|
ghost_score: Dict[str, Any] = None
|
|
|
|
if ghost_type == self.GHOST_TYPE_RIVAL:
|
|
rival_extid = int(parameter)
|
|
rival_userid = self.data.remote.user.from_extid(self.game, self.version, rival_extid)
|
|
if rival_userid is not None:
|
|
rival_profile = self.get_profile(rival_userid)
|
|
rival_score = self.data.remote.music.get_score(self.game, self.music_version, rival_userid, musicid, chart)
|
|
if rival_score is not None and rival_profile is not None:
|
|
ghost_score = {
|
|
'score': rival_score.points,
|
|
'ghost': rival_score.data.get_bytes('ghost'),
|
|
'name': rival_profile.get_str('name'),
|
|
'pid': rival_profile.get_int('pid'),
|
|
}
|
|
|
|
if (
|
|
ghost_type == self.GHOST_TYPE_GLOBAL_TOP or
|
|
ghost_type == self.GHOST_TYPE_LOCAL_TOP or
|
|
ghost_type == self.GHOST_TYPE_GLOBAL_AVERAGE or
|
|
ghost_type == self.GHOST_TYPE_LOCAL_AVERAGE
|
|
):
|
|
if (
|
|
ghost_type == self.GHOST_TYPE_LOCAL_TOP or
|
|
ghost_type == self.GHOST_TYPE_LOCAL_AVERAGE
|
|
):
|
|
all_scores = sorted(
|
|
self.data.local.music.get_all_scores(game=self.game, version=self.music_version, songid=musicid, songchart=chart),
|
|
key=lambda s: s[1].points,
|
|
reverse=True,
|
|
)
|
|
|
|
# Figure out what arcade this user joined and filter scores by
|
|
# other users who have also joined that arcade.
|
|
my_profile = self.get_profile(userid)
|
|
if my_profile is None:
|
|
my_profile = ValidatedDict()
|
|
|
|
if 'shop_location' in my_profile:
|
|
shop_id = my_profile.get_int('shop_location')
|
|
machine = self.get_machine_by_id(shop_id)
|
|
else:
|
|
machine = None
|
|
|
|
if machine is not None:
|
|
all_scores = [
|
|
score for score in all_scores
|
|
if self.user_joined_arcade(machine, self.get_any_profile(score[0]))
|
|
]
|
|
else:
|
|
# Not joined an arcade, so nobody matches our scores
|
|
all_scores = []
|
|
else:
|
|
all_scores = sorted(
|
|
self.data.remote.music.get_all_scores(game=self.game, version=self.music_version, songid=musicid, songchart=chart),
|
|
key=lambda s: s[1].points,
|
|
reverse=True,
|
|
)
|
|
|
|
if (
|
|
ghost_type == self.GHOST_TYPE_GLOBAL_TOP or
|
|
ghost_type == self.GHOST_TYPE_LOCAL_TOP
|
|
):
|
|
for potential_top in all_scores:
|
|
top_userid = potential_top[0]
|
|
top_score = potential_top[1]
|
|
top_profile = self.get_any_profile(top_userid)
|
|
if top_profile is not None:
|
|
ghost_score = {
|
|
'score': top_score.points,
|
|
'ghost': top_score.data.get_bytes('ghost'),
|
|
'name': top_profile.get_str('name'),
|
|
'pid': top_profile.get_int('pid'),
|
|
'extid': top_profile.get_int('extid'),
|
|
}
|
|
break
|
|
|
|
if (
|
|
ghost_type == self.GHOST_TYPE_GLOBAL_AVERAGE or
|
|
ghost_type == self.GHOST_TYPE_LOCAL_AVERAGE
|
|
):
|
|
average_score, delta_ghost = self.delta_score([score[1] for score in all_scores], ghost_length)
|
|
if average_score is not None and delta_ghost is not None:
|
|
ghost_score = {
|
|
'score': average_score,
|
|
'ghost': bytes([0] * ghost_length),
|
|
}
|
|
|
|
if (
|
|
ghost_type == self.GHOST_TYPE_DAN_TOP or
|
|
ghost_type == self.GHOST_TYPE_DAN_AVERAGE
|
|
):
|
|
is_dp = chart not in [
|
|
self.CHART_TYPE_N7,
|
|
self.CHART_TYPE_H7,
|
|
self.CHART_TYPE_A7,
|
|
]
|
|
my_profile = self.get_profile(userid)
|
|
if my_profile is None:
|
|
my_profile = ValidatedDict()
|
|
if is_dp:
|
|
dan_rank = my_profile.get_int(self.DAN_RANKING_DOUBLE, -1)
|
|
else:
|
|
dan_rank = my_profile.get_int(self.DAN_RANKING_SINGLE, -1)
|
|
|
|
if dan_rank != -1:
|
|
all_scores = sorted(
|
|
self.data.local.music.get_all_scores(game=self.game, version=self.music_version, songid=musicid, songchart=chart),
|
|
key=lambda s: s[1].points,
|
|
reverse=True,
|
|
)
|
|
all_profiles = self.data.local.user.get_all_profiles(self.game, self.version)
|
|
relevant_userids = {
|
|
profile[0] for profile in all_profiles
|
|
if profile[1].get_int(self.DAN_RANKING_DOUBLE if is_dp else self.DAN_RANKING_SINGLE) == dan_rank
|
|
}
|
|
relevant_scores = [
|
|
score for score in all_scores
|
|
if score[0] in relevant_userids
|
|
]
|
|
if ghost_type == self.GHOST_TYPE_DAN_TOP:
|
|
for potential_top in relevant_scores:
|
|
top_userid = potential_top[0]
|
|
top_score = potential_top[1]
|
|
top_profile = self.get_any_profile(top_userid)
|
|
if top_profile is not None:
|
|
ghost_score = {
|
|
'score': top_score.points,
|
|
'ghost': top_score.data.get_bytes('ghost'),
|
|
'name': top_profile.get_str('name'),
|
|
'pid': top_profile.get_int('pid'),
|
|
'extid': top_profile.get_int('extid'),
|
|
}
|
|
break
|
|
|
|
if ghost_type == self.GHOST_TYPE_DAN_AVERAGE:
|
|
average_score, delta_ghost = self.delta_score([score[1] for score in relevant_scores], ghost_length)
|
|
if average_score is not None and delta_ghost is not None:
|
|
ghost_score = {
|
|
'score': average_score,
|
|
'ghost': bytes([0] * ghost_length),
|
|
}
|
|
|
|
if (
|
|
ghost_type == self.GHOST_TYPE_RIVAL_TOP or
|
|
ghost_type == self.GHOST_TYPE_RIVAL_AVERAGE
|
|
):
|
|
rival_extids = [int(e[1:-1]) for e in parameter.split(',')]
|
|
rival_userids = [
|
|
self.data.remote.user.from_extid(self.game, self.version, rival_extid)
|
|
for rival_extid in rival_extids
|
|
]
|
|
|
|
all_scores = sorted(
|
|
[
|
|
score for score in
|
|
self.data.remote.music.get_all_scores(game=self.game, version=self.music_version, songid=musicid, songchart=chart)
|
|
if score[0] in rival_userids
|
|
],
|
|
key=lambda s: s[1].points,
|
|
reverse=True,
|
|
)
|
|
if ghost_type == self.GHOST_TYPE_RIVAL_TOP:
|
|
for potential_top in all_scores:
|
|
top_userid = potential_top[0]
|
|
top_score = potential_top[1]
|
|
top_profile = self.get_any_profile(top_userid)
|
|
if top_profile is not None:
|
|
ghost_score = {
|
|
'score': top_score.points,
|
|
'ghost': top_score.data.get_bytes('ghost'),
|
|
'name': top_profile.get_str('name'),
|
|
'pid': top_profile.get_int('pid'),
|
|
'extid': top_profile.get_int('extid'),
|
|
}
|
|
break
|
|
|
|
if ghost_type == self.GHOST_TYPE_RIVAL_AVERAGE:
|
|
average_score, delta_ghost = self.delta_score([score[1] for score in all_scores], ghost_length)
|
|
if average_score is not None and delta_ghost is not None:
|
|
ghost_score = {
|
|
'score': average_score,
|
|
'ghost': bytes([0] * ghost_length),
|
|
}
|
|
|
|
return ghost_score
|