MonkeyBusiness/core_common.py

212 lines
5.3 KiB
Python
Raw Permalink Normal View History

2022-08-26 12:39:11 +02:00
import config
import random
import time
from lxml.builder import ElementMaker
from kbinxml import KBinXML
from utils.arc4 import EamuseARC4
from utils.lz77 import EamuseLZ77
def _add_val_as_str(elm, val):
new_val = str(val)
if elm is not None:
elm.text = new_val
else:
return new_val
def _add_bool_as_str(elm, val):
return _add_val_as_str(elm, 1 if val else 0)
def _add_list_as_str(elm, vals):
new_val = " ".join([str(val) for val in vals])
if elm is not None:
elm.text = new_val
2022-11-15 15:03:37 +01:00
elm.attrib["__count"] = str(len(vals))
2022-08-26 12:39:11 +02:00
else:
return new_val
E = ElementMaker(
typemap={
int: _add_val_as_str,
bool: _add_bool_as_str,
list: _add_list_as_str,
float: _add_val_as_str,
}
)
async def core_get_game_version_from_software_version(software_version):
_, model, dest, spec, rev, ext = software_version
ext = int(ext)
2022-12-18 00:09:40 +01:00
if model == "LDJ":
2024-10-10 08:24:34 +02:00
if ext >= 2024100900:
return 32
elif ext >= 2023101800:
2023-10-18 21:49:22 +02:00
return 31
2024-03-13 08:11:06 +01:00
elif ext >= 2022101700:
2022-12-18 00:09:40 +01:00
return 30
2024-03-13 08:11:06 +01:00
elif ext >= 2021101300:
2022-12-18 00:09:40 +01:00
return 29
2024-03-13 08:11:06 +01:00
# TODO: Consolidate IIDX modules to easily support versions 21-28 (probably never)
elif ext >= 2020102800:
2022-12-18 00:09:40 +01:00
return 28
2024-03-13 08:11:06 +01:00
elif ext >= 2019101600:
2022-12-18 00:09:40 +01:00
return 27
2024-03-13 08:11:06 +01:00
elif ext >= 2018110700:
2022-12-18 00:09:40 +01:00
return 26
2024-03-13 08:11:06 +01:00
elif ext >= 2017122100:
2022-12-18 00:09:40 +01:00
return 25
2024-03-13 08:11:06 +01:00
elif ext >= 2016102400:
2022-12-18 00:09:40 +01:00
return 24
2024-03-13 08:11:06 +01:00
elif ext >= 2015111100:
2022-12-18 00:09:40 +01:00
return 23
2024-03-13 08:11:06 +01:00
elif ext >= 2014091700:
2022-12-18 00:09:40 +01:00
return 22
2024-03-13 08:11:06 +01:00
elif ext >= 2013100200:
2022-12-18 00:09:40 +01:00
return 21
2024-03-13 08:11:06 +01:00
elif ext >= 2012010100:
2022-12-18 00:09:40 +01:00
return 20
elif model == "KDZ":
2022-08-26 12:39:11 +02:00
return 19
2022-12-18 00:09:40 +01:00
elif model == "JDZ":
return 18
elif model == "M32":
2024-03-13 08:11:06 +01:00
if ext >= 2024031300:
return 10
elif ext >= 2022121400:
2022-12-18 00:09:40 +01:00
return 9
elif ext >= 2021042100:
return 8
elif ext >= 2019100200:
return 7
elif ext >= 2018072700:
return 6
2024-03-13 08:11:06 +01:00
# TODO: Support versions 1-5 (never)
2022-12-18 00:09:40 +01:00
elif ext >= 2017090600:
return 5
elif ext >= 2017011800:
return 4
2024-03-13 08:11:06 +01:00
elif ext >= 2015042100:
2022-12-18 00:09:40 +01:00
return 3
2024-03-13 08:11:06 +01:00
elif ext >= 2014021400:
2022-12-18 00:09:40 +01:00
return 2
2024-03-13 08:11:06 +01:00
elif ext >= 2013012400:
2022-12-18 00:09:40 +01:00
return 1
elif model == "MDX":
if ext >= 2019022600: # ???
return 19
elif model == "KFC":
2024-03-13 08:11:06 +01:00
# TODO: Fix newer than 2022 versions (never, I don't play this game)
2022-12-18 00:09:40 +01:00
if ext >= 2020090402: # ???
return 6
2023-05-07 08:43:54 +02:00
elif model == "REC":
return 1
2024-03-13 08:11:06 +01:00
# TODO: ???
# elif model == "PAN":
# return 0
2022-08-26 12:39:11 +02:00
else:
return 0
async def core_process_request(request):
2022-11-15 15:03:37 +01:00
cl = request.headers.get("Content-Length")
2022-08-26 12:39:11 +02:00
data = await request.body()
if not cl or not data:
return {}
2022-11-15 15:03:37 +01:00
if "X-Compress" in request.headers:
request.compress = request.headers.get("X-Compress")
2022-08-26 12:39:11 +02:00
else:
request.compress = None
2022-11-15 15:03:37 +01:00
if "X-Eamuse-Info" in request.headers:
xeamuseinfo = request.headers.get("X-Eamuse-Info")
2022-08-26 12:39:11 +02:00
key = bytes.fromhex(xeamuseinfo[2:].replace("-", ""))
2022-11-15 15:03:37 +01:00
xml_dec = EamuseARC4(key).decrypt(data[: int(cl)])
2022-08-26 12:39:11 +02:00
request.is_encrypted = True
else:
2022-11-15 15:03:37 +01:00
xml_dec = data[: int(cl)]
2022-08-26 12:39:11 +02:00
request.is_encrypted = False
if request.compress == "lz77":
xml_dec = EamuseLZ77.decode(xml_dec)
2023-10-29 01:44:56 +02:00
xml = KBinXML(xml_dec, convert_illegal_things=True)
2022-08-26 12:39:11 +02:00
root = xml.xml_doc
xml_text = xml.to_text()
request.is_binxml = KBinXML.is_binary_xml(xml_dec)
if config.verbose_log:
2023-05-05 18:24:50 +02:00
print()
print("\033[94mREQUEST\033[0m:")
2022-08-26 12:39:11 +02:00
print(xml_text)
2022-11-15 15:03:37 +01:00
model_parts = (root.attrib["model"], *root.attrib["model"].split(":"))
2022-08-26 12:39:11 +02:00
module = root[0].tag
2022-11-15 15:03:37 +01:00
method = root[0].attrib["method"] if "method" in root[0].attrib else None
command = root[0].attrib["command"] if "command" in root[0].attrib else None
2022-08-26 12:39:11 +02:00
game_version = await core_get_game_version_from_software_version(model_parts)
return {
2022-11-15 15:03:37 +01:00
"root": root,
"text": xml_text,
"module": module,
"method": method,
"command": command,
"model": model_parts[1],
"dest": model_parts[2],
"spec": model_parts[3],
"rev": model_parts[4],
"ext": model_parts[5],
"game_version": game_version,
2022-08-26 12:39:11 +02:00
}
async def core_prepare_response(request, xml):
binxml = KBinXML(xml)
if request.is_binxml:
xml_binary = binxml.to_binary()
else:
xml_binary = binxml.to_text().encode("utf-8") # TODO: Proper encoding
if config.verbose_log:
2023-05-05 18:24:50 +02:00
print("\033[91mRESPONSE\033[0m:")
2022-08-26 12:39:11 +02:00
print(binxml.to_text())
response_headers = {"User-Agent": "EAMUSE.Httpac/1.0"}
if request.is_encrypted:
2022-11-15 15:03:37 +01:00
xeamuseinfo = "1-%08x-%04x" % (int(time.time()), random.randint(0x0000, 0xFFFF))
2022-08-26 12:39:11 +02:00
response_headers["X-Eamuse-Info"] = xeamuseinfo
key = bytes.fromhex(xeamuseinfo[2:].replace("-", ""))
response = EamuseARC4(key).encrypt(xml_binary)
else:
response = bytes(xml_binary)
request.compress = None
# if request.compress == "lz77":
# response_headers["X-Compress"] = request.compress
# response = EamuseLZ77.encode(response)
return response, response_headers