diff --git a/tools/analyzeGameDumps.py b/tools/analyzeGameDumps.py
new file mode 100755
index 0000000..042f137
--- /dev/null
+++ b/tools/analyzeGameDumps.py
@@ -0,0 +1,172 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+# 573in1 - Copyright (C) 2022-2024 spicyjpeg
+# 573in1 is free software: you can redistribute it and/or modify it under the
+# terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+# 573in1 is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along with
+# 573in1. If not, see .
+__version__ = "1.0.3"
+__author__ = "spicyjpeg"
+import json, logging
+from argparse import ArgumentParser, FileType, Namespace
+from pathlib import Path
+from typing import Any
+from common.analysis import MAMENVRAMDump, getBootloaderVersion
+from common.cartparser import parseCartHeader, parseROMHeader
+from common.decompile import AnalysisError
+from common.gamedb import GameInfo
+from common.util import \
+ JSONFormatter, JSONGroupedArray, JSONGroupedObject, setupLogger
+## Game analysis
+def analyzeGame(game: GameInfo, nvramDir: Path, reanalyze: bool = False):
+ dump: MAMENVRAMDump = MAMENVRAMDump(nvramDir)
+ if (reanalyze or game.bootloaderVersion is None) and dump.bootloader:
+ try:
+ game.bootloaderVersion = getBootloaderVersion(dump.bootloader)
+ except AnalysisError:
+ pass
+ if (reanalyze or game.rtcHeader is None) and dump.rtcHeader:
+ game.rtcHeader = parseROMHeader(dump.rtcHeader)
+ if (reanalyze or game.flashHeader is None) and dump.flashHeader:
+ game.flashHeader = parseROMHeader(dump.flashHeader)
+ if (reanalyze or game.installCart is None) and dump.installCart:
+ game.installCart = parseCartHeader(dump.installCart)
+ if (reanalyze or game.gameCart is None) and dump.gameCart:
+ game.gameCart = parseCartHeader(dump.gameCart)
+## Main
+def createParser() -> ArgumentParser:
+ parser = ArgumentParser(
+ description = \
+ "Parses a list of games in JSON format and generates a new JSON "
+ "file with additional information about each game extracted from "
+ "MAME ROM and NVRAM dumps.",
+ add_help = False
+ )
+ group = parser.add_argument_group("Tool options")
+ group.add_argument(
+ "-h", "--help",
+ action = "help",
+ help = "Show this help message and exit"
+ )
+ group.add_argument(
+ "-v", "--verbose",
+ action = "count",
+ help = "Enable additional logging levels"
+ )
+ group = parser.add_argument_group("Analysis options")
+ group.add_argument(
+ "-r", "--reanalyze",
+ action = "store_true",
+ help = \
+ "Discard any existing analysis information from the input file and "
+ "rebuild it by reanalyzing the game whenever possible"
+ )
+ group.add_argument(
+ "-k", "--keep-unanalyzed",
+ action = "store_true",
+ help = \
+ "Do not remove entries for games that have not been analyzed from "
+ "output file"
+ )
+ group = parser.add_argument_group("Output options")
+ group.add_argument(
+ "-m", "--minify",
+ action = "store_true",
+ help = "Do not pretty print output file"
+ )
+ group = parser.add_argument_group("File paths")
+ group.add_argument(
+ "dumpDir",
+ type = Path,
+ help = "Path to MAME NVRAM directory"
+ )
+ group.add_argument(
+ "gameInfo",
+ type = FileType("rt", encoding = "utf-8"),
+ help = "Path to JSON file containing initial game list"
+ )
+ group.add_argument(
+ "output",
+ type = FileType("wt", encoding = "utf-8"),
+ help = "Path to JSON file to generate"
+ )
+ return parser
+def main():
+ parser: ArgumentParser = createParser()
+ args: Namespace = parser.parse_args()
+ setupLogger(args.verbose)
+ with args.gameInfo as file:
+ gameInfo: dict[str, Any] = json.load(file)
+ games: list[JSONGroupedObject] = []
+ for initialInfo in gameInfo["games"]:
+ game: GameInfo = GameInfo.fromJSONObject(initialInfo)
+ code: str = f"{game.code} {"/".join(set(game.regions))}"
+ # Each entry in the initial game list may be associated with one or more
+ # region codes (and thus MAME dumps). This script only analyzes one dump
+ # per entry, assuming all its dumps are functionally identical and only
+ # differ in the region code.
+ analyzed: bool = False
+ for identifier in game.identifiers:
+ nvramDir: Path = args.dumpDir / identifier
+ if not identifier or not nvramDir.exists():
+ continue
+ logging.info(f"analyzing {identifier} ({code})")
+ analyzeGame(game, nvramDir, args.reanalyze)
+ analyzed = True
+ break
+ if analyzed or args.keep_unanalyzed:
+ games.append(game.toJSONObject())
+ if not analyzed:
+ logging.error(f"no dump found for {game.name} ({code})")
+ logging.info(f"saving {len(games)} entries out of {len(gameInfo["games"])}")
+ # Generate the output file, carrying over the schema path (if any) from the
+ # initial game list.
+ root: JSONGroupedObject = JSONGroupedObject()
+ if "$schema" in gameInfo:
+ root.groups.append({ "$schema": gameInfo["$schema"] })
+ root.groups.append({ "games": JSONGroupedArray([ games ]) })
+ with args.output as file:
+ for string in JSONFormatter(args.minify).serialize(root):
+ file.write(string)
+if __name__ == "__main__":
+ main()
diff --git a/tools/buildResourcePackage.py b/tools/buildResourcePackage.py
old mode 100644
new mode 100755
diff --git a/tools/common/__init__.py b/tools/common/__init__.py
index 7b98594..0d8a3f0 100644
--- a/tools/common/__init__.py
+++ b/tools/common/__init__.py
@@ -18,11 +18,12 @@ __version__ = "1.0.3"
__author__ = "spicyjpeg"
__all__ = (
+ "analysis",
- "cartdata",
+ "cartparser",
- "games",
+ "gamedb",
diff --git a/tools/common/analysis.py b/tools/common/analysis.py
new file mode 100644
index 0000000..a418c74
--- /dev/null
+++ b/tools/common/analysis.py
@@ -0,0 +1,229 @@
+# -*- coding: utf-8 -*-
+# 573in1 - Copyright (C) 2022-2024 spicyjpeg
+# 573in1 is free software: you can redistribute it and/or modify it under the
+# terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+# 573in1 is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along with
+# 573in1. If not, see .
+import logging, re
+from collections.abc import Sequence
+from pathlib import Path
+from .cart import *
+from .decompile import AnalysisError, PSEXEAnalyzer
+from .mips import ImmInstruction, Opcode, Register, encodeADDIU, encodeJR
+from .util import InterleavedFile
+## MAME NVRAM directory reader
+class MAMENVRAMDump:
+ def __init__(self, nvramDir: Path):
+ try:
+ with InterleavedFile(
+ open(nvramDir / "29f016a.31m", "rb"),
+ open(nvramDir / "29f016a.27m", "rb")
+ ) as file:
+ self.flashHeader: ROMHeaderDump | None = ROMHeaderDump(
+ b"",
+ )
+ # FIXME: the executable's CRC32 should probably be validated
+ try:
+ self.bootloader: PSEXEAnalyzer | None = PSEXEAnalyzer(file)
+ except AnalysisError:
+ self.bootloader: PSEXEAnalyzer | None = None
+ except FileNotFoundError:
+ self.flashHeader: ROMHeaderDump | None = None
+ self.bootloader: PSEXEAnalyzer | None = None
+ try:
+ with open(nvramDir / "m48t58", "rb") as file:
+ file.seek(RTC_HEADER_OFFSET)
+ self.rtcHeader: ROMHeaderDump | None = ROMHeaderDump(
+ b"",
+ file.read(RTC_HEADER_LENGTH)
+ )
+ except FileNotFoundError:
+ self.rtcHeader: ROMHeaderDump | None = None
+ self.installCart: CartDump | None = \
+ self._loadCartDump(nvramDir / "cassette_install_eeprom")
+ self.gameCart: CartDump | None = \
+ self._loadCartDump(nvramDir / "cassette_game_eeprom")
+ def _loadCartDump(self, path: Path) -> CartDump | None:
+ try:
+ with open(path, "rb") as file:
+ return parseMAMECartDump(file.read())
+ except FileNotFoundError:
+ return None
+## Bootloader executable analysis
+_BOOT_VERSION_REGEX: re.Pattern = \
+ re.compile(rb"\0BOOT VER[. ]*(1\.[0-9A-Z]+)\0")
+def getBootloaderVersion(exe: PSEXEAnalyzer) -> str:
+ for matched in _BOOT_VERSION_REGEX.finditer(exe.body):
+ version: bytes = matched.group(1)
+ argString: bytes = b"\0" + version + b"\0"
+ # A copy of the version string with no "BOOT VER" prefix is always
+ # present in the launcher and passed to the game's command line.
+ if argString not in exe.body:
+ logging.warning("found version string with no prefix-less copy")
+ return version.decode("ascii")
+ raise AnalysisError("could not find version string")
+## Game executable analysis
+# In order to support chips from multiple manufacturers, Konami's flash and
+# security cartridge drivers use vtable arrays to dispatch API calls to the
+# appropriate driver. The following arrays are present in the binary:
+# struct {
+# int (*eraseChip)(const uint8_t *dataKey);
+# int (*setDataKey)(
+# uint8_t type, const uint8_t *oldKey, const uint8_t *newKey
+# );
+# int (*readData)(
+# const uint8_t *dataKey, uint32_t offset, void *output, size_t length
+# );
+# int (*writeData)(
+# const uint8_t *dataKey, uint32_t offset, const void *data, size_t length
+# );
+# int (*readConfig)(const uint8_t *dataKey, void *output);
+# int (*writeConfig)(const uint8_t *dataKey, const void *config);
+# int (*readDS2401)(void *output);
+# int chipType, capacity;
+# struct {
+# int (*eraseSector)(void *ptr);
+# int (*flushErase)(void);
+# int (*flushEraseLower)(void);
+# int (*flushEraseUpper)(void);
+# int (*writeHalfword)(void *ptr, uint16_t value);
+# int (*writeHalfwordAsync)(void *ptr, uint16_t value);
+# int (*flushWrite)(void *ptr, uint16_t value);
+# int (*flushWriteLower)(void *ptr, uint16_t value);
+# int (*flushWriteUpper)(void *ptr, uint16_t value);
+# int (*resetChip)(void *ptr);
+def _findDriverTableCalls(
+ exe: PSEXEAnalyzer,
+ dummyErrorCode: int,
+ functionNames: Sequence[str] = (),
+ valueNames: Sequence[str] = ()
+) -> dict[str, int]:
+ # The first entry of each array is always a dummy driver containing pointers
+ # to a function that returns an error code. The table can thus be found by
+ # locating the dummy function and all contiguous references to it.
+ table: int = 0
+ for dummy in exe.findBytes(
+ encodeJR(Register.RA) +
+ encodeADDIU(Register.V0, Register.ZERO, dummyErrorCode)
+ ):
+ try:
+ table = exe.findSingleMatch(
+ (dummy.to_bytes(4, "little") * len(functionNames)) +
+ bytes(4 * len(valueNames))
+ )
+ break
+ except StopIteration:
+ continue
+ if not table:
+ raise AnalysisError(
+ "could not locate any valid table referenced by a dummy function"
+ )
+ logging.debug(f"table found at {table:#010x}")
+ # Search the binary for functions that are wrappers around the driver table.
+ memberNames: Sequence[str] = functionNames + valueNames
+ functions: dict[str, int] = {}
+ for offset in exe.findFunctionReturns():
+ match (
+ exe.disassembleAt(offset + 4),
+ exe.disassembleAt(offset + 16),
+ exe.disassembleAt(offset + 40)
+ ):
+ case (
+ ImmInstruction(
+ opcode = Opcode.LUI, rt = Register.V1, value = msb
+ ), ImmInstruction(
+ opcode = Opcode.ADDIU, rt = Register.V1, value = lsb
+ ), ImmInstruction(
+ opcode = Opcode.LW, rt = Register.V0, value = index
+ )
+ ) if ((msb << 16) + lsb) == table:
+ index //= 4
+ if (index < 0) or (index >= len(memberNames)):
+ logging.debug(
+ f"ignoring candidate at {offset:#010x} due to "
+ f"out-of-bounds index {index}"
+ )
+ continue
+ name: str = memberNames[index]
+ functions[name] = offset
+ logging.debug(f"found {name} at {offset:#010x}")
+ return functions
+def findCartFunctions(exe: PSEXEAnalyzer) -> dict[str, int]:
+ return _findDriverTableCalls(
+ exe, -2, (
+ "eraseChip",
+ "setDataKey",
+ "readSector",
+ "writeSector",
+ "readConfig",
+ "writeConfig",
+ "readCartID",
+ ), (
+ "chipType",
+ "capacity"
+ )
+ )
+def findFlashFunctions(exe: PSEXEAnalyzer) -> dict[str, int]:
+ return _findDriverTableCalls(
+ exe, -1, (
+ "eraseSector",
+ "flushErase",
+ "flushEraseLower",
+ "flushEraseUpper",
+ "writeHalfword",
+ "writeHalfwordAsync",
+ "flushWrite",
+ "flushWriteLower",
+ "flushWriteUpper",
+ "resetChip"
+ )
+ )
diff --git a/tools/common/cart.py b/tools/common/cart.py
index 3ea6067..45fcfa7 100644
--- a/tools/common/cart.py
+++ b/tools/common/cart.py
@@ -14,10 +14,12 @@
# You should have received a copy of the GNU General Public License along with
# 573in1. If not, see .
-from collections.abc import Mapping
+import re
+from collections.abc import ByteString
from dataclasses import dataclass
from enum import IntEnum, IntFlag
from struct import Struct
+from typing import Self
from zlib import decompress
from .util import decodeBase41
@@ -40,19 +42,37 @@ class DumpFlag(IntFlag):
+class ChipSize:
+ privateDataOffset: int
+ privateDataLength: int
+ publicDataOffset: int
+ publicDataLength: int
+ def getLength(self) -> int:
+ return self.privateDataLength + self.publicDataLength
+RTC_HEADER_OFFSET: int = 0x00
+RTC_HEADER_LENGTH: int = 0x20
+FLASH_CRC_OFFSET: int = 0x20
## Cartridge dump structure
_CART_DUMP_HEADER_STRUCT: Struct = Struct("< H 2B 8s 8s 8s 8s 8s")
-_CHIP_SIZES: Mapping[ChipType, tuple[int, int, int]] = {
- ChipType.X76F041: ( 512, 384, 128 ),
- ChipType.X76F100: ( 112, 0, 0 ),
- ChipType.ZS01: ( 112, 0, 32 )
+_CHIP_SIZES: dict[ChipType, ChipSize] = {
+ ChipType.X76F041: ChipSize( 0, 384, 384, 128),
+ ChipType.X76F100: ChipSize( 0, 112, 0, 0),
+ ChipType.ZS01: ChipSize(32, 80, 0, 32)
-_QR_STRING_START: str = "573::"
-_QR_STRING_END: str = "::"
+_QR_STRING_REGEX: re.Pattern = \
+ re.compile(r"573::([0-9A-Z+-./:]+)::", re.IGNORECASE)
class CartDump:
@@ -66,10 +86,52 @@ class CartDump:
config: bytes
data: bytes
- def getChipSize(self) -> tuple[int, int, int]:
+ def getChipSize(self) -> ChipSize:
return _CHIP_SIZES[self.chipType]
- def serialize(self) -> bytes:
+ @staticmethod
+ def fromQRString(data: str) -> Self:
+ qrString: re.Match | None = _QR_STRING_REGEX.search(data)
+ if qrString is None:
+ raise ValueError("not a valid 573in1 QR code string")
+ dump: bytearray = decodeBase41(qrString.group(1).upper())
+ return CartDump.fromBinary(decompress(dump))
+ @staticmethod
+ def fromBinary(data: ByteString) -> Self:
+ (
+ magic,
+ chipType,
+ flags,
+ systemID,
+ cartID,
+ zsID,
+ dataKey,
+ config
+ ) = \
+ _CART_DUMP_HEADER_STRUCT.unpack_from(data, 0)
+ if magic != _CART_DUMP_HEADER_MAGIC:
+ raise ValueError(f"invalid or unsupported dump format: {magic:#06x}")
+ offset: int = _CART_DUMP_HEADER_STRUCT.size
+ length: int = _CHIP_SIZES[chipType].getLength()
+ return CartDump(
+ chipType,
+ flags,
+ systemID,
+ cartID,
+ zsID,
+ dataKey,
+ config,
+ data[offset:offset + length]
+ )
+ def toBinary(self) -> bytes:
@@ -81,31 +143,80 @@ class CartDump:
) + self.data
-def parseCartDump(data: bytes) -> CartDump:
- magic, chipType, flags, systemID, cartID, zsID, dataKey, config = \
+## MAME NVRAM cartridge dump parser
- if magic != _CART_DUMP_HEADER_MAGIC:
- raise ValueError(f"invalid or unsupported dump format: {magic:#04x}")
+_MAME_X76F041_DUMP_STRUCT: Struct = Struct("< 4x 8s 8s 8s 8s 512s")
+_MAME_X76F100_DUMP_STRUCT: Struct = Struct("< 4x 8s 8s 112s")
+_MAME_ZS01_DUMP_STRUCT: Struct = Struct("< 4x 8s 8s 8s 112s")
+_MAME_ZS01_OLD_DUMP_STRUCT1: Struct = Struct("< 4x 8s 8s 8s 112s 3984x")
+_MAME_ZS01_OLD_DUMP_STRUCT2: Struct = Struct("< 4x 8s 8s 112s 3984x")
- length, _, _ = _CHIP_SIZES[chipType]
+def parseMAMECartDump(dump: ByteString) -> CartDump:
+ match int.from_bytes(dump[0:4], "big"), len(dump):
+ case 0x1955aa55, _MAME_X76F041_DUMP_STRUCT.size:
+ writeKey, readKey, configKey, config, data = \
+ _MAME_X76F041_DUMP_STRUCT.unpack(dump)
+ chipType: ChipType = ChipType.X76F041
+ dataKey: bytes = configKey
+ case 0x1900aa55, _MAME_X76F100_DUMP_STRUCT.size:
+ writeKey, readKey, data = \
+ _MAME_X76F100_DUMP_STRUCT.unpack(dump)
+ if writeKey != readKey:
+ raise RuntimeError(
+ "X76F100 dumps with different read and write keys are not "
+ "supported"
+ )
+ chipType: ChipType = ChipType.X76F100
+ dataKey: bytes = writeKey
+ config: bytes | None = None
+ # Even though older versions of MAME emulate X76F100 cartridges for
+ # games that support them, no actual X76F100 cartridges seem to
+ # exist.
+ raise RuntimeError("X76F100 cartridge dumps are not supported")
+ case 0x5a530001, _MAME_ZS01_DUMP_STRUCT.size:
+ commandKey, dataKey, config, data = \
+ _MAME_ZS01_DUMP_STRUCT.unpack(dump)
+ chipType: ChipType = ChipType.ZS01
+ case 0x5a530001, _MAME_ZS01_OLD_DUMP_STRUCT1.size:
+ commandKey, dataKey, config, data = \
+ _MAME_ZS01_OLD_DUMP_STRUCT1.unpack(dump)
+ chipType: ChipType = ChipType.ZS01
+ case 0x5a530001, _MAME_ZS01_OLD_DUMP_STRUCT2.size:
+ commandKey, dataKey, data = \
+ _MAME_ZS01_OLD_DUMP_STRUCT2.unpack(dump)
+ chipType: ChipType = ChipType.ZS01
+ config: bytes | None = None
+ case magic, length:
+ raise RuntimeError(
+ f"unknown chip type {magic:#010x}, dump length {length:#x}"
+ )
return CartDump(
- chipType, flags, systemID, cartID, zsID, dataKey, config,
+ chipType,
+ 0
+ | (DumpFlag.DUMP_CONFIG_OK if config else 0)
+ b"",
+ b"",
+ b"",
+ dataKey,
+ config or b"",
+ data
-def parseCartQRString(data: str) -> CartDump:
- _data: str = data.strip().upper()
- if not _data.startswith(_QR_STRING_START):
- raise ValueError(f"dump string does not begin with '{_QR_STRING_START}'")
- if not _data.endswith(_QR_STRING_END):
- raise ValueError(f"dump string does not end with '{_QR_STRING_END}'")
- _data = _data[len(_QR_STRING_START):-len(_QR_STRING_END)]
- return parseCartDump(decompress(decodeBase41(_data)))
## Flash and RTC header dump structure
diff --git a/tools/common/cartparser.py b/tools/common/cartparser.py
new file mode 100644
index 0000000..abdbbc7
--- /dev/null
+++ b/tools/common/cartparser.py
@@ -0,0 +1,415 @@
+# -*- coding: utf-8 -*-
+# 573in1 - Copyright (C) 2022-2024 spicyjpeg
+# 573in1 is free software: you can redistribute it and/or modify it under the
+# terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+# 573in1 is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along with
+# 573in1. If not, see .
+import logging, re
+from collections.abc import Sequence
+from dataclasses import dataclass
+from itertools import product
+from struct import Struct
+from typing import ByteString
+from .cart import *
+from .gamedb import *
+from .util import checksum8, checksum16, dsCRC8, sidCRC16
+## Utilities
+class ParserError(Exception):
+ pass
+def _unscrambleRTCRAM(data: ByteString) -> bytearray:
+ # Some early games "scramble" RTC RAM by (possibly accidentally?)
+ # interpreting the data to be written as an array of 16-bit big endian
+ # values, then expanding them to 32-bit little endian.
+ output: bytearray = bytearray(len(data) // 2)
+ for i in range(0, len(output), 2):
+ #if data[(i * 2) + 2] or data[(i * 2) + 3]:
+ #raise ParserError("data does not seem to be scrambled")
+ output[i + 0] = data[(i * 2) + 1]
+ output[i + 1] = data[(i * 2) + 0]
+ return output
+def _validateCustomID(data: ByteString) -> bool:
+ if not sum(data):
+ return False
+ checksum: int = checksum8(data[0:7], True)
+ if checksum == data[7]:
+ return True
+ raise ParserError(
+ f"checksum mismatch: expected {checksum:#04x}, got {data[7]:#04x}"
+ )
+def _validateDS2401ID(data: ByteString) -> bool:
+ if not sum(data):
+ return False
+ if not data[0] or (data[0] == 0xff):
+ raise ParserError(f"invalid 1-wire prefix {data[0]:#04x}")
+ crc: int = dsCRC8(data[0:7])
+ if crc == data[7]:
+ return True
+ raise ParserError(f"CRC8 mismatch: expected {crc:#04x}, got {data[7]:#04x}")
+## Header checksum detection
+def detectChecksum(data: ByteString, checksum: int) -> ChecksumFlag:
+ buffer: bytearray = bytearray(data)
+ bigEndianSum: int = (0
+ | ((checksum << 8) & 0xff00)
+ | ((checksum >> 8) & 0x00ff)
+ )
+ for unit, bigEndian, inverted, forceGXSpec in product(
+ (
+ ),
+ ( 0, ChecksumFlag.CHECKSUM_BIG_ENDIAN ),
+ ( 0, ChecksumFlag.CHECKSUM_INVERTED ),
+ ( 0, ChecksumFlag.CHECKSUM_FORCE_GX_SPEC )
+ ):
+ checksumFlags: ChecksumFlag = \
+ ChecksumFlag(unit | bigEndian | inverted | forceGXSpec)
+ flagList: str = \
+ "|".join(flag.name for flag in checksumFlags) or "0"
+ # Dark Horse Legend sets the game code to GE706, but mistakenly computes
+ # the checksum as if the specification were GX.
+ actual: int = bigEndianSum if bigEndian else checksum
+ buffer[0:2] = b"GX" if forceGXSpec else data[0:2]
+ match unit:
+ case ChecksumFlag.CHECKSUM_UNIT_BYTE:
+ expected: int = checksum8(buffer, bool(inverted))
+ expected: int = checksum16(buffer, "little", bool(inverted))
+ case ChecksumFlag.CHECKSUM_UNIT_WORD_BIG:
+ expected: int = checksum16(buffer, "big", bool(inverted))
+ if expected == actual:
+ return checksumFlags
+ else:
+ logging.debug(
+ f" <{flagList}>: expected {expected:#06x}, got {actual:#06x}"
+ )
+ raise ParserError("could not find any valid header checksum format")
+## Header format detection
+# spec[0]: always G
+# spec[1]: product type (B, C, E, K, L, N, Q, U, X, *=wildcard)
+# code[0:2]: game code (700-999 or A00-D99)
+# region[0]: region code
+# (A=Asia, E=Europe, J=Japan, K=Korea, S=Singapore?, U=US)
+# region[1]: major version code (A-F=regular, R-W=e-Amusement, X-Z=?)
+# region[2:4]: minor version code (A-D or Z00-Z99, optional)
+_SPECIFICATION_REGEX: re.Pattern = re.compile(rb"G[A-Z*]")
+_CODE_REGEX: re.Pattern = re.compile(rb"[0-9A-D][0-9][0-9]")
+_REGION_REGEX: re.Pattern = \
+ re.compile(rb"[AEJKSU][A-FR-WX-Z]([A-D]|Z[0-9][0-9])?", re.IGNORECASE)
+_BASIC_HEADER_STRUCT: Struct = Struct("< 2s 2s B 3x")
+_EXTENDED_HEADER_STRUCT: Struct = Struct("< 2s 6s 2s 4s H")
+_PRIVATE_ID_STRUCT: Struct = Struct("< 8s 8s 8s 8s")
+_PUBLIC_ID_STRUCT: Struct = Struct("< 8s 8s")
+class DetectedHeader:
+ yearField: int = 0
+ headerFlags: HeaderFlag = HeaderFlag(0)
+ checksumFlags: ChecksumFlag = ChecksumFlag(0)
+ privateIDOffset: int | None = None
+ publicIDOffset: int | None = None
+def detectHeader(
+ data: ByteString,
+ privateOffset: int,
+ publicOffset: int
+) -> DetectedHeader:
+ unscrambledData: bytearray = _unscrambleRTCRAM(data)
+ for formatType, scrambled, usesPublicArea in product(
+ (
+ HeaderFlag.FORMAT_BASIC,
+ ),
+ ( HeaderFlag(0), HeaderFlag.HEADER_SCRAMBLED ),
+ ( HeaderFlag(0), HeaderFlag.HEADER_IN_PUBLIC_AREA )
+ ):
+ header: DetectedHeader = DetectedHeader()
+ header.headerFlags = formatType | scrambled | usesPublicArea
+ flagList: str = \
+ "|".join(flag.name for flag in header.headerFlags) or "0"
+ buffer: ByteString = unscrambledData if scrambled else data
+ offset: int | None = publicOffset if usesPublicArea else privateOffset
+ if (offset < 0) or (offset >= len(buffer)):
+ logging.debug(f" <{flagList}>: header offset out of bounds")
+ continue
+ match formatType:
+ case HeaderFlag.FORMAT_SIMPLE:
+ region: bytes = buffer[offset:offset + 4]
+ specification: bytes = b""
+ case HeaderFlag.FORMAT_BASIC:
+ region, specification, checksum = \
+ _BASIC_HEADER_STRUCT.unpack_from(buffer, offset)
+ header.privateIDOffset = offset + _BASIC_HEADER_STRUCT.size
+ try:
+ header.checksumFlags = \
+ detectChecksum(buffer[offset:offset + 4], checksum)
+ except ParserError as err:
+ logging.debug(f" <{flagList}>: {err}")
+ continue
+ case HeaderFlag.FORMAT_EXTENDED:
+ (
+ specification,
+ code,
+ header.yearField,
+ region,
+ checksum
+ ) = \
+ _EXTENDED_HEADER_STRUCT.unpack_from(buffer, offset)
+ header.publicIDOffset = offset + _EXTENDED_HEADER_STRUCT.size
+ header.privateIDOffset = \
+ header.publicIDOffset + _PUBLIC_ID_STRUCT.size
+ if (
+ not _SPECIFICATION_REGEX.match(specification) or
+ not _CODE_REGEX.match(code)
+ ):
+ logging.debug(f" <{flagList}>: invalid game code")
+ continue
+ try:
+ header.checksumFlags = \
+ detectChecksum(buffer[offset:offset + 14], checksum)
+ except ParserError as err:
+ logging.debug(f" <{flagList}>: {err}")
+ continue
+ if not _REGION_REGEX.match(region):
+ logging.debug(f" <{flagList}>: invalid game region")
+ continue
+ if region == region.lower():
+ header.headerFlags |= HeaderFlag.REGION_LOWERCASE
+ if _SPECIFICATION_REGEX.match(specification):
+ if specification[1] == "*":
+ header.headerFlags |= HeaderFlag.SPEC_TYPE_WILDCARD
+ else:
+ header.headerFlags |= HeaderFlag.SPEC_TYPE_ACTUAL
+ return header
+ raise ParserError("could not find any valid header data format")
+## Identifier detection
+_TID_WIDTHS: Sequence[int] = 16, 14
+class DetectedIdentifiers:
+ tidWidth: int = 0
+ midValue: int = 0
+ idFlags: IdentifierFlag = IdentifierFlag(0)
+def detectPrivateIDs(
+ data: ByteString,
+ privateOffset: int,
+ dummyAreaOffset: int
+) -> DetectedIdentifiers:
+ ids: DetectedIdentifiers = DetectedIdentifiers()
+ # Dancing Stage EuroMIX uses an X76F041 cartridge but adopts the same data
+ # layout as ZS01 games (32-byte public header/IDs + 32-byte private IDs).
+ # However, as the X76F041 does not support leaving only the first 32 bytes
+ # unprotected, the public area is instead relocated to the chip's last
+ # 128-byte sector (which is then configured to be unprotected). This has to
+ # be taken into account here as the private IDs are *not* moved to the
+ # beginning of the first sector; the space that would otherwise .
+ offset: int = privateOffset
+ if (dummyAreaOffset >= 0) and (dummyAreaOffset < len(data)):
+ dummyArea: ByteString = \
+ data[dummyAreaOffset:dummyAreaOffset + _PRIVATE_ID_STRUCT.size]
+ if sum(dummyArea):
+ offset = dummyAreaOffset
+ ids.idFlags = IdentifierFlag.ALLOCATE_DUMMY_PUBLIC_AREA
+ tid, sid, mid, xid = _PRIVATE_ID_STRUCT.unpack_from(data, offset)
+ if _validateCustomID(tid):
+ match tid[0]:
+ case 0x81:
+ ids.idFlags |= IdentifierFlag.PRIVATE_TID_TYPE_STATIC
+ case 0x82:
+ littleEndianCRC: int = int.from_bytes(tid[1:3], "little")
+ bigEndianCRC: int = int.from_bytes(tid[1:3], "big")
+ for width in _TID_WIDTHS:
+ crc: int = sidCRC16(sid[1:7], width)
+ if crc == littleEndianCRC:
+ ids.tidWidth = width
+ ids.idFlags |= \
+ break
+ elif crc == bigEndianCRC:
+ ids.tidWidth = width
+ ids.idFlags |= \
+ break
+ raise ParserError("could not determine trace ID bit width")
+ case _:
+ raise ParserError(f"unknown trace ID prefix: {tid[0]:#04x}")
+ if _validateDS2401ID(sid):
+ ids.idFlags |= IdentifierFlag.PRIVATE_SID_PRESENT
+ if _validateCustomID(mid):
+ ids.midValue = mid[0]
+ ids.idFlags |= IdentifierFlag.PRIVATE_MID_PRESENT
+ if _validateDS2401ID(xid):
+ ids.idFlags |= IdentifierFlag.PRIVATE_XID_PRESENT
+ return ids
+def detectPublicIDs(data: ByteString, publicOffset: int) -> DetectedIdentifiers:
+ ids: DetectedIdentifiers = DetectedIdentifiers()
+ mid, xid = _PUBLIC_ID_STRUCT.unpack_from(data, publicOffset)
+ if _validateCustomID(mid):
+ ids.midValue = mid[0]
+ ids.idFlags |= IdentifierFlag.PUBLIC_MID_PRESENT
+ if _validateDS2401ID(xid):
+ ids.idFlags |= IdentifierFlag.PUBLIC_XID_PRESENT
+ return ids
+## Installation signature detection
+_SIGNATURE_STRUCT: Struct = Struct("< 8s 8s")
+def detectSignature(data: ByteString, publicOffset: int) -> SignatureFlag:
+ signatureFlags: SignatureFlag = SignatureFlag(0)
+ installSig, dummy = _SIGNATURE_STRUCT.unpack_from(data, publicOffset)
+ # TODO: implement
+ return signatureFlags
+## Parsing API
+def parseCartHeader(dump: CartDump, pcb: CartPCBType | None = None) -> CartInfo:
+ if pcb is None:
+ match dump.chipType, bool(dump.flags & DumpFlag.DUMP_HAS_CART_ID):
+ case ChipType.X76F041, False:
+ pcb = CartPCBType.CART_UNKNOWN_X76F041
+ case ChipType.X76F041, True:
+ pcb = CartPCBType.CART_UNKNOWN_X76F041_DS2401
+ case ChipType.ZS01, True:
+ pcb = CartPCBType.CART_UNKNOWN_ZS01
+ case _, _:
+ raise ParserError("unsupported cartridge type")
+ chipSize: ChipSize = dump.getChipSize()
+ header: DetectedHeader = detectHeader(
+ dump.data,
+ chipSize.privateDataOffset,
+ chipSize.publicDataOffset
+ )
+ if header.privateIDOffset is None:
+ privateIDs: DetectedIdentifiers = DetectedIdentifiers()
+ else:
+ privateIDs: DetectedIdentifiers = detectPrivateIDs(
+ dump.data,
+ header.privateIDOffset,
+ header.privateIDOffset
+ - chipSize.publicDataOffset
+ + chipSize.privateDataOffset
+ )
+ if header.publicIDOffset is None:
+ publicIDs: DetectedIdentifiers = DetectedIdentifiers()
+ else:
+ publicIDs: DetectedIdentifiers = \
+ detectPublicIDs(dump.data, header.publicIDOffset)
+ if (
+ (IdentifierFlag.PRIVATE_MID_PRESENT in privateIDs.idFlags) and
+ (IdentifierFlag.PUBLIC_MID_PRESENT in publicIDs.idFlags)
+ ):
+ if privateIDs.midValue != publicIDs.midValue:
+ raise ParserError("private and public MID values do not match")
+ return CartInfo(
+ pcb,
+ dump.dataKey,
+ header.yearField,
+ privateIDs.tidWidth,
+ privateIDs.midValue,
+ header.headerFlags,
+ header.checksumFlags,
+ privateIDs.idFlags | publicIDs.idFlags
+ )
+def parseROMHeader(dump: ROMHeaderDump) -> ROMHeaderInfo:
+ header: DetectedHeader = detectHeader(dump.data, -1, FLASH_HEADER_OFFSET)
+ if header.publicIDOffset is None:
+ signatureFlags: SignatureFlag = SignatureFlag(0)
+ else:
+ signatureFlags: SignatureFlag = \
+ detectSignature(dump.data, header.publicIDOffset)
+ return ROMHeaderInfo(
+ header.yearField,
+ header.headerFlags,
+ header.checksumFlags,
+ signatureFlags
+ )
diff --git a/tools/common/decompile.py b/tools/common/decompile.py
index a89fed3..0d6dfc0 100644
--- a/tools/common/decompile.py
+++ b/tools/common/decompile.py
@@ -24,12 +24,12 @@ from .mips import \
## Executable analyzer
-def parseStructFromFile(file: BinaryIO, _struct: Struct) -> tuple:
- return _struct.unpack(file.read(_struct.size))
_EXE_HEADER_STRUCT: Struct = Struct("< 8s 8x 4I 16x 2I 20x 1972s")
+class AnalysisError(Exception):
+ pass
class PSEXEAnalyzer:
def __init__(self, file: BinaryIO):
@@ -40,12 +40,12 @@ class PSEXEAnalyzer:
- _
+ region
) = \
- parseStructFromFile(file, _EXE_HEADER_STRUCT)
+ _EXE_HEADER_STRUCT.unpack(file.read(_EXE_HEADER_STRUCT.size))
if magic != _EXE_HEADER_MAGIC:
- raise RuntimeError("file is not a valid PS1 executable")
+ raise AnalysisError("file is not a valid PS1 executable")
self.entryPoint: int = entryPoint
self.startAddress: int = startAddress
@@ -61,7 +61,10 @@ class PSEXEAnalyzer:
return self.body[key - self.startAddress]
def _makeSlice(
- self, start: int | None = None, stop: int | None = None, step: int = 1
+ self,
+ start: int | None = None,
+ stop: int | None = None,
+ step: int = 1
) -> slice:
_start: int = \
0 if (start is None) else (start - self.startAddress)
@@ -86,7 +89,9 @@ class PSEXEAnalyzer:
return None
def disassemble(
- self, start: int | None = None, stop: int | None = None
+ self,
+ start: int | None = None,
+ stop: int | None = None
) -> Generator[Instruction | None, None, None]:
area: slice = self._makeSlice(start, stop, 4)
offset: int = area.start
@@ -105,15 +110,21 @@ class PSEXEAnalyzer:
offset += area.step
def dumpDisassembly(
- self, output: TextIO, start: int | None = None, stop: int | None = None
+ self,
+ output: TextIO,
+ start: int | None = None,
+ stop: int | None = None
for inst in self.disassemble(start, stop):
if inst is not None:
output.write(f"{inst.address:08x}: {inst.toString()}\n")
def findBytes(
- self, data: ByteString, start: int | None = None,
- stop: int | None = None, alignment: int = 4
+ self,
+ data: ByteString,
+ start: int | None = None,
+ stop: int | None = None,
+ alignment: int = 4
) -> Generator[int, None, None]:
area: slice = self._makeSlice(start, stop)
offset: int = area.start
@@ -138,8 +149,31 @@ class PSEXEAnalyzer:
offset += step
+ def findSingleMatch(
+ self,
+ data: ByteString,
+ start: int | None = None,
+ stop: int | None = None,
+ alignment: int = 4
+ ) -> int:
+ matches: Generator[int, None, None] = \
+ self.findBytes(data, start, stop, alignment)
+ try:
+ firstMatch: int = next(matches)
+ except StopIteration:
+ raise AnalysisError("no match found")
+ try:
+ next(matches)
+ raise AnalysisError("more than one match found")
+ except StopIteration:
+ return firstMatch
def findFunctionReturns(
- self, start: int | None = None, stop: int | None = None
+ self,
+ start: int | None = None,
+ stop: int | None = None
) -> Generator[int, None, None]:
inst: bytes = encodeJR(Register.RA)
@@ -151,15 +185,21 @@ class PSEXEAnalyzer:
yield offset + 8
def findCalls(
- self, target: int, start: int | None = None, stop: int | None = None
+ self,
+ target: int,
+ start: int | None = None,
+ stop: int | None = None
) -> Generator[int, None, None]:
inst: bytes = encodeJAL(target)
yield from self.findBytes(inst, start, stop, 4)
def findValueLoads(
- self, value: int, start: int | None = None, stop: int | None = None,
- maxDisplacement: int = 1
+ self,
+ value: int,
+ start: int | None = None,
+ stop: int | None = None,
+ maxDistance: int = 1
) -> Generator[ImmInstruction, None, None]:
# 32-bit loads are typically encoded as a LUI followed by either ORI or
# ADDIU. Due to ADDIU only supporting signed immediates, the LUI's
@@ -169,7 +209,7 @@ class PSEXEAnalyzer:
if inst is None:
- for offset in range(4, (maxDisplacement + 1) * 4, 4):
+ for offset in range(4, (maxDistance + 1) * 4, 4):
nextInst: Instruction | None = \
self.disassembleAt(inst.address + offset)
diff --git a/tools/common/gamedb.py b/tools/common/gamedb.py
new file mode 100644
index 0000000..2ef27ab
--- /dev/null
+++ b/tools/common/gamedb.py
@@ -0,0 +1,583 @@
+# -*- coding: utf-8 -*-
+# 573in1 - Copyright (C) 2022-2024 spicyjpeg
+# 573in1 is free software: you can redistribute it and/or modify it under the
+# terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+# 573in1 is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along with
+# 573in1. If not, see .
+from collections.abc import ByteString, Iterable, Mapping
+from dataclasses import dataclass
+from enum import IntEnum, IntFlag
+from struct import Struct
+from typing import Any, Self
+from .util import JSONGroupedObject
+## Utilities
+def _toJSONObject(value: Any) -> Any:
+ if hasattr(value, "toJSONObject"):
+ value = value.toJSONObject()
+ elif isinstance(value, ByteString):
+ value = value.hex("-")
+ elif isinstance(value, IntFlag):
+ value = [ flag.name for flag in value ]
+ if (value == 0) or (value == False) or (value == ""):
+ return None
+ else:
+ return value
+def _makeJSONObject(*groups: Mapping[str, Any]) -> JSONGroupedObject:
+ jsonObj: JSONGroupedObject = JSONGroupedObject()
+ for group in groups:
+ dest: dict[str, Any] = {}
+ for key, value in group.items():
+ jsonValue: Any = _toJSONObject(value)
+ if jsonValue is not None:
+ dest[key] = jsonValue
+ if dest:
+ jsonObj.groups.append(dest)
+ return jsonObj
+def _groupJSONObject(obj: Any, *groups: Iterable[str]) -> JSONGroupedObject:
+ jsonObj: JSONGroupedObject = JSONGroupedObject()
+ for group in groups:
+ dest: dict[str, Any] = {}
+ for key in group:
+ jsonValue: Any = _toJSONObject(getattr(obj, key, None))
+ if jsonValue is not None:
+ dest[key] = jsonValue
+ if dest:
+ jsonObj.groups.append(dest)
+ return jsonObj
+## Flags
+class CartPCBType(IntEnum):
+ CART_UNKNOWN_X76F041 = 1
+ CART_UNKNOWN_X76F041_DS2401 = 2
+ CART_GX700_PWB_D = 4
+ CART_GX700_PWB_E = 5
+ CART_GX700_PWB_J = 6
+ CART_GX883_PWB_D = 7
+ CART_GX894_PWB_D = 8
+ CART_GX896_PWB_A_A = 9
+ CART_GE949_PWB_D_A = 10
+ CART_GE949_PWB_D_B = 12
+ CART_PWB0000068819 = 12
+ CART_PWB0000088954 = 13
+ @staticmethod
+ def fromJSONObject(obj: str) -> Self:
+ return {
+ "unknown-x76f041": CartPCBType.CART_UNKNOWN_X76F041,
+ "unknown-x76f041-ds2401": CartPCBType.CART_UNKNOWN_X76F041_DS2401,
+ "unknown-zs01": CartPCBType.CART_UNKNOWN_ZS01,
+ "GX700-PWB(D)": CartPCBType.CART_GX700_PWB_D,
+ "GX700-PWB(E)": CartPCBType.CART_GX700_PWB_E,
+ "GX700-PWB(J)": CartPCBType.CART_GX700_PWB_J,
+ "GX883-PWB(D)": CartPCBType.CART_GX883_PWB_D,
+ "GX894-PWB(D)": CartPCBType.CART_GX894_PWB_D,
+ "GX896-PWB(A)A": CartPCBType.CART_GX896_PWB_A_A,
+ "GE949-PWB(D)A": CartPCBType.CART_GE949_PWB_D_A,
+ "GE949-PWB(D)B": CartPCBType.CART_GE949_PWB_D_B,
+ "PWB0000068819": CartPCBType.CART_PWB0000068819,
+ "PWB0000088954": CartPCBType.CART_PWB0000088954
+ }[obj]
+ def toJSONObject(self) -> str:
+ return {
+ CartPCBType.CART_UNKNOWN_X76F041: "unknown-x76f041",
+ CartPCBType.CART_UNKNOWN_X76F041_DS2401: "unknown-x76f041-ds2401",
+ CartPCBType.CART_UNKNOWN_ZS01: "unknown-zs01",
+ CartPCBType.CART_GX700_PWB_D: "GX700-PWB(D)",
+ CartPCBType.CART_GX700_PWB_E: "GX700-PWB(E)",
+ CartPCBType.CART_GX700_PWB_J: "GX700-PWB(J)",
+ CartPCBType.CART_GX883_PWB_D: "GX883-PWB(D)",
+ CartPCBType.CART_GX894_PWB_D: "GX894-PWB(D)",
+ CartPCBType.CART_GX896_PWB_A_A: "GX896-PWB(A)A",
+ CartPCBType.CART_GE949_PWB_D_A: "GE949-PWB(D)A",
+ CartPCBType.CART_GE949_PWB_D_B: "GE949-PWB(D)B",
+ CartPCBType.CART_PWB0000068819: "PWB0000068819",
+ CartPCBType.CART_PWB0000088954: "PWB0000088954"
+ }[self]
+class HeaderFlag(IntFlag):
+ FORMAT_SIMPLE = 0 << 0
+ FORMAT_BASIC = 1 << 0
+ SPEC_TYPE_NONE = 0 << 2
+ @staticmethod
+ def fromJSONObject(obj: Mapping[str, Any]) -> Self:
+ flags: HeaderFlag = 0
+ flags |= {
+ "simple": HeaderFlag.FORMAT_SIMPLE,
+ "basic": HeaderFlag.FORMAT_BASIC,
+ "extended": HeaderFlag.FORMAT_EXTENDED
+ }[obj.get("format", None)]
+ flags |= {
+ None: HeaderFlag.SPEC_TYPE_NONE,
+ "actual": HeaderFlag.SPEC_TYPE_ACTUAL,
+ "wildcard": HeaderFlag.SPEC_TYPE_WILDCARD
+ }[obj.get("specType", None)]
+ for key, flag in {
+ "scrambled": HeaderFlag.HEADER_SCRAMBLED,
+ "usesPublicArea": HeaderFlag.HEADER_IN_PUBLIC_AREA,
+ "lowercaseRegion": HeaderFlag.REGION_LOWERCASE
+ }.items():
+ if obj.get(key, False):
+ flags |= flag
+ return flags
+ def toJSONObject(self) -> JSONGroupedObject:
+ return _makeJSONObject(
+ {
+ "format": {
+ HeaderFlag.FORMAT_SIMPLE: "simple",
+ HeaderFlag.FORMAT_BASIC: "basic",
+ HeaderFlag.FORMAT_EXTENDED: "extended"
+ }[self & HeaderFlag.FORMAT_BITMASK],
+ "specType": {
+ HeaderFlag.SPEC_TYPE_NONE: None,
+ HeaderFlag.SPEC_TYPE_ACTUAL: "actual",
+ HeaderFlag.SPEC_TYPE_WILDCARD: "wildcard"
+ }[self & HeaderFlag.SPEC_TYPE_BITMASK],
+ "scrambled": (HeaderFlag.HEADER_SCRAMBLED in self),
+ "usesPublicArea": (HeaderFlag.HEADER_IN_PUBLIC_AREA in self),
+ "lowercaseRegion": (HeaderFlag.REGION_LOWERCASE in self)
+ }
+ )
+class ChecksumFlag(IntFlag):
+ @staticmethod
+ def fromJSONObject(obj: Mapping[str, Any]) -> Self:
+ flags: ChecksumFlag = 0
+ flags |= {
+ "byte": ChecksumFlag.CHECKSUM_UNIT_BYTE,
+ "littleEndianWord": ChecksumFlag.CHECKSUM_UNIT_WORD_LITTLE,
+ "bigEndianWord": ChecksumFlag.CHECKSUM_UNIT_WORD_BIG
+ }[obj.get("unit", None)]
+ for key, flag in {
+ "bigEndian": ChecksumFlag.CHECKSUM_BIG_ENDIAN,
+ "inverted": ChecksumFlag.CHECKSUM_INVERTED,
+ "forceGXSpec": ChecksumFlag.CHECKSUM_FORCE_GX_SPEC
+ }.items():
+ if obj.get(key, False):
+ flags |= flag
+ return flags
+ def toJSONObject(self) -> JSONGroupedObject:
+ return _makeJSONObject(
+ {
+ "unit": {
+ ChecksumFlag.CHECKSUM_UNIT_BYTE: "byte",
+ ChecksumFlag.CHECKSUM_UNIT_WORD_LITTLE: "littleEndianWord",
+ ChecksumFlag.CHECKSUM_UNIT_WORD_BIG: "bigEndianWord"
+ }[self & ChecksumFlag.CHECKSUM_UNIT_BITMASK],
+ "bigEndian": (ChecksumFlag.CHECKSUM_BIG_ENDIAN in self),
+ "inverted": (ChecksumFlag.CHECKSUM_INVERTED in self),
+ "forceGXSpec": (ChecksumFlag.CHECKSUM_FORCE_GX_SPEC in self)
+ }
+ )
+class IdentifierFlag(IntFlag):
+ @staticmethod
+ def fromJSONObject(obj: Mapping[str, Any]) -> Self:
+ flags: IdentifierFlag = 0
+ flags |= {
+ None: IdentifierFlag.PRIVATE_TID_TYPE_NONE,
+ "static": IdentifierFlag.PRIVATE_TID_TYPE_STATIC,
+ "littleEndianSIDHash": IdentifierFlag.PRIVATE_TID_TYPE_SID_HASH_LITTLE,
+ "bigEndianSIDHash": IdentifierFlag.PRIVATE_TID_TYPE_SID_HASH_BIG
+ }[obj.get("privateTID", None)]
+ for key, flag in {
+ "privateSID": IdentifierFlag.PRIVATE_SID_PRESENT,
+ "privateMID": IdentifierFlag.PRIVATE_MID_PRESENT,
+ "privateXID": IdentifierFlag.PRIVATE_XID_PRESENT,
+ "dummyPublicArea": IdentifierFlag.ALLOCATE_DUMMY_PUBLIC_AREA,
+ "publicMID": IdentifierFlag.PUBLIC_MID_PRESENT,
+ "publicXID": IdentifierFlag.PUBLIC_XID_PRESENT
+ }.items():
+ if obj.get(key, False):
+ flags |= flag
+ return flags
+ def toJSONObject(self) -> JSONGroupedObject:
+ return _makeJSONObject(
+ {
+ "privateTID": {
+ IdentifierFlag.PRIVATE_TID_TYPE_NONE: None,
+ IdentifierFlag.PRIVATE_TID_TYPE_STATIC: "static",
+ IdentifierFlag.PRIVATE_TID_TYPE_SID_HASH_LITTLE: "littleEndianSIDHash",
+ IdentifierFlag.PRIVATE_TID_TYPE_SID_HASH_BIG: "bigEndianSIDHash"
+ }[self & IdentifierFlag.PRIVATE_TID_TYPE_BITMASK],
+ "privateSID": (IdentifierFlag.PRIVATE_SID_PRESENT in self),
+ "privateMID": (IdentifierFlag.PRIVATE_MID_PRESENT in self),
+ "privateXID": (IdentifierFlag.PRIVATE_XID_PRESENT in self)
+ }, {
+ "dummyPublicArea":
+ (IdentifierFlag.ALLOCATE_DUMMY_PUBLIC_AREA in self),
+ "publicMID": (IdentifierFlag.PUBLIC_MID_PRESENT in self),
+ "publicXID": (IdentifierFlag.PUBLIC_XID_PRESENT in self)
+ }
+ )
+class SignatureFlag(IntFlag):
+ SIGNATURE_DUMMY_01 = 1 << 2
+ @staticmethod
+ def fromJSONObject(obj: Mapping[str, Any]) -> Self:
+ flags: SignatureFlag = 0
+ flags |= {
+ None: SignatureFlag.SIGNATURE_TYPE_NONE,
+ "checksum": SignatureFlag.SIGNATURE_TYPE_CHECKSUM,
+ "md5": SignatureFlag.SIGNATURE_TYPE_MD5
+ }[obj.get("type", None)]
+ for key, flag in {
+ "dummy01": SignatureFlag.SIGNATURE_DUMMY_01,
+ "padWithFF": SignatureFlag.SIGNATURE_PAD_WITH_FF
+ }.items():
+ if obj.get(key, False):
+ flags |= flag
+ return flags
+ def toJSONObject(self) -> JSONGroupedObject:
+ return _makeJSONObject(
+ {
+ "type": {
+ SignatureFlag.SIGNATURE_TYPE_NONE: None,
+ SignatureFlag.SIGNATURE_TYPE_CHECKSUM: "checksum",
+ SignatureFlag.SIGNATURE_TYPE_MD5: "md5"
+ }[self & SignatureFlag.SIGNATURE_TYPE_BITMASK],
+ "dummy01": (SignatureFlag.SIGNATURE_DUMMY_01 in self),
+ "padWithFF": (SignatureFlag.SIGNATURE_PAD_WITH_FF in self)
+ }
+ )
+class GameFlag(IntFlag):
+ @staticmethod
+ def fromJSONObject(obj: Mapping[str, Any]) -> Self:
+ flags: GameFlag = 0
+ flags |= {
+ None: GameFlag.GAME_IO_BOARD_NONE,
+ "GX700-PWB(K)": GameFlag.GAME_IO_BOARD_KICK,
+ "PWB0000073070": GameFlag.GAME_IO_BOARD_GUNMANIA
+ }[obj.get("ioBoard", None)]
+ for key, flag in {
+ "installRequiresRTCHeader": GameFlag.GAME_INSTALL_RTC_HEADER_REQUIRED,
+ "requiresRTCHeader": GameFlag.GAME_RTC_HEADER_REQUIRED
+ }.items():
+ if obj.get(key, False):
+ flags |= flag
+ return flags
+ def toJSONObject(self) -> JSONGroupedObject:
+ return _makeJSONObject(
+ {
+ "ioBoard": {
+ GameFlag.GAME_IO_BOARD_NONE: None,
+ GameFlag.GAME_IO_BOARD_KICK: "GX700-PWB(K)",
+ GameFlag.GAME_IO_BOARD_GUNMANIA: "PWB0000073070"
+ }[self & GameFlag.GAME_IO_BOARD_BITMASK]
+ }, {
+ "installRequiresRTCHeader":
+ "requiresRTCHeader": (GameFlag.GAME_RTC_HEADER_REQUIRED in self)
+ }
+ )
+## Data structures
+_ROM_HEADER_INFO_STRUCT: Struct = Struct("< 2s 3B x")
+_CART_INFO_STRUCT: Struct = Struct("< 8s 2s 6B")
+_GAME_INFO_STRUCT: Struct = Struct("< 8s 36s 3s B 2H 6s 6s 16s 16s")
+_MAX_REGIONS: int = 12
+class ROMHeaderInfo:
+ yearField: bytes
+ headerFlags: HeaderFlag
+ checksumFlags: ChecksumFlag
+ signatureFlags: SignatureFlag
+ @staticmethod
+ def fromJSONObject(obj: Mapping[str, Any]) -> Self:
+ return ROMHeaderInfo(
+ bytes.fromhex(obj["yearField"].replace("-", " ")),
+ HeaderFlag .fromJSONObject(obj.get("headerFlags", {})),
+ ChecksumFlag .fromJSONObject(obj.get("checksumFlags", {})),
+ SignatureFlag.fromJSONObject(obj.get("signatureFlags", {}))
+ )
+ def toJSONObject(self) -> JSONGroupedObject:
+ return _groupJSONObject(
+ self, (
+ "yearField"
+ ), (
+ "headerFlags",
+ "checksumFlags",
+ "signatureFlags"
+ )
+ )
+ def toBinary(self) -> bytes:
+ return _ROM_HEADER_INFO_STRUCT.pack(
+ self.yearField,
+ self.headerFlags,
+ self.checksumFlags,
+ self.signatureFlags
+ )
+class CartInfo:
+ pcb: CartPCBType
+ dataKey: bytes
+ yearField: bytes
+ tidWidth: int
+ midValue: int
+ headerFlags: HeaderFlag
+ checksumFlags: ChecksumFlag
+ idFlags: IdentifierFlag
+ @staticmethod
+ def fromJSONObject(obj: Mapping[str, Any]) -> Self:
+ return CartInfo(
+ CartPCBType.fromJSONObject(obj["pcb"]),
+ bytes.fromhex(obj["dataKey"] .replace("-", " ")),
+ bytes.fromhex(obj["yearField"].replace("-", " ")),
+ int(obj.get("tidWidth", 0)),
+ int(obj.get("midValue", 0)),
+ HeaderFlag .fromJSONObject(obj.get("headerFlags", {})),
+ ChecksumFlag .fromJSONObject(obj.get("checksumFlags", {})),
+ IdentifierFlag.fromJSONObject(obj.get("idFlags", {}))
+ )
+ def toJSONObject(self) -> JSONGroupedObject:
+ return _groupJSONObject(
+ self, (
+ "pcb",
+ ), (
+ "dataKey",
+ "yearField",
+ "tidWidth",
+ "midValue"
+ ), (
+ "headerFlags",
+ "checksumFlags",
+ "idFlags",
+ "tidWidth",
+ "midValue"
+ )
+ )
+ def toBinary(self) -> bytes:
+ return _CART_INFO_STRUCT.pack(
+ self.dataKey,
+ self.yearField,
+ self.pcb,
+ self.tidWidth,
+ self.midValue,
+ self.headerFlags,
+ self.checksumFlags,
+ self.idFlags
+ )
+class GameInfo:
+ specifications: list[str]
+ code: str
+ regions: list[str]
+ identifiers: list[str | None]
+ name: str
+ series: str | None
+ year: int
+ flags: GameFlag
+ bootloaderVersion: str | None = None
+ rtcHeader: ROMHeaderInfo | None = None
+ flashHeader: ROMHeaderInfo | None = None
+ installCart: CartInfo | None = None
+ gameCart: CartInfo | None = None
+ @staticmethod
+ def fromJSONObject(obj: Mapping[str, Any]) -> Self:
+ rtcHeader: Mapping[str, Any] | None = obj.get("rtcHeader", None)
+ flashHeader: Mapping[str, Any] | None = obj.get("flashHeader", None)
+ installCart: Mapping[str, Any] | None = obj.get("installCart", None)
+ gameCart: Mapping[str, Any] | None = obj.get("gameCart", None)
+ return GameInfo(
+ obj["specifications"],
+ obj["code"],
+ obj["regions"],
+ obj["identifiers"],
+ obj["name"],
+ obj.get("series", None),
+ obj["year"],
+ GameFlag.fromJSONObject(obj.get("flags", {})),
+ obj.get("bootloaderVersion", None),
+ ROMHeaderInfo.fromJSONObject(rtcHeader) if rtcHeader else None,
+ ROMHeaderInfo.fromJSONObject(flashHeader) if flashHeader else None,
+ CartInfo .fromJSONObject(installCart) if installCart else None,
+ CartInfo .fromJSONObject(gameCart) if gameCart else None
+ )
+ def toJSONObject(self) -> JSONGroupedObject:
+ return _groupJSONObject(
+ self, (
+ "specifications",
+ "code",
+ "regions",
+ "identifiers"
+ ), (
+ "name",
+ "series",
+ "year"
+ ), (
+ "flags",
+ ), (
+ "bootloaderVersion",
+ ), (
+ "rtcHeader",
+ "flashHeader",
+ "installCart",
+ "gameCart"
+ )
+ )
+ def toBinary(self, nameOffset: int) -> bytes:
+ if len(self.specifications) > _MAX_SPECIFICATIONS:
+ raise ValueError(
+ f"entry can only have up to {_MAX_SPECIFICATIONS} "
+ f"specification codes"
+ )
+ if len(self.regions) > _MAX_REGIONS:
+ raise ValueError(
+ f"entry can only have up to {_MAX_REGIONS} region codes"
+ )
+ # FIXME: identifiers, series and bootloaderVersion are not currently
+ # included in the binary format
+ return _GAME_INFO_STRUCT.pack(
+ b"".join(sorted(
+ spec.encode("ascii").ljust(2, b"\0")
+ for spec in self.specifications
+ )),
+ b"".join(sorted(
+ region.encode("ascii").ljust(3, b"\0")
+ for region in self.regions
+ )),
+ self.code.encode("ascii"),
+ self.flags,
+ nameOffset,
+ self.year,
+ self.rtcHeader .toBinary(),
+ self.flashHeader.toBinary(),
+ self.installCart.toBinary(),
+ self.gameCart .toBinary()
+ )
diff --git a/tools/common/util.py b/tools/common/util.py
index 5be263b..d1f5952 100644
--- a/tools/common/util.py
+++ b/tools/common/util.py
@@ -125,9 +125,11 @@ def hashData(data: Iterable[int]) -> int:
def checksum8(data: Iterable[int], invert: bool = False) -> int:
return (sum(data) & 0xff) ^ (0xff if invert else 0)
-def checksum16(data: Iterable[int], invert: bool = False) -> int:
+def checksum16(
+ data: Iterable[int], endianness: str = "little", invert: bool = False
+) -> int:
it: Iterator = iter(data)
- values: map[int] = map(lambda x: x[0] | (x[1] << 8), zip(it, it))
+ values: map[int] = map(lambda x: int.from_bytes(x, endianness), zip(it, it))
return (sum(values) & 0xffff) ^ (0xffff if invert else 0)
@@ -140,6 +142,37 @@ def shortenedMD5(data: ByteString) -> bytearray:
return output
+## CRC calculation
+_CRC8_POLY: int = 0x8c
+def dsCRC8(data: ByteString) -> int:
+ crc: int = 0
+ for byte in data:
+ for _ in range(8):
+ temp: int = crc ^ byte
+ byte >>= 1
+ crc >>= 1
+ if temp & 1:
+ crc ^= _CRC8_POLY
+ return crc & 0xff
+def sidCRC16(data: ByteString, width: int = 16) -> int:
+ crc: int = 0
+ for i, byte in enumerate(data):
+ for j in range(i * 8, (i + 1) * 8):
+ if byte & 1:
+ crc ^= 1 << (j % width)
+ byte >>= 1
+ return crc & 0xffff
## Logging
def setupLogger(level: int | None):
@@ -228,7 +261,7 @@ class JSONFormatter:
lastIndex: int = len(obj) - 1
- for index, ( key, value ) in obj.items():
+ for index, ( key, value ) in enumerate(obj.items()):
yield from self.serialize(key)
yield self._inlineSep(":")
yield from self.serialize(value)
@@ -252,6 +285,9 @@ class JSONFormatter:
lastGroupIndex: int = len(groups) - 1
for groupIndex, obj in enumerate(groups):
+ if not obj:
+ raise ValueError("empty groups are not allowed")
lastIndex: int = len(obj) - 1
for index, item in enumerate(obj):
@@ -279,6 +315,9 @@ class JSONFormatter:
lastGroupIndex: int = len(groups) - 1
for groupIndex, obj in enumerate(groups):
+ if not obj:
+ raise ValueError("empty groups are not allowed")
keys: list[str] = [
("".join(self.serialize(key)) + self._inlineSep(":"))
for key in obj.keys()
@@ -317,9 +356,9 @@ class JSONFormatter:
case JSONGroupedObject() if not groupedOnSingleLine:
yield from self._groupedObject(obj.groups)
- case list() | tuple() if ungroupedOnSingleLine:
+ case (list() | tuple()) if ungroupedOnSingleLine:
yield from self._singleLineArray(obj)
- case list() | tuple() if not ungroupedOnSingleLine:
+ case (list() | tuple()) if not ungroupedOnSingleLine:
yield from self._groupedArray(( obj, ))
case Mapping() if ungroupedOnSingleLine:
@@ -355,7 +394,7 @@ class HashTableBuilder:
self.entries[index] = entry
return index
if bucket.fullHash == fullHash:
- raise KeyError(f"collision detected, hash={fullHash:#010x}")
+ raise KeyError(f"hash collision detected ({fullHash:#010x})")
# Otherwise, follow the buckets's chain, find the last chained item and
# link the new entry to it.
@@ -363,7 +402,7 @@ class HashTableBuilder:
bucket = self.entries[bucket.chainIndex]
if bucket.fullHash == fullHash:
- raise KeyError(f"collision detected, hash={fullHash:#010x}")
+ raise KeyError(f"hash collision detected, ({fullHash:#010x})")
bucket.chainIndex = len(self.entries)
diff --git a/tools/decodeDump.py b/tools/decodeDump.py
index 7f70227..14fdb99 100755
--- a/tools/decodeDump.py
+++ b/tools/decodeDump.py
@@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License along with
# 573in1. If not, see .
-__version__ = "0.4.1"
+__version__ = "1.0.3"
__author__ = "spicyjpeg"
import sys
@@ -63,7 +63,7 @@ def createParser() -> ArgumentParser:
parser = ArgumentParser(
description = \
"Decodes and displays or saves the contents of a QR code cartridge "
- "dump generated by the tool.",
+ "dump generated by 573in1.",
add_help = False
@@ -78,7 +78,7 @@ def createParser() -> ArgumentParser:
"-i", "--input",
type = FileType("rb"),
- help = "Read dump (.dmp file) or QR string from specified path",
+ help = "Read dump (.dmp file) or QR code string from specified path",
metavar = "file"
@@ -100,7 +100,7 @@ def createParser() -> ArgumentParser:
type = str,
nargs = "?",
- help = "QR string to decode (if -i was not passed)"
+ help = "QR code string to decode (if -i was not passed)"
return parser
@@ -114,19 +114,20 @@ def main():
data: bytes = file.read()
- dump: CartDump = parseCartDump(data)
+ dump: CartDump = CartDump.fromBinary(data)
- dump: CartDump = parseCartQRString(data.decode("ascii"))
+ dump: CartDump = CartDump.fromQRString(data.decode("utf-8"))
elif args.data:
- dump: CartDump = parseCartQRString(args.data)
+ dump: CartDump = CartDump.fromQRString(args.data)
parser.error("a dump must be passed on the command line or using -i")
if args.log:
printDumpInfo(dump, args.log)
if args.export:
with args.export as file:
- file.write(dump.serialize())
+ file.write(dump.toBinary())
if __name__ == "__main__":