1
0
mirror of synced 2025-01-07 09:41:33 +01:00
bemaniutils/bemani/backend/reflec/reflecbeat.py

544 lines
22 KiB
Python
Raw Normal View History

import copy
from typing import Any, Dict, Tuple
from bemani.backend.reflec.base import ReflecBeatBase
from bemani.common import ValidatedDict, VersionConstants, ID, Time
from bemani.data import UserID
from bemani.protocol import Node
class ReflecBeat(ReflecBeatBase):
name = "REFLEC BEAT"
version = VersionConstants.REFLEC_BEAT
# Clear types according to the game
GAME_CLEAR_TYPE_NO_PLAY = 0
GAME_CLEAR_TYPE_PLAYED = 2
GAME_CLEAR_TYPE_FULL_COMBO = 3
@classmethod
def get_settings(cls) -> Dict[str, Any]:
"""
Return all of our front-end modifiably settings.
"""
return {
'bools': [
{
'name': 'Force Song Unlock',
'tip': 'Force unlock all songs.',
'category': 'game_config',
'setting': 'force_unlock_songs',
},
],
'ints': [],
}
def __db_to_game_clear_type(self, db_clear_type: int, db_combo_type: int) -> int:
if db_clear_type == self.CLEAR_TYPE_NO_PLAY:
return self.GAME_CLEAR_TYPE_NO_PLAY
if db_clear_type == self.CLEAR_TYPE_FAILED:
return self.GAME_CLEAR_TYPE_PLAYED
if db_clear_type in [
self.CLEAR_TYPE_CLEARED,
self.CLEAR_TYPE_HARD_CLEARED,
self.CLEAR_TYPE_S_HARD_CLEARED,
]:
if db_combo_type in [
self.COMBO_TYPE_NONE,
self.COMBO_TYPE_ALMOST_COMBO,
]:
return self.GAME_CLEAR_TYPE_PLAYED
if db_combo_type in [
self.COMBO_TYPE_FULL_COMBO,
self.COMBO_TYPE_FULL_COMBO_ALL_JUST,
]:
return self.GAME_CLEAR_TYPE_FULL_COMBO
raise Exception(f'Invalid db_combo_type {db_combo_type}')
raise Exception(f'Invalid db_clear_type {db_clear_type}')
def __game_to_db_clear_type(self, game_clear_type: int, game_achievement_rate: int) -> Tuple[int, int]:
if game_clear_type == self.GAME_CLEAR_TYPE_NO_PLAY:
return (self.CLEAR_TYPE_NO_PLAY, self.COMBO_TYPE_NONE)
if game_clear_type == self.GAME_CLEAR_TYPE_PLAYED:
if game_achievement_rate >= 7000:
return (self.CLEAR_TYPE_CLEARED, self.COMBO_TYPE_NONE)
else:
return (self.CLEAR_TYPE_FAILED, self.COMBO_TYPE_NONE)
if game_clear_type == self.GAME_CLEAR_TYPE_FULL_COMBO:
return (self.CLEAR_TYPE_CLEARED, self.COMBO_TYPE_FULL_COMBO)
raise Exception(f'Invalid game_clear_type {game_clear_type}')
def handle_log_pcb_status_request(self, request: Node) -> Node:
return Node.void('log')
def handle_log_opsetting_request(self, request: Node) -> Node:
return Node.void('log')
def handle_log_play_request(self, request: Node) -> Node:
return Node.void('log')
def handle_pcbinfo_get_request(self, request: Node) -> Node:
shop_id = ID.parse_machine_id(request.child_value('lid'))
machine = self.get_machine_by_id(shop_id)
if machine is not None:
machine_name = machine.name
close = machine.data.get_bool('close')
hour = machine.data.get_int('hour')
minute = machine.data.get_int('minute')
pref = machine.data.get_int('pref', 51)
else:
machine_name = ''
close = False
hour = 0
minute = 0
pref = 51
root = Node.void('pcbinfo')
info = Node.void('info')
root.add_child(info)
info.add_child(Node.string('name', machine_name))
info.add_child(Node.s16('pref', pref))
info.add_child(Node.bool('close', close))
info.add_child(Node.u8('hour', hour))
info.add_child(Node.u8('min', minute))
return root
def handle_pcbinfo_set_request(self, request: Node) -> Node:
self.update_machine_name(request.child_value('info/name'))
self.update_machine_data({
'close': request.child_value('info/close'),
'hour': request.child_value('info/hour'),
'minute': request.child_value('info/min'),
'pref': request.child_value('info/pref'),
})
return Node.void('pcbinfo')
def __add_event_info(self, request: Node) -> None:
events: Dict[int, int] = {}
for (_eventid, _phase) in events.items():
data = Node.void('data')
request.add_child(data)
data.add_child(Node.s32('type', -1))
data.add_child(Node.s32('value', -1))
def handle_sysinfo_get_request(self, request: Node) -> Node:
root = Node.void('sysinfo')
trd = Node.void('trd')
root.add_child(trd)
# Add event info
self.__add_event_info(trd)
return root
def handle_sysinfo_fan_request(self, request: Node) -> Node:
sysinfo = Node.void('sysinfo')
sysinfo.add_child(Node.u8('pref', 51))
sysinfo.add_child(Node.string('lid', request.child_value('lid')))
return sysinfo
def handle_lobby_entry_request(self, request: Node) -> Node:
root = Node.void('lobby')
# Create a lobby entry for this user
extid = request.child_value('e/uid')
userid = self.data.remote.user.from_extid(self.game, self.version, extid)
if userid is not None:
profile = self.get_profile(userid)
self.data.local.lobby.put_lobby(
self.game,
self.version,
userid,
{
'mid': request.child_value('e/mid'),
'ng': request.child_value('e/ng'),
'lid': request.child_value('e/lid'),
'sn': request.child_value('e/sn'),
'pref': request.child_value('e/pref'),
'ga': request.child_value('e/ga'),
'gp': request.child_value('e/gp'),
'la': request.child_value('e/la'),
}
)
lobby = self.data.local.lobby.get_lobby(
self.game,
self.version,
userid,
)
root.add_child(Node.s32('eid', lobby.get_int('id')))
e = Node.void('e')
root.add_child(e)
e.add_child(Node.s32('eid', lobby.get_int('id')))
e.add_child(Node.u16('mid', lobby.get_int('mid')))
e.add_child(Node.u8('ng', lobby.get_int('ng')))
e.add_child(Node.s32('uid', profile.get_int('extid')))
e.add_child(Node.string('pn', profile.get_str('name')))
e.add_child(Node.s32('exp', profile.get_int('exp')))
e.add_child(Node.u8('mg', profile.get_int('mg')))
e.add_child(Node.s32('tid', lobby.get_int('tid')))
e.add_child(Node.string('tn', lobby.get_str('tn')))
e.add_child(Node.string('lid', lobby.get_str('lid')))
e.add_child(Node.string('sn', lobby.get_str('sn')))
e.add_child(Node.u8('pref', lobby.get_int('pref')))
e.add_child(Node.u8_array('ga', lobby.get_int_array('ga', 4)))
e.add_child(Node.u16('gp', lobby.get_int('gp')))
e.add_child(Node.u8_array('la', lobby.get_int_array('la', 4)))
return root
def handle_lobby_read_request(self, request: Node) -> Node:
root = Node.void('lobby')
# Look up all lobbies matching the criteria specified
mg = request.child_value('m_grade') # noqa: F841
extid = request.child_value('uid')
limit = request.child_value('max')
userid = self.data.remote.user.from_extid(self.game, self.version, extid)
if userid is not None:
lobbies = self.data.local.lobby.get_all_lobbies(self.game, self.version)
for (user, lobby) in lobbies:
if limit <= 0:
break
if user == userid:
# If we have our own lobby, don't return it
continue
profile = self.get_profile(user)
if profile is None:
# No profile info, don't return this lobby
continue
e = Node.void('e')
root.add_child(e)
e.add_child(Node.s32('eid', lobby.get_int('id')))
e.add_child(Node.u16('mid', lobby.get_int('mid')))
e.add_child(Node.u8('ng', lobby.get_int('ng')))
e.add_child(Node.s32('uid', profile.get_int('extid')))
e.add_child(Node.string('pn', profile.get_str('name')))
e.add_child(Node.s32('exp', profile.get_int('exp')))
e.add_child(Node.u8('mg', profile.get_int('mg')))
e.add_child(Node.s32('tid', lobby.get_int('tid')))
e.add_child(Node.string('tn', lobby.get_str('tn')))
e.add_child(Node.string('lid', lobby.get_str('lid')))
e.add_child(Node.string('sn', lobby.get_str('sn')))
e.add_child(Node.u8('pref', lobby.get_int('pref')))
e.add_child(Node.u8_array('ga', lobby.get_int_array('ga', 4)))
e.add_child(Node.u16('gp', lobby.get_int('gp')))
e.add_child(Node.u8_array('la', lobby.get_int_array('la', 4)))
limit = limit - 1
return root
def handle_lobby_delete_request(self, request: Node) -> Node:
eid = request.child_value('eid')
self.data.local.lobby.destroy_lobby(eid)
return Node.void('lobby')
def handle_player_start_request(self, request: Node) -> Node:
# Add a dummy entry into the lobby setup so we can clean up on end play
refid = request.child_value('rid')
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
if userid is not None:
self.data.local.lobby.put_play_session_info(
self.game,
self.version,
userid,
{}
)
root = Node.void('player')
root.add_child(Node.bool('is_suc', True))
# Add event info
self.__add_event_info(root)
return root
def handle_player_delete_request(self, request: Node) -> Node:
return Node.void('player')
def handle_player_end_request(self, request: Node) -> Node:
# Destroy play session based on info from the request
refid = request.child_value('rid')
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
if userid is not None:
# Kill any lingering lobbies by this user
lobby = self.data.local.lobby.get_lobby(
self.game,
self.version,
userid,
)
if lobby is not None:
self.data.local.lobby.destroy_lobby(lobby.get_int('id'))
self.data.local.lobby.destroy_play_session_info(self.game, self.version, userid)
return Node.void('player')
def handle_player_read_request(self, request: Node) -> Node:
refid = request.child_value('rid')
profile = self.get_profile_by_refid(refid)
if profile:
return profile
return Node.void('player')
def handle_player_write_request(self, request: Node) -> Node:
refid = request.child_value('rid')
profile = self.put_profile_by_refid(refid, request)
root = Node.void('player')
if profile is None:
root.add_child(Node.s32('uid', 0))
else:
root.add_child(Node.s32('uid', profile.get_int('extid')))
root.add_child(Node.s32('time', Time.now()))
return root
def format_profile(self, userid: UserID, profile: ValidatedDict) -> Node:
statistics = self.get_play_statistics(userid)
game_config = self.get_game_config()
achievements = self.data.local.user.get_achievements(self.game, self.version, userid)
scores = self.data.remote.music.get_scores(self.game, self.version, userid)
root = Node.void('player')
pdata = Node.void('pdata')
root.add_child(pdata)
base = Node.void('base')
pdata.add_child(base)
base.add_child(Node.s32('uid', profile.get_int('extid')))
base.add_child(Node.string('name', profile.get_str('name')))
base.add_child(Node.s16('lv', profile.get_int('lvl')))
base.add_child(Node.s32('exp', profile.get_int('exp')))
base.add_child(Node.s16('mg', profile.get_int('mg')))
base.add_child(Node.s16('ap', profile.get_int('ap')))
base.add_child(Node.s32('flag', profile.get_int('flag')))
last_play_date = statistics.get_int_array('last_play_date', 3)
today_play_date = Time.todays_date()
if (
last_play_date[0] == today_play_date[0] and
last_play_date[1] == today_play_date[1] and
last_play_date[2] == today_play_date[2]
):
today_count = statistics.get_int('today_plays', 0)
else:
today_count = 0
con = Node.void('con')
pdata.add_child(con)
con.add_child(Node.s32('day', today_count))
con.add_child(Node.s32('cnt', statistics.get_int('total_plays')))
con.add_child(Node.s32('last', statistics.get_int('last_play_timestamp')))
con.add_child(Node.s32('now', Time.now()))
team = Node.void('team')
pdata.add_child(team)
team.add_child(Node.s32('id', -1))
team.add_child(Node.string('name', ''))
custom = Node.void('custom')
customdict = profile.get_dict('custom')
pdata.add_child(custom)
custom.add_child(Node.u8('bgm_m', customdict.get_int('bgm_m')))
custom.add_child(Node.u8('st_f', customdict.get_int('st_f')))
custom.add_child(Node.u8('st_bg', customdict.get_int('st_bg')))
custom.add_child(Node.u8('st_bg_b', customdict.get_int('st_bg_b')))
custom.add_child(Node.u8('eff_e', customdict.get_int('eff_e')))
custom.add_child(Node.u8('se_s', customdict.get_int('se_s')))
custom.add_child(Node.u8('se_s_v', customdict.get_int('se_s_v')))
released = Node.void('released')
pdata.add_child(released)
for item in achievements:
if item.type[:5] != 'item_':
continue
itemtype = int(item.type[5:])
if game_config.get_bool('force_unlock_songs') and itemtype == 0:
# Don't echo unlocks when we're force unlocking, we'll do it later
continue
info = Node.void('info')
released.add_child(info)
info.add_child(Node.u8('type', itemtype))
info.add_child(Node.u16('id', item.id))
if game_config.get_bool('force_unlock_songs'):
songs = {song.id for song in self.data.local.music.get_all_songs(self.game, self.version)}
for songid in songs:
info = Node.void('info')
released.add_child(info)
info.add_child(Node.u8('type', 0))
info.add_child(Node.u16('id', songid))
# Scores
record = Node.void('record')
pdata.add_child(record)
for score in scores:
rec = Node.void('rec')
record.add_child(rec)
rec.add_child(Node.u16('mid', score.id))
rec.add_child(Node.u8('ng', score.chart))
rec.add_child(Node.s32('win', score.data.get_dict('stats').get_int('win')))
rec.add_child(Node.s32('lose', score.data.get_dict('stats').get_int('lose')))
rec.add_child(Node.s32('draw', score.data.get_dict('stats').get_int('draw')))
rec.add_child(Node.u8('ct', self.__db_to_game_clear_type(score.data.get_int('clear_type'), score.data.get_int('combo_type'))))
rec.add_child(Node.s16('ar', int(score.data.get_int('achievement_rate') / 10)))
rec.add_child(Node.s16('bs', score.points))
rec.add_child(Node.s16('mc', score.data.get_int('combo')))
rec.add_child(Node.s16('bmc', score.data.get_int('miss_count')))
# In original ReflecBeat, the entire battle log was returned for each battle.
# We don't support storing all of that info, so don't return anything here.
blog = Node.void('blog')
pdata.add_child(blog)
# Comment (seems unused?)
pdata.add_child(Node.string('cmnt', ''))
return root
def unformat_profile(self, userid: UserID, request: Node, oldprofile: ValidatedDict) -> ValidatedDict:
game_config = self.get_game_config()
newprofile = copy.deepcopy(oldprofile)
newprofile.replace_int('lid', ID.parse_machine_id(request.child_value('lid')))
newprofile.replace_str('name', request.child_value('pdata/base/name'))
newprofile.replace_int('lvl', request.child_value('pdata/base/lv'))
newprofile.replace_int('exp', request.child_value('pdata/base/exp'))
newprofile.replace_int('mg', request.child_value('pdata/base/mg'))
newprofile.replace_int('ap', request.child_value('pdata/base/ap'))
newprofile.replace_int('flag', request.child_value('pdata/base/flag'))
customdict = newprofile.get_dict('custom')
custom = request.child('pdata/custom')
if custom:
customdict.replace_int('bgm_m', custom.child_value('bgm_m'))
customdict.replace_int('st_f', custom.child_value('st_f'))
customdict.replace_int('st_bg', custom.child_value('st_bg'))
customdict.replace_int('st_bg_b', custom.child_value('st_bg_b'))
customdict.replace_int('eff_e', custom.child_value('eff_e'))
customdict.replace_int('se_s', custom.child_value('se_s'))
customdict.replace_int('se_s_v', custom.child_value('se_s_v'))
newprofile.replace_dict('custom', customdict)
# Music unlocks and other stuff
released = request.child('pdata/released')
if released:
for child in released.children:
if child.name != 'info':
continue
item_id = child.child_value('id')
item_type = child.child_value('type')
if game_config.get_bool('force_unlock_songs') and item_type == 0:
# Don't save unlocks when we're force unlocking
continue
self.data.local.user.put_achievement(
self.game,
self.version,
userid,
item_id,
f'item_{item_type}',
{},
)
# Grab any new records set during this play session. Reflec Beat original only sends
# the top record back for songs that were played at least once during the session.
# Note that it sends the top record, so if you play the song twice, it will return
# only one record. Also, if you get a lower score than a previous try, it will return
# the previous try. So, we must also look at the battle log for the actual play scores,
# and combine the data if we can.
savedrecords: Dict[int, Dict[int, Dict[str, int]]] = {}
songplays = request.child('pdata/record')
if songplays:
for child in songplays.children:
if child.name != 'rec':
continue
songid = child.child_value('mid')
chart = child.child_value('ng')
# These don't get sent with the battle logs, so we try to construct
# the values here.
if songid not in savedrecords:
savedrecords[songid] = {}
savedrecords[songid][chart] = {
'achievement_rate': child.child_value('ar') * 10,
'points': child.child_value('bs'),
'combo': child.child_value('mc'),
'miss_count': child.child_value('bmc'),
'win': child.child_value('win'),
'lose': child.child_value('lose'),
'draw': child.child_value('draw'),
}
# Now, see the actual battles that were played. If we can, unify the data with a record.
# We only do that when the record achievement rate and score matches the battle achievement
# rate and score, so we know for a fact that that record was generated by this battle.
battlelogs = request.child('pdata/blog')
if battlelogs:
for child in battlelogs.children:
if child.name != 'log':
continue
songid = child.child_value('mid')
chart = child.child_value('ng')
clear_type = child.child_value('myself/ct')
achievement_rate = child.child_value('myself/ar') * 10
points = child.child_value('myself/s')
clear_type, combo_type = self.__game_to_db_clear_type(clear_type, achievement_rate)
combo = None
miss_count = -1
stats = None
if songid in savedrecords:
if chart in savedrecords[songid]:
data = savedrecords[songid][chart]
if (
data['achievement_rate'] == achievement_rate and
data['points'] == points
):
# This is the same record! Use the stats from it to update our
# internal representation.
combo = data['combo']
miss_count = data['miss_count']
stats = {
'win': data['win'],
'lose': data['lose'],
'draw': data['draw'],
}
self.update_score(
userid,
songid,
chart,
points,
achievement_rate,
clear_type,
combo_type,
miss_count,
combo=combo,
stats=stats,
)
# Keep track of play statistics
self.update_play_statistics(userid)
return newprofile