1
0
mirror of synced 2025-01-18 22:24:04 +01:00

384 lines
14 KiB
Python
Raw Normal View History

# vim: set fileencoding=utf-8
from typing import Dict, Optional, Sequence
from typing_extensions import Final
from bemani.backend.base import Base
from bemani.backend.core import CoreHandler, CardManagerHandler, PASELIHandler
from bemani.common import (
Profile,
ValidatedDict,
Time,
GameConstants,
DBConstants,
BroadcastConstants,
Model,
)
from bemani.data import UserID, Achievement, ScoreSaveException, Config, Data
from bemani.protocol import Node
class PopnMusicBase(CoreHandler, CardManagerHandler, PASELIHandler, Base):
"""
Base game class for all Pop'n Music versions. Handles common functionality for
getting profiles based on refid, creating new profiles, looking up and saving
scores.
"""
game: GameConstants = GameConstants.POPN_MUSIC
# Play medals, as saved into/loaded from the DB
PLAY_MEDAL_NO_PLAY: Final[int] = DBConstants.POPN_MUSIC_PLAY_MEDAL_NO_PLAY
PLAY_MEDAL_CIRCLE_FAILED: Final[
int
] = DBConstants.POPN_MUSIC_PLAY_MEDAL_CIRCLE_FAILED
PLAY_MEDAL_DIAMOND_FAILED: Final[
int
] = DBConstants.POPN_MUSIC_PLAY_MEDAL_DIAMOND_FAILED
PLAY_MEDAL_STAR_FAILED: Final[int] = DBConstants.POPN_MUSIC_PLAY_MEDAL_STAR_FAILED
PLAY_MEDAL_EASY_CLEAR: Final[int] = DBConstants.POPN_MUSIC_PLAY_MEDAL_EASY_CLEAR
PLAY_MEDAL_CIRCLE_CLEARED: Final[
int
] = DBConstants.POPN_MUSIC_PLAY_MEDAL_CIRCLE_CLEARED
PLAY_MEDAL_DIAMOND_CLEARED: Final[
int
] = DBConstants.POPN_MUSIC_PLAY_MEDAL_DIAMOND_CLEARED
PLAY_MEDAL_STAR_CLEARED: Final[int] = DBConstants.POPN_MUSIC_PLAY_MEDAL_STAR_CLEARED
PLAY_MEDAL_CIRCLE_FULL_COMBO: Final[
int
] = DBConstants.POPN_MUSIC_PLAY_MEDAL_CIRCLE_FULL_COMBO
PLAY_MEDAL_DIAMOND_FULL_COMBO: Final[
int
] = DBConstants.POPN_MUSIC_PLAY_MEDAL_DIAMOND_FULL_COMBO
PLAY_MEDAL_STAR_FULL_COMBO: Final[
int
] = DBConstants.POPN_MUSIC_PLAY_MEDAL_STAR_FULL_COMBO
PLAY_MEDAL_PERFECT: Final[int] = DBConstants.POPN_MUSIC_PLAY_MEDAL_PERFECT
# Chart type, as saved into/loaded from the DB, and returned to game
CHART_TYPE_EASY: Final[int] = 0
CHART_TYPE_NORMAL: Final[int] = 1
CHART_TYPE_HYPER: Final[int] = 2
CHART_TYPE_EX: Final[int] = 3
# Old profile lookup type, for loading profile by ID
NEW_PROFILE_ONLY: Final[int] = 0
OLD_PROFILE_ONLY: Final[int] = 1
OLD_PROFILE_FALLTHROUGH: Final[int] = 2
# Pop'n Music in particular requires non-expired profiles to do conversions
# properly.
supports_expired_profiles: bool = False
def __init__(self, data: Data, config: Config, model: Model) -> None:
super().__init__(data, config, model)
if model.rev == "X":
self.omnimix = True
else:
self.omnimix = False
@property
def music_version(self) -> int:
if self.omnimix:
return DBConstants.OMNIMIX_VERSION_BUMP + self.version
return self.version
def previous_version(self) -> Optional["PopnMusicBase"]:
"""
Returns the previous version of the game, based on this game. Should
be overridden.
"""
return None
def format_profile(self, userid: UserID, profile: Profile) -> Node:
"""
Base handler for a profile. Given a userid and a profile dictionary,
return a Node representing a profile. Should be overridden.
"""
return Node.void("playerdata")
def format_conversion(self, userid: UserID, profile: Profile) -> Node:
"""
Base handler for profile conversion. Given a userid and a profile
dictionary, return a node which represents the converted profile for
the next version of this game. Games will call previous_version to get
a game class of their previous game version, and then will call
format_conversion on that previous version to get the profile to
migrate.
"""
return Node.void("playerdata")
def unformat_profile(
self, userid: UserID, request: Node, oldprofile: Profile
) -> Profile:
"""
Base handler for profile parsing. Given a request and an old profile,
return a new profile that's been updated with the contents of the request.
Should be overridden.
"""
return oldprofile
def get_profile_by_refid(
self, refid: Optional[str], load_mode: int
) -> Optional[Node]:
"""
Given a RefID, return a formatted profile node. Basically every game
needs a profile lookup, even if it handles where that happens in
a different request. This is provided for code deduplication. This
method handles delegating to either format_profile, or looking up
the previous game and calling format_conversion, whenever necessary.
"""
if refid is None:
return None
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
if userid is None:
# User doesn't exist but should at this point
return None
if load_mode == self.OLD_PROFILE_ONLY:
# Trying to import from older version
oldversion = self.previous_version()
profile = oldversion.get_profile(userid)
if profile is None:
return None
return self.format_conversion(userid, profile)
elif load_mode == self.NEW_PROFILE_ONLY:
# Trying to import from current version
profile = self.get_profile(userid)
if profile is None:
return None
return self.format_profile(userid, profile)
elif load_mode == self.OLD_PROFILE_FALLTHROUGH:
# Try to load from current, if that fails try to load previous
profile = self.get_profile(userid)
if profile is not None:
return self.format_profile(userid, profile)
oldversion = self.previous_version()
oldprofile = oldversion.get_profile(userid)
if oldprofile is not None:
return self.format_conversion(userid, oldprofile)
return None
else:
# Unknown value
raise Exception("Unrecognized value for get profile!")
def new_profile_by_refid(
self,
refid: Optional[str],
name: Optional[str],
chara: Optional[int] = None,
achievements: Sequence[Achievement] = (),
) -> Node:
"""
Given a RefID and an optional name, create a profile and then return
a formatted profile node. Similar rationale to get_profile_by_refid.
"""
if refid is None:
return None
if name is None:
name = "なし"
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
if userid is None:
raise Exception("Logic error! Didn't find user to tie profile to!")
profile = Profile(
self.game,
self.version,
refid,
0,
{
"name": name,
},
)
if chara is not None:
profile.replace_int("chara", chara)
self.put_profile(userid, profile)
for achievement in achievements:
self.data.local.user.put_achievement(
self.game,
self.version,
userid,
achievement.id,
achievement.type,
achievement.data,
)
return self.format_profile(userid, profile)
def update_score(
self,
userid: UserID,
songid: int,
chart: int,
points: int,
medal: int,
combo: Optional[int] = None,
stats: Optional[Dict[str, int]] = None,
) -> None:
"""
Given various pieces of a score, update the user's high score and score
history in a controlled manner, so all games in Pop'n series can expect
the same attributes in a score. Note that the medals passed here are
expected to be converted from game identifier to our internal identifier,
so that any game in the series may convert them back. In this way, a song
played on Pop'n 22 that exists in Pop'n 19 will still have scores/medals
going back all versions.
"""
# Range check medals
if medal not in [
self.PLAY_MEDAL_NO_PLAY,
self.PLAY_MEDAL_CIRCLE_FAILED,
self.PLAY_MEDAL_DIAMOND_FAILED,
self.PLAY_MEDAL_STAR_FAILED,
self.PLAY_MEDAL_EASY_CLEAR,
self.PLAY_MEDAL_CIRCLE_CLEARED,
self.PLAY_MEDAL_DIAMOND_CLEARED,
self.PLAY_MEDAL_STAR_CLEARED,
self.PLAY_MEDAL_CIRCLE_FULL_COMBO,
self.PLAY_MEDAL_DIAMOND_FULL_COMBO,
self.PLAY_MEDAL_STAR_FULL_COMBO,
self.PLAY_MEDAL_PERFECT,
]:
raise Exception(f"Invalid medal value {medal}")
oldscore = self.data.local.music.get_score(
self.game,
self.music_version,
userid,
songid,
chart,
)
# Score history is verbatum, instead of highest score
history = ValidatedDict({})
oldpoints = points
if oldscore is None:
# If it is a new score, create a new dictionary to add to
scoredata = ValidatedDict({})
raised = True
highscore = True
else:
# Set the score to any new record achieved
raised = points > oldscore.points
highscore = points >= oldscore.points
points = max(points, oldscore.points)
scoredata = oldscore.data
# Replace medal with highest value
scoredata.replace_int("medal", max(scoredata.get_int("medal"), medal))
history.replace_int("medal", medal)
if stats is not None:
if raised:
# We have stats, and there's a new high score, update the stats
scoredata.replace_dict("stats", stats)
history.replace_dict("stats", stats)
if combo is not None:
# If we have a combo, replace it
scoredata.replace_int("combo", max(scoredata.get_int("combo"), combo))
history.replace_int("combo", combo)
# Look up where this score was earned
lid = self.get_machine_id()
# Pop'n Music for all versions before Lapistoria sends all of the songs
# a player played at the end of the round. It doesn't send timestamps
# for those songs (Jubeat does). So, if a user plays the same song/chart
# more than once in a round, we will end up failing to store the attempt
# since we don't allow two of the same attempt at the same time for the
# same user and song/chart. So, bump the timestamp by one second and retry
# well past the maximum number of songs.
now = Time.now()
for bump in range(10):
timestamp = now + bump
# Write the new score back
self.data.local.music.put_score(
self.game,
self.music_version,
userid,
songid,
chart,
lid,
points,
scoredata,
highscore,
timestamp=timestamp,
)
try:
# Save the history of this score too
self.data.local.music.put_attempt(
self.game,
self.music_version,
userid,
songid,
chart,
lid,
oldpoints,
history,
raised,
timestamp=timestamp,
)
except ScoreSaveException:
# Try again one second in the future
continue
# We saved successfully
break
2021-09-04 18:06:00 +02:00
def broadcast_score(
self,
userid: UserID,
songid: int,
chart: int,
medal: int,
points: int,
combo: int,
stats: Dict[str, int],
) -> None:
2021-09-04 18:06:00 +02:00
# Generate scorecard
profile = self.get_profile(userid)
song = self.data.local.music.get_song(self.game, self.music_version, songid, chart)
2021-09-04 18:06:00 +02:00
card_medal = {
self.PLAY_MEDAL_CIRCLE_FAILED: "Failed",
self.PLAY_MEDAL_DIAMOND_FAILED: "Failed",
self.PLAY_MEDAL_STAR_FAILED: "Failed",
self.PLAY_MEDAL_EASY_CLEAR: "Cleared",
self.PLAY_MEDAL_CIRCLE_CLEARED: "Cleared",
self.PLAY_MEDAL_DIAMOND_CLEARED: "Cleared",
self.PLAY_MEDAL_STAR_CLEARED: "Cleared",
self.PLAY_MEDAL_CIRCLE_FULL_COMBO: "Full Combo",
self.PLAY_MEDAL_DIAMOND_FULL_COMBO: "Full Combo",
self.PLAY_MEDAL_STAR_FULL_COMBO: "Full Combo",
self.PLAY_MEDAL_PERFECT: "Perfect",
2021-09-04 18:06:00 +02:00
}[medal]
card_chart = {
self.CHART_TYPE_EASY: "Easy",
self.CHART_TYPE_NORMAL: "Normal",
self.CHART_TYPE_HYPER: "Hyper",
self.CHART_TYPE_EX: "Ex",
2021-09-04 18:06:00 +02:00
}[chart]
# Construct the dictionary for the broadcast
card_data = {
BroadcastConstants.PLAYER_NAME: profile.get_str("name", "なし"),
2021-09-04 18:06:00 +02:00
BroadcastConstants.SONG_NAME: song.name,
BroadcastConstants.ARTIST_NAME: song.artist,
BroadcastConstants.DIFFICULTY: card_chart,
2021-09-06 19:35:05 +02:00
BroadcastConstants.SCORE: str(points),
2021-09-04 18:06:00 +02:00
BroadcastConstants.MEDAL: card_medal,
BroadcastConstants.COOLS: str(stats["cool"]),
BroadcastConstants.GREATS: str(stats["great"]),
BroadcastConstants.GOODS: str(stats["good"]),
BroadcastConstants.BADS: str(stats["bad"]),
2021-09-06 19:35:05 +02:00
BroadcastConstants.COMBO: str(combo),
2021-09-04 18:06:00 +02:00
}
# Try to broadcast out the score to our webhook(s)
self.data.triggers.broadcast_score(card_data, self.game, song)