338 lines
13 KiB
Python
338 lines
13 KiB
Python
# vim: set fileencoding=utf-8
|
|
from typing import Dict, Optional, Sequence
|
|
|
|
from bemani.backend.base import Base
|
|
from bemani.backend.core import CoreHandler, CardManagerHandler, PASELIHandler
|
|
from bemani.common import Profile, ValidatedDict, Time, GameConstants, DBConstants, BroadcastConstants
|
|
from bemani.data import UserID, Achievement, ScoreSaveException
|
|
from bemani.protocol import Node
|
|
|
|
|
|
class PopnMusicBase(CoreHandler, CardManagerHandler, PASELIHandler, Base):
|
|
"""
|
|
Base game class for all Pop'n Music versions. Handles common functionality for
|
|
getting profiles based on refid, creating new profiles, looking up and saving
|
|
scores.
|
|
"""
|
|
|
|
game = GameConstants.POPN_MUSIC
|
|
|
|
# Play medals, as saved into/loaded from the DB
|
|
PLAY_MEDAL_NO_PLAY = DBConstants.POPN_MUSIC_PLAY_MEDAL_NO_PLAY
|
|
PLAY_MEDAL_CIRCLE_FAILED = DBConstants.POPN_MUSIC_PLAY_MEDAL_CIRCLE_FAILED
|
|
PLAY_MEDAL_DIAMOND_FAILED = DBConstants.POPN_MUSIC_PLAY_MEDAL_DIAMOND_FAILED
|
|
PLAY_MEDAL_STAR_FAILED = DBConstants.POPN_MUSIC_PLAY_MEDAL_STAR_FAILED
|
|
PLAY_MEDAL_EASY_CLEAR = DBConstants.POPN_MUSIC_PLAY_MEDAL_EASY_CLEAR
|
|
PLAY_MEDAL_CIRCLE_CLEARED = DBConstants.POPN_MUSIC_PLAY_MEDAL_CIRCLE_CLEARED
|
|
PLAY_MEDAL_DIAMOND_CLEARED = DBConstants.POPN_MUSIC_PLAY_MEDAL_DIAMOND_CLEARED
|
|
PLAY_MEDAL_STAR_CLEARED = DBConstants.POPN_MUSIC_PLAY_MEDAL_STAR_CLEARED
|
|
PLAY_MEDAL_CIRCLE_FULL_COMBO = DBConstants.POPN_MUSIC_PLAY_MEDAL_CIRCLE_FULL_COMBO
|
|
PLAY_MEDAL_DIAMOND_FULL_COMBO = DBConstants.POPN_MUSIC_PLAY_MEDAL_DIAMOND_FULL_COMBO
|
|
PLAY_MEDAL_STAR_FULL_COMBO = DBConstants.POPN_MUSIC_PLAY_MEDAL_STAR_FULL_COMBO
|
|
PLAY_MEDAL_PERFECT = DBConstants.POPN_MUSIC_PLAY_MEDAL_PERFECT
|
|
|
|
# Chart type, as saved into/loaded from the DB, and returned to game
|
|
CHART_TYPE_EASY = 0
|
|
CHART_TYPE_NORMAL = 1
|
|
CHART_TYPE_HYPER = 2
|
|
CHART_TYPE_EX = 3
|
|
|
|
# Old profile lookup type, for loading profile by ID
|
|
NEW_PROFILE_ONLY = 0
|
|
OLD_PROFILE_ONLY = 1
|
|
OLD_PROFILE_FALLTHROUGH = 2
|
|
|
|
def previous_version(self) -> Optional['PopnMusicBase']:
|
|
"""
|
|
Returns the previous version of the game, based on this game. Should
|
|
be overridden.
|
|
"""
|
|
return None
|
|
|
|
def supports_expired_profiles(self) -> bool:
|
|
"""
|
|
Pop'n Music in particular requires non-expired profiles to do conversions
|
|
properly.
|
|
"""
|
|
return False
|
|
|
|
def format_profile(self, userid: UserID, profile: Profile) -> Node:
|
|
"""
|
|
Base handler for a profile. Given a userid and a profile dictionary,
|
|
return a Node representing a profile. Should be overridden.
|
|
"""
|
|
return Node.void('playerdata')
|
|
|
|
def format_conversion(self, userid: UserID, profile: Profile) -> Node:
|
|
"""
|
|
Base handler for profile conversion. Given a userid and a profile
|
|
dictionary, return a node which represents the converted profile for
|
|
the next version of this game. Games will call previous_version to get
|
|
a game class of their previous game version, and then will call
|
|
format_conversion on that previous version to get the profile to
|
|
migrate.
|
|
"""
|
|
return Node.void('playerdata')
|
|
|
|
def unformat_profile(self, userid: UserID, request: Node, oldprofile: Profile) -> Profile:
|
|
"""
|
|
Base handler for profile parsing. Given a request and an old profile,
|
|
return a new profile that's been updated with the contents of the request.
|
|
Should be overridden.
|
|
"""
|
|
return oldprofile
|
|
|
|
def get_profile_by_refid(self, refid: Optional[str], load_mode: int) -> Optional[Node]:
|
|
"""
|
|
Given a RefID, return a formatted profile node. Basically every game
|
|
needs a profile lookup, even if it handles where that happens in
|
|
a different request. This is provided for code deduplication. This
|
|
method handles delegating to either format_profile, or looking up
|
|
the previous game and calling format_conversion, whenever necessary.
|
|
"""
|
|
if refid is None:
|
|
return None
|
|
|
|
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
|
|
if userid is None:
|
|
# User doesn't exist but should at this point
|
|
return None
|
|
|
|
if load_mode == self.OLD_PROFILE_ONLY:
|
|
# Trying to import from older version
|
|
oldversion = self.previous_version()
|
|
profile = oldversion.get_profile(userid)
|
|
if profile is None:
|
|
return None
|
|
return self.format_conversion(userid, profile)
|
|
elif load_mode == self.NEW_PROFILE_ONLY:
|
|
# Trying to import from current version
|
|
profile = self.get_profile(userid)
|
|
if profile is None:
|
|
return None
|
|
return self.format_profile(userid, profile)
|
|
elif load_mode == self.OLD_PROFILE_FALLTHROUGH:
|
|
# Try to load from current, if that fails try to load previous
|
|
profile = self.get_profile(userid)
|
|
if profile is not None:
|
|
return self.format_profile(userid, profile)
|
|
oldversion = self.previous_version()
|
|
oldprofile = oldversion.get_profile(userid)
|
|
if oldprofile is not None:
|
|
return self.format_conversion(userid, oldprofile)
|
|
return None
|
|
else:
|
|
# Unknown value
|
|
raise Exception("Unrecognized value for get profile!")
|
|
|
|
def new_profile_by_refid(
|
|
self,
|
|
refid: Optional[str],
|
|
name: Optional[str],
|
|
chara: Optional[int]=None,
|
|
achievements: Sequence[Achievement] = (),
|
|
) -> Node:
|
|
"""
|
|
Given a RefID and an optional name, create a profile and then return
|
|
a formatted profile node. Similar rationale to get_profile_by_refid.
|
|
"""
|
|
if refid is None:
|
|
return None
|
|
|
|
if name is None:
|
|
name = 'なし'
|
|
|
|
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
|
|
if userid is None:
|
|
raise Exception("Logic error! Didn't find user to tie profile to!")
|
|
profile = Profile(
|
|
self.game,
|
|
self.version,
|
|
refid,
|
|
0,
|
|
{
|
|
'name': name,
|
|
},
|
|
)
|
|
if chara is not None:
|
|
profile.replace_int('chara', chara)
|
|
self.put_profile(userid, profile)
|
|
for achievement in achievements:
|
|
self.data.local.user.put_achievement(
|
|
self.game,
|
|
self.version,
|
|
userid,
|
|
achievement.id,
|
|
achievement.type,
|
|
achievement.data,
|
|
)
|
|
return self.format_profile(userid, profile)
|
|
|
|
def update_score(
|
|
self,
|
|
userid: UserID,
|
|
songid: int,
|
|
chart: int,
|
|
points: int,
|
|
medal: int,
|
|
combo: Optional[int]=None,
|
|
stats: Optional[Dict[str, int]]=None,
|
|
) -> None:
|
|
"""
|
|
Given various pieces of a score, update the user's high score and score
|
|
history in a controlled manner, so all games in Pop'n series can expect
|
|
the same attributes in a score. Note that the medals passed here are
|
|
expected to be converted from game identifier to our internal identifier,
|
|
so that any game in the series may convert them back. In this way, a song
|
|
played on Pop'n 22 that exists in Pop'n 19 will still have scores/medals
|
|
going back all versions.
|
|
"""
|
|
# Range check medals
|
|
if medal not in [
|
|
self.PLAY_MEDAL_NO_PLAY,
|
|
self.PLAY_MEDAL_CIRCLE_FAILED,
|
|
self.PLAY_MEDAL_DIAMOND_FAILED,
|
|
self.PLAY_MEDAL_STAR_FAILED,
|
|
self.PLAY_MEDAL_EASY_CLEAR,
|
|
self.PLAY_MEDAL_CIRCLE_CLEARED,
|
|
self.PLAY_MEDAL_DIAMOND_CLEARED,
|
|
self.PLAY_MEDAL_STAR_CLEARED,
|
|
self.PLAY_MEDAL_CIRCLE_FULL_COMBO,
|
|
self.PLAY_MEDAL_DIAMOND_FULL_COMBO,
|
|
self.PLAY_MEDAL_STAR_FULL_COMBO,
|
|
self.PLAY_MEDAL_PERFECT,
|
|
]:
|
|
raise Exception(f"Invalid medal value {medal}")
|
|
|
|
oldscore = self.data.local.music.get_score(
|
|
self.game,
|
|
self.version,
|
|
userid,
|
|
songid,
|
|
chart,
|
|
)
|
|
|
|
# Score history is verbatum, instead of highest score
|
|
history = ValidatedDict({})
|
|
oldpoints = points
|
|
|
|
if oldscore is None:
|
|
# If it is a new score, create a new dictionary to add to
|
|
scoredata = ValidatedDict({})
|
|
raised = True
|
|
highscore = True
|
|
else:
|
|
# Set the score to any new record achieved
|
|
raised = points > oldscore.points
|
|
highscore = points >= oldscore.points
|
|
points = max(points, oldscore.points)
|
|
scoredata = oldscore.data
|
|
|
|
# Replace medal with highest value
|
|
scoredata.replace_int('medal', max(scoredata.get_int('medal'), medal))
|
|
history.replace_int('medal', medal)
|
|
|
|
if stats is not None:
|
|
if raised:
|
|
# We have stats, and there's a new high score, update the stats
|
|
scoredata.replace_dict('stats', stats)
|
|
history.replace_dict('stats', stats)
|
|
|
|
if combo is not None:
|
|
# If we have a combo, replace it
|
|
scoredata.replace_int('combo', max(scoredata.get_int('combo'), combo))
|
|
history.replace_int('combo', combo)
|
|
|
|
# Look up where this score was earned
|
|
lid = self.get_machine_id()
|
|
|
|
# Pop'n Music for all versions before Lapistoria sends all of the songs
|
|
# a player played at the end of the round. It doesn't send timestamps
|
|
# for those songs (Jubeat does). So, if a user plays the same song/chart
|
|
# more than once in a round, we will end up failing to store the attempt
|
|
# since we don't allow two of the same attempt at the same time for the
|
|
# same user and song/chart. So, bump the timestamp by one second and retry
|
|
# well past the maximum number of songs.
|
|
now = Time.now()
|
|
for bump in range(10):
|
|
timestamp = now + bump
|
|
|
|
# Write the new score back
|
|
self.data.local.music.put_score(
|
|
self.game,
|
|
self.version,
|
|
userid,
|
|
songid,
|
|
chart,
|
|
lid,
|
|
points,
|
|
scoredata,
|
|
highscore,
|
|
timestamp=timestamp,
|
|
)
|
|
|
|
try:
|
|
# Save the history of this score too
|
|
self.data.local.music.put_attempt(
|
|
self.game,
|
|
self.version,
|
|
userid,
|
|
songid,
|
|
chart,
|
|
lid,
|
|
oldpoints,
|
|
history,
|
|
raised,
|
|
timestamp=timestamp,
|
|
)
|
|
except ScoreSaveException:
|
|
# Try again one second in the future
|
|
continue
|
|
|
|
# We saved successfully
|
|
break
|
|
|
|
def broadcast_score(self, userid: UserID, songid: int, chart: int, medal: int, points: int, combo: int, stats: Dict[str, int]) -> None:
|
|
# Generate scorecard
|
|
profile = self.get_profile(userid)
|
|
song = self.data.local.music.get_song(self.game, self.version, songid, chart)
|
|
|
|
card_medal = {
|
|
self.PLAY_MEDAL_CIRCLE_FAILED: 'Failed',
|
|
self.PLAY_MEDAL_DIAMOND_FAILED: 'Failed',
|
|
self.PLAY_MEDAL_STAR_FAILED: 'Failed',
|
|
self.PLAY_MEDAL_EASY_CLEAR: 'Cleared',
|
|
self.PLAY_MEDAL_CIRCLE_CLEARED: 'Cleared',
|
|
self.PLAY_MEDAL_DIAMOND_CLEARED: 'Cleared',
|
|
self.PLAY_MEDAL_STAR_CLEARED: 'Cleared',
|
|
self.PLAY_MEDAL_CIRCLE_FULL_COMBO: 'Full Combo',
|
|
self.PLAY_MEDAL_DIAMOND_FULL_COMBO: 'Full Combo',
|
|
self.PLAY_MEDAL_STAR_FULL_COMBO: 'Full Combo',
|
|
self.PLAY_MEDAL_PERFECT: 'Perfect',
|
|
}[medal]
|
|
|
|
card_chart = {
|
|
self.CHART_TYPE_EASY: 'Easy',
|
|
self.CHART_TYPE_NORMAL: 'Normal',
|
|
self.CHART_TYPE_HYPER: 'Hyper',
|
|
self.CHART_TYPE_EX: 'Ex',
|
|
}[chart]
|
|
|
|
# Construct the dictionary for the broadcast
|
|
card_data = {
|
|
BroadcastConstants.PLAYER_NAME: profile.get_str('name', 'なし'),
|
|
BroadcastConstants.SONG_NAME: song.name,
|
|
BroadcastConstants.ARTIST_NAME: song.artist,
|
|
BroadcastConstants.DIFFICULTY: card_chart,
|
|
BroadcastConstants.SCORE: str(points),
|
|
BroadcastConstants.MEDAL: card_medal,
|
|
BroadcastConstants.COOLS: str(stats['cool']),
|
|
BroadcastConstants.GREATS: str(stats['great']),
|
|
BroadcastConstants.GOODS: str(stats['good']),
|
|
BroadcastConstants.BADS: str(stats['bad']),
|
|
BroadcastConstants.COMBO: str(combo),
|
|
}
|
|
|
|
# Try to broadcast out the score to our webhook(s)
|
|
self.data.triggers.broadcast_score(card_data, self.game, song)
|