600 lines
21 KiB
Python
600 lines
21 KiB
Python
from typing import Any, List, Dict
|
|
from datetime import datetime, timedelta
|
|
import pytz
|
|
import json
|
|
from random import randint
|
|
|
|
from core.config import CoreConfig
|
|
from titles.mai2.base import Mai2Base
|
|
from titles.mai2.config import Mai2Config
|
|
from titles.mai2.const import Mai2Constants
|
|
|
|
|
|
class Mai2DX(Mai2Base):
|
|
def __init__(self, cfg: CoreConfig, game_cfg: Mai2Config) -> None:
|
|
super().__init__(cfg, game_cfg)
|
|
self.version = Mai2Constants.VER_MAIMAI_DX
|
|
|
|
async def handle_get_game_setting_api_request(self, data: Dict):
|
|
return {
|
|
"gameSetting": {
|
|
"isMaintenance": False,
|
|
"requestInterval": 1800,
|
|
"rebootStartTime": "2020-01-01 07:00:00.0",
|
|
"rebootEndTime": "2020-01-01 07:59:59.0",
|
|
"movieUploadLimit": 100,
|
|
"movieStatus": 1,
|
|
"movieServerUri": self.old_server + "movie/",
|
|
"deliverServerUri": self.old_server + "deliver/" if self.can_deliver and self.game_config.deliver.enable else "",
|
|
"oldServerUri": self.old_server + "old",
|
|
"usbDlServerUri": self.old_server + "usbdl/" if self.can_deliver and self.game_config.deliver.udbdl_enable else "",
|
|
"rebootInterval": 0,
|
|
},
|
|
"isAouAccession": False,
|
|
}
|
|
|
|
async def handle_get_user_preview_api_request(self, data: Dict) -> Dict:
|
|
p = await self.data.profile.get_profile_detail(data["userId"], self.version)
|
|
o = await self.data.profile.get_profile_option(data["userId"], self.version)
|
|
if p is None or o is None:
|
|
return {} # Register
|
|
profile = p._asdict()
|
|
option = o._asdict()
|
|
|
|
return {
|
|
"userId": data["userId"],
|
|
"userName": profile["userName"],
|
|
"isLogin": False,
|
|
"lastGameId": profile["lastGameId"],
|
|
"lastDataVersion": profile["lastDataVersion"],
|
|
"lastRomVersion": profile["lastRomVersion"],
|
|
"lastLoginDate": profile["lastLoginDate"],
|
|
"lastPlayDate": profile["lastPlayDate"],
|
|
"playerRating": profile["playerRating"],
|
|
"nameplateId": 0, # Unused
|
|
"iconId": profile["iconId"],
|
|
"trophyId": 0, # Unused
|
|
"partnerId": profile["partnerId"],
|
|
"frameId": profile["frameId"],
|
|
"dispRate": option[
|
|
"dispRate"
|
|
], # 0: all/begin, 1: disprate, 2: dispDan, 3: hide, 4: end
|
|
"totalAwake": profile["totalAwake"],
|
|
"isNetMember": profile["isNetMember"],
|
|
"dailyBonusDate": profile["dailyBonusDate"],
|
|
"headPhoneVolume": option["headPhoneVolume"],
|
|
"isInherit": False, # Not sure what this is or does??
|
|
"banState": profile["banState"]
|
|
if profile["banState"] is not None
|
|
else 0, # New with uni+
|
|
}
|
|
|
|
async def handle_upload_user_playlog_api_request(self, data: Dict) -> Dict:
|
|
user_id = data["userId"]
|
|
playlog = data["userPlaylog"]
|
|
|
|
await self.data.score.put_playlog(user_id, playlog)
|
|
|
|
return {"returnCode": 1, "apiName": "UploadUserPlaylogApi"}
|
|
|
|
async def handle_upsert_user_chargelog_api_request(self, data: Dict) -> Dict:
|
|
user_id = data["userId"]
|
|
charge = data["userCharge"]
|
|
|
|
# remove the ".0" from the date string, festival only?
|
|
charge["purchaseDate"] = charge["purchaseDate"].replace(".0", "")
|
|
await self.data.item.put_charge(
|
|
user_id,
|
|
charge["chargeId"],
|
|
charge["stock"],
|
|
datetime.strptime(charge["purchaseDate"], Mai2Constants.DATE_TIME_FORMAT),
|
|
datetime.strptime(charge["validDate"], Mai2Constants.DATE_TIME_FORMAT),
|
|
)
|
|
|
|
return {"returnCode": 1, "apiName": "UpsertUserChargelogApi"}
|
|
|
|
async def handle_upsert_user_all_api_request(self, data: Dict) -> Dict:
|
|
user_id = data["userId"]
|
|
upsert = data["upsertUserAll"]
|
|
|
|
if int(user_id) & 1000000000001 == 1000000000001:
|
|
self.logger.info("Guest play, ignoring.")
|
|
return {"returnCode": 1, "apiName": "UpsertUserAllApi"}
|
|
|
|
if "userData" in upsert and len(upsert["userData"]) > 0:
|
|
upsert["userData"][0]["isNetMember"] = 1
|
|
upsert["userData"][0].pop("accessCode")
|
|
await self.data.profile.put_profile_detail(
|
|
user_id, self.version, upsert["userData"][0]
|
|
)
|
|
|
|
if "userExtend" in upsert and len(upsert["userExtend"]) > 0:
|
|
await self.data.profile.put_profile_extend(
|
|
user_id, self.version, upsert["userExtend"][0]
|
|
)
|
|
|
|
if "userGhost" in upsert:
|
|
for ghost in upsert["userGhost"]:
|
|
await self.data.profile.put_profile_ghost(user_id, self.version, ghost)
|
|
|
|
if "userOption" in upsert and len(upsert["userOption"]) > 0:
|
|
await self.data.profile.put_profile_option(
|
|
user_id, self.version, upsert["userOption"][0]
|
|
)
|
|
|
|
if "userRatingList" in upsert and len(upsert["userRatingList"]) > 0:
|
|
await self.data.profile.put_profile_rating(
|
|
user_id, self.version, upsert["userRatingList"][0]
|
|
)
|
|
|
|
if "userActivityList" in upsert and len(upsert["userActivityList"]) > 0:
|
|
for k, v in upsert["userActivityList"][0].items():
|
|
for act in v:
|
|
await self.data.profile.put_profile_activity(user_id, act)
|
|
|
|
if "userChargeList" in upsert and len(upsert["userChargeList"]) > 0:
|
|
for charge in upsert["userChargeList"]:
|
|
# remove the ".0" from the date string, festival only?
|
|
charge["purchaseDate"] = charge["purchaseDate"].replace(".0", "")
|
|
await self.data.item.put_charge(
|
|
user_id,
|
|
charge["chargeId"],
|
|
charge["stock"],
|
|
datetime.strptime(
|
|
charge["purchaseDate"], Mai2Constants.DATE_TIME_FORMAT
|
|
),
|
|
datetime.strptime(
|
|
charge["validDate"], Mai2Constants.DATE_TIME_FORMAT
|
|
),
|
|
)
|
|
|
|
if "userCharacterList" in upsert and len(upsert["userCharacterList"]) > 0:
|
|
for char in upsert["userCharacterList"]:
|
|
await self.data.item.put_character(
|
|
user_id,
|
|
char["characterId"],
|
|
char["level"],
|
|
char["awakening"],
|
|
char["useCount"],
|
|
)
|
|
|
|
if "userItemList" in upsert and len(upsert["userItemList"]) > 0:
|
|
for item in upsert["userItemList"]:
|
|
await self.data.item.put_item(
|
|
user_id,
|
|
int(item["itemKind"]),
|
|
item["itemId"],
|
|
item["stock"],
|
|
item["isValid"],
|
|
)
|
|
|
|
if "userLoginBonusList" in upsert and len(upsert["userLoginBonusList"]) > 0:
|
|
for login_bonus in upsert["userLoginBonusList"]:
|
|
await self.data.item.put_login_bonus(
|
|
user_id,
|
|
login_bonus["bonusId"],
|
|
login_bonus["point"],
|
|
login_bonus["isCurrent"],
|
|
login_bonus["isComplete"],
|
|
)
|
|
|
|
if "userMapList" in upsert and len(upsert["userMapList"]) > 0:
|
|
for map in upsert["userMapList"]:
|
|
await self.data.item.put_map(
|
|
user_id,
|
|
map["mapId"],
|
|
map["distance"],
|
|
map["isLock"],
|
|
map["isClear"],
|
|
map["isComplete"],
|
|
)
|
|
|
|
if "userMusicDetailList" in upsert and len(upsert["userMusicDetailList"]) > 0:
|
|
for music in upsert["userMusicDetailList"]:
|
|
await self.data.score.put_best_score(user_id, music)
|
|
|
|
if "userCourseList" in upsert and len(upsert["userCourseList"]) > 0:
|
|
for course in upsert["userCourseList"]:
|
|
await self.data.score.put_course(user_id, course)
|
|
|
|
if "userFavoriteList" in upsert and len(upsert["userFavoriteList"]) > 0:
|
|
for fav in upsert["userFavoriteList"]:
|
|
await self.data.item.put_favorite(user_id, fav["kind"], fav["itemIdList"])
|
|
|
|
if (
|
|
"userFriendSeasonRankingList" in upsert
|
|
and len(upsert["userFriendSeasonRankingList"]) > 0
|
|
):
|
|
for fsr in upsert["userFriendSeasonRankingList"]:
|
|
fsr["recordDate"] = (
|
|
datetime.strptime(
|
|
fsr["recordDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
|
|
),
|
|
)
|
|
await self.data.item.put_friend_season_ranking(user_id, fsr)
|
|
|
|
return {"returnCode": 1, "apiName": "UpsertUserAllApi"}
|
|
|
|
async def handle_get_user_data_api_request(self, data: Dict) -> Dict:
|
|
profile = await self.data.profile.get_profile_detail(data["userId"], self.version)
|
|
if profile is None:
|
|
return
|
|
|
|
profile_dict = profile._asdict()
|
|
profile_dict.pop("id")
|
|
profile_dict.pop("user")
|
|
profile_dict.pop("version")
|
|
|
|
return {"userId": data["userId"], "userData": profile_dict}
|
|
|
|
async def handle_get_user_extend_api_request(self, data: Dict) -> Dict:
|
|
extend = await self.data.profile.get_profile_extend(data["userId"], self.version)
|
|
if extend is None:
|
|
return
|
|
|
|
extend_dict = extend._asdict()
|
|
extend_dict.pop("id")
|
|
extend_dict.pop("user")
|
|
extend_dict.pop("version")
|
|
|
|
return {"userId": data["userId"], "userExtend": extend_dict}
|
|
|
|
async def handle_get_user_option_api_request(self, data: Dict) -> Dict:
|
|
options = await self.data.profile.get_profile_option(data["userId"], self.version)
|
|
if options is None:
|
|
return
|
|
|
|
options_dict = options._asdict()
|
|
options_dict.pop("id")
|
|
options_dict.pop("user")
|
|
options_dict.pop("version")
|
|
|
|
return {"userId": data["userId"], "userOption": options_dict}
|
|
|
|
async def handle_get_user_card_api_request(self, data: Dict) -> Dict:
|
|
user_cards = await self.data.item.get_cards(data["userId"])
|
|
if user_cards is None:
|
|
return {"userId": data["userId"], "nextIndex": 0, "userCardList": []}
|
|
|
|
max_ct = data["maxCount"]
|
|
next_idx = data["nextIndex"]
|
|
start_idx = next_idx
|
|
end_idx = max_ct + start_idx
|
|
|
|
if len(user_cards[start_idx:]) > max_ct:
|
|
next_idx += max_ct
|
|
else:
|
|
next_idx = 0
|
|
|
|
card_list = []
|
|
for card in user_cards:
|
|
tmp = card._asdict()
|
|
tmp.pop("id")
|
|
tmp.pop("user")
|
|
tmp["startDate"] = datetime.strftime(
|
|
tmp["startDate"], Mai2Constants.DATE_TIME_FORMAT
|
|
)
|
|
tmp["endDate"] = datetime.strftime(
|
|
tmp["endDate"], Mai2Constants.DATE_TIME_FORMAT
|
|
)
|
|
card_list.append(tmp)
|
|
|
|
return {
|
|
"userId": data["userId"],
|
|
"nextIndex": next_idx,
|
|
"userCardList": card_list[start_idx:end_idx],
|
|
}
|
|
|
|
async def handle_get_user_item_api_request(self, data: Dict) -> Dict:
|
|
kind = int(data["nextIndex"] / 10000000000)
|
|
next_idx = int(data["nextIndex"] % 10000000000)
|
|
user_item_list = await self.data.item.get_items(data["userId"], kind)
|
|
|
|
items: List[Dict[str, Any]] = []
|
|
for i in range(next_idx, len(user_item_list)):
|
|
tmp = user_item_list[i]._asdict()
|
|
tmp.pop("user")
|
|
tmp.pop("id")
|
|
items.append(tmp)
|
|
if len(items) >= int(data["maxCount"]):
|
|
break
|
|
|
|
xout = kind * 10000000000 + next_idx + len(items)
|
|
|
|
if len(items) < int(data["maxCount"]):
|
|
next_idx = 0
|
|
else:
|
|
next_idx = xout
|
|
|
|
return {
|
|
"userId": data["userId"],
|
|
"nextIndex": next_idx,
|
|
"itemKind": kind,
|
|
"userItemList": items,
|
|
}
|
|
|
|
async def handle_get_user_character_api_request(self, data: Dict) -> Dict:
|
|
characters = await self.data.item.get_characters(data["userId"])
|
|
|
|
chara_list = []
|
|
for chara in characters:
|
|
tmp = chara._asdict()
|
|
tmp.pop("id")
|
|
tmp.pop("user")
|
|
chara_list.append(tmp)
|
|
|
|
return {"userId": data["userId"], "userCharacterList": chara_list}
|
|
|
|
async def handle_get_user_favorite_api_request(self, data: Dict) -> Dict:
|
|
favorites = await self.data.item.get_favorites(data["userId"], data["itemKind"])
|
|
if favorites is None:
|
|
return
|
|
|
|
userFavs = []
|
|
for fav in favorites:
|
|
userFavs.append(
|
|
{
|
|
"userId": data["userId"],
|
|
"itemKind": fav["itemKind"],
|
|
"itemIdList": fav["itemIdList"],
|
|
}
|
|
)
|
|
|
|
return {"userId": data["userId"], "userFavoriteData": userFavs}
|
|
|
|
async def handle_get_user_ghost_api_request(self, data: Dict) -> Dict:
|
|
ghost = await self.data.profile.get_profile_ghost(data["userId"], self.version)
|
|
if ghost is None:
|
|
return
|
|
|
|
ghost_dict = ghost._asdict()
|
|
ghost_dict.pop("user")
|
|
ghost_dict.pop("id")
|
|
ghost_dict.pop("version_int")
|
|
|
|
return {"userId": data["userId"], "userGhost": ghost_dict}
|
|
|
|
async def handle_get_user_rating_api_request(self, data: Dict) -> Dict:
|
|
rating = await self.data.profile.get_profile_rating(data["userId"], self.version)
|
|
if rating is None:
|
|
return
|
|
|
|
rating_dict = rating._asdict()
|
|
rating_dict.pop("user")
|
|
rating_dict.pop("id")
|
|
rating_dict.pop("version")
|
|
|
|
return {"userId": data["userId"], "userRating": rating_dict}
|
|
|
|
async def handle_get_user_activity_api_request(self, data: Dict) -> Dict:
|
|
"""
|
|
kind 1 is playlist, kind 2 is music list
|
|
"""
|
|
playlist = await self.data.profile.get_profile_activity(data["userId"], 1)
|
|
musiclist = await self.data.profile.get_profile_activity(data["userId"], 2)
|
|
if playlist is None or musiclist is None:
|
|
return
|
|
|
|
plst = []
|
|
mlst = []
|
|
|
|
for play in playlist:
|
|
tmp = play._asdict()
|
|
tmp["id"] = tmp["activityId"]
|
|
tmp.pop("activityId")
|
|
tmp.pop("user")
|
|
plst.append(tmp)
|
|
|
|
for music in musiclist:
|
|
tmp = music._asdict()
|
|
tmp["id"] = tmp["activityId"]
|
|
tmp.pop("activityId")
|
|
tmp.pop("user")
|
|
mlst.append(tmp)
|
|
|
|
return {"userActivity": {"playList": plst, "musicList": mlst}}
|
|
|
|
async def handle_get_user_course_api_request(self, data: Dict) -> Dict:
|
|
user_courses = await self.data.score.get_courses(data["userId"])
|
|
if user_courses is None:
|
|
return {"userId": data["userId"], "nextIndex": 0, "userCourseList": []}
|
|
|
|
course_list = []
|
|
for course in user_courses:
|
|
tmp = course._asdict()
|
|
tmp.pop("user")
|
|
tmp.pop("id")
|
|
course_list.append(tmp)
|
|
|
|
return {"userId": data["userId"], "nextIndex": 0, "userCourseList": course_list}
|
|
|
|
async def handle_get_user_portrait_api_request(self, data: Dict) -> Dict:
|
|
# No support for custom pfps
|
|
return {"length": 0, "userPortraitList": []}
|
|
|
|
async def handle_get_user_friend_season_ranking_api_request(self, data: Dict) -> Dict:
|
|
friend_season_ranking = await self.data.item.get_friend_season_ranking(data["userId"])
|
|
if friend_season_ranking is None:
|
|
return {
|
|
"userId": data["userId"],
|
|
"nextIndex": 0,
|
|
"userFriendSeasonRankingList": [],
|
|
}
|
|
|
|
friend_season_ranking_list = []
|
|
next_idx = int(data["nextIndex"])
|
|
max_ct = int(data["maxCount"])
|
|
|
|
for x in range(next_idx, len(friend_season_ranking)):
|
|
tmp = friend_season_ranking[x]._asdict()
|
|
tmp.pop("user")
|
|
tmp.pop("id")
|
|
tmp["recordDate"] = datetime.strftime(
|
|
tmp["recordDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
|
|
)
|
|
friend_season_ranking_list.append(tmp)
|
|
|
|
if len(friend_season_ranking_list) >= max_ct:
|
|
break
|
|
|
|
if len(friend_season_ranking) >= next_idx + max_ct:
|
|
next_idx += max_ct
|
|
else:
|
|
next_idx = 0
|
|
|
|
return {
|
|
"userId": data["userId"],
|
|
"nextIndex": next_idx,
|
|
"userFriendSeasonRankingList": friend_season_ranking_list,
|
|
}
|
|
|
|
async def handle_get_user_map_api_request(self, data: Dict) -> Dict:
|
|
maps = await self.data.item.get_maps(data["userId"])
|
|
if maps is None:
|
|
return {
|
|
"userId": data["userId"],
|
|
"nextIndex": 0,
|
|
"userMapList": [],
|
|
}
|
|
|
|
map_list = []
|
|
next_idx = int(data["nextIndex"])
|
|
max_ct = int(data["maxCount"])
|
|
|
|
for x in range(next_idx, len(maps)):
|
|
tmp = maps[x]._asdict()
|
|
tmp.pop("user")
|
|
tmp.pop("id")
|
|
map_list.append(tmp)
|
|
|
|
if len(map_list) >= max_ct:
|
|
break
|
|
|
|
if len(maps) >= next_idx + max_ct:
|
|
next_idx += max_ct
|
|
else:
|
|
next_idx = 0
|
|
|
|
return {
|
|
"userId": data["userId"],
|
|
"nextIndex": next_idx,
|
|
"userMapList": map_list,
|
|
}
|
|
|
|
async def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict:
|
|
login_bonuses = await self.data.item.get_login_bonuses(data["userId"])
|
|
if login_bonuses is None:
|
|
return {
|
|
"userId": data["userId"],
|
|
"nextIndex": 0,
|
|
"userLoginBonusList": [],
|
|
}
|
|
|
|
login_bonus_list = []
|
|
next_idx = int(data["nextIndex"])
|
|
max_ct = int(data["maxCount"])
|
|
|
|
for x in range(next_idx, len(login_bonuses)):
|
|
tmp = login_bonuses[x]._asdict()
|
|
tmp.pop("user")
|
|
tmp.pop("id")
|
|
login_bonus_list.append(tmp)
|
|
|
|
if len(login_bonus_list) >= max_ct:
|
|
break
|
|
|
|
if len(login_bonuses) >= next_idx + max_ct:
|
|
next_idx += max_ct
|
|
else:
|
|
next_idx = 0
|
|
|
|
return {
|
|
"userId": data["userId"],
|
|
"nextIndex": next_idx,
|
|
"userLoginBonusList": login_bonus_list,
|
|
}
|
|
|
|
async def handle_get_user_region_api_request(self, data: Dict) -> Dict:
|
|
"""
|
|
class UserRegionList:
|
|
regionId: int
|
|
playCount: int
|
|
created: str
|
|
"""
|
|
return {"userId": data["userId"], "length": 0, "userRegionList": []}
|
|
|
|
async def handle_get_user_rival_data_api_request(self, data: Dict) -> Dict:
|
|
user_id = data["userId"]
|
|
rival_id = data["rivalId"]
|
|
|
|
"""
|
|
class UserRivalData:
|
|
rivalId: int
|
|
rivalName: str
|
|
"""
|
|
return {"userId": user_id, "userRivalData": {}}
|
|
|
|
async def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict:
|
|
user_id = data["userId"]
|
|
rival_id = data["rivalId"]
|
|
next_idx = data["nextIndex"]
|
|
rival_music_levels = data["userRivalMusicLevelList"]
|
|
|
|
"""
|
|
class UserRivalMusicList:
|
|
class UserRivalMusicDetailList:
|
|
level: int
|
|
achievement: int
|
|
deluxscoreMax: int
|
|
|
|
musicId: int
|
|
userRivalMusicDetailList: list[UserRivalMusicDetailList]
|
|
"""
|
|
return {"userId": user_id, "nextIndex": 0, "userRivalMusicList": []}
|
|
|
|
async def handle_get_user_music_api_request(self, data: Dict) -> Dict:
|
|
user_id = data.get("userId", 0)
|
|
next_index = data.get("nextIndex", 0)
|
|
max_ct = data.get("maxCount", 50)
|
|
upper_lim = next_index + max_ct
|
|
music_detail_list = []
|
|
|
|
if user_id <= 0:
|
|
self.logger.warning("handle_get_user_music_api_request: Could not find userid in data, or userId is 0")
|
|
return {}
|
|
|
|
songs = await self.data.score.get_best_scores(user_id)
|
|
if songs is None:
|
|
self.logger.debug("handle_get_user_music_api_request: get_best_scores returned None!")
|
|
return {
|
|
"userId": data["userId"],
|
|
"nextIndex": 0,
|
|
"userMusicList": [],
|
|
}
|
|
|
|
num_user_songs = len(songs)
|
|
|
|
for x in range(next_index, upper_lim):
|
|
if num_user_songs <= x:
|
|
break
|
|
|
|
tmp = songs[x]._asdict()
|
|
tmp.pop("id")
|
|
tmp.pop("user")
|
|
music_detail_list.append(tmp)
|
|
|
|
next_index = 0 if len(music_detail_list) < max_ct or num_user_songs == upper_lim else upper_lim
|
|
self.logger.info(f"Send songs {next_index}-{upper_lim} ({len(music_detail_list)}) out of {num_user_songs} for user {user_id} (next idx {next_index})")
|
|
return {
|
|
"userId": data["userId"],
|
|
"nextIndex": next_index,
|
|
"userMusicList": [{"userMusicDetailList": music_detail_list}],
|
|
}
|
|
|
|
async def handle_user_login_api_request(self, data: Dict) -> Dict:
|
|
ret = await super().handle_user_login_api_request(data)
|
|
if ret is None or not ret:
|
|
return ret
|
|
ret['loginId'] = ret.get('loginCount', 0)
|
|
return ret
|