1
0
mirror of synced 2025-02-19 03:24:42 +01:00
artemis/core/utils.py

153 lines
4.4 KiB
Python
Raw Normal View History

2023-02-16 17:13:41 -05:00
import importlib
2024-11-14 12:36:22 +07:00
import logging
from base64 import b64decode
from datetime import datetime, timezone
2024-11-14 12:36:22 +07:00
from os import walk
from types import ModuleType
from typing import Any, Dict, Optional
import jwt
from starlette.requests import Request
2023-02-16 17:13:41 -05:00
from .config import CoreConfig
2023-03-09 11:38:58 -05:00
2024-11-14 12:36:22 +07:00
class _MissingSentinel:
__slots__: tuple[str, ...] = ()
def __eq__(self, other) -> bool:
return False
def __bool__(self) -> bool:
return False
def __hash__(self) -> int:
return 0
def __repr__(self):
return "..."
MISSING: Any = _MissingSentinel()
"""This is different from `None` in that its type is `Any`, and so it can be used
as a placeholder for values that are *definitely* going to be initialized,
so they don't have to be typed as `T | None`, which makes type checkers
angry when an attribute is accessed.
This can also be used for when `None` has actual meaning as a value, and so a
separate value is needed to mean "unset"."""
2023-02-16 17:13:41 -05:00
class Utils:
real_title_port = None
real_title_port_ssl = None
2024-11-14 12:36:22 +07:00
2023-02-16 17:13:41 -05:00
@classmethod
def get_all_titles(cls) -> Dict[str, ModuleType]:
ret: Dict[str, Any] = {}
for root, dirs, files in walk("titles"):
2023-03-09 11:38:58 -05:00
for dir in dirs:
2023-02-16 17:13:41 -05:00
if not dir.startswith("__"):
try:
mod = importlib.import_module(f"titles.{dir}")
if hasattr(mod, "game_codes") and hasattr(
mod, "index"
): # Minimum required to function
2023-04-15 00:12:45 -04:00
ret[dir] = mod
2023-02-16 17:13:41 -05:00
except ImportError as e:
2023-02-24 14:13:31 -05:00
logging.getLogger("core").error(f"get_all_titles: {dir} - {e}")
raise
2023-02-16 17:13:41 -05:00
return ret
@classmethod
def get_ip_addr(cls, req: Request) -> str:
2024-03-29 20:13:15 -04:00
ip = req.headers.get("x-forwarded-for", req.client.host)
return ip.split(", ")[0]
2024-11-14 12:36:22 +07:00
@classmethod
def get_title_port(cls, cfg: CoreConfig):
2024-11-14 12:36:22 +07:00
if cls.real_title_port is not None:
return cls.real_title_port
cls.real_title_port = (
cfg.server.proxy_port
if cfg.server.is_using_proxy and cfg.server.proxy_port
else cfg.server.port
)
return cls.real_title_port
2024-11-14 12:36:22 +07:00
2024-01-09 17:49:18 -05:00
@classmethod
def get_title_port_ssl(cls, cfg: CoreConfig):
2024-11-14 12:36:22 +07:00
if cls.real_title_port_ssl is not None:
return cls.real_title_port_ssl
cls.real_title_port_ssl = (
cfg.server.proxy_port_ssl
if cfg.server.is_using_proxy and cfg.server.proxy_port_ssl
else 443
)
2024-01-09 17:49:18 -05:00
return cls.real_title_port_ssl
2024-11-14 12:36:22 +07:00
def create_sega_auth_key(
aime_id: int,
game: str,
place_id: int,
keychip_id: str,
b64_secret: str,
exp_seconds: int = 86400,
err_logger: str = "aimedb",
) -> Optional[str]:
logger = logging.getLogger(err_logger)
try:
2024-11-14 12:36:22 +07:00
return jwt.encode(
{
"aime_id": aime_id,
"game": game,
"place_id": place_id,
"keychip_id": keychip_id,
"exp": int(datetime.now(tz=timezone.utc).timestamp()) + exp_seconds,
},
b64decode(b64_secret),
algorithm="HS256",
)
except jwt.InvalidKeyError:
logger.error("Failed to encode Sega Auth Key because the secret is invalid!")
return None
except Exception as e:
logger.error(f"Unknown exception occoured when encoding Sega Auth Key! {e}")
return None
2024-11-14 12:36:22 +07:00
def decode_sega_auth_key(
token: str, b64_secret: str, err_logger: str = "aimedb"
) -> Optional[Dict]:
logger = logging.getLogger(err_logger)
try:
2024-11-14 12:36:22 +07:00
return jwt.decode(
token,
"secret",
b64decode(b64_secret),
algorithms=["HS256"],
options={"verify_signature": True},
)
except jwt.ExpiredSignatureError:
logger.error("Sega Auth Key failed to validate due to an expired signature!")
return None
except jwt.InvalidSignatureError:
logger.error("Sega Auth Key failed to validate due to an invalid signature!")
return None
except jwt.DecodeError as e:
logger.error(f"Sega Auth Key failed to decode! {e}")
return None
except jwt.InvalidTokenError as e:
logger.error(f"Sega Auth Key is invalid! {e}")
return None
except Exception as e:
logger.error(f"Unknown exception occoured when decoding Sega Auth Key! {e}")
return None