1
0
mirror of synced 2025-01-18 22:24:04 +01:00

Merge remote-tracking branch 'origin/develop' into fork_develop

This commit is contained in:
Dniel97 2023-03-06 17:08:50 +01:00
commit 78b2a81c79
No known key found for this signature in database
GPG Key ID: 6180B3C768FB2E08
38 changed files with 487 additions and 243 deletions

8
contributing.md Normal file
View File

@ -0,0 +1,8 @@
# Contributing to ARTEMiS
If you would like to contribute to artemis, either by adding features, games, or fixing bugs, you can do so by forking the repo and submitting a pull request [here](https://gitea.tendokyu.moe/Hay1tsme/artemis/pulls). Please make sure, if you're submitting a PR for a game or game version, that you're following the n-0/y-1 guidelines, or it will be rejected.
## Adding games
Guide WIP
## Adding game versions
Guide WIP

View File

@ -48,49 +48,13 @@ class AllnetServlet:
self.logger.error("No games detected!")
for _, mod in plugins.items():
for code in mod.game_codes:
if hasattr(mod, "use_default_title") and mod.use_default_title:
if hasattr(mod, "include_protocol") and mod.include_protocol:
if hasattr(mod, "title_secure") and mod.title_secure:
uri = "https://"
else:
uri = "http://"
if hasattr(mod.index, "get_allnet_info"):
for code in mod.game_codes:
enabled, uri, host = mod.index.get_allnet_info(code, self.config, self.config_folder)
if enabled:
self.uri_registry[code] = (uri, host)
else:
uri = ""
if core_cfg.server.is_develop:
uri += f"{core_cfg.title.hostname}:{core_cfg.title.port}"
else:
uri += f"{core_cfg.title.hostname}"
uri += f"/{code}/$v"
if hasattr(mod, "trailing_slash") and mod.trailing_slash:
uri += "/"
else:
if hasattr(mod, "uri"):
uri = mod.uri
else:
uri = ""
if hasattr(mod, "host"):
host = mod.host
elif hasattr(mod, "use_default_host") and mod.use_default_host:
if core_cfg.server.is_develop:
host = f"{core_cfg.title.hostname}:{core_cfg.title.port}"
else:
host = f"{core_cfg.title.hostname}"
else:
host = ""
self.uri_registry[code] = (uri, host)
self.logger.info(f"Allnet serving {len(self.uri_registry)} games on port {core_cfg.allnet.port}")
def handle_poweron(self, request: Request, _: Dict):

View File

@ -1,12 +1,12 @@
import logging, coloredlogs
from typing import Any, Dict, List
from typing import Optional
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import create_engine
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime
import importlib, os, json
import importlib, os
import secrets, string
import bcrypt
from hashlib import sha256
from core.config import CoreConfig
@ -31,7 +31,7 @@ class Data:
self.arcade = ArcadeData(self.config, self.session)
self.card = CardData(self.config, self.session)
self.base = BaseData(self.config, self.session)
self.schema_ver_latest = 2
self.schema_ver_latest = 4
log_fmt_str = "[%(asctime)s] %(levelname)s | Database | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
@ -138,3 +138,61 @@ class Data:
return None
self.logger.info(f"Successfully migrated {game} to schema version {version}")
def create_owner(self, email: Optional[str] = None) -> None:
pw = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(20))
hash = bcrypt.hashpw(pw.encode(), bcrypt.gensalt())
user_id = self.user.create_user(email=email, permission=255, password=hash)
if user_id is None:
self.logger.error(f"Failed to create owner with email {email}")
return
card_id = self.card.create_card(user_id, "00000000000000000000")
if card_id is None:
self.logger.error(f"Failed to create card for owner with id {user_id}")
return
self.logger.warn(f"Successfully created owner with email {email}, access code 00000000000000000000, and password {pw} Make sure to change this password and assign a real card ASAP!")
def migrate_card(self, old_ac: str, new_ac: str, should_force: bool) -> None:
if old_ac == new_ac:
self.logger.error("Both access codes are the same!")
return
new_card = self.card.get_card_by_access_code(new_ac)
if new_card is None:
self.card.update_access_code(old_ac, new_ac)
return
if not should_force:
self.logger.warn(f"Card already exists for access code {new_ac} (id {new_card['id']}). If you wish to continue, rerun with the '--force' flag."\
f" All exiting data on the target card {new_ac} will be perminently erased and replaced with data from card {old_ac}.")
return
self.logger.info(f"All exiting data on the target card {new_ac} will be perminently erased and replaced with data from card {old_ac}.")
self.card.delete_card(new_card["id"])
self.card.update_access_code(old_ac, new_ac)
hanging_user = self.user.get_user(new_card["user"])
if hanging_user["password"] is None:
self.logger.info(f"Delete hanging user {hanging_user['id']}")
self.user.delete_user(hanging_user['id'])
def delete_hanging_users(self) -> None:
"""
Finds and deletes users that have not registered for the webui that have no cards assocated with them.
"""
unreg_users = self.user.get_unregistered_users()
if unreg_users is None:
self.logger.error("Error occoured finding unregistered users")
for user in unreg_users:
cards = self.card.get_user_cards(user['id'])
if cards is None:
self.logger.error(f"Error getting cards for user {user['id']}")
continue
if not cards:
self.logger.info(f"Delete hanging user {user['id']}")
self.user.delete_user(user['id'])

View File

@ -29,6 +29,7 @@ event_log = Table(
Column("system", String(255), nullable=False),
Column("type", String(255), nullable=False),
Column("severity", Integer, nullable=False),
Column("message", String(1000), nullable=False),
Column("details", JSON, nullable=False),
Column("when_logged", TIMESTAMP, nullable=False, server_default=func.now()),
mysql_charset='utf8mb4'
@ -85,7 +86,12 @@ class BaseData():
result = self.execute(sql)
if result is None:
return None
return result.fetchone()["version"]
row = result.fetchone()
if row is None:
return None
return row["version"]
def set_schema_ver(self, ver: int, game: str = "CORE") -> Optional[int]:
sql = insert(schema_ver).values(game = game, version = ver)
@ -97,12 +103,12 @@ class BaseData():
return None
return result.lastrowid
def log_event(self, system: str, type: str, severity: int, details: Dict) -> Optional[int]:
sql = event_log.insert().values(system = system, type = type, severity = severity, details = json.dumps(details))
def log_event(self, system: str, type: str, severity: int, message: str, details: Dict = {}) -> Optional[int]:
sql = event_log.insert().values(system = system, type = type, severity = severity, message = message, details = json.dumps(details))
result = self.execute(sql)
if result is None:
self.logger.error(f"{__name__}: Failed to insert event into event log! system = {system}, type = {type}, severity = {severity}, details = {details}")
self.logger.error(f"{__name__}: Failed to insert event into event log! system = {system}, type = {type}, severity = {severity}, message = {message}")
return None
return result.lastrowid

View File

@ -3,6 +3,7 @@ from sqlalchemy import Table, Column, UniqueConstraint
from sqlalchemy.types import Integer, String, Boolean, TIMESTAMP
from sqlalchemy.sql.schema import ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.engine import Row
from core.data.schema.base import BaseData, metadata
@ -21,21 +22,44 @@ aime_card = Table(
)
class CardData(BaseData):
def get_user_id_from_card(self, access_code: str) -> Optional[int]:
"""
Given a 20 digit access code as a string, get the user id associated with that card
"""
def get_card_by_access_code(self, access_code: str) -> Optional[Row]:
sql = aime_card.select(aime_card.c.access_code == access_code)
result = self.execute(sql)
if result is None: return None
return result.fetchone()
card = result.fetchone()
def get_card_by_id(self, card_id: int) -> Optional[Row]:
sql = aime_card.select(aime_card.c.id == card_id)
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def update_access_code(self, old_ac: str, new_ac: str) -> None:
sql = aime_card.update(aime_card.c.access_code == old_ac).values(access_code = new_ac)
result = self.execute(sql)
if result is None:
self.logger.error(f"Failed to change card access code from {old_ac} to {new_ac}")
def get_user_id_from_card(self, access_code: str) -> Optional[int]:
"""
Given a 20 digit access code as a string, get the user id associated with that card
"""
card = self.get_card_by_access_code(access_code)
if card is None: return None
return int(card["user"])
def delete_card(self, card_id: int) -> None:
sql = aime_card.delete(aime_card.c.id == card_id)
def get_user_cards(self, aime_id: int) -> Optional[List[Dict]]:
result = self.execute(sql)
if result is None:
self.logger.error(f"Failed to delete card with id {card_id}")
def get_user_cards(self, aime_id: int) -> Optional[List[Row]]:
"""
Returns all cards owned by a user
"""

View File

@ -1,14 +1,12 @@
from enum import Enum
from typing import Dict, Optional
from sqlalchemy import Table, Column, and_
from typing import Optional, List
from sqlalchemy import Table, Column
from sqlalchemy.types import Integer, String, TIMESTAMP
from sqlalchemy.sql.schema import ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.dialects.mysql import insert
from sqlalchemy.sql import func, select, Delete
from uuid import uuid4
from datetime import datetime, timedelta
from sqlalchemy.sql import func, select
from sqlalchemy.engine import Row
import bcrypt
from core.data.schema.base import BaseData, metadata
@ -26,17 +24,6 @@ aime_user = Table(
mysql_charset='utf8mb4'
)
frontend_session = Table(
"frontend_session",
metadata,
Column("id", Integer, primary_key=True, unique=True),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("ip", String(15)),
Column('session_cookie', String(32), nullable=False, unique=True),
Column("expires", TIMESTAMP, nullable=False),
mysql_charset='utf8mb4'
)
class PermissionBits(Enum):
PermUser = 1
PermMod = 2
@ -44,9 +31,6 @@ class PermissionBits(Enum):
class UserData(BaseData):
def create_user(self, id: int = None, username: str = None, email: str = None, password: str = None, permission: int = 1) -> Optional[int]:
if email is None:
permission = 1
if id is None:
sql = insert(aime_user).values(
username=username,
@ -74,52 +58,40 @@ class UserData(BaseData):
if result is None: return None
return result.lastrowid
def login(self, user_id: int, passwd: bytes = None, ip: str = "0.0.0.0") -> Optional[str]:
sql = select(aime_user).where(and_(aime_user.c.id == user_id, aime_user.c.password == passwd))
result = self.execute(sql)
if result is None: return None
usr = result.fetchone()
if usr is None: return None
return self.create_session(user_id, ip)
def check_session(self, cookie: str, ip: str = "0.0.0.0") -> Optional[Row]:
sql = select(frontend_session).where(
and_(
frontend_session.c.session_cookie == cookie,
frontend_session.c.ip == ip
)
)
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def delete_session(self, session_id: int) -> bool:
sql = Delete(frontend_session).where(frontend_session.c.id == session_id)
def get_user(self, user_id: int) -> Optional[Row]:
sql = select(aime_user).where(aime_user.c.id == user_id)
result = self.execute(sql)
if result is None: return False
return True
return result.fetchone()
def check_password(self, user_id: int, passwd: bytes = None) -> bool:
usr = self.get_user(user_id)
if usr is None: return False
def create_session(self, user_id: int, ip: str = "0.0.0.0", expires: datetime = datetime.now() + timedelta(days=1)) -> Optional[str]:
cookie = uuid4().hex
if usr['password'] is None:
return False
sql = insert(frontend_session).values(
user = user_id,
ip = ip,
session_cookie = cookie,
expires = expires
)
result = self.execute(sql)
if result is None:
return None
return cookie
return bcrypt.checkpw(passwd, usr['password'].encode())
def reset_autoincrement(self, ai_value: int) -> None:
# ALTER TABLE isn't in sqlalchemy so we do this the ugly way
sql = f"ALTER TABLE aime_user AUTO_INCREMENT={ai_value}"
self.execute(sql)
self.execute(sql)
def delete_user(self, user_id: int) -> None:
sql = aime_user.delete(aime_user.c.id == user_id)
result = self.execute(sql)
if result is None:
self.logger.error(f"Failed to delete user with id {user_id}")
def get_unregistered_users(self) -> List[Row]:
"""
Returns a list of users who have not registered with the webui. They may or may not have cards.
"""
sql = select(aime_user).where(aime_user.c.password == None)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()

View File

@ -0,0 +1 @@
ALTER TABLE `event_log` DROP COLUMN `message`;

View File

@ -0,0 +1,12 @@
CREATE TABLE `frontend_session` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user` int(11) NOT NULL,
`ip` varchar(15) DEFAULT NULL,
`session_cookie` varchar(32) NOT NULL,
`expires` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp(),
PRIMARY KEY (`id`),
UNIQUE KEY `id` (`id`),
UNIQUE KEY `session_cookie` (`session_cookie`),
KEY `user` (`user`),
CONSTRAINT `frontend_session_ibfk_1` FOREIGN KEY (`user`) REFERENCES `aime_user` (`id`) ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;

View File

@ -0,0 +1 @@
ALTER TABLE `event_log` ADD COLUMN `message` VARCHAR(1000) NOT NULL AFTER `severity`;

View File

@ -0,0 +1 @@
DROP TABLE `frontend_session`;

View File

@ -4,6 +4,9 @@ from twisted.web import resource
from twisted.web.util import redirectTo
from twisted.web.http import Request
from logging.handlers import TimedRotatingFileHandler
from twisted.web.server import Session
from zope.interface import Interface, Attribute, implementer
from twisted.python.components import registerAdapter
import jinja2
import bcrypt
@ -11,6 +14,18 @@ from core.config import CoreConfig
from core.data import Data
from core.utils import Utils
class IUserSession(Interface):
userId = Attribute("User's ID")
current_ip = Attribute("User's current ip address")
permissions = Attribute("User's permission level")
@implementer(IUserSession)
class UserSession(object):
def __init__(self, session):
self.userId = 0
self.current_ip = "0.0.0.0"
self.permissions = 0
class FrontendServlet(resource.Resource):
def getChild(self, name: bytes, request: Request):
self.logger.debug(f"{request.getClientIP()} -> {name.decode()}")
@ -38,6 +53,7 @@ class FrontendServlet(resource.Resource):
self.logger.setLevel(cfg.frontend.loglevel)
coloredlogs.install(level=cfg.frontend.loglevel, logger=self.logger, fmt=log_fmt_str)
registerAdapter(UserSession, Session, IUserSession)
fe_game = FE_Game(cfg, self.environment)
games = Utils.get_all_titles()
@ -59,8 +75,8 @@ class FrontendServlet(resource.Resource):
def render_GET(self, request):
self.logger.debug(f"{request.getClientIP()} -> {request.uri.decode()}")
template = self.environment.get_template("core/frontend/index.jinja")
return template.render(server_name=self.config.server.name, title=self.config.server.name, game_list=self.game_list).encode("utf-16")
template = self.environment.get_template("core/frontend/index.jinja")
return template.render(server_name=self.config.server.name, title=self.config.server.name, game_list=self.game_list, sesh=vars(IUserSession(request.getSession()))).encode("utf-16")
class FE_Base(resource.Resource):
"""
@ -80,6 +96,12 @@ class FE_Gate(FE_Base):
def render_GET(self, request: Request):
self.logger.debug(f"{request.getClientIP()} -> {request.uri.decode()}")
uri: str = request.uri.decode()
sesh = request.getSession()
usr_sesh = IUserSession(sesh)
if usr_sesh.userId > 0:
return redirectTo(b"/user", request)
if uri.startswith("/gate/create"):
return self.create_user(request)
@ -92,7 +114,7 @@ class FE_Gate(FE_Base):
else: err = 0
template = self.environment.get_template("core/frontend/gate/gate.jinja")
return template.render(title=f"{self.core_config.server.name} | Login Gate", error=err).encode("utf-16")
return template.render(title=f"{self.core_config.server.name} | Login Gate", error=err, sesh=vars(usr_sesh)).encode("utf-16")
def render_POST(self, request: Request):
uri = request.uri.decode()
@ -100,7 +122,7 @@ class FE_Gate(FE_Base):
if uri == "/gate/gate.login":
access_code: str = request.args[b"access_code"][0].decode()
passwd: str = request.args[b"passwd"][0]
passwd: bytes = request.args[b"passwd"][0]
if passwd == b"":
passwd = None
@ -109,20 +131,22 @@ class FE_Gate(FE_Base):
return redirectTo(b"/gate?e=1", request)
if passwd is None:
sesh = self.data.user.login(uid, ip=ip)
sesh = self.data.user.check_password(uid)
if sesh is not None:
return redirectTo(f"/gate/create?ac={access_code}".encode(), request)
return redirectTo(b"/gate?e=1", request)
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(passwd, salt)
sesh = self.data.user.login(uid, hashed, ip)
if sesh is None:
if not self.data.user.check_password(uid, passwd):
return redirectTo(b"/gate?e=1", request)
request.addCookie('session', sesh)
self.logger.info(f"Successful login of user {uid} at {ip}")
sesh = request.getSession()
usr_sesh = IUserSession(sesh)
usr_sesh.userId = uid
usr_sesh.current_ip = ip
return redirectTo(b"/user", request)
elif uri == "/gate/gate.create":
@ -142,10 +166,8 @@ class FE_Gate(FE_Base):
if result is None:
return redirectTo(b"/gate?e=3", request)
sesh = self.data.user.login(uid, hashed, ip)
if sesh is None:
if not self.data.user.check_password(uid, passwd.encode()):
return redirectTo(b"/gate", request)
request.addCookie('session', sesh)
return redirectTo(b"/user", request)
@ -159,14 +181,18 @@ class FE_Gate(FE_Base):
ac = request.args[b'ac'][0].decode()
template = self.environment.get_template("core/frontend/gate/create.jinja")
return template.render(title=f"{self.core_config.server.name} | Create User", code=ac).encode("utf-16")
return template.render(title=f"{self.core_config.server.name} | Create User", code=ac, sesh={"userId": 0}).encode("utf-16")
class FE_User(FE_Base):
def render_GET(self, request: Request):
template = self.environment.get_template("core/frontend/user/index.jinja")
return template.render().encode("utf-16")
if b'session' not in request.cookies:
sesh: Session = request.getSession()
usr_sesh = IUserSession(sesh)
if usr_sesh.userId == 0:
return redirectTo(b"/gate", request)
return template.render(title=f"{self.core_config.server.name} | Account", sesh=vars(usr_sesh)).encode("utf-16")
class FE_Game(FE_Base):
isLeaf = False

View File

@ -2,10 +2,23 @@
{% block content %}
<h1>Gate</h1>
{% include "core/frontend/widgets/err_banner.jinja" %}
<style>
/* Chrome, Safari, Edge, Opera */
input::-webkit-outer-spin-button,
input::-webkit-inner-spin-button {
-webkit-appearance: none;
margin: 0;
}
/* Firefox */
input[type=number] {
-moz-appearance: textfield;
}
</style>
<form id="login" style="max-width: 240px; min-width: 10%;" action="/gate/gate.login" method="post">
<div class="form-group row">
<label for="access_code">Card Access Code</label><br>
<input form="login" class="form-control" name="access_code" id="access_code" type="text" placeholder="00000000000000000000" maxlength="20" required>
<input form="login" class="form-control" name="access_code" id="access_code" type="number" placeholder="00000000000000000000" maxlength="20" required>
</div>
<div class="form-group row">
<label for="passwd">Password</label><br>
@ -14,4 +27,6 @@
<p></p>
<input id="submit" class="btn btn-primary" style="display: block; margin: 0 auto;" form="login" type="submit" value="Login">
</form>
<h6>*To register for the webui, type in the access code of your card, as shown in a game, and leave the password field blank.</h6>
<h6>*If you have not registered a card with this server, you cannot create a webui account.</h6>
{% endblock content %}

View File

@ -9,5 +9,10 @@
</div>
</div>
<div style="background: #333; color: #f9f9f9; width: 10%; height: 50px; line-height: 50px; text-align: center; float: left;">
{% if sesh is defined and sesh["userId"] > 0 %}
<a href="/user"><button class="btn btn-primary">Account</button></a>
{% else %}
<a href="/gate"><button class="btn btn-primary">Gate</button></a>
{% endif %}
</div>

View File

@ -1,4 +1,4 @@
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
from twisted.web import resource
@ -7,10 +7,13 @@ from datetime import datetime
import pytz
from core.config import CoreConfig
from core.utils import Utils
class MuchaServlet:
def __init__(self, cfg: CoreConfig) -> None:
def __init__(self, cfg: CoreConfig, cfg_dir: str) -> None:
self.config = cfg
self.config_dir = cfg_dir
self.mucha_registry: List[str] = []
self.logger = logging.getLogger('mucha')
log_fmt_str = "[%(asctime)s] Mucha | %(levelname)s | %(message)s"
@ -28,6 +31,16 @@ class MuchaServlet:
self.logger.setLevel(logging.INFO)
coloredlogs.install(level=logging.INFO, logger=self.logger, fmt=log_fmt_str)
all_titles = Utils.get_all_titles()
for _, mod in all_titles.items():
if hasattr(mod, "index") and hasattr(mod.index, "get_mucha_info"):
enabled, game_cd = mod.index.get_mucha_info(self.config, self.config_dir)
if enabled:
self.mucha_registry.append(game_cd)
self.logger.info(f"Serving {len(self.mucha_registry)} games on port {self.config.mucha.port}")
def handle_boardauth(self, request: Request, _: Dict) -> bytes:
req_dict = self.mucha_preprocess(request.content.getvalue())
if req_dict is None:
@ -36,11 +49,15 @@ class MuchaServlet:
req = MuchaAuthRequest(req_dict)
self.logger.debug(f"Mucha request {vars(req)}")
self.logger.info(f"Boardauth request from {request.getClientAddress().host} for {req.gameVer}")
if self.config.server.is_develop:
resp = MuchaAuthResponse(mucha_url=f"{self.config.mucha.hostname}:{self.config.mucha.port}")
else:
resp = MuchaAuthResponse(mucha_url=f"{self.config.mucha.hostname}")
if req.gameCd not in self.mucha_registry:
self.logger.warn(f"Unknown gameCd {req.gameCd}")
return b""
# TODO: Decrypt S/N
resp = MuchaAuthResponse(f"{self.config.mucha.hostname}{':' + self.config.mucha.port if self.config.server.is_develop else ''}")
self.logger.debug(f"Mucha response {vars(resp)}")
@ -54,11 +71,13 @@ class MuchaServlet:
req = MuchaUpdateRequest(req_dict)
self.logger.debug(f"Mucha request {vars(req)}")
self.logger.info(f"Updatecheck request from {request.getClientAddress().host} for {req.gameVer}")
if self.config.server.is_develop:
resp = MuchaUpdateResponse(mucha_url=f"{self.config.mucha.hostname}:{self.config.mucha.port}")
else:
resp = MuchaUpdateResponse(mucha_url=f"{self.config.mucha.hostname}")
if req.gameCd not in self.mucha_registry:
self.logger.warn(f"Unknown gameCd {req.gameCd}")
return b""
resp = MuchaUpdateResponseStub(req.gameVer)
self.logger.debug(f"Mucha response {vars(resp)}")
@ -93,12 +112,13 @@ class MuchaServlet:
class MuchaAuthRequest():
def __init__(self, request: Dict) -> None:
self.gameVer = "" if "gameVer" not in request else request["gameVer"]
self.sendDate = "" if "sendDate" not in request else request["sendDate"]
self.gameVer = "" if "gameVer" not in request else request["gameVer"] # gameCd + boardType + countryCd + version
self.sendDate = "" if "sendDate" not in request else request["sendDate"] # %Y%m%d
self.serialNum = "" if "serialNum" not in request else request["serialNum"]
self.gameCd = "" if "gameCd" not in request else request["gameCd"]
self.boardType = "" if "boardType" not in request else request["boardType"]
self.boardId = "" if "boardId" not in request else request["boardId"]
self.mac = "" if "mac" not in request else request["mac"]
self.placeId = "" if "placeId" not in request else request["placeId"]
self.storeRouterIp = "" if "storeRouterIp" not in request else request["storeRouterIp"]
self.countryCd = "" if "countryCd" not in request else request["countryCd"]
@ -106,7 +126,7 @@ class MuchaAuthRequest():
self.allToken = "" if "allToken" not in request else request["allToken"]
class MuchaAuthResponse():
def __init__(self, mucha_url: str = "localhost") -> None:
def __init__(self, mucha_url: str) -> None:
self.RESULTS = "001"
self.AUTH_INTERVAL = "86400"
self.SERVER_TIME = datetime.strftime(datetime.now(), "%Y%m%d%H%M")
@ -159,7 +179,7 @@ class MuchaUpdateRequest():
self.storeRouterIp = "" if "storeRouterIp" not in request else request["storeRouterIp"]
class MuchaUpdateResponse():
def __init__(self, game_ver: str = "PKFN0JPN01.01", mucha_url: str = "localhost") -> None:
def __init__(self, game_ver: str, mucha_url: str) -> None:
self.RESULTS = "001"
self.UPDATE_VER_1 = game_ver
self.UPDATE_URL_1 = f"https://{mucha_url}/updUrl1/"
@ -171,3 +191,10 @@ class MuchaUpdateResponse():
self.COM_SIZE_1 = "0"
self.COM_TIME_1 = "0"
self.LAN_INFO_SIZE_1 = "0"
self.USER_ID = ""
self.PASSWORD = ""
class MuchaUpdateResponseStub():
def __init__(self, game_ver: str) -> None:
self.RESULTS = "001"
self.UPDATE_VER_1 = game_ver

View File

@ -37,16 +37,27 @@ class TitleServlet():
for folder, mod in plugins.items():
if hasattr(mod, "game_codes") and hasattr(mod, "index"):
handler_cls = mod.index(self.config, self.config_folder)
if hasattr(handler_cls, "setup"):
handler_cls.setup()
should_call_setup = True
for code in mod.game_codes:
self.title_registry[code] = handler_cls
if hasattr(mod.index, "get_allnet_info"):
for code in mod.game_codes:
enabled, _, _ = mod.index.get_allnet_info(code, self.config, self.config_folder)
if enabled:
handler_cls = mod.index(self.config, self.config_folder)
if hasattr(handler_cls, "setup") and should_call_setup:
handler_cls.setup()
should_call_setup = False
self.title_registry[code] = handler_cls
else:
self.logger.warn(f"Game {folder} has no get_allnet_info")
else:
self.logger.error(f"{folder} missing game_code or index in __init__.py")
self.logger.info(f"Serving {len(self.title_registry)} game codes on port {core_cfg.title.port}")
def render_GET(self, request: Request, endpoints: dict) -> bytes:

View File

@ -2,17 +2,23 @@ import yaml
import argparse
from core.config import CoreConfig
from core.data import Data
from os import path
if __name__=='__main__':
parser = argparse.ArgumentParser(description="Database utilities")
parser.add_argument("--config", "-c", type=str, help="Config folder to use", default="config")
parser.add_argument("--version", "-v", type=str, help="Version of the database to upgrade/rollback to")
parser.add_argument("--game", "-g", type=str, help="Game code of the game who's schema will be updated/rolled back. Ex. SDFE")
parser.add_argument("--email", "-e", type=str, help="Email for the new user")
parser.add_argument("--old_ac", "-o", type=str, help="Access code to transfer from")
parser.add_argument("--new_ac", "-n", type=str, help="Access code to transfer to")
parser.add_argument("--force", "-f", type=bool, help="Force the action to happen")
parser.add_argument("action", type=str, help="DB Action, create, recreate, upgrade, or rollback")
args = parser.parse_args()
cfg = CoreConfig()
cfg.update(yaml.safe_load(open(f"{args.config}/core.yaml")))
if path.exists(f"{args.config}/core.yaml"):
cfg.update(yaml.safe_load(open(f"{args.config}/core.yaml")))
data = Data(cfg)
if args.action == "create":
@ -28,9 +34,18 @@ if __name__=='__main__':
if args.game is None:
data.logger.info("No game set, upgrading core schema")
data.migrate_database("CORE", int(args.version))
data.migrate_database("CORE", int(args.version), args.action)
else:
data.migrate_database(args.game, int(args.version), args.action)
elif args.action == "create-owner":
data.create_owner(args.email)
elif args.action == "migrate-card":
data.migrate_card(args.old_ac, args.new_ac, args.force)
elif args.action == "cleanup":
data.delete_hanging_users()
data.logger.info("Done")

View File

@ -1,7 +1,11 @@
server:
hostname: "localhost"
enable: True
loglevel: "info"
port: 9000
port_matching: 9001
port_stun: 9002
port_turn: 9003
port_admission: 9004
ssl_cert: cert/pokken.crt
ssl_key: cert/pokken.key

View File

@ -23,7 +23,7 @@ class HttpDispatcher(resource.Resource):
self.allnet = AllnetServlet(cfg, config_dir)
self.title = TitleServlet(cfg, config_dir)
self.mucha = MuchaServlet(cfg)
self.mucha = MuchaServlet(cfg, config_dir)
self.map_post.connect('allnet_ping', '/naomitest.html', controller="allnet", action='handle_naomitest', conditions=dict(method=['GET']))
self.map_post.connect('allnet_poweron', '/sys/servlet/PowerOn', controller="allnet", action='handle_poweron', conditions=dict(method=['POST']))
@ -90,7 +90,8 @@ if __name__ == "__main__":
exit(1)
cfg: CoreConfig = CoreConfig()
cfg.update(yaml.safe_load(open(f"{args.config}/core.yaml")))
if path.exists(f"{args.config}/core.yaml"):
cfg.update(yaml.safe_load(open(f"{args.config}/core.yaml")))
logger = logging.getLogger("core")
log_fmt_str = "[%(asctime)s] Core | %(levelname)s | %(message)s"

View File

@ -3,7 +3,7 @@ import argparse
import re
import os
import yaml
import importlib
from os import path
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
@ -79,7 +79,8 @@ if __name__ == "__main__":
args = parser.parse_args()
config = CoreConfig()
config.update(yaml.safe_load(open(f"{args.config}/core.yaml")))
if path.exists(f"{args.config}/core.yaml"):
config.update(yaml.safe_load(open(f"{args.config}/core.yaml")))
log_fmt_str = "[%(asctime)s] Reader | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)

View File

@ -6,13 +6,5 @@ from titles.chuni.read import ChuniReader
index = ChuniServlet
database = ChuniData
reader = ChuniReader
use_default_title = True
include_protocol = True
title_secure = False
game_codes = [ChuniConstants.GAME_CODE, ChuniConstants.GAME_CODE_NEW]
trailing_slash = True
use_default_host = False
host = ""
current_schema_version = 1

View File

@ -2,6 +2,8 @@ class ChuniConstants():
GAME_CODE = "SDBT"
GAME_CODE_NEW = "SDHD"
CONFIG_NAME = "chuni.yaml"
VER_CHUNITHM = 0
VER_CHUNITHM_PLUS = 1
VER_CHUNITHM_AIR = 2

View File

@ -8,6 +8,8 @@ import inflection
import string
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from os import path
from typing import Tuple
from core import CoreConfig
from titles.chuni.config import ChuniConfig
@ -30,7 +32,8 @@ class ChuniServlet():
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = ChuniConfig()
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/chuni.yaml")))
if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"):
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}")))
self.versions = [
ChuniBase(core_cfg, self.game_cfg),
@ -68,6 +71,20 @@ class ChuniServlet():
coloredlogs.install(level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str)
self.logger.inited = True
@classmethod
def get_allnet_info(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str, str]:
game_cfg = ChuniConfig()
if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"):
game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}")))
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return (True, f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/", "")
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
req_raw = request.content.getvalue()
url_split = url_path.split("/")

View File

@ -6,16 +6,5 @@ from titles.cxb.read import CxbReader
index = CxbServlet
database = CxbData
reader = CxbReader
use_default_title = False
include_protocol = True
title_secure = True
game_codes = [CxbConstants.GAME_CODE]
trailing_slash = True
use_default_host = False
include_port = True
uri = "http://$h:$p/" # If you care about the allnet response you're probably running with no SSL
host = ""
current_schema_version = 1

View File

@ -7,7 +7,8 @@ import re
import inflection
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
from typing import Dict
from typing import Dict, Tuple
from os import path
from core.config import CoreConfig
from titles.cxb.config import CxbConfig
@ -22,7 +23,8 @@ class CxbServlet(resource.Resource):
self.cfg_dir = cfg_dir
self.core_cfg = core_cfg
self.game_cfg = CxbConfig()
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/cxb.yaml")))
if path.exists(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}"):
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}")))
self.logger = logging.getLogger("cxb")
if not hasattr(self.logger, "inited"):
@ -49,6 +51,20 @@ class CxbServlet(resource.Resource):
CxbRevSunriseS2(core_cfg, self.game_cfg),
]
@classmethod
def get_allnet_info(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str, str]:
game_cfg = CxbConfig()
if path.exists(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}"):
game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}")))
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return (True, f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/", "")
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def setup(self):
if self.game_cfg.server.enable:
endpoints.serverFromString(reactor, f"tcp:{self.game_cfg.server.port}:interface={self.core_cfg.server.listen_address}")\

View File

@ -6,13 +6,5 @@ from titles.diva.read import DivaReader
index = DivaServlet
database = DivaData
reader = DivaReader
use_default_title = True
include_protocol = True
title_secure = False
game_codes = [DivaConstants.GAME_CODE]
trailing_slash = True
use_default_host = False
host = ""
current_schema_version = 1

View File

@ -1,6 +1,8 @@
class DivaConstants():
GAME_CODE = "SBZV"
CONFIG_NAME = "diva.yaml"
VER_PROJECT_DIVA_ARCADE = 0
VER_PROJECT_DIVA_ARCADE_FUTURE_TONE = 1

View File

@ -6,16 +6,20 @@ import zlib
import json
import urllib.parse
import base64
from os import path
from typing import Tuple
from core.config import CoreConfig
from titles.diva.config import DivaConfig
from titles.diva.const import DivaConstants
from titles.diva.base import DivaBase
class DivaServlet():
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = DivaConfig()
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/diva.yaml")))
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}")))
self.base = DivaBase(core_cfg, self.game_cfg)
@ -36,6 +40,20 @@ class DivaServlet():
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str)
@classmethod
def get_allnet_info(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str, str]:
game_cfg = DivaConfig()
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}")))
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return (True, f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/", "")
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def render_POST(self, req: Request, version: int, url_path: str) -> bytes:
req_raw = req.content.getvalue()
url_header = req.getAllHeaders()

View File

@ -6,13 +6,5 @@ from titles.mai2.read import Mai2Reader
index = Mai2Servlet
database = Mai2Data
reader = Mai2Reader
use_default_title = True
include_protocol = True
title_secure = False
game_codes = [Mai2Constants.GAME_CODE]
trailing_slash = True
use_default_host = False
host = ""
current_schema_version = 2

View File

@ -6,6 +6,8 @@ import string
import logging, coloredlogs
import zlib
from logging.handlers import TimedRotatingFileHandler
from os import path
from typing import Tuple
from core.config import CoreConfig
from titles.mai2.config import Mai2Config
@ -22,7 +24,8 @@ class Mai2Servlet():
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = Mai2Config()
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{Mai2Constants.CONFIG_NAME}")))
if path.exists(f"{cfg_dir}/{Mai2Constants.CONFIG_NAME}"):
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{Mai2Constants.CONFIG_NAME}")))
self.versions = [
Mai2Base(core_cfg, self.game_cfg),
@ -50,6 +53,21 @@ class Mai2Servlet():
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str)
@classmethod
def get_allnet_info(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str, str]:
game_cfg = Mai2Config()
if path.exists(f"{cfg_dir}/{Mai2Constants.CONFIG_NAME}"):
game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{Mai2Constants.CONFIG_NAME}")))
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return (True, f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/", f"{core_cfg.title.hostname}:{core_cfg.title.port}/")
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", f"{core_cfg.title.hostname}/")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
req_raw = request.content.getvalue()
url = request.uri.decode()

View File

@ -6,13 +6,5 @@ from titles.ongeki.read import OngekiReader
index = OngekiServlet
database = OngekiData
reader = OngekiReader
use_default_title = True
include_protocol = True
title_secure = False
game_codes = [OngekiConstants.GAME_CODE]
trailing_slash = True
use_default_host = False
host = ""
current_schema_version = 2

View File

@ -3,6 +3,8 @@ from enum import Enum
class OngekiConstants():
GAME_CODE = "SDDT"
CONFIG_NAME = "ongeki.yaml"
VER_ONGEKI = 0
VER_ONGEKI_PLUS = 1
VER_ONGEKI_SUMMER = 2

View File

@ -6,6 +6,8 @@ import string
import logging, coloredlogs
import zlib
from logging.handlers import TimedRotatingFileHandler
from os import path
from typing import Tuple
from core.config import CoreConfig
from titles.ongeki.config import OngekiConfig
@ -23,7 +25,8 @@ class OngekiServlet():
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = OngekiConfig()
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/ongeki.yaml")))
if path.exists(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}"):
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}")))
self.versions = [
OngekiBase(core_cfg, self.game_cfg),
@ -52,6 +55,21 @@ class OngekiServlet():
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str)
@classmethod
def get_allnet_info(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str, str]:
game_cfg = OngekiConfig()
if path.exists(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}"):
game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}")))
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return (True, f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/", f"{core_cfg.title.hostname}:{core_cfg.title.port}/")
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", f"{core_cfg.title.hostname}/")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
req_raw = request.content.getvalue()

View File

@ -4,16 +4,5 @@ from titles.pokken.database import PokkenData
index = PokkenServlet
database = PokkenData
use_default_title = True
include_protocol = True
title_secure = True
game_codes = [PokkenConstants.GAME_CODE]
trailing_slash = True
use_default_host = False
include_port = True
uri="https://$h:$p/"
host="$h:$p/"
current_schema_version = 1

View File

@ -35,19 +35,19 @@ class PokkenBase():
regist_pcb.server_time = int(datetime.now().timestamp() / 1000)
biwa_setting = {
"MatchingServer": {
"host": f"https://{self.core_cfg.title.hostname}",
"port": 9000,
"host": f"https://{self.game_cfg.server.hostname}",
"port": self.game_cfg.server.port_matching,
"url": "/matching"
},
"StunServer": {
"addr": self.core_cfg.title.hostname,
"port": 3333
"addr": self.game_cfg.server.hostname,
"port": self.game_cfg.server.port_stun
},
"TurnServer": {
"addr": self.core_cfg.title.hostname,
"port": 4444
"addr": self.game_cfg.server.hostname,
"port": self.game_cfg.server.port_turn
},
"AdmissionUrl": f"ws://{self.core_cfg.title.hostname}:1111",
"AdmissionUrl": f"ws://{self.game_cfg.server.hostname}:{self.game_cfg.server.port_admission}",
"locationId": 123,
"logfilename": "JackalMatchingLibrary.log",
"biwalogfilename": "./biwa.log"

View File

@ -4,6 +4,10 @@ class PokkenServerConfig():
def __init__(self, parent_config: "PokkenConfig"):
self.__config = parent_config
@property
def hostname(self) -> str:
return CoreConfig.get_config_field(self.__config, 'pokken', 'server', 'hostname', default="localhost")
@property
def enable(self) -> bool:
return CoreConfig.get_config_field(self.__config, 'pokken', 'server', 'enable', default=True)
@ -18,7 +22,19 @@ class PokkenServerConfig():
@property
def port_matching(self) -> int:
return CoreConfig.get_config_field(self.__config, 'pokken', 'server', 'port', default=9001)
return CoreConfig.get_config_field(self.__config, 'pokken', 'server', 'port_matching', default=9001)
@property
def port_stun(self) -> int:
return CoreConfig.get_config_field(self.__config, 'pokken', 'server', 'port_stun', default=9002)
@property
def port_turn(self) -> int:
return CoreConfig.get_config_field(self.__config, 'pokken', 'server', 'port_turn', default=9003)
@property
def port_admission(self) -> int:
return CoreConfig.get_config_field(self.__config, 'pokken', 'server', 'port_admission', default=9004)
@property
def ssl_cert(self) -> str:

View File

@ -1,3 +1,4 @@
from typing import Tuple
from twisted.web.http import Request
from twisted.web import resource, server
from twisted.internet import reactor, endpoints
@ -11,6 +12,7 @@ from google.protobuf.message import DecodeError
from core.config import CoreConfig
from titles.pokken.config import PokkenConfig
from titles.pokken.base import PokkenBase
from titles.pokken.const import PokkenConstants
class PokkenServlet(resource.Resource):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
@ -18,7 +20,8 @@ class PokkenServlet(resource.Resource):
self.core_cfg = core_cfg
self.config_dir = cfg_dir
self.game_cfg = PokkenConfig()
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/pokken.yaml")))
if path.exists(f"{cfg_dir}/pokken.yaml"):
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/pokken.yaml")))
self.logger = logging.getLogger("pokken")
if not hasattr(self.logger, "inited"):
@ -40,6 +43,33 @@ class PokkenServlet(resource.Resource):
self.logger.inited = True
self.base = PokkenBase(core_cfg, self.game_cfg)
@classmethod
def get_allnet_info(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str, str]:
game_cfg = PokkenConfig()
if path.exists(f"{cfg_dir}/{PokkenConstants.CONFIG_NAME}"):
game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{PokkenConstants.CONFIG_NAME}")))
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return (True, f"https://{game_cfg.server.hostname}:{game_cfg.server.port}/{game_code}/$v/", f"{game_cfg.server.hostname}:{game_cfg.server.port}/")
return (True, f"https://{game_cfg.server.hostname}/{game_code}/$v/", f"{game_cfg.server.hostname}/")
@classmethod
def get_mucha_info(cls, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str, str]:
game_cfg = PokkenConfig()
if path.exists(f"{cfg_dir}/{PokkenConstants.CONFIG_NAME}"):
game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{PokkenConstants.CONFIG_NAME}")))
if not game_cfg.server.enable:
return (False, "", "")
return (True, "PKFN")
def setup(self):
"""

View File

@ -8,13 +8,5 @@ index = WaccaServlet
database = WaccaData
reader = WaccaReader
frontend = WaccaFrontend
use_default_title = True
include_protocol = True
title_secure = False
game_codes = [WaccaConstants.GAME_CODE]
trailing_slash = False
use_default_host = False
host = ""
current_schema_version = 3

View File

@ -3,10 +3,10 @@ import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
import logging
import json
from datetime import datetime
from hashlib import md5
from twisted.web.http import Request
from typing import Dict
from typing import Dict, Tuple
from os import path
from core.config import CoreConfig
from titles.wacca.config import WaccaConfig
@ -24,7 +24,8 @@ class WaccaServlet():
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = WaccaConfig()
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/wacca.yaml")))
if path.exists(f"{cfg_dir}/{WaccaConstants.CONFIG_NAME}"):
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{WaccaConstants.CONFIG_NAME}")))
self.versions = [
WaccaBase(core_cfg, self.game_cfg),
@ -51,6 +52,20 @@ class WaccaServlet():
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str)
@classmethod
def get_allnet_info(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str, str]:
game_cfg = WaccaConfig()
if path.exists(f"{cfg_dir}/{WaccaConstants.CONFIG_NAME}"):
game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{WaccaConstants.CONFIG_NAME}")))
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return (True, f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v", "")
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v", "")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
def end(resp: Dict) -> bytes:
hash = md5(json.dumps(resp, ensure_ascii=False).encode()).digest()