1
0
mirror of synced 2024-12-18 16:25:53 +01:00
artemis/titles/mai2/index.py
beerpsi a8daa0344a [mai2] Support encryption (#130)
Similar to O.N.G.E.K.I. and CHUNITHM, with the caveat that the obfuscated endpoint is created using `md5(endpoint + salt)` instead of using PBKDF2 like other games.

Tested and confirmed working on FESTiVAL+.

The current implementation is also affected by #129, so I'm open to ideas.

Reviewed-on: https://gitea.tendokyu.moe/Hay1tsme/artemis/pulls/130
Co-authored-by: beerpsi <beerpsi@duck.com>
Co-committed-by: beerpsi <beerpsi@duck.com>
2024-04-24 16:59:33 +00:00

428 lines
16 KiB
Python

from starlette.requests import Request
from starlette.responses import Response, JSONResponse
from starlette.routing import Route
import json
import inflection
import yaml
import logging, coloredlogs
import zlib
import string
from logging.handlers import TimedRotatingFileHandler
from os import path, mkdir
from typing import Tuple, List, Dict
from Crypto.Hash import MD5
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from core.config import CoreConfig
from core.utils import Utils
from core.title import BaseServlet
from .config import Mai2Config
from .const import Mai2Constants
from .base import Mai2Base
from .finale import Mai2Finale
from .dx import Mai2DX
from .dxplus import Mai2DXPlus
from .splash import Mai2Splash
from .splashplus import Mai2SplashPlus
from .universe import Mai2Universe
from .universeplus import Mai2UniversePlus
from .festival import Mai2Festival
from .festivalplus import Mai2FestivalPlus
from .buddies import Mai2Buddies
class Mai2Servlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
super().__init__(core_cfg, cfg_dir)
self.game_cfg = Mai2Config()
self.hash_table: Dict[int, Dict[str, str]] = {}
if path.exists(f"{cfg_dir}/{Mai2Constants.CONFIG_NAME}"):
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{Mai2Constants.CONFIG_NAME}"))
)
self.versions = [
Mai2Base,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
Mai2Finale,
Mai2DX,
Mai2DXPlus,
Mai2Splash,
Mai2SplashPlus,
Mai2Universe,
Mai2UniversePlus,
Mai2Festival,
Mai2FestivalPlus,
Mai2Buddies
]
self.logger = logging.getLogger("mai2")
if not hasattr(self.logger, "initted"):
log_fmt_str = "[%(asctime)s] Mai2 | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "mai2"),
encoding="utf8",
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.initted = True
for version, keys in self.game_cfg.crypto.keys.items():
if version < Mai2Constants.VER_MAIMAI_DX:
continue
if len(keys) < 3:
continue
self.hash_table[version] = {}
method_list = [
method
for method in dir(self.versions[version])
if not method.startswith("__")
]
for method in method_list:
# handle_method_api_request -> HandleMethodApiRequest
# remove the first 6 chars and the final 7 chars to get the canonical
# endpoint name.
method_fixed = inflection.camelize(method)[6:-7]
hash = MD5.new((method_fixed + keys[2]).encode())
# truncate unused bytes like the game does
hashed_name = hash.hexdigest()
self.hash_table[version][hashed_name] = method_fixed
self.logger.debug(
"Hashed v%s method %s with %s to get %s",
version, method_fixed, keys[2], hashed_name
)
@classmethod
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> bool:
game_cfg = Mai2Config()
if path.exists(f"{cfg_dir}/{Mai2Constants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{Mai2Constants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return False
return True
def get_routes(self) -> List[Route]:
return [
Route("/{version:int}/MaimaiServlet/api/movie/{endpoint:str}", self.handle_movie, methods=['GET', 'POST']),
Route("/{version:int}/MaimaiServlet/old/{endpoint:str}", self.handle_old_srv),
Route("/{version:int}/MaimaiServlet/old/{endpoint:str}/{placeid:str}/{keychip:str}/{userid:int}", self.handle_old_srv_userdata),
Route("/{version:int}/MaimaiServlet/old/{endpoint:str}/{userid:int}", self.handle_old_srv_userdata),
Route("/{version:int}/MaimaiServlet/old/{endpoint:str}/{userid:int}", self.handle_old_srv_userdata),
Route("/{version:int}/MaimaiServlet/usbdl/{endpoint:str}", self.handle_usbdl),
Route("/{version:int}/MaimaiServlet/deliver/{endpoint:str}", self.handle_deliver),
Route("/{version:int}/MaimaiServlet/{endpoint:str}", self.handle_mai, methods=['POST']),
Route("/{game:str}/{version:int}/Maimai2Servlet/{endpoint:str}", self.handle_mai2, methods=['POST']),
]
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if game_code in {Mai2Constants.GAME_CODE_DX, Mai2Constants.GAME_CODE_DX_INT}:
path = f"{game_code}/{game_ver}"
else:
path = game_ver
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
return (
f"http://{self.core_cfg.server.hostname}:{Utils.get_title_port(self.core_cfg)}/{path}/",
f"{self.core_cfg.server.hostname}",
)
return (
f"http://{self.core_cfg.server.hostname}/{path}/",
f"{self.core_cfg.server.hostname}",
)
def setup(self):
if (
self.game_cfg.uploads.photos
and self.game_cfg.uploads.photos_dir
and not path.exists(self.game_cfg.uploads.photos_dir)
):
try:
mkdir(self.game_cfg.uploads.photos_dir)
except Exception:
self.logger.error(
f"Failed to make photo upload directory at {self.game_cfg.uploads.photos_dir}"
)
if (
self.game_cfg.uploads.movies
and self.game_cfg.uploads.movies_dir
and not path.exists(self.game_cfg.uploads.movies_dir)
):
try:
mkdir(self.game_cfg.uploads.movies_dir)
except Exception:
self.logger.error(
f"Failed to make movie upload directory at {self.game_cfg.uploads.movies_dir}"
)
async def handle_movie(self, request: Request):
return JSONResponse()
async def handle_usbdl(self, request: Request):
return Response("OK")
async def handle_deliver(self, request: Request):
return Response(status_code=404)
async def handle_mai(self, request: Request) -> bytes:
endpoint: str = request.path_params.get('endpoint')
version: int = request.path_params.get('version')
if endpoint.lower() == "ping":
return Response(zlib.compress(b'{"returnCode": "1"}'))
req_raw = await request.body()
internal_ver = 0
client_ip = Utils.get_ip_addr(request)
if version < 110: # 1.0
internal_ver = Mai2Constants.VER_MAIMAI
elif version >= 110 and version < 120: # Plus
internal_ver = Mai2Constants.VER_MAIMAI_PLUS
elif version >= 120 and version < 130: # Green
internal_ver = Mai2Constants.VER_MAIMAI_GREEN
elif version >= 130 and version < 140: # Green Plus
internal_ver = Mai2Constants.VER_MAIMAI_GREEN_PLUS
elif version >= 140 and version < 150: # Orange
internal_ver = Mai2Constants.VER_MAIMAI_ORANGE
elif version >= 150 and version < 160: # Orange Plus
internal_ver = Mai2Constants.VER_MAIMAI_ORANGE_PLUS
elif version >= 160 and version < 170: # Pink
internal_ver = Mai2Constants.VER_MAIMAI_PINK
elif version >= 170 and version < 180: # Pink Plus
internal_ver = Mai2Constants.VER_MAIMAI_PINK_PLUS
elif version >= 180 and version < 185: # Murasaki
internal_ver = Mai2Constants.VER_MAIMAI_MURASAKI
elif version >= 185 and version < 190: # Murasaki Plus
internal_ver = Mai2Constants.VER_MAIMAI_MURASAKI_PLUS
elif version >= 190 and version < 195: # Milk
internal_ver = Mai2Constants.VER_MAIMAI_MILK
elif version >= 195 and version < 197: # Milk Plus
internal_ver = Mai2Constants.VER_MAIMAI_MILK_PLUS
elif version >= 197: # Finale
internal_ver = Mai2Constants.VER_MAIMAI_FINALE
try:
unzip = zlib.decompress(req_raw)
except zlib.error as e:
self.logger.error(
f"Failed to decompress v{version} {endpoint} request -> {e}"
)
return Response(zlib.compress(b'{"stat": "0"}'))
req_data = json.loads(unzip)
self.logger.info(f"v{version} {endpoint} request from {client_ip}")
self.logger.debug(req_data)
func_to_find = "handle_" + inflection.underscore(endpoint) + "_request"
handler_cls = self.versions[internal_ver](self.core_cfg, self.game_cfg)
if not hasattr(handler_cls, func_to_find):
self.logger.warning(f"Unhandled v{version} request {endpoint}")
resp = {"returnCode": 1}
else:
try:
handler = getattr(handler_cls, func_to_find)
resp = await handler(req_data)
except Exception as e:
self.logger.error(f"Error handling v{version} method {endpoint} - {e}")
return Response(zlib.compress(b'{"returnCode": "0"}'))
if resp is None:
resp = {"returnCode": 1}
self.logger.debug(f"Response {resp}")
return Response(zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8")))
async def handle_mai2(self, request: Request) -> bytes:
endpoint: str = request.path_params.get('endpoint')
version: int = request.path_params.get('version')
game_code = request.path_params.get('game')
if endpoint.lower() == "ping":
return Response(zlib.compress(b'{"returnCode": "1"}'))
req_raw = await request.body()
internal_ver = 0
client_ip = Utils.get_ip_addr(request)
encrypted = False
if version < 105: # 1.0
internal_ver = Mai2Constants.VER_MAIMAI_DX
elif version >= 105 and version < 110: # PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_PLUS
elif version >= 110 and version < 115: # Splash
internal_ver = Mai2Constants.VER_MAIMAI_DX_SPLASH
elif version >= 115 and version < 120: # Splash PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_SPLASH_PLUS
elif version >= 120 and version < 125: # UNiVERSE
internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE
elif version >= 125 and version < 130: # UNiVERSE PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE_PLUS
elif version >= 130 and version < 135: # FESTiVAL
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL
elif version >= 135 and version < 140: # FESTiVAL PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL_PLUS
elif version >= 140: # BUDDiES
internal_ver = Mai2Constants.VER_MAIMAI_DX_BUDDIES
if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32:
# If we get a 32 character long hex string, it's a hash and we're
# dealing with an encrypted request. False positives shouldn't happen
# as long as requests are suffixed with `Api`.
if internal_ver not in self.hash_table:
self.logger.error(
"v%s does not support encryption or no keys entered",
version,
)
return Response(zlib.compress(b'{"stat": "0"}'))
elif endpoint.lower() not in self.hash_table[internal_ver]:
self.logger.error(
"No hash found for v%s endpoint %s",
version, endpoint
)
return Response(zlib.compress(b'{"stat": "0"}'))
endpoint = self.hash_table[internal_ver][endpoint.lower()]
try:
crypt = AES.new(
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
AES.MODE_CBC,
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
)
req_raw = crypt.decrypt(req_raw)
except Exception as e:
self.logger.error(
"Failed to decrypt v%s request to %s",
version, endpoint,
exc_info=e,
)
return Response(zlib.compress(b'{"stat": "0"}'))
encrypted = True
if (
not encrypted
and self.game_cfg.crypto.encrypted_only
and version >= 110
):
self.logger.error(
"Unencrypted v%s %s request, but config is set to encrypted only: %r",
version, endpoint, req_raw
)
return Response(zlib.compress(b'{"stat": "0"}'))
try:
unzip = zlib.decompress(req_raw)
except zlib.error as e:
self.logger.error(
f"Failed to decompress v{version} {endpoint} request -> {e}"
)
return Response(zlib.compress(b'{"stat": "0"}'))
req_data = json.loads(unzip)
self.logger.info(f"v{version} {endpoint} request from {client_ip}")
self.logger.debug(req_data)
endpoint = (
endpoint.replace("MaimaiExp", "")
if game_code == Mai2Constants.GAME_CODE_DX_INT
else endpoint
)
func_to_find = "handle_" + inflection.underscore(endpoint) + "_request"
handler_cls = self.versions[internal_ver](self.core_cfg, self.game_cfg)
if not hasattr(handler_cls, func_to_find):
self.logger.warning(f"Unhandled v{version} request {endpoint}")
resp = {"returnCode": 1}
else:
try:
handler = getattr(handler_cls, func_to_find)
resp = await handler(req_data)
except Exception as e:
self.logger.error(f"Error handling v{version} method {endpoint} - {e}")
return Response(zlib.compress(b'{"stat": "0"}'))
if resp is None:
resp = {"returnCode": 1}
self.logger.debug(f"Response {resp}")
zipped = zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8"))
if not encrypted or version < 110:
return Response(zipped)
padded = pad(zipped, 16)
crypt = AES.new(
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
AES.MODE_CBC,
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
)
return Response(crypt.encrypt(padded))
async def handle_old_srv(self, request: Request) -> bytes:
endpoint = request.path_params.get('endpoint')
version = request.path_params.get('version')
self.logger.info(f"v{version} old server {endpoint}")
return Response(zlib.compress(b"ok"))
async def handle_old_srv_userdata(self, request: Request) -> bytes:
endpoint = request.path_params.get('endpoint')
version = request.path_params.get('version')
self.logger.info(f"v{version} old server {endpoint}")
return Response(zlib.compress(b"{}"))