1
0
mirror of synced 2024-11-27 23:40:48 +01:00

adding partial Sword Art Online Arcade support

This commit is contained in:
Midorica 2023-05-26 13:45:20 -04:00
parent 7ed294e9f7
commit 72594fef31
13 changed files with 2709 additions and 0 deletions

6
example_config/sao.yaml Normal file
View File

@ -0,0 +1,6 @@
server:
hostname: "localhost"
enable: True
loglevel: "info"
port: 9000
auto_register: True

10
titles/sao/__init__.py Normal file
View File

@ -0,0 +1,10 @@
from .index import SaoServlet
from .const import SaoConstants
from .database import SaoData
from .read import SaoReader
index = SaoServlet
database = SaoData
reader = SaoReader
game_codes = [SaoConstants.GAME_CODE]
current_schema_version = 1

226
titles/sao/base.py Normal file
View File

@ -0,0 +1,226 @@
from datetime import datetime, timedelta
import json, logging
from typing import Any, Dict
import random
import struct
from core.data import Data
from core import CoreConfig
from .config import SaoConfig
from .database import SaoData
from titles.sao.handlers.base import *
class SaoBase:
def __init__(self, core_cfg: CoreConfig, game_cfg: SaoConfig) -> None:
self.core_cfg = core_cfg
self.game_cfg = game_cfg
self.core_data = Data(core_cfg)
self.game_data = SaoData(core_cfg)
self.version = 0
self.logger = logging.getLogger("sao")
def handle_noop(self, request: Any) -> bytes:
sao_request = request
sao_id = int(sao_request[:4],16) + 1
ret = struct.pack("!HHIIIIIIb", sao_id, 0, 0, 5, 1, 1, 5, 0x01000000, 0).hex()
return bytes.fromhex(ret)
def handle_c122(self, request: Any) -> bytes:
#common/get_maintenance_info
resp = SaoGetMaintResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c12e(self, request: Any) -> bytes:
#common/ac_cabinet_boot_notification
resp = SaoCommonAcCabinetBootNotificationResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c100(self, request: Any) -> bytes:
#common/get_app_versions
resp = SaoCommonGetAppVersionsRequest(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c102(self, request: Any) -> bytes:
#common/master_data_version_check
resp = SaoMasterDataVersionCheckResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c10a(self, request: Any) -> bytes:
#common/paying_play_start
resp = SaoCommonPayingPlayStartRequest(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_ca02(self, request: Any) -> bytes:
#quest_multi_play_room/get_quest_scene_multi_play_photon_server
resp = SaoGetQuestSceneMultiPlayPhotonServerResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c11e(self, request: Any) -> bytes:
#common/get_auth_card_data
#Check authentication
access_code = bytes.fromhex(request[188:268]).decode("utf-16le")
user_id = self.core_data.card.get_user_id_from_card( access_code )
if not user_id:
user_id = self.core_data.user.create_user() #works
card_id = self.core_data.card.create_card(user_id, access_code)
if card_id is None:
user_id = -1
self.logger.error("Failed to register card!")
profile_id = self.game_data.profile.create_profile(user_id)
self.logger.info(f"User Authenticated: { access_code } | { user_id }")
#Grab values from profile
profile_data = self.game_data.profile.get_profile(user_id)
if user_id and not profile_data:
profile_id = self.game_data.profile.create_profile(user_id)
profile_data = self.game_data.profile.get_profile(user_id)
resp = SaoGetAuthCardDataResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, profile_data)
return resp.make()
def handle_c40c(self, request: Any) -> bytes:
#home/check_ac_login_bonus
resp = SaoHomeCheckAcLoginBonusResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c104(self, request: Any) -> bytes:
#common/login
access_code = bytes.fromhex(request[228:308]).decode("utf-16le")
user_id = self.core_data.card.get_user_id_from_card( access_code )
profile_data = self.game_data.profile.get_profile(user_id)
resp = SaoCommonLoginResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, profile_data)
return resp.make()
def handle_c404(self, request: Any) -> bytes:
#home/check_comeback_event
resp = SaoCheckComebackEventRequest(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c000(self, request: Any) -> bytes:
#ticket/ticket
resp = SaoTicketResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c500(self, request: Any) -> bytes:
#user_info/get_user_basic_data
user_id = bytes.fromhex(request[88:112]).decode("utf-16le")
profile_data = self.game_data.profile.get_profile(user_id)
resp = SaoGetUserBasicDataResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, profile_data)
return resp.make()
def handle_c600(self, request: Any) -> bytes:
#have_object/get_hero_log_user_data_list
heroIdsData = self.game_data.static.get_hero_ids(0, True)
resp = SaoGetHeroLogUserDataListResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, heroIdsData)
return resp.make()
def handle_c602(self, request: Any) -> bytes:
#have_object/get_equipment_user_data_list
equipmentIdsData = self.game_data.static.get_equipment_ids(0, True)
resp = SaoGetEquipmentUserDataListResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, equipmentIdsData)
return resp.make()
def handle_c604(self, request: Any) -> bytes:
#have_object/get_item_user_data_list
itemIdsData = self.game_data.static.get_item_ids(0, True)
resp = SaoGetItemUserDataListResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, itemIdsData)
return resp.make()
def handle_c606(self, request: Any) -> bytes:
#have_object/get_support_log_user_data_list
supportIdsData = self.game_data.static.get_support_log_ids(0, True)
resp = SaoGetSupportLogUserDataListResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, supportIdsData)
return resp.make()
def handle_c800(self, request: Any) -> bytes:
#custom/get_title_user_data_list
titleIdsData = self.game_data.static.get_title_ids(0, True)
resp = SaoGetTitleUserDataListResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, titleIdsData)
return resp.make()
def handle_c608(self, request: Any) -> bytes:
#have_object/get_episode_append_data_list
user_id = bytes.fromhex(request[88:112]).decode("utf-16le")
profile_data = self.game_data.profile.get_profile(user_id)
resp = SaoGetEpisodeAppendDataListResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, profile_data)
return resp.make()
def handle_c804(self, request: Any) -> bytes:
#custom/get_party_data_list
resp = SaoGetPartyDataListResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c902(self, request: Any) -> bytes: # for whatever reason, having all entries empty or filled changes nothing
#quest/get_quest_scene_prev_scan_profile_card
resp = SaoGetQuestScenePrevScanProfileCardResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c124(self, request: Any) -> bytes:
#common/get_resource_path_info
resp = SaoGetResourcePathInfoResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c900(self, request: Any) -> bytes:
#quest/get_quest_scene_user_data_list // QuestScene.csv
questIdsData = self.game_data.static.get_quests_ids(0, True)
resp = SaoGetQuestSceneUserDataListResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, questIdsData)
return resp.make()
def handle_c400(self, request: Any) -> bytes:
#home/check_yui_medal_get_condition
resp = SaoCheckYuiMedalGetConditionResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c402(self, request: Any) -> bytes:
#home/get_yui_medal_bonus_user_data
resp = SaoGetYuiMedalBonusUserDataResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c40a(self, request: Any) -> bytes:
#home/check_profile_card_used_reward
resp = SaoCheckProfileCardUsedRewardResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c904(self, request: Any) -> bytes:
#quest/episode_play_start
user_id = bytes.fromhex(request[100:124]).decode("utf-16le")
profile_data = self.game_data.profile.get_profile(user_id)
resp = SaoEpisodePlayStartResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, profile_data)
return resp.make()
def handle_c908(self, request: Any) -> bytes: # function not working yet, tired of this
#quest/episode_play_end
resp = SaoEpisodePlayEndResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()
def handle_c914(self, request: Any) -> bytes:
#quest/trial_tower_play_start
user_id = bytes.fromhex(request[100:124]).decode("utf-16le")
floor_id = int(request[130:132], 16) # not required but nice to know
profile_data = self.game_data.profile.get_profile(user_id)
resp = SaoEpisodePlayStartResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1, profile_data)
return resp.make()
def handle_c90a(self, request: Any) -> bytes: #should be tweaked for proper item unlock
#quest/episode_play_end_unanalyzed_log_fixed
resp = SaoEpisodePlayEndUnanalyzedLogFixedResponse(int.from_bytes(bytes.fromhex(request[:4]), "big")+1)
return resp.make()

47
titles/sao/config.py Normal file
View File

@ -0,0 +1,47 @@
from core.config import CoreConfig
class SaoServerConfig:
def __init__(self, parent_config: "SaoConfig"):
self.__config = parent_config
@property
def hostname(self) -> str:
return CoreConfig.get_config_field(
self.__config, "sao", "server", "hostname", default="localhost"
)
@property
def enable(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "sao", "server", "enable", default=True
)
@property
def loglevel(self) -> int:
return CoreConfig.str_to_loglevel(
CoreConfig.get_config_field(
self.__config, "sao", "server", "loglevel", default="info"
)
)
@property
def port(self) -> int:
return CoreConfig.get_config_field(
self.__config, "sao", "server", "port", default=9000
)
@property
def auto_register(self) -> bool:
"""
Automatically register users in `aime_user` on first carding in with sao
if they don't exist already. Set to false to display an error instead.
"""
return CoreConfig.get_config_field(
self.__config, "sao", "server", "auto_register", default=True
)
class SaoConfig(dict):
def __init__(self) -> None:
self.server = SaoServerConfig(self)

15
titles/sao/const.py Normal file
View File

@ -0,0 +1,15 @@
from enum import Enum
class SaoConstants:
GAME_CODE = "SDEW"
CONFIG_NAME = "sao.yaml"
VER_SAO = 0
VERSION_NAMES = ("Sword Art Online Arcade")
@classmethod
def game_ver_to_string(cls, ver: int):
return cls.VERSION_NAMES[ver]

12
titles/sao/database.py Normal file
View File

@ -0,0 +1,12 @@
from core.data import Data
from core.config import CoreConfig
from .schema import *
class SaoData(Data):
def __init__(self, cfg: CoreConfig) -> None:
super().__init__(cfg)
self.profile = SaoProfileData(cfg, self.session)
self.static = SaoStaticData(cfg, self.session)

View File

@ -0,0 +1 @@
from titles.sao.handlers.base import *

1698
titles/sao/handlers/base.py Normal file

File diff suppressed because it is too large Load Diff

117
titles/sao/index.py Normal file
View File

@ -0,0 +1,117 @@
from typing import Tuple
from twisted.web.http import Request
from twisted.web import resource
import json, ast
from datetime import datetime
import yaml
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
import inflection
from os import path
from core import CoreConfig, Utils
from titles.sao.config import SaoConfig
from titles.sao.const import SaoConstants
from titles.sao.base import SaoBase
from titles.sao.handlers.base import *
class SaoServlet(resource.Resource):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.isLeaf = True
self.core_cfg = core_cfg
self.config_dir = cfg_dir
self.game_cfg = SaoConfig()
if path.exists(f"{cfg_dir}/sao.yaml"):
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/sao.yaml")))
self.logger = logging.getLogger("sao")
if not hasattr(self.logger, "inited"):
log_fmt_str = "[%(asctime)s] SAO | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "sao"),
encoding="utf8",
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.inited = True
self.base = SaoBase(core_cfg, self.game_cfg)
@classmethod
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
game_cfg = SaoConfig()
if path.exists(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return (False, "", "")
return (
True,
f"http://{game_cfg.server.hostname}:{game_cfg.server.port}/{game_code}/$v/",
f"{game_cfg.server.hostname}/SDEW/$v/",
)
@classmethod
def get_mucha_info(
cls, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
game_cfg = SaoConfig()
if path.exists(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return (False, "")
return (True, "SAO1")
def setup(self) -> None:
pass
def render_POST(
self, request: Request, version: int = 0, endpoints: str = ""
) -> bytes:
req_url = request.uri.decode()
if req_url == "/matching":
self.logger.info("Matching request")
request.responseHeaders.addRawHeader(b"content-type", b"text/html; charset=utf-8")
sao_request = request.content.getvalue().hex()
#sao_request = sao_request[:32]
handler = getattr(self.base, f"handle_{sao_request[:4]}", None)
if handler is None:
self.logger.info(f"Generic Handler for {req_url} - {sao_request[:4]}")
#self.logger.debug(f"Request: {request.content.getvalue().hex()}")
resp = SaoNoopResponse(int.from_bytes(bytes.fromhex(sao_request[:4]), "big")+1)
self.logger.debug(f"Response: {resp.make().hex()}")
return resp.make()
self.logger.info(f"Handler {req_url} - {sao_request[:4]} request")
self.logger.debug(f"Request: {request.content.getvalue().hex()}")
self.logger.debug(f"Response: {handler(sao_request).hex()}")
return handler(sao_request)

230
titles/sao/read.py Normal file
View File

@ -0,0 +1,230 @@
from typing import Optional, Dict, List
from os import walk, path
import urllib
import csv
from read import BaseReader
from core.config import CoreConfig
from titles.sao.database import SaoData
from titles.sao.const import SaoConstants
class SaoReader(BaseReader):
def __init__(
self,
config: CoreConfig,
version: int,
bin_arg: Optional[str],
opt_arg: Optional[str],
extra: Optional[str],
) -> None:
super().__init__(config, version, bin_arg, opt_arg, extra)
self.data = SaoData(config)
try:
self.logger.info(
f"Start importer for {SaoConstants.game_ver_to_string(version)}"
)
except IndexError:
self.logger.error(f"Invalid project SAO version {version}")
exit(1)
def read(self) -> None:
pull_bin_ram = True
if not path.exists(f"{self.bin_dir}"):
self.logger.warn(f"Couldn't find csv file in {self.bin_dir}, skipping")
pull_bin_ram = False
if pull_bin_ram:
self.read_csv(f"{self.bin_dir}")
def read_csv(self, bin_dir: str) -> None:
self.logger.info(f"Read csv from {bin_dir}")
self.logger.info("Now reading QuestScene.csv")
try:
fullPath = bin_dir + "/QuestScene.csv"
with open(fullPath, encoding="UTF-8") as fp:
reader = csv.DictReader(fp)
for row in reader:
questSceneId = row["QuestSceneId"]
sortNo = row["SortNo"]
name = row["Name"]
enabled = True
self.logger.info(f"Added quest {questSceneId} | Name: {name}")
try:
self.data.static.put_quest(
questSceneId,
0,
sortNo,
name,
enabled
)
except Exception as err:
print(err)
except:
self.logger.warn(f"Couldn't read csv file in {self.bin_dir}, skipping")
self.logger.info("Now reading HeroLog.csv")
try:
fullPath = bin_dir + "/HeroLog.csv"
with open(fullPath, encoding="UTF-8") as fp:
reader = csv.DictReader(fp)
for row in reader:
heroLogId = row["HeroLogId"]
name = row["Name"]
nickname = row["Nickname"]
rarity = row["Rarity"]
skillTableSubId = row["SkillTableSubId"]
awakeningExp = row["AwakeningExp"]
flavorText = row["FlavorText"]
enabled = True
self.logger.info(f"Added hero {heroLogId} | Name: {name}")
try:
self.data.static.put_hero(
0,
heroLogId,
name,
nickname,
rarity,
skillTableSubId,
awakeningExp,
flavorText,
enabled
)
except Exception as err:
print(err)
except:
self.logger.warn(f"Couldn't read csv file in {self.bin_dir}, skipping")
self.logger.info("Now reading Equipment.csv")
try:
fullPath = bin_dir + "/Equipment.csv"
with open(fullPath, encoding="UTF-8") as fp:
reader = csv.DictReader(fp)
for row in reader:
equipmentId = row["EquipmentId"]
equipmentType = row["EquipmentType"]
weaponTypeId = row["WeaponTypeId"]
name = row["Name"]
rarity = row["Rarity"]
flavorText = row["FlavorText"]
enabled = True
self.logger.info(f"Added equipment {equipmentId} | Name: {name}")
try:
self.data.static.put_equipment(
0,
equipmentId,
name,
equipmentType,
weaponTypeId,
rarity,
flavorText,
enabled
)
except Exception as err:
print(err)
except:
self.logger.warn(f"Couldn't read csv file in {self.bin_dir}, skipping")
self.logger.info("Now reading Item.csv")
try:
fullPath = bin_dir + "/Item.csv"
with open(fullPath, encoding="UTF-8") as fp:
reader = csv.DictReader(fp)
for row in reader:
itemId = row["ItemId"]
itemTypeId = row["ItemTypeId"]
name = row["Name"]
rarity = row["Rarity"]
flavorText = row["FlavorText"]
enabled = True
self.logger.info(f"Added item {itemId} | Name: {name}")
try:
self.data.static.put_item(
0,
itemId,
name,
itemTypeId,
rarity,
flavorText,
enabled
)
except Exception as err:
print(err)
except:
self.logger.warn(f"Couldn't read csv file in {self.bin_dir}, skipping")
self.logger.info("Now reading SupportLog.csv")
try:
fullPath = bin_dir + "/SupportLog.csv"
with open(fullPath, encoding="UTF-8") as fp:
reader = csv.DictReader(fp)
for row in reader:
supportLogId = row["SupportLogId"]
charaId = row["CharaId"]
name = row["Name"]
rarity = row["Rarity"]
salePrice = row["SalePrice"]
skillName = row["SkillName"]
enabled = True
self.logger.info(f"Added support log {supportLogId} | Name: {name}")
try:
self.data.static.put_support_log(
0,
supportLogId,
charaId,
name,
rarity,
salePrice,
skillName,
enabled
)
except Exception as err:
print(err)
except:
self.logger.warn(f"Couldn't read csv file in {self.bin_dir}, skipping")
self.logger.info("Now reading Title.csv")
try:
fullPath = bin_dir + "/Title.csv"
with open(fullPath, encoding="UTF-8") as fp:
reader = csv.DictReader(fp)
for row in reader:
titleId = row["TitleId"]
displayName = row["DisplayName"]
requirement = row["Requirement"]
rank = row["Rank"]
imageFilePath = row["ImageFilePath"]
enabled = True
self.logger.info(f"Added title {titleId} | Name: {displayName}")
if len(titleId) > 5:
try:
self.data.static.put_title(
0,
titleId,
displayName,
requirement,
rank,
imageFilePath,
enabled
)
except Exception as err:
print(err)
elif len(titleId) < 6: # current server code cannot have multiple lengths for the id
continue
except:
self.logger.warn(f"Couldn't read csv file in {self.bin_dir}, skipping")

View File

@ -0,0 +1,2 @@
from .profile import SaoProfileData
from .static import SaoStaticData

View File

@ -0,0 +1,48 @@
from typing import Optional, Dict, List
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_, case
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql import func, select, update, delete
from sqlalchemy.engine import Row
from sqlalchemy.dialects.mysql import insert
from core.data.schema import BaseData, metadata
from ..const import SaoConstants
profile = Table(
"sao_profile",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
unique=True,
),
Column("user_type", Integer, server_default="1"),
Column("nick_name", String(16), server_default="PLAYER"),
Column("rank_num", Integer, server_default="1"),
Column("rank_exp", Integer, server_default="0"),
Column("own_col", Integer, server_default="0"),
Column("own_vp", Integer, server_default="0"),
Column("own_yui_medal", Integer, server_default="0"),
Column("setting_title_id", Integer, server_default="20005"),
)
class SaoProfileData(BaseData):
def create_profile(self, user_id: int) -> Optional[int]:
sql = insert(profile).values(user=user_id)
conflict = sql.on_duplicate_key_update(user=user_id)
result = self.execute(conflict)
if result is None:
self.logger.error(f"Failed to create SAO profile for user {user_id}!")
return None
return result.lastrowid
def get_profile(self, user_id: int) -> Optional[Row]:
sql = profile.select(profile.c.user == user_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()

297
titles/sao/schema/static.py Normal file
View File

@ -0,0 +1,297 @@
from typing import Dict, List, Optional
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON, Float
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine import Row
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql import func, select
from sqlalchemy.dialects.mysql import insert
from core.data.schema import BaseData, metadata
quest = Table(
"sao_static_quest",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer),
Column("questSceneId", Integer),
Column("sortNo", Integer),
Column("name", String(255)),
Column("enabled", Boolean),
UniqueConstraint(
"version", "questSceneId", name="sao_static_quest_uk"
),
mysql_charset="utf8mb4",
)
hero = Table(
"sao_static_hero_list",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer),
Column("heroLogId", Integer),
Column("name", String(255)),
Column("nickname", String(255)),
Column("rarity", Integer),
Column("skillTableSubId", Integer),
Column("awakeningExp", Integer),
Column("flavorText", String(255)),
Column("enabled", Boolean),
UniqueConstraint(
"version", "heroLogId", name="sao_static_hero_list_uk"
),
mysql_charset="utf8mb4",
)
equipment = Table(
"sao_static_equipment_list",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer),
Column("equipmentId", Integer),
Column("equipmentType", Integer),
Column("weaponTypeId", Integer),
Column("name", String(255)),
Column("rarity", Integer),
Column("flavorText", String(255)),
Column("enabled", Boolean),
UniqueConstraint(
"version", "equipmentId", name="sao_static_equipment_list_uk"
),
mysql_charset="utf8mb4",
)
item = Table(
"sao_static_item_list",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer),
Column("itemId", Integer),
Column("itemTypeId", Integer),
Column("name", String(255)),
Column("rarity", Integer),
Column("flavorText", String(255)),
Column("enabled", Boolean),
UniqueConstraint(
"version", "itemId", name="sao_static_item_list_uk"
),
mysql_charset="utf8mb4",
)
support = Table(
"sao_static_support_log_list",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer),
Column("supportLogId", Integer),
Column("charaId", Integer),
Column("name", String(255)),
Column("rarity", Integer),
Column("salePrice", Integer),
Column("skillName", String(255)),
Column("enabled", Boolean),
UniqueConstraint(
"version", "supportLogId", name="sao_static_support_log_list_uk"
),
mysql_charset="utf8mb4",
)
title = Table(
"sao_static_title_list",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer),
Column("titleId", Integer),
Column("displayName", String(255)),
Column("requirement", Integer),
Column("rank", Integer),
Column("imageFilePath", String(255)),
Column("enabled", Boolean),
UniqueConstraint(
"version", "titleId", name="sao_static_title_list_uk"
),
mysql_charset="utf8mb4",
)
class SaoStaticData(BaseData):
def put_quest( self, questSceneId: int, version: int, sortNo: int, name: str, enabled: bool ) -> Optional[int]:
sql = insert(quest).values(
questSceneId=questSceneId,
version=version,
sortNo=sortNo,
name=name,
tutorial=tutorial,
)
conflict = sql.on_duplicate_key_update(
name=name, questSceneId=questSceneId, version=version
)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def put_hero( self, version: int, heroLogId: int, name: str, nickname: str, rarity: int, skillTableSubId: int, awakeningExp: int, flavorText: str, enabled: bool ) -> Optional[int]:
sql = insert(hero).values(
version=version,
heroLogId=heroLogId,
name=name,
nickname=nickname,
rarity=rarity,
skillTableSubId=skillTableSubId,
awakeningExp=awakeningExp,
flavorText=flavorText,
enabled=enabled
)
conflict = sql.on_duplicate_key_update(
name=name, heroLogId=heroLogId
)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def put_equipment( self, version: int, equipmentId: int, name: str, equipmentType: int, weaponTypeId:int, rarity: int, flavorText: str, enabled: bool ) -> Optional[int]:
sql = insert(equipment).values(
version=version,
equipmentId=equipmentId,
name=name,
equipmentType=equipmentType,
weaponTypeId=weaponTypeId,
rarity=rarity,
flavorText=flavorText,
enabled=enabled
)
conflict = sql.on_duplicate_key_update(
name=name, equipmentId=equipmentId
)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def put_item( self, version: int, itemId: int, name: str, itemTypeId: int, rarity: int, flavorText: str, enabled: bool ) -> Optional[int]:
sql = insert(item).values(
version=version,
itemId=itemId,
name=name,
itemTypeId=itemTypeId,
rarity=rarity,
flavorText=flavorText,
enabled=enabled
)
conflict = sql.on_duplicate_key_update(
name=name, itemId=itemId
)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def put_support_log( self, version: int, supportLogId: int, charaId: int, name: str, rarity: int, salePrice: int, skillName: str, enabled: bool ) -> Optional[int]:
sql = insert(support).values(
version=version,
supportLogId=supportLogId,
charaId=charaId,
name=name,
rarity=rarity,
salePrice=salePrice,
skillName=skillName,
enabled=enabled
)
conflict = sql.on_duplicate_key_update(
name=name, supportLogId=supportLogId
)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def put_title( self, version: int, titleId: int, displayName: str, requirement: int, rank: int, imageFilePath: str, enabled: bool ) -> Optional[int]:
sql = insert(title).values(
version=version,
titleId=titleId,
displayName=displayName,
requirement=requirement,
rank=rank,
imageFilePath=imageFilePath,
enabled=enabled
)
conflict = sql.on_duplicate_key_update(
displayName=displayName, titleId=titleId
)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def get_quests_ids(self, version: int, enabled: bool) -> Optional[List[Dict]]:
sql = quest.select(quest.c.version == version and quest.c.enabled == enabled).order_by(
quest.c.questSceneId.asc()
)
result = self.execute(sql)
if result is None:
return None
return [list[2] for list in result.fetchall()]
def get_hero_ids(self, version: int, enabled: bool) -> Optional[List[Dict]]:
sql = hero.select(hero.c.version == version and hero.c.enabled == enabled).order_by(
hero.c.heroLogId.asc()
)
result = self.execute(sql)
if result is None:
return None
return [list[2] for list in result.fetchall()]
def get_equipment_ids(self, version: int, enabled: bool) -> Optional[List[Dict]]:
sql = equipment.select(equipment.c.version == version and equipment.c.enabled == enabled).order_by(
equipment.c.equipmentId.asc()
)
result = self.execute(sql)
if result is None:
return None
return [list[2] for list in result.fetchall()]
def get_item_ids(self, version: int, enabled: bool) -> Optional[List[Dict]]:
sql = item.select(item.c.version == version and item.c.enabled == enabled).order_by(
item.c.itemId.asc()
)
result = self.execute(sql)
if result is None:
return None
return [list[2] for list in result.fetchall()]
def get_support_log_ids(self, version: int, enabled: bool) -> Optional[List[Dict]]:
sql = support.select(support.c.version == version and support.c.enabled == enabled).order_by(
support.c.supportLogId.asc()
)
result = self.execute(sql)
if result is None:
return None
return [list[2] for list in result.fetchall()]
def get_title_ids(self, version: int, enabled: bool) -> Optional[List[Dict]]:
sql = title.select(title.c.version == version and title.c.enabled == enabled).order_by(
title.c.titleId.asc()
)
result = self.execute(sql)
if result is None:
return None
return [list[2] for list in result.fetchall()]