1
0
mirror of synced 2024-11-25 06:40:11 +01:00
artemis/core/data/schema/user.py
2023-02-19 15:40:25 -05:00

125 lines
4.1 KiB
Python

from enum import Enum
from typing import Dict, Optional
from sqlalchemy import Table, Column, and_
from sqlalchemy.types import Integer, String, TIMESTAMP
from sqlalchemy.sql.schema import ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.dialects.mysql import insert
from sqlalchemy.sql import func, select, Delete
from uuid import uuid4
from datetime import datetime, timedelta
from sqlalchemy.engine import Row
from core.data.schema.base import BaseData, metadata
aime_user = Table(
"aime_user",
metadata,
Column("id", Integer, nullable=False, primary_key=True, autoincrement=True),
Column("username", String(25), unique=True),
Column("email", String(255), unique=True),
Column("password", String(255)),
Column("permissions", Integer),
Column("created_date", TIMESTAMP, server_default=func.now()),
Column("last_login_date", TIMESTAMP, onupdate=func.now()),
Column("suspend_expire_time", TIMESTAMP),
mysql_charset='utf8mb4'
)
frontend_session = Table(
"frontend_session",
metadata,
Column("id", Integer, primary_key=True, unique=True),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("ip", String(15)),
Column('session_cookie', String(32), nullable=False, unique=True),
Column("expires", TIMESTAMP, nullable=False),
mysql_charset='utf8mb4'
)
class PermissionBits(Enum):
PermUser = 1
PermMod = 2
PermSysAdmin = 4
class UserData(BaseData):
def create_user(self, id: int = None, username: str = None, email: str = None, password: str = None, permission: int = 1) -> Optional[int]:
if email is None:
permission = 1
if id is None:
sql = insert(aime_user).values(
username=username,
email=email,
password=password,
permissions=permission
)
else:
sql = insert(aime_user).values(
id=id,
username=username,
email=email,
password=password,
permissions=permission
)
conflict = sql.on_duplicate_key_update(
username=username,
email=email,
password=password,
permissions=permission
)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def login(self, user_id: int, passwd: bytes = None, ip: str = "0.0.0.0") -> Optional[str]:
sql = select(aime_user).where(and_(aime_user.c.id == user_id, aime_user.c.password == passwd))
result = self.execute(sql)
if result is None: return None
usr = result.fetchone()
if usr is None: return None
return self.create_session(user_id, ip)
def check_session(self, cookie: str, ip: str = "0.0.0.0") -> Optional[Row]:
sql = select(frontend_session).where(
and_(
frontend_session.c.session_cookie == cookie,
frontend_session.c.ip == ip
)
)
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def delete_session(self, session_id: int) -> bool:
sql = Delete(frontend_session).where(frontend_session.c.id == session_id)
result = self.execute(sql)
if result is None: return False
return True
def create_session(self, user_id: int, ip: str = "0.0.0.0", expires: datetime = datetime.now() + timedelta(days=1)) -> Optional[str]:
cookie = uuid4().hex
sql = insert(frontend_session).values(
user = user_id,
ip = ip,
session_cookie = cookie,
expires = expires
)
result = self.execute(sql)
if result is None:
return None
return cookie
def reset_autoincrement(self, ai_value: int) -> None:
# ALTER TABLE isn't in sqlalchemy so we do this the ugly way
sql = f"ALTER TABLE aime_user AUTO_INCREMENT={ai_value}"
self.execute(sql)