153 lines
4.4 KiB
Python
153 lines
4.4 KiB
Python
import importlib
|
|
import logging
|
|
from base64 import b64decode
|
|
from datetime import datetime, timezone
|
|
from os import walk
|
|
from types import ModuleType
|
|
from typing import Any, Dict, Optional
|
|
|
|
import jwt
|
|
from starlette.requests import Request
|
|
|
|
from .config import CoreConfig
|
|
|
|
|
|
class _MissingSentinel:
|
|
__slots__: tuple[str, ...] = ()
|
|
|
|
def __eq__(self, other) -> bool:
|
|
return False
|
|
|
|
def __bool__(self) -> bool:
|
|
return False
|
|
|
|
def __hash__(self) -> int:
|
|
return 0
|
|
|
|
def __repr__(self):
|
|
return "..."
|
|
|
|
|
|
MISSING: Any = _MissingSentinel()
|
|
"""This is different from `None` in that its type is `Any`, and so it can be used
|
|
as a placeholder for values that are *definitely* going to be initialized,
|
|
so they don't have to be typed as `T | None`, which makes type checkers
|
|
angry when an attribute is accessed.
|
|
|
|
This can also be used for when `None` has actual meaning as a value, and so a
|
|
separate value is needed to mean "unset"."""
|
|
|
|
|
|
class Utils:
|
|
real_title_port = None
|
|
real_title_port_ssl = None
|
|
|
|
@classmethod
|
|
def get_all_titles(cls) -> Dict[str, ModuleType]:
|
|
ret: Dict[str, Any] = {}
|
|
|
|
for root, dirs, files in walk("titles"):
|
|
for dir in dirs:
|
|
if not dir.startswith("__"):
|
|
try:
|
|
mod = importlib.import_module(f"titles.{dir}")
|
|
if hasattr(mod, "game_codes") and hasattr(
|
|
mod, "index"
|
|
): # Minimum required to function
|
|
ret[dir] = mod
|
|
|
|
except ImportError as e:
|
|
logging.getLogger("core").error(f"get_all_titles: {dir} - {e}")
|
|
raise
|
|
return ret
|
|
|
|
@classmethod
|
|
def get_ip_addr(cls, req: Request) -> str:
|
|
ip = req.headers.get("x-forwarded-for", req.client.host)
|
|
return ip.split(", ")[0]
|
|
|
|
@classmethod
|
|
def get_title_port(cls, cfg: CoreConfig):
|
|
if cls.real_title_port is not None:
|
|
return cls.real_title_port
|
|
|
|
cls.real_title_port = (
|
|
cfg.server.proxy_port
|
|
if cfg.server.is_using_proxy and cfg.server.proxy_port
|
|
else cfg.server.port
|
|
)
|
|
|
|
return cls.real_title_port
|
|
|
|
@classmethod
|
|
def get_title_port_ssl(cls, cfg: CoreConfig):
|
|
if cls.real_title_port_ssl is not None:
|
|
return cls.real_title_port_ssl
|
|
|
|
cls.real_title_port_ssl = (
|
|
cfg.server.proxy_port_ssl
|
|
if cfg.server.is_using_proxy and cfg.server.proxy_port_ssl
|
|
else 443
|
|
)
|
|
|
|
return cls.real_title_port_ssl
|
|
|
|
|
|
def create_sega_auth_key(
|
|
aime_id: int,
|
|
game: str,
|
|
place_id: int,
|
|
keychip_id: str,
|
|
b64_secret: str,
|
|
exp_seconds: int = 86400,
|
|
err_logger: str = "aimedb",
|
|
) -> Optional[str]:
|
|
logger = logging.getLogger(err_logger)
|
|
try:
|
|
return jwt.encode(
|
|
{
|
|
"aime_id": aime_id,
|
|
"game": game,
|
|
"place_id": place_id,
|
|
"keychip_id": keychip_id,
|
|
"exp": int(datetime.now(tz=timezone.utc).timestamp()) + exp_seconds,
|
|
},
|
|
b64decode(b64_secret),
|
|
algorithm="HS256",
|
|
)
|
|
except jwt.InvalidKeyError:
|
|
logger.error("Failed to encode Sega Auth Key because the secret is invalid!")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unknown exception occoured when encoding Sega Auth Key! {e}")
|
|
return None
|
|
|
|
|
|
def decode_sega_auth_key(
|
|
token: str, b64_secret: str, err_logger: str = "aimedb"
|
|
) -> Optional[Dict]:
|
|
logger = logging.getLogger(err_logger)
|
|
try:
|
|
return jwt.decode(
|
|
token,
|
|
"secret",
|
|
b64decode(b64_secret),
|
|
algorithms=["HS256"],
|
|
options={"verify_signature": True},
|
|
)
|
|
except jwt.ExpiredSignatureError:
|
|
logger.error("Sega Auth Key failed to validate due to an expired signature!")
|
|
return None
|
|
except jwt.InvalidSignatureError:
|
|
logger.error("Sega Auth Key failed to validate due to an invalid signature!")
|
|
return None
|
|
except jwt.DecodeError as e:
|
|
logger.error(f"Sega Auth Key failed to decode! {e}")
|
|
return None
|
|
except jwt.InvalidTokenError as e:
|
|
logger.error(f"Sega Auth Key is invalid! {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unknown exception occoured when decoding Sega Auth Key! {e}")
|
|
return None
|