MonkeyBusiness/pyeamu.py

220 lines
5.8 KiB
Python
Raw Normal View History

2023-09-23 11:25:52 +02:00
from urllib.parse import urlparse, urlunparse, urlencode
2022-08-26 12:39:11 +02:00
import uvicorn
2022-12-18 00:08:29 +01:00
import ujson as json
2023-05-05 18:24:50 +02:00
from os import name, path
2023-09-23 11:25:52 +02:00
from typing import Optional
2022-08-26 12:39:11 +02:00
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from starlette.responses import RedirectResponse
2022-08-26 12:39:11 +02:00
import config
import modules
import utils.card as conv
2022-08-26 12:39:11 +02:00
from core_common import core_process_request, core_prepare_response, E
2023-09-23 11:25:52 +02:00
import socket
2022-08-26 12:39:11 +02:00
2022-11-15 15:03:37 +01:00
def urlpathjoin(parts, sep="/"):
2022-08-26 12:39:11 +02:00
return sep + sep.join([x.lstrip(sep) for x in parts])
2023-09-23 11:25:52 +02:00
loopback = "127.0.0.1"
server_addresses = []
for host in ("localhost", config.ip, socket.gethostname()):
server_addresses.append(f"{host}:{config.port}")
server_services_urls = []
for server_address in server_addresses:
server_services_urls.append(
urlunparse(("http", server_address, config.services_prefix, None, None, None))
)
2022-08-26 12:39:11 +02:00
settings = {}
for s in (
"ip",
"port",
"services_prefix",
"verbose_log",
"arcade",
"paseli",
"maintenance_mode",
):
settings[s] = getattr(config, s)
2022-08-26 12:39:11 +02:00
app = FastAPI()
for router in modules.routers:
app.include_router(router)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
if path.exists("webui"):
webui = True
with open(path.join("webui", "monkey.json"), "w") as f:
2023-03-02 12:47:04 +01:00
json.dump(settings, f, indent=2, escape_forward_slashes=False)
app.mount("/webui", StaticFiles(directory="webui", html=True), name="webui")
else:
webui = False
@app.get("/webui")
async def redirect_to_config():
return RedirectResponse(url="/config")
2022-08-26 12:39:11 +02:00
2023-05-05 18:24:50 +02:00
# Enable ANSI escape sequences
if name == "nt":
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
2022-08-26 12:39:11 +02:00
if __name__ == "__main__":
2023-05-05 18:24:50 +02:00
print(
"""
"""
)
2022-08-26 12:39:11 +02:00
print()
2023-05-05 18:24:50 +02:00
print("\033[1mGame Config\033[0m:")
2023-09-23 11:25:52 +02:00
for server_services_url in server_services_urls:
print(f"<services>\033[92m{server_services_url}\033[0m</services>")
print("<!-- url_slash \033[92m0\033[0m or \033[92m1\033[0m -->")
2022-08-26 12:39:11 +02:00
print()
2023-10-07 14:06:24 +02:00
print("\033[1mWeb Interface\033[0m:")
if webui:
2023-09-23 11:25:52 +02:00
for server_address in server_addresses:
print(f"http://{server_address}/webui/")
2023-10-07 14:06:24 +02:00
else:
print("/webui missing")
print("download it here: https://github.com/drmext/BounceTrippy/releases")
print()
2023-05-05 18:24:50 +02:00
print("\033[1mSource Repository\033[0m:")
print("https://github.com/drmext/MonkeyBusiness")
print()
2023-09-23 11:25:52 +02:00
uvicorn.run("pyeamu:app", host="0.0.0.0", port=config.port, reload=True)
2022-08-26 12:39:11 +02:00
2023-09-23 11:25:52 +02:00
@app.post(urlpathjoin([config.services_prefix]))
2022-08-26 12:39:11 +02:00
@app.post(urlpathjoin([config.services_prefix, "/{gameinfo}/services/get"]))
2023-09-23 11:25:52 +02:00
async def services_get(
request: Request,
model: Optional[str] = None,
f: Optional[str] = None,
module: Optional[str] = None,
method: Optional[str] = None,
):
2022-08-26 12:39:11 +02:00
request_info = await core_process_request(request)
2023-09-23 11:25:52 +02:00
request_address = f"{urlparse(str(request.url)).netloc}:{config.port}"
2022-08-26 12:39:11 +02:00
services = {}
for service in modules.routers:
2022-11-15 15:03:37 +01:00
model_blacklist = services.get("model_blacklist", [])
model_whitelist = services.get("model_whitelist", [])
2022-08-26 12:39:11 +02:00
2022-11-15 15:03:37 +01:00
if request_info["model"] in model_blacklist:
2022-08-26 12:39:11 +02:00
continue
2022-11-15 15:03:37 +01:00
if model_whitelist and request_info["model"] not in model_whitelist:
2022-08-26 12:39:11 +02:00
continue
2023-09-23 11:25:52 +02:00
if (
service.tags
and service.tags[0].startswith("api_")
or service.tags[0] == "slashless_forwarder"
):
continue
2022-11-15 15:03:37 +01:00
k = (service.tags[0] if service.tags else service.prefix).strip("/")
2023-09-23 11:25:52 +02:00
if f == "services.get" or module == "services" and method == "get":
2023-10-07 14:06:24 +02:00
# url_slash 0
pre = "/fwdr"
2023-09-23 11:25:52 +02:00
else:
2023-10-07 14:06:24 +02:00
# url_slash 1
pre = service.prefix
if k not in services:
services[k] = urlunparse(("http", request_address, pre, None, None, None))
2022-08-26 12:39:11 +02:00
keepalive_params = {
2023-09-23 11:25:52 +02:00
"pa": loopback,
"ia": loopback,
"ga": loopback,
"ma": loopback,
2022-08-26 12:39:11 +02:00
"t1": 2,
"t2": 10,
}
2022-11-15 15:03:37 +01:00
services["keepalive"] = urlunparse(
(
"http",
2023-09-23 11:25:52 +02:00
loopback,
2022-11-15 15:03:37 +01:00
"/keepalive",
None,
urlencode(keepalive_params),
None,
)
)
services["ntp"] = urlunparse(("ntp", "pool.ntp.org", "/", None, None, None))
2022-08-26 12:39:11 +02:00
response = E.response(
E.services(
expire=10800,
2022-11-15 15:03:37 +01:00
mode="operation",
2022-08-26 12:39:11 +02:00
product_domain=1,
2022-11-15 15:03:37 +01:00
*[E.item(name=k, url=services[k]) for k in services],
2022-08-26 12:39:11 +02:00
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@app.get("/")
async def redirect_to_webui():
return RedirectResponse(url="/webui")
@app.get("/config")
async def get_config():
return settings
@app.get("/conv/{card}")
async def card_conv(card: str):
card = card.upper()
lookalike = {
"I": "1",
"O": "0",
"Q": "0",
"V": "U",
}
for k, v in lookalike.items():
card = card.replace(k, v)
if card.startswith("E004") or card.startswith("012E"):
card = "".join([c for c in card if c in "0123456789ABCDEF"])
uid = card
kid = conv.to_konami_id(card)
else:
card = "".join([c for c in card if c in conv.valid_characters])
uid = conv.to_uid(card)
kid = card
return {"uid": uid, "konami_id": kid}