1
0
mirror of synced 2024-12-01 09:07:15 +01:00
artemis/titles/ongeki/index.py

167 lines
6.0 KiB
Python
Raw Normal View History

from twisted.web.http import Request
import json
import inflection
import yaml
import string
import logging
import coloredlogs
import zlib
from logging.handlers import TimedRotatingFileHandler
2023-03-05 03:39:38 +01:00
from os import path
from typing import Tuple
from core.config import CoreConfig
from core.utils import Utils
from titles.ongeki.config import OngekiConfig
from titles.ongeki.const import OngekiConstants
from titles.ongeki.base import OngekiBase
from titles.ongeki.plus import OngekiPlus
from titles.ongeki.summer import OngekiSummer
from titles.ongeki.summerplus import OngekiSummerPlus
from titles.ongeki.red import OngekiRed
from titles.ongeki.redplus import OngekiRedPlus
from titles.ongeki.bright import OngekiBright
2023-03-03 06:00:22 +01:00
from titles.ongeki.brightmemory import OngekiBrightMemory
2023-03-09 17:38:58 +01:00
class OngekiServlet:
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = OngekiConfig()
if path.exists(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}"):
2023-03-09 17:38:58 +01:00
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}"))
)
self.versions = [
OngekiBase(core_cfg, self.game_cfg),
OngekiPlus(core_cfg, self.game_cfg),
OngekiSummer(core_cfg, self.game_cfg),
OngekiSummerPlus(core_cfg, self.game_cfg),
OngekiRed(core_cfg, self.game_cfg),
OngekiRedPlus(core_cfg, self.game_cfg),
OngekiBright(core_cfg, self.game_cfg),
2023-03-03 06:00:22 +01:00
OngekiBrightMemory(core_cfg, self.game_cfg),
]
self.logger = logging.getLogger("ongeki")
log_fmt_str = "[%(asctime)s] Ongeki | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
2023-03-09 17:38:58 +01:00
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "ongeki"),
encoding="utf8",
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
2023-03-09 17:38:58 +01:00
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
2023-03-09 17:38:58 +01:00
self.logger.setLevel(self.game_cfg.server.loglevel)
2023-03-09 17:38:58 +01:00
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
2023-03-05 03:39:38 +01:00
@classmethod
2023-03-09 17:38:58 +01:00
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
2023-03-05 03:39:38 +01:00
game_cfg = OngekiConfig()
if path.exists(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}"):
2023-03-09 17:38:58 +01:00
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}"))
)
2023-03-05 03:39:38 +01:00
if not game_cfg.server.enable:
return (False, "", "")
2023-03-09 17:38:58 +01:00
2023-03-05 03:39:38 +01:00
if core_cfg.server.is_develop:
2023-03-09 17:38:58 +01:00
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
f"{core_cfg.title.hostname}:{core_cfg.title.port}/",
)
return (
True,
f"http://{core_cfg.title.hostname}/{game_code}/$v/",
f"{core_cfg.title.hostname}/",
)
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
if url_path.lower() == "ping":
return zlib.compress(b'{"returnCode": 1}')
req_raw = request.content.getvalue()
url_split = url_path.split("/")
internal_ver = 0
endpoint = url_split[len(url_split) - 1]
client_ip = Utils.get_ip_addr(request)
2023-03-09 17:38:58 +01:00
if version < 105: # 1.0
internal_ver = OngekiConstants.VER_ONGEKI
2023-03-09 17:38:58 +01:00
elif version >= 105 and version < 110: # Plus
internal_ver = OngekiConstants.VER_ONGEKI_PLUS
2023-03-09 17:38:58 +01:00
elif version >= 110 and version < 115: # Summer
internal_ver = OngekiConstants.VER_ONGEKI_SUMMER
2023-03-09 17:38:58 +01:00
elif version >= 115 and version < 120: # Summer Plus
internal_ver = OngekiConstants.VER_ONGEKI_SUMMER_PLUS
2023-03-09 17:38:58 +01:00
elif version >= 120 and version < 125: # Red
internal_ver = OngekiConstants.VER_ONGEKI_RED
2023-03-09 17:38:58 +01:00
elif version >= 125 and version < 130: # Red Plus
internal_ver = OngekiConstants.VER_ONGEKI_RED_PLUS
2023-03-09 17:38:58 +01:00
elif version >= 130 and version < 135: # Bright
internal_ver = OngekiConstants.VER_ONGEKI_BRIGHT
2023-03-09 17:38:58 +01:00
elif version >= 135 and version < 140: # Bright Memory
2023-03-03 06:00:22 +01:00
internal_ver = OngekiConstants.VER_ONGEKI_BRIGHT_MEMORY
if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32:
2023-03-09 17:38:58 +01:00
# If we get a 32 character long hex string, it's a hash and we're
# doing encrypted. The likelyhood of false positives is low but
# technically not 0
self.logger.error("Encryption not supported at this time")
return b""
2023-03-09 17:38:58 +01:00
try:
unzip = zlib.decompress(req_raw)
2023-03-09 17:38:58 +01:00
except zlib.error as e:
2023-03-09 17:38:58 +01:00
self.logger.error(
f"Failed to decompress v{version} {endpoint} request -> {e}"
)
return zlib.compress(b'{"stat": "0"}')
2023-03-09 17:38:58 +01:00
req_data = json.loads(unzip)
2023-03-09 17:38:58 +01:00
self.logger.info(
f"v{version} {endpoint} request from {client_ip}"
)
self.logger.debug(req_data)
func_to_find = "handle_" + inflection.underscore(endpoint) + "_request"
if not hasattr(self.versions[internal_ver], func_to_find):
self.logger.warning(f"Unhandled v{version} request {endpoint}")
return zlib.compress(b'{"returnCode": 1}')
try:
handler = getattr(self.versions[internal_ver], func_to_find)
resp = handler(req_data)
except Exception as e:
self.logger.error(f"Error handling v{version} method {endpoint} - {e}")
return zlib.compress(b'{"stat": "0"}')
2023-03-09 17:38:58 +01:00
if resp == None:
2023-03-09 17:38:58 +01:00
resp = {"returnCode": 1}
self.logger.debug(f"Response {resp}")
2023-03-09 17:38:58 +01:00
return zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8"))