diff --git a/readme.md b/readme.md index 0564c70..e863e52 100644 --- a/readme.md +++ b/readme.md @@ -84,3 +84,6 @@ Read [Games specific info](docs/game_specific_info.md) for all supported games, ## Production guide See the [production guide](docs/prod.md) for running a production server. + +## Text User Interface +Invoke `tui.py` (with optional `-c ` parameter) for an interactive TUI to perform management actions (add, edit or delete users, cards, arcades and machines) without needing to spin up the frontend. Requires installing asciimatics via `pip install asciimatics` diff --git a/tui.py b/tui.py new file mode 100644 index 0000000..06c51e9 --- /dev/null +++ b/tui.py @@ -0,0 +1,501 @@ +#!/usr/bin/env +from typing import Optional, List +import asyncio +import argparse +from os import path, mkdir, W_OK, access +import yaml +import bcrypt +import secrets +import string +from sqlalchemy.engine import Row + +from core.data import Data +from core.config import CoreConfig + +try: + from asciimatics.widgets import Frame, Layout, Text, Button, RadioButtons, CheckBox, Divider + from asciimatics.scene import Scene + from asciimatics.screen import Screen + from asciimatics.exceptions import ResizeScreenError, NextScene, StopApplication +except: + print("Artemis TUI requires asciimatics, please install it using pip") + exit(1) + + +class State: + class SelectedUser: + def __init__(self, id: Optional[int] = None, name: Optional[str] = None): + self.id = id + self.name = name + + def __str__(self): + if self.id is not None: + return f"{self.name} ({self.id})" if self.name else f"User {self.id}" + return "None" + + def __int__(self): + return self.id if self.id else 0 + + class SelectedCard: + def __init__(self, id: Optional[int] = None, access_code: Optional[str] = None): + self.id = id + self.access_code = access_code + + def __str__(self): + if self.id is not None and self.access_code: + return f"{self.access_code} ({self.id})" + return "None" + + def __int__(self): + return self.id if self.id else 0 + + class SelectedArcade: + def __init__(self, id: Optional[int] = None, country: Optional[str] = None, name: Optional[str] = None): + self.id = id + self.country = country + self.name = name + + def __str__(self): + if self.id is not None: + return f"{self.name} ({self.country}{self.id:05d})" if self.name else f"{self.country}{self.id:05d}" + return "None" + + def __int__(self): + return self.id if self.id else 0 + + class SelectedMachine: + def __init__(self, id: Optional[int] = None, serial: Optional[str] = None): + self.id = id + self.serial = serial + + def __str__(self): + if self.id is not None: + return f"{self.serial} ({self.id})" + return "None" + + def __int__(self): + return self.id if self.id else 0 + + def __init__(self): + self.selected_user: self.SelectedUser = self.SelectedUser() + self.selected_card: self.SelectedCard = self.SelectedCard() + self.selected_arcade: self.SelectedArcade = self.SelectedArcade() + self.selected_machine: self.SelectedMachine = self.SelectedMachine() + self.last_err: str = "" + self.search_results: List[Row] = [] + self.search_type: str = "" + + def set_user(self, id: int, username: Optional[str]) -> None: + self.selected_user = self.SelectedUser(id, username) + + def clear_user(self) -> None: + self.selected_user = self.SelectedUser() + + def set_card(self, id: int, access_code: Optional[str]) -> None: + self.selected_card = self.SelectedCard(id, access_code) + + def clear_card(self) -> None: + self.selected_card = self.SelectedCard() + + def set_arcade(self, id: int, country: str = "JPN", name: Optional[str] = None) -> None: + self.selected_arcade = self.SelectedArcade(id, country, name) + + def clear_arcade(self) -> None: + self.selected_arcade = self.SelectedArcade() + + def set_machine(self, id: int, serial: Optional[str]) -> None: + self.selected_machine = self.SelectedMachine(id, serial) + + def clear_machine(self) -> None: + self.selected_machine = self.SelectedMachine() + + def set_last_err(self, err: str) -> None: + self.last_err = err + + def clear_last_err(self) -> None: + self.last_err = "" + + def clear_search_results(self) -> None: + self.search_results = [] + +state = State() +data: Data = None +loop: asyncio.AbstractEventLoop = asyncio.new_event_loop() + +class MainView(Frame): + def __init__(self, screen: Screen): + super(MainView, self).__init__( + screen, + screen.height * 2 // 3, + screen.width * 2 // 3, + hover_focus=True, + can_scroll=False, + title="ARTEMiS TUI" + ) + + layout = Layout([100], True) + self.add_layout(layout) + layout.add_widget(Button("User Management", self._user_mgmt)) + layout.add_widget(Button("Card Management", self._card_mgmt)) + layout.add_widget(Button("Arcade Management", self._arcade_mgmt)) + layout.add_widget(Button("Machine Management", self._mech_mgmt)) + layout.add_widget(Button("Quit", self._quit)) + + self.fix() + + def _user_mgmt(self): + self.save() + raise NextScene("User Management") + + def _card_mgmt(self): + self.save() + raise NextScene("Card Management") + + def _arcade_mgmt(self): + self.save() + raise NextScene("Arcade Management") + + def _mech_mgmt(self): + self.save() + raise NextScene("Mech Management") + + @staticmethod + def _quit(): + raise StopApplication("User pressed quit") + +class ManageUser(Frame): + def __init__(self, screen: Screen): + super(ManageUser, self).__init__( + screen, + screen.height * 2 // 3, + screen.width * 2 // 3, + hover_focus=True, + can_scroll=False, + title="User Management" + ) + + layout = Layout([3]) + self.add_layout(layout) + layout.add_widget(Button("Create User", self._create_user)) + layout.add_widget(Button("Lookup User", self._lookup)) + layout.add_widget(Button("Edit User", self._edit_user, disabled=state.selected_user.id != 0)) + layout.add_widget(Button("Delete User", self._del_user, disabled=state.selected_user.id != 0)) + + usr_cards = [] + #if state.selected_user.id != 0: + #cards = data.card.get_user_cards(state.selected_user.id) + #for card in cards: + #usr_cards.append(card._asdict()) + + layout3 = Layout([100], True) + self.add_layout(layout3) + if len(usr_cards) > 0: + layout.add_widget(Divider()) + layout3.add_widget(RadioButtons( + [(f"{card['id']}\t{card['access_code']}\t{card['status']}", card['id']) for card in usr_cards], + "Cards:", + "usr_cards" + )) + layout3.add_widget(Divider()) + + layout2 = Layout([1, 1, 1, 1]) + self.add_layout(layout2) + a = Text("", f"status", readonly=True, disabled=True) + a.value = f"Selected User: {state.selected_user}" + layout2.add_widget(a) + layout2.add_widget(Button("Back", self._back), 3) + + self.fix() + + def _create_user(self): + self.save() + raise NextScene("Create User") + + def _lookup(self): + self.save() + raise NextScene("Lookup User") + + def _edit_user(self): + self.save() + raise NextScene("Lookup User") + + def _del_user(self): + self.save() + raise NextScene("Lookup User") + + def _back(self): + self.save() + raise NextScene("Main") + +class CreateUserView(Frame): + def __init__(self, screen: Screen): + super(CreateUserView, self).__init__( + screen, + screen.height * 2 // 3, + screen.width * 2 // 3, + hover_focus=True, + can_scroll=False, + title="Create User" + ) + + layout = Layout([100], fill_frame=True) + self.add_layout(layout) + layout.add_widget(Text("Username:", "username")) + layout.add_widget(Text("Email:", "email")) + layout.add_widget(Text("Password:", "passwd")) + layout.add_widget(CheckBox("", "Add Card:", "is_add_card", )) + layout.add_widget(RadioButtons([ + ("User", "1"), + ("User Manager", "2"), + ("Arcde Manager", "4"), + ("Sysadmin", "8"), + ("Owner", "255"), + ], "Role:", "role")) + + layout3 = Layout([100]) + self.add_layout(layout3) + layout3.add_widget(Text("", f"status", readonly=True, disabled=True)) + + layout2 = Layout([1, 1, 1, 1]) + self.add_layout(layout2) + layout2.add_widget(Button("OK", self._ok), 0) + layout2.add_widget(Button("Cancel", self._cancel), 3) + + self.fix() + + def _ok(self): + self.save() + if not self.data.get("username"): + state.set_last_err("Username cannot be blank") + self.find_widget('status').value = state.last_err + self.screen.reset() + return + + state.clear_last_err() + self.find_widget('status').value = state.last_err + + if not self.data.get("passwd"): + pw = "".join( + secrets.choice(string.ascii_letters + string.digits) for i in range(20) + ) + else: + pw = self.data.get("passwd") + + hash = bcrypt.hashpw(pw.encode(), bcrypt.gensalt()) + + loop.run_until_complete(self._create_user_async(self.data.get("username"), hash.decode(), self.data.get("email"), self.data.get('role'))) + + raise NextScene("User Management") + + async def _create_user_async(self, username: str, password: str, email: Optional[str], role: str): + usr_id = await data.user.create_user( + username=username, + email=email if email else None, + password=password, + permission=int(role) + ) + + state.set_user(usr_id, username) + + def _cancel(self): + state.clear_last_err() + self.find_widget('status').value = state.last_err + raise NextScene("User Management") + +class SearchResultsView(Frame): + def __init__(self, screen: Screen): + super(CreateUserView, self).__init__( + screen, + screen.height * 2 // 3, + screen.width * 2 // 3, + hover_focus=True, + can_scroll=False, + title="Search Results" + ) + + layout = Layout([100], fill_frame=True) + self.add_layout(layout) + layout.add_widget(Text("Username:", "username")) + + self.fix() + + def _ok(self): + self.save() + if not self.data.get("username"): + state.set_last_err("Username cannot be blank") + self.find_widget('status').value = state.last_err + self.screen.reset() + return + + state.clear_last_err() + self.find_widget('status').value = state.last_err + + if not self.data.get("passwd"): + pw = "".join( + secrets.choice(string.ascii_letters + string.digits) for i in range(20) + ) + else: + pw = self.data.get("passwd") + + hash = bcrypt.hashpw(pw.encode(), bcrypt.gensalt()) + + loop.run_until_complete(self._create_user_async(self.data.get("username"), hash.decode(), self.data.get("email"), self.data.get('role'))) + + raise NextScene("User Management") + + async def _create_user_async(self, username: str, password: str, email: Optional[str], role: str): + usr_id = await data.user.create_user( + username=username, + email=email if email else None, + password=password, + permission=int(role) + ) + + state.set_user(usr_id, username) + + def _cancel(self): + state.clear_last_err() + self.find_widget('status').value = state.last_err + raise NextScene("User Management") + +class LookupUserView(Frame): + def __init__(self, screen): + super(LookupUserView, self).__init__( + screen, + screen.height * 2 // 3, + screen.width * 2 // 3, + hover_focus=True, + can_scroll=False, + title="Lookup User" + ) + + layout = Layout([1, 1], fill_frame=True) + self.add_layout(layout) + layout.add_widget(RadioButtons([ + ("Username", "1"), + ("Email", "2"), + ("Access Code", "3"), + ("User ID", "4"), + ], "Search By:", "search_type")) + layout.add_widget(Text("Search:", "search_str"), 1) + + layout3 = Layout([100]) + self.add_layout(layout3) + layout3.add_widget(Text("", f"status", readonly=True, disabled=True)) + + layout2 = Layout([1, 1, 1, 1]) + self.add_layout(layout2) + layout2.add_widget(Button("Search", self._lookup), 0) + layout2.add_widget(Button("Cancel", self._cancel), 3) + + self.fix() + + def _lookup(self): + self.save() + if not self.data.get("search_str"): + state.set_last_err("Search cannot be blank") + self.find_widget('status').value = state.last_err + self.screen.reset() + return + + state.clear_last_err() + self.find_widget('status').value = state.last_err + + search_type = self.data.get("search_type") + if search_type == "1": + loop.run_until_complete(self._lookup_user_by_username(self.data.get("search_str"))) + elif search_type == "2": + loop.run_until_complete(self._lookup_user_by_email(self.data.get("search_str"))) + elif search_type == "3": + loop.run_until_complete(self._lookup_user_by_access_code(self.data.get("search_str"))) + elif search_type == "4": + loop.run_until_complete(self._lookup_user_by_id(self.data.get("search_str"))) + else: + state.set_last_err("Unknown search type") + self.find_widget('status').value = state.last_err + self.screen.reset() + return + + if len(state.search_results) < 1: + state.set_last_err("Search returned no results") + self.find_widget('status').value = state.last_err + self.screen.reset() + return + + state.search_type = "user" + raise NextScene("Search Results") + + async def _lookup_user_by_id(self, user_id: str): + usr = await data.user.get_user(user_id) + + if usr is not None: + state.search_results = [usr] + + async def _lookup_user_by_username(self, username: str): + usr = await data.user.find_user_by_username(username) + + if usr is not None: + state.search_results = usr + + async def _lookup_user_by_email(self, email: str): + usr = await data.user.find_user_by_email(email) + + if usr is not None: + state.search_results = usr + + async def _lookup_user_by_access_code(self, access_code: str): + card = await data.card.get_card_by_access_code(access_code) + + if card is not None: + usr = await data.user.get_user(card['user']) + if usr is not None: + state.search_results = [usr] + + def _cancel(self): + state.clear_last_err() + self.find_widget('status').value = state.last_err + raise NextScene("User Management") + +def demo(screen:Screen, scene: Scene): + scenes = [ + Scene([MainView(screen)], -1, name="Main"), + Scene([ManageUser(screen)], -1, name="User Management"), + Scene([CreateUserView(screen)], -1, name="Create User"), + Scene([LookupUserView(screen)], -1, name="Lookup User"), + Scene([SearchResultsView(screen)], -1, name="Search Results"), + ] + + screen.play(scenes, stop_on_resize=False, start_scene=scene, allow_int=True) + +last_scene = None + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Database utilities") + parser.add_argument( + "--config", "-c", type=str, help="Config folder to use", default="config" + ) + args = parser.parse_args() + + cfg = CoreConfig() + if path.exists(f"{args.config}/core.yaml"): + cfg_dict = yaml.safe_load(open(f"{args.config}/core.yaml")) + cfg_dict.get("database", {})["loglevel"] = "info" + cfg.update(cfg_dict) + + if not path.exists(cfg.server.log_dir): + mkdir(cfg.server.log_dir) + + if not access(cfg.server.log_dir, W_OK): + print( + f"Log directory {cfg.server.log_dir} NOT writable, please check permissions" + ) + exit(1) + + data = Data(cfg) + + while True: + try: + Screen.wrapper(demo, catch_interrupt=True, arguments=[last_scene]) + exit(0) + except ResizeScreenError as e: + last_scene = e.scene