aimedb: soft impl of auth ids
This commit is contained in:
parent
e7fb9ce07d
commit
46f61325cb
@ -52,6 +52,7 @@ class ADBFelicaLookup2Response(ADBBaseResponse):
|
||||
self.access_code = access_code if access_code is not None else "00000000000000000000"
|
||||
self.company = CompanyCodes.SEGA
|
||||
self.portal_status = PortalRegStatus.NO_REG
|
||||
self.auth_key = [0] * 256
|
||||
|
||||
@classmethod
|
||||
def from_req(cls, req: ADBHeader, user_id: Union[int, None] = None, access_code: Union[str, None] = None) -> "ADBFelicaLookup2Response":
|
||||
@ -76,7 +77,7 @@ class ADBFelicaLookup2Response(ADBBaseResponse):
|
||||
access_code = bytes.fromhex(self.access_code),
|
||||
portal_status = self.portal_status.value,
|
||||
company_code = self.company.value,
|
||||
auth_key = [0] * 256 # Unsupported
|
||||
auth_key = self.auth_key
|
||||
))
|
||||
|
||||
self.head.length = HEADER_SIZE + len(resp_struct)
|
||||
|
@ -52,6 +52,7 @@ class ADBLookupExResponse(ADBBaseResponse):
|
||||
super().__init__(code, length, status, game_id, store_id, keychip_id)
|
||||
self.user_id = user_id if user_id is not None else -1
|
||||
self.portal_reg = PortalRegStatus.NO_REG
|
||||
self.auth_key = [0] * 256
|
||||
|
||||
@classmethod
|
||||
def from_req(cls, req: ADBHeader, user_id: Union[int, None]) -> "ADBLookupExResponse":
|
||||
@ -72,7 +73,7 @@ class ADBLookupExResponse(ADBBaseResponse):
|
||||
body = resp_struct.build(dict(
|
||||
user_id = self.user_id,
|
||||
portal_reg = self.portal_reg.value,
|
||||
auth_key = [0] * 256,
|
||||
auth_key = self.auth_key,
|
||||
relation1 = -1,
|
||||
relation2 = -1
|
||||
))
|
||||
|
@ -7,6 +7,7 @@ from typing_extensions import Final
|
||||
from logging.handlers import TimedRotatingFileHandler
|
||||
|
||||
from core.config import CoreConfig
|
||||
from core.utils import create_sega_auth_key
|
||||
from core.data import Data
|
||||
from .adb_handlers import *
|
||||
|
||||
@ -179,6 +180,14 @@ class AimedbProtocol(Protocol):
|
||||
self.logger.info(
|
||||
f"access_code {req.access_code} -> user_id {ret.user_id}"
|
||||
)
|
||||
|
||||
if user_id > 0 and self.config.aimedb.id_secret:
|
||||
auth_key = create_sega_auth_key(user_id, req.head.game_id, req.head.store_id, self.config.aimedb.id_secret)
|
||||
if auth_key is not None:
|
||||
auth_key_extra_len = 256 - len(auth_key)
|
||||
auth_key_full = auth_key.encode() + (b"\0" * auth_key_extra_len) # TODO: impl
|
||||
self.logger.debug(f"Generated auth token {auth_key}")
|
||||
|
||||
return ret
|
||||
|
||||
def handle_felica_lookup(self, data: bytes, resp_code: int) -> bytes:
|
||||
@ -241,7 +250,16 @@ class AimedbProtocol(Protocol):
|
||||
f"idm {req.idm} ipm {req.pmm} -> access_code {access_code} user_id {user_id}"
|
||||
)
|
||||
|
||||
return ADBFelicaLookup2Response.from_req(req.head, user_id, access_code)
|
||||
resp = ADBFelicaLookup2Response.from_req(req.head, user_id, access_code)
|
||||
|
||||
if user_id > 0 and self.config.aimedb.id_secret:
|
||||
auth_key = create_sega_auth_key(user_id, req.head.game_id, req.head.store_id, self.config.aimedb.id_secret)
|
||||
if auth_key is not None:
|
||||
auth_key_extra_len = 256 - len(auth_key)
|
||||
auth_key_full = auth_key.encode() + (b"\0" * auth_key_extra_len) # TODO: impl
|
||||
self.logger.debug(f"Generated auth token {auth_key}")
|
||||
|
||||
return resp
|
||||
|
||||
def handle_campaign_clear(self, data: bytes, resp_code: int) -> ADBBaseResponse:
|
||||
req = ADBCampaignClearRequest(data)
|
||||
|
@ -314,6 +314,12 @@ class AimedbConfig:
|
||||
self.__config, "core", "aimedb", "key", default=""
|
||||
)
|
||||
|
||||
@property
|
||||
def id_secret(self) -> str:
|
||||
return CoreConfig.get_config_field(
|
||||
self.__config, "core", "aimedb", "id_secret", default=""
|
||||
)
|
||||
|
||||
|
||||
class MuchaConfig:
|
||||
def __init__(self, parent_config: "CoreConfig") -> None:
|
||||
|
@ -64,10 +64,10 @@ class Utils:
|
||||
|
||||
return cls.real_title_port_ssl
|
||||
|
||||
def create_sega_auth_key(aime_id: int, game: str, ver: float, place_id: str, b64_secret: str, err_logger: str = 'aimedb') -> Optional[str]:
|
||||
def create_sega_auth_key(aime_id: int, game: str, place_id: str, b64_secret: str, err_logger: str = 'aimedb') -> Optional[str]:
|
||||
logger = logging.getLogger(err_logger)
|
||||
try:
|
||||
return jwt.encode({ "aime_id": aime_id, "game": game, "ver": ver, "place_id": place_id, "exp": int(datetime.now(tz=timezone.utc).timestamp()) + 86400 }, b64decode(b64_secret), algorithm="HS256")
|
||||
return jwt.encode({ "aime_id": aime_id, "game": game, "place_id": place_id, "exp": int(datetime.now(tz=timezone.utc).timestamp()) + 86400 }, b64decode(b64_secret), algorithm="HS256")
|
||||
except jwt.InvalidKeyError:
|
||||
logger.error("Failed to encode Sega Auth Key because the secret is invalid!")
|
||||
return None
|
||||
|
@ -56,6 +56,7 @@ aimedb:
|
||||
loglevel: "info"
|
||||
port: 22345
|
||||
key: ""
|
||||
id_secret: ""
|
||||
|
||||
mucha:
|
||||
enable: False
|
||||
|
Loading…
x
Reference in New Issue
Block a user