375 lines
15 KiB
Python
375 lines
15 KiB
Python
# vim: set fileencoding=utf-8
|
|
import binascii
|
|
import base64
|
|
|
|
try:
|
|
# Python <= 3.9
|
|
from collections import Iterable
|
|
except ImportError:
|
|
# Python > 3.9
|
|
from collections.abc import Iterable
|
|
from typing import Any, Dict, List, Sequence, Union
|
|
|
|
from bemani.backend.bishi.base import BishiBashiBase
|
|
from bemani.backend.ess import EventLogHandler
|
|
from bemani.common import Profile, GameConstants, VersionConstants, Time
|
|
from bemani.data import UserID
|
|
from bemani.protocol import Node
|
|
|
|
|
|
class TheStarBishiBashi(
|
|
EventLogHandler,
|
|
BishiBashiBase,
|
|
):
|
|
name: str = "The★BishiBashi"
|
|
version: int = VersionConstants.BISHI_BASHI_TSBB
|
|
|
|
@classmethod
|
|
def get_settings(cls) -> Dict[str, Any]:
|
|
"""
|
|
Return all of our front-end modifiably settings.
|
|
"""
|
|
return {
|
|
"bools": [
|
|
{
|
|
"name": "Force Unlock All Characters",
|
|
"tip": "Force unlock all characters on select screen.",
|
|
"category": "game_config",
|
|
"setting": "force_unlock_characters",
|
|
},
|
|
{
|
|
"name": "Unlock Non-Gacha Characters",
|
|
"tip": "Unlock characters that require playing a different game to unlock.",
|
|
"category": "game_config",
|
|
"setting": "force_unlock_eamuse_characters",
|
|
},
|
|
{
|
|
"name": "Enable DLC levels",
|
|
"tip": "Enable extra DLC levels on newer cabinets.",
|
|
"category": "game_config",
|
|
"setting": "enable_dlc_levels",
|
|
},
|
|
],
|
|
"strs": [
|
|
{
|
|
"name": "Scrolling Announcement",
|
|
"tip": "An announcement that scrolls by in attract mode.",
|
|
"category": "game_config",
|
|
"setting": "big_announcement",
|
|
},
|
|
],
|
|
"longstrs": [
|
|
{
|
|
"name": "Bulletin Board Announcement",
|
|
"tip": "An announcement displayed on a bulletin board in attract mode.",
|
|
"category": "game_config",
|
|
"setting": "bb_announcement",
|
|
},
|
|
],
|
|
}
|
|
|
|
def __update_shop_name(self, profiledata: bytes) -> None:
|
|
# Figure out the profile type
|
|
csvs = profiledata.split(b",")
|
|
if len(csvs) < 2:
|
|
# Not long enough to care about
|
|
return
|
|
datatype = csvs[1].decode("ascii")
|
|
if datatype != "IBBDAT00":
|
|
# Not the right profile type requested
|
|
return
|
|
|
|
# Grab the shop name
|
|
try:
|
|
shopname = csvs[30].decode("shift-jis")
|
|
except Exception:
|
|
return
|
|
self.update_machine_name(shopname)
|
|
|
|
def __escape_string(self, data: Union[int, str]) -> str:
|
|
data = str(data)
|
|
data = data.replace("#", "##")
|
|
data = data.replace("\r\n", "#n")
|
|
data = data.replace("\r", "#n")
|
|
data = data.replace("\n", "#n")
|
|
data = data.replace(" ", "#s")
|
|
data = data.replace(",", "#,")
|
|
data = data.replace("=", "#=")
|
|
data = data.replace(";", "#;")
|
|
return data
|
|
|
|
def __generate_setting(
|
|
self, key: str, values: Union[int, str, Sequence[int], Sequence[str]]
|
|
) -> str:
|
|
if isinstance(values, Iterable) and not isinstance(values, str):
|
|
values = ",".join(self.__escape_string(x) for x in values)
|
|
else:
|
|
values = self.__escape_string(values)
|
|
key = self.__escape_string(key)
|
|
return f"{key}={values}"
|
|
|
|
def handle_system_getmaster_request(self, request: Node) -> Node:
|
|
# See if we can grab the request
|
|
data = request.child("data")
|
|
if not data:
|
|
root = Node.void("system")
|
|
root.add_child(Node.s32("result", 0))
|
|
return root
|
|
|
|
# Figure out what type of messsage this is
|
|
reqtype = data.child_value("datatype")
|
|
reqkey = data.child_value("datakey")
|
|
|
|
# System message
|
|
root = Node.void("system")
|
|
|
|
if reqtype == "S_SRVMSG" and reqkey == "INFO":
|
|
# Settings that we can tweak from the server.
|
|
# There's a variety of settings that the game supports, not all of them are figured
|
|
# out. They are documented below.
|
|
#
|
|
# "MAL": 1 - Unlock all DLC levels.
|
|
# "MO": [<levelnum>, <levelnum>, ...] - unlock certain DLC levels by ID. The four
|
|
# DLC levels are as follows:
|
|
# 14 - Morse Code
|
|
# 51 - PiroPiro
|
|
# 60 - Pop'n Music
|
|
# 61 - Love Drop
|
|
# "CM": "Arbitrary String" - Scroll the message "Arbitrary String" in attract mode.
|
|
# "IM": "Arbitrary Message" - Display "Arbitrary Message" on a new bulletin in attract mode.
|
|
# "ALL": 1 - Force-unlock all non-gacha characters.
|
|
# "MD": [<int>, <int>, ...] - Unknown setting related to demo mode. Possibly allows server-selection of
|
|
# which levels show up?
|
|
# "MQ": 0/1 - Unknown boolean setting that enables recommendation weights I think?
|
|
# "MR": [<int>, <int>, ...] - Unknown setting related to recommendation weights. Only appears to be used
|
|
# if "MQ" is set to 1.
|
|
#
|
|
# Additionally, there are a series of settings that are related to character unlocks and BGM selection.
|
|
# I haven't figured out what this setting does, but it might enable gacha-pulls of characters that otherwise
|
|
# require eAmusement plays to unlock? The settings are all in the form of "<key>": <str>. I am not sure what
|
|
# the str value should be. They are reproduced here:
|
|
# "ABB" = "BishiBashi"
|
|
# "ASF" = "Spin Fever"
|
|
# "AEK" = "Eternal Knights 2"
|
|
# "AOD" = "Otomedius"
|
|
# "ABM" = "Beatmania IIDX"
|
|
# "APM" = "pop'n music"
|
|
# "ATB" = "Twinbee"
|
|
# "AGG" = "Good Luck Goemon!"
|
|
# "AGK" = "Ga-Ko Kerotan"
|
|
# "AQM" = "Quiz Magic Academy"
|
|
# "AMF" = "Mahjong Fight Club"
|
|
# "AGF" = "Guitar Freaks"
|
|
# "ADM" = "DrumMania"
|
|
# "AJB" = "Jubeat"
|
|
# "ACL" = "Brain Development Institute Kurukuru Lab"
|
|
# "ASH" = "Silent Hill THE ARCADE"
|
|
# "AHR" = "Horse Riders"
|
|
# "AAD" = "Action Detective"
|
|
# "AWE" = "Winning Eleven"
|
|
# "ACV" = "Ajumajo Dracula (Castlevania)"
|
|
# "AGT" = "GTI Club"
|
|
# "ABH" = "Baseball Heroes"
|
|
# "ADR" = "DanceDanceRevolution"
|
|
# "AGD" = "Gradius"
|
|
# "APD" = "Parodius"
|
|
# "AGC" = "GrandCross Premium"
|
|
# "AXX" = "XeXeX"
|
|
# "ATK" = "TokiMeki Memorial"
|
|
# "AKK" = "Konami"
|
|
# "A--" = "Original"
|
|
settings: Dict[str, Union[int, str, Sequence[int], Sequence[str]]] = {}
|
|
|
|
game_config = self.get_game_config()
|
|
enable_dlc_levels = game_config.get_bool("enable_dlc_levels")
|
|
if enable_dlc_levels:
|
|
settings["MAL"] = 1
|
|
force_unlock_characters = game_config.get_bool(
|
|
"force_unlock_eamuse_characters"
|
|
)
|
|
if force_unlock_characters:
|
|
settings["ALL"] = 1
|
|
scrolling_message = game_config.get_str("big_announcement")
|
|
if scrolling_message:
|
|
settings["CM"] = scrolling_message
|
|
bb_message = game_config.get_str("bb_announcement")
|
|
if bb_message:
|
|
settings["IM"] = bb_message
|
|
|
|
# Generate system message
|
|
settings_str = ";".join(
|
|
self.__generate_setting(key, vals) for key, vals in settings.items()
|
|
)
|
|
|
|
# Send it to the client, making sure to inform the client that it was valid.
|
|
root.add_child(
|
|
Node.string(
|
|
"strdata1",
|
|
base64.b64encode(settings_str.encode("ascii")).decode("ascii"),
|
|
)
|
|
)
|
|
root.add_child(Node.string("strdata2", ""))
|
|
root.add_child(Node.u64("updatedate", Time.now() * 1000))
|
|
root.add_child(Node.s32("result", 1))
|
|
else:
|
|
# Unknown message.
|
|
root.add_child(Node.s32("result", 0))
|
|
|
|
return root
|
|
|
|
def handle_playerdata_usergamedata_send_request(self, request: Node) -> Node:
|
|
# Look up user by refid
|
|
refid = request.child_value("data/eaid")
|
|
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
|
|
if userid is None:
|
|
root = Node.void("playerdata")
|
|
root.add_child(
|
|
Node.s32("result", 1)
|
|
) # Unclear if this is the right thing to do here.
|
|
return root
|
|
|
|
# Extract new profile info from old profile
|
|
oldprofile = self.get_profile(userid)
|
|
is_new = False
|
|
if oldprofile is None:
|
|
oldprofile = Profile(self.game, self.version, refid, 0)
|
|
is_new = True
|
|
newprofile = self.unformat_profile(userid, request, oldprofile, is_new)
|
|
|
|
# Write new profile
|
|
self.put_profile(userid, newprofile)
|
|
|
|
# Return success!
|
|
root = Node.void("playerdata")
|
|
root.add_child(Node.s32("result", 0))
|
|
return root
|
|
|
|
def handle_playerdata_usergamedata_recv_request(self, request: Node) -> Node:
|
|
# Look up user by refid
|
|
refid = request.child_value("data/eaid")
|
|
profiletype = request.child_value("data/recv_csv").split(",")[0]
|
|
profile = None
|
|
userid = None
|
|
if refid is not None:
|
|
userid = self.data.remote.user.from_refid(self.game, self.version, refid)
|
|
if userid is not None:
|
|
profile = self.get_profile(userid)
|
|
if profile is not None:
|
|
return self.format_profile(userid, profiletype, profile)
|
|
else:
|
|
root = Node.void("playerdata")
|
|
root.add_child(
|
|
Node.s32("result", 1)
|
|
) # Unclear if this is the right thing to do here.
|
|
return root
|
|
|
|
def format_profile(
|
|
self, userid: UserID, profiletype: str, profile: Profile
|
|
) -> Node:
|
|
root = Node.void("playerdata")
|
|
root.add_child(Node.s32("result", 0))
|
|
player = Node.void("player")
|
|
root.add_child(player)
|
|
records = 0
|
|
|
|
for i in range(len(profile["strdatas"])):
|
|
strdata = profile["strdatas"][i]
|
|
bindata = profile["bindatas"][i]
|
|
|
|
# Figure out the profile type
|
|
csvs = strdata.split(b",")
|
|
if len(csvs) < 2:
|
|
# Not long enough to care about
|
|
continue
|
|
datatype = csvs[1].decode("ascii")
|
|
if datatype != profiletype:
|
|
# Not the right profile type requested
|
|
continue
|
|
|
|
game_config = self.get_game_config()
|
|
force_unlock_characters = game_config.get_bool("force_unlock_characters")
|
|
if force_unlock_characters:
|
|
csvs[11] = b"3ffffffffffff"
|
|
else:
|
|
# Reward characters based on playing other games on the network
|
|
hexdata = csvs[11].decode("ascii")
|
|
while (len(hexdata) & 1) != 0:
|
|
hexdata = "0" + hexdata
|
|
unlock_bits = [b for b in binascii.unhexlify(hexdata)]
|
|
while len(unlock_bits) < 7:
|
|
unlock_bits.insert(0, 0)
|
|
|
|
# Reverse the array, so indexing makes more sense
|
|
unlock_bits = unlock_bits[::-1]
|
|
|
|
# Figure out what other games were played by this user
|
|
profiles = self.data.local.user.get_games_played(userid)
|
|
|
|
# IIDX
|
|
if len([p for p in profiles if p[0] == GameConstants.IIDX]) > 0:
|
|
unlock_bits[1] = unlock_bits[1] | 0x10
|
|
|
|
# Pop'n
|
|
if len([p for p in profiles if p[0] == GameConstants.POPN_MUSIC]) > 0:
|
|
unlock_bits[1] = unlock_bits[1] | 0x60
|
|
|
|
# Jubeat
|
|
if len([p for p in profiles if p[0] == GameConstants.JUBEAT]) > 0:
|
|
unlock_bits[2] = unlock_bits[2] | 0x02
|
|
|
|
# DDR
|
|
if len([p for p in profiles if p[0] == GameConstants.DDR]) > 0:
|
|
unlock_bits[6] = unlock_bits[6] | 0x03
|
|
|
|
# GFDM characters exist, but this network has no support for
|
|
# GFDM or Gitadora, so the bits were never added.
|
|
|
|
# Reconstruct table
|
|
unlock_bits = unlock_bits[::-1]
|
|
csvs[11] = "".join([f"{x:02x}" for x in unlock_bits]).encode("ascii")
|
|
|
|
# This is a valid profile node for this type, lets return only the profile values
|
|
strdata = b",".join(csvs[2:])
|
|
record = Node.void("record")
|
|
player.add_child(record)
|
|
d = Node.string("d", base64.b64encode(strdata).decode("ascii"))
|
|
record.add_child(d)
|
|
d.add_child(Node.string("bin1", base64.b64encode(bindata).decode("ascii")))
|
|
|
|
# Remember that we had this record
|
|
records = records + 1
|
|
|
|
player.add_child(Node.u32("record_num", records))
|
|
return root
|
|
|
|
def unformat_profile(
|
|
self, userid: UserID, request: Node, oldprofile: Profile, is_new: bool
|
|
) -> Profile:
|
|
# Profile save request, data values are base64 encoded.
|
|
# d is a CSV, and bin1 is binary data.
|
|
newprofile = oldprofile.clone()
|
|
strdatas: List[bytes] = []
|
|
bindatas: List[bytes] = []
|
|
|
|
record = request.child("data/record")
|
|
for node in record.children:
|
|
if node.name != "d":
|
|
continue
|
|
|
|
profile = base64.b64decode(node.value)
|
|
# Update the shop name if this is a new profile, since we know it came
|
|
# from this cabinet. This is the only source of truth for what the
|
|
# cabinet shop name is set to.
|
|
if is_new:
|
|
self.__update_shop_name(profile)
|
|
strdatas.append(profile)
|
|
bindatas.append(base64.b64decode(node.child_value("bin1")))
|
|
|
|
newprofile["strdatas"] = strdatas
|
|
newprofile["bindatas"] = bindatas
|
|
|
|
# Keep track of play statistics across all versions
|
|
self.update_play_statistics(userid)
|
|
|
|
return newprofile
|