1
0
mirror of synced 2024-11-24 14:30:09 +01:00

Add team support, rivals, and test function for getting playcounts

This commit is contained in:
EmmyHeart 2023-07-11 09:14:53 +00:00
parent c01d3f49f5
commit 043ff17008

View File

@ -637,3 +637,103 @@ class ChuniProfileData(BaseData):
if result is None:
return None
return result.fetchall()
def get_team_by_id(self, team_id: int) -> Optional[Row]:
sql = select(team).where(team.c.id == team_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_team_rank_actual(self, team_id: int) -> int:
# Normal ranking system, likely the one used in the real servers
# Query all teams sorted by 'teamPoint'
result = self.execute(
select(team.c.id).order_by(team.c.teamPoint.desc())
)
# Get the rank of the team with the given team_id
rank = None
for i, row in enumerate(result, start=1):
if row.id == team_id:
rank = i
break
# Return the rank if found, or a default rank otherwise
return rank if rank is not None else 0
def get_team_rank(self, team_id: int) -> int:
# Scaled ranking system, designed for smaller instances.
# Query all teams sorted by 'teamPoint'
result = self.execute(
select(team.c.id).order_by(team.c.teamPoint.desc())
)
# Count total number of teams
total_teams = self.execute(select(func.count()).select_from(team)).scalar()
# Get the rank of the team with the given team_id
rank = None
for i, row in enumerate(result, start=1):
if row.id == team_id:
rank = i
break
# If the team is not found, return default rank
if rank is None:
return 0
# Define rank tiers
tiers = {
1: range(1, int(total_teams * 0.1) + 1), # Rainbow
2: range(int(total_teams * 0.1) + 1, int(total_teams * 0.4) + 1), # Gold
3: range(int(total_teams * 0.4) + 1, int(total_teams * 0.7) + 1), # Silver
4: range(int(total_teams * 0.7) + 1, total_teams + 1), # Grey
}
# Assign rank based on tier
for tier_rank, tier_range in tiers.items():
if rank in tier_range:
return tier_rank
# Return default rank if not found in any tier
return 0
def update_team(self, team_id: int, team_data: Dict) -> bool:
team_data["id"] = team_id
sql = insert(team).values(**team_data)
conflict = sql.on_duplicate_key_update(**team_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(
f"update_team: Failed to update team! team id: {team_id}"
)
return False
return True
def get_rival(self, rival_id: int) -> Optional[Row]:
sql = select(profile).where(profile.c.user == rival_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_overview(self) -> Dict:
# Fetch and add up all the playcounts
playcount_sql = self.execute(select(profile.c.playCount))
if playcount_sql is None:
self.logger.warn(
f"get_overview: Couldn't pull playcounts"
)
return 0
total_play_count = 0;
for row in playcount_sql:
total_play_count += row[0]
return {
"total_play_count": total_play_count
}