From cc885a0270d18e7071757c3c0bb6390256a41c81 Mon Sep 17 00:00:00 2001 From: Viv Date: Fri, 2 Jun 2023 16:33:44 -0400 Subject: [PATCH] Refactor functions into individual modules --- tja2fumen/__init__.py | 0 tja2fumen/constants.py | 71 +++++ tja2fumen/converters.py | 91 ++++++ tja2fumen/parsers.py | 455 +++++++++++++++++++++++++++++ tja2fumen/parsetja.py | 245 ---------------- tja2fumen/tja2fumen.py | 633 ++-------------------------------------- tja2fumen/utils.py | 102 +++++++ tja2fumen/writers.py | 63 ++++ 8 files changed, 808 insertions(+), 852 deletions(-) create mode 100644 tja2fumen/__init__.py create mode 100644 tja2fumen/constants.py create mode 100644 tja2fumen/converters.py create mode 100644 tja2fumen/parsers.py delete mode 100644 tja2fumen/parsetja.py create mode 100644 tja2fumen/utils.py create mode 100644 tja2fumen/writers.py diff --git a/tja2fumen/__init__.py b/tja2fumen/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tja2fumen/constants.py b/tja2fumen/constants.py new file mode 100644 index 0000000..e0b6318 --- /dev/null +++ b/tja2fumen/constants.py @@ -0,0 +1,71 @@ +# Various commands and header fields for TJA files +HEADER_GLOBAL = ['TITLE', 'TITLEJA', 'SUBTITLE', 'SUBTITLEJA', 'BPM', 'WAVE', 'OFFSET', 'DEMOSTART', 'GENRE'] +HEADER_COURSE = ['COURSE', 'LEVEL', 'BALLOON', 'SCOREINIT', 'SCOREDIFF', 'TTRO' 'WBEAT'] +BRANCH_COMMANDS = ['START', 'END', 'BRANCHSTART', 'BRANCHEND', 'N', 'E', 'M'] +MEASURE_COMMANDS = ['MEASURE', 'GOGOSTART', 'GOGOEND', 'SCROLL', 'BPMCHANGE', 'TTBREAK' 'LEVELHOLD'] +UNUSED_COMMANDS = ['DELAY', 'SECTION', 'BMSCROLL', 'HBSCROLL', 'BARLINEON', 'BARLINEOFF'] +COMMAND = BRANCH_COMMANDS + MEASURE_COMMANDS + UNUSED_COMMANDS + +# Note types for TJA files +TJA_NOTE_TYPES = { + '1': 'Don', + '2': 'Ka', + '3': 'DON', + '4': 'KA', + '5': 'Drumroll', + '6': 'DRUMROLL', + '7': 'Balloon', + '8': 'EndDRB', + '9': 'Kusudama', + 'A': 'DON', # hands + 'B': 'KA', # hands +} + +# Note types for fumen files +noteTypes = { + 0x1: "Don", # ドン + 0x2: "Don2", # ド + 0x3: "Don3", # コ + 0x4: "Ka", # カッ + 0x5: "Ka2", # カ + 0x6: "Drumroll", + 0x7: "DON", + 0x8: "KA", + 0x9: "DRUMROLL", + 0xa: "Balloon", + 0xb: "DON2", # hands + 0xc: "Kusudama", + 0xd: "KA2", # hands + 0xe: "Unknown1", # ? (Present in some Wii1 songs) + 0xf: "Unknown2", # ? (Present in some PS4 songs) + 0x10: "Unknown3", # ? (Present in some Wii1 songs) + 0x11: "Unknown4", # ? (Present in some Wii1 songs) + 0x12: "Unknown5", # ? (Present in some Wii4 songs) + 0x13: "Unknown6", # ? (Present in some Wii1 songs) + 0x14: "Unknown7", # ? (Present in some PS4 songs) + 0x15: "Unknown8", # ? (Present in some Wii1 songs) + 0x16: "Unknown9", # ? (Present in some Wii1 songs) + 0x17: "Unknown10", # ? (Present in some Wii4 songs) + 0x18: "Unknown11", # ? (Present in some PS4 songs) + 0x19: "Unknown12", # ? (Present in some PS4 songs) + 0x22: "Unknown13", # ? (Present in some Wii1 songs) + 0x62: "Drumroll2" # ? +} +typeNotes = {v: k for k, v in noteTypes.items()} + +branchNames = ("normal", "advanced", "master") + +# Fumen headers are made up of smaller substrings of bytes +byte_strings = { + 'x00': b'\x00\x00\x00\x00\x00\x00', + '431': b'43\xc8Ag&\x96B"\xe2\xd8B', + '432': b'43\xc8Ag&\x96BD\x84\xb7B', + '433': b'43\xc8A"\xe2\xd8B\x00@\xfaB', + '434': b'43\xc8AD\x84\xb7B"\xe2\xd8B', + 'g1': b'g&\x96B4\xa3\x89Cxw\x05A', + 'V1': b'V\xd5&B\x00@\xfaB\x00@\xfaB', + 'V2': b'V\xd5&B"\xe2\xd8B\x00@\xfaB', + 'V3': b'V\xd5&B\x00@\xfaB\xf0\xce\rC', +} + +simpleHeaders = [b * 36 for b in [byte_strings['431'], byte_strings['V1'], byte_strings['V2']]] diff --git a/tja2fumen/converters.py b/tja2fumen/converters.py new file mode 100644 index 0000000..97faca3 --- /dev/null +++ b/tja2fumen/converters.py @@ -0,0 +1,91 @@ +from copy import deepcopy + +from constants import TJA_NOTE_TYPES + +# Filler metadata that the `writeFumen` function expects +default_note = {'type': '', 'pos': 0.0, 'item': 0, 'padding': 0.0, + 'scoreInit': 0, 'scoreDiff': 0, 'durationPadding': 0.0} +default_branch = {'length': 0, 'padding': 0, 'speed': 1.0} +default_measure = { + 'bpm': 0.0, + 'fumenOffset': 0.0, + 'gogo': False, + 'hidden': False, + 'padding1': 0, + 'branchInfo': [-1, -1, -1, -1, -1, -1], + 'padding2': 0, + 'normal': deepcopy(default_branch), + 'advanced': deepcopy(default_branch), + 'master': deepcopy(default_branch) +} + + +def convertTJAToFumen(fumen, tja): + # Fumen offset for the first measure that has a barline + fumenOffset1 = float(tja['metadata']['offset']) * -1000 + + # Variables that will change over time due to events + currentBPM = 0.0 + currentGogo = False + currentHidden = False + currentBranch = 'normal' # TODO: Program in branch support + + # Parse TJA measures to create converted TJA -> Fumen file + tjaConverted = {'measures': []} + for i, measureTJA in enumerate(tja['measures']): + measureFumenExample = fumen['measures'][i+9] + measureFumen = deepcopy(default_measure) + + # TODO Event: GOGOTIME + + # TODO Event: HIDDEN + + # TODO Event: BARLINE + + # TODO Event: MEASURE + + # Event: BPMCHANGE + # TODO: Handle TJA measure being broken up into multiple Fumen measures due to mid-measure BPM changes + midMeasureBPM = [(0, currentBPM)] + for event in measureTJA['events']: + if event['name'] == 'bpm': + currentBPM = float(event['value']) + if event['position'] == 0: + midMeasureBPM[0] = (0, currentBPM,) + else: + midMeasureBPM.append((event['position'], currentBPM)) + if len(midMeasureBPM) > 1: + test = None + measureFumen['bpm'] = currentBPM + + # TODO: `measureFumen['fumenOffset'] + # Will need to account for BARLINEON and BARLINEOFF. + # Some examples that line up with actual fumen data: + # fumenOffset0 = (fumenOffset1 - measureLength) + # fumenOffset2 = (fumenOffset1 + measureLength) + measureLength = 240_000 / currentBPM + # measureFumen['fumenOffset'] = prev['fumenOffset'] + measureLength + + # Create note dictionaries based on TJA measure data (containing 0's plus 1/2/3/4/etc. for notes) + note_counter = 0 + for i, note_value in enumerate(measureTJA['data']): + if note_value != '0': + note = deepcopy(default_note) + note['pos'] = measureLength * (i / len(measureTJA['data'])) + note['type'] = TJA_NOTE_TYPES[note_value] # TODO: Handle BALLOON/DRUMROLL + note['scoreInit'] = tja['scoreInit'] # Probably not fully accurate + note['scoreDiff'] = tja['scoreDiff'] # Probably not fully accurate + measureFumen[currentBranch][note_counter] = note + note_counter += 1 + measureFumen[currentBranch]['length'] = note_counter + + # Append the measure to the tja's list of measures + tjaConverted['measures'].append(measureFumen) + + tjaConverted['headerUnknown'] = b'x\00' * 80 + tjaConverted['order'] = '<' + tjaConverted['length'] = len(tjaConverted['measures']) + tjaConverted['unknownMetadata'] = 0 + tjaConverted['branches'] = False + + return tjaConverted diff --git a/tja2fumen/parsers.py b/tja2fumen/parsers.py new file mode 100644 index 0000000..8642da7 --- /dev/null +++ b/tja2fumen/parsers.py @@ -0,0 +1,455 @@ +import os +import re + +from utils import readStruct, getBool, shortHex, nameValue, debugPrint, checkValidHeader +from constants import ( + # TJA constants + HEADER_GLOBAL, HEADER_COURSE, BRANCH_COMMANDS, MEASURE_COMMANDS, COMMAND, + # Fumen constants + branchNames, noteTypes +) + + +######################################################################################################################## +# TJA-parsing functions ( Original source: https://github.com/WHMHammer/tja-tools/blob/master/src/js/parseTJA.js) +######################################################################################################################## + + +def parseTJA(tja): + # Split into lines + lines = tja.read().splitlines() + lines = [line for line in lines if line] # Discard empty lines + + # Line by line + headers = {} + courses = {} + currentCourse = '' + for line in lines: + parsed = parseLine(line) + # Case 1: Comments (ignore) + if parsed['type'] == 'comment': + pass + # Case 2: Global header metadata + elif parsed['type'] == 'header' and parsed['scope'] == 'global': + headers[parsed['name'].lower()] = parsed['value'] + # Case 3: Course data (metadata, commands, note data) + else: + # Check to see if we're starting a new course + if parsed['type'] == 'header' and parsed['scope'] == 'course' and parsed['name'] == 'COURSE': + currentCourse = parsed['value'] + if currentCourse not in courses.keys(): + courses[currentCourse] = [] + # Append the line to the current course + courses[currentCourse].append(parsed) + + # Convert parsed course lines into actual note data + songs = {} + for courseName, courseLines in courses.items(): + courseHeader, courseMeasures = getCourse(headers, courseLines) + songs[courseName] = applyFumenStructureToParsedTJA(headers, courseHeader, courseMeasures) + + return songs + + +def parseLine(line): + # Regex matches for various line types + match_comment = re.match(r"//.*", line) + match_header = re.match(r"^([A-Z]+):(.+)", line) + match_command = re.match(r"^#([A-Z]+)(?:\s+(.+))?", line) + match_data = re.match(r"^(([0-9]|A|B|C|F|G)*,?)$", line) + + if match_comment: + return {"type": 'comment', "value": line} + + elif match_header: + nameUpper = match_header.group(1).upper() + value = match_header.group(2) + if nameUpper in HEADER_GLOBAL: + return {"type": 'header', "scope": 'global', "name": nameUpper, "value": value.strip()} + elif nameUpper in HEADER_COURSE: + return {"type": 'header', "scope": 'course', "name": nameUpper, "value": value.strip()} + + elif match_command: + nameUpper = match_command.group(1).upper() + value = match_command.group(2) if match_command.group(2) else '' + if nameUpper in COMMAND: + return {"type": 'command', "name": nameUpper, "value": value.strip()} + + elif match_data: + return {"type": 'data', "data": match_data.group(1)} + + return {"type": 'unknown', "value": line} + + +def getCourse(tjaHeaders, lines): + def parseHeaderMetadata(line): + nonlocal headers + if line["name"] == 'COURSE': + headers['course'] = line['value'] + elif line["name"] == 'LEVEL': + headers['level'] = int(line['value']) + elif line["name"] == 'SCOREINIT': + headers['scoreInit'] = int(line['value']) + elif line["name"] == 'SCOREDIFF': + headers['scoreDiff'] = int(line['value']) + elif line["name"] == 'TTROWBEAT': + headers['ttRowBeat'] = int(line['value']) + elif line["name"] == 'BALLOON': + if line['value']: + balloons = [int(v) for v in line['value'].split(",")] + else: + balloons = [] + headers['balloon'] = balloons + + def parseBranchCommands(line): + nonlocal flagLevelhold, targetBranch, currentBranch + if line["name"] == 'BRANCHSTART': + if flagLevelhold: + return + values = line['value'].split(',') + if values[0] == 'r': + if len(values) >= 3: + targetBranch = 'M' + elif len(values) == 2: + targetBranch = 'E' + else: + targetBranch = 'N' + elif values[0] == 'p': + if len(values) >= 3 and float(values[2]) <= 100: + targetBranch = 'M' + elif len(values) >= 2 and float(values[1]) <= 100: + targetBranch = 'E' + else: + targetBranch = 'N' + elif line["name"] == 'BRANCHEND': + currentBranch = targetBranch + elif line["name"] == 'N': + currentBranch = 'N' + elif line["name"] == 'E': + currentBranch = 'E' + elif line["name"] == 'M': + currentBranch = 'M' + elif line["name"] == 'START' or line['name'] == 'END': + currentBranch = 'N' + targetBranch = 'N' + flagLevelhold = False + + def parseMeasureCommands(line): + nonlocal measureDivisor, measureDividend, measureEvents, measureProperties, flagLevelhold + if line['name'] == 'MEASURE': + matchMeasure = re.match(r"(\d+)/(\d+)", line['value']) + if not matchMeasure: + return + measureDividend = int(matchMeasure.group(1)) + measureDivisor = int(matchMeasure.group(2)) + elif line['name'] == 'GOGOSTART': + measureEvents.append({"name": 'gogoStart', "position": len(measureData)}) + elif line['name'] == 'GOGOEND': + measureEvents.append({"name": 'gogoEnd', "position": len(measureData)}) + elif line['name'] == 'SCROLL': + measureEvents.append({"name": 'scroll', "position": len(measureData), "value": float(line['value'])}) + elif line['name'] == 'BPMCHANGE': + measureEvents.append({"name": 'bpm', "position": len(measureData), "value": float(line['value'])}) + elif line['name'] == 'TTBREAK': + measureProperties['ttBreak'] = True + elif line['name'] == 'LEVELHOLD': + flagLevelhold = True + + def parseMeasureData(line): + nonlocal measures, measureData, measureDividend, measureDivisor, measureEvents, measureProperties + data = line['data'] + # If measure has ended, then append the measure and start anew + if data.endswith(','): + measureData += data[0:-1] + measure = { + "length": [measureDividend, measureDivisor], + "properties": measureProperties, + "data": measureData, + "events": measureEvents, + } + measures.append(measure) + measureData = '' + measureEvents = [] + measureProperties = {} + # Otherwise, keep tracking measureData + else: + measureData += data + + # Define state variables + headers = {'balloon': []} # Charters sometimes exclude `BALLOON` entirely if there are none + measures = [] + measureDividend = 4 + measureDivisor = 4 + measureProperties = {} + measureData = '' + measureEvents = [] + currentBranch = 'N' + targetBranch = 'N' + flagLevelhold = False + + # Process course lines + for line in lines: + if line["type"] == 'header': + parseHeaderMetadata(line) + elif line["type"] == 'command' and line['name'] in BRANCH_COMMANDS: + parseBranchCommands(line) + elif line["type"] == 'command' and line['name'] in MEASURE_COMMANDS and currentBranch == targetBranch: + parseMeasureCommands(line) + elif line['type'] == 'data' and currentBranch == targetBranch: + parseMeasureData(line) + + # Post-processing: Ensure the first measure has a BPM event + if measures: + firstBPMEventFound = False + # Search for BPM event in the first measure + for i in range(len(measures[0]['events'])): + evt = measures[0]['events'][i] + if evt.name == 'bpm' and evt.position == 0: + firstBPMEventFound = True + # If not present, insert a BPM event into the first measure using the global header metadata + if not firstBPMEventFound: + # noinspection PyTypeChecker + measures[0]['events'].insert(0, {"name": 'bpm', "position": 0, "value": tjaHeaders['bpm']}) + + # Post-processing: In case the file doesn't end on a "measure end" symbol (','), append whatever is left + if measureData: + measures.append({ + "length": [measureDividend, measureDivisor], + "properties": measureProperties, + "data": measureData, + "events": measureEvents, + }) + + # Post-processing: Otherwise, if the file ends on a measure event (e.g. #GOGOEND), append any remaining events + elif measureEvents: + for event in measureEvents: + event['position'] = len(measures[len(measures) - 1]['data']) + # noinspection PyTypeChecker + measures[len(measures) - 1]['events'].append(event) + + return headers, measures + + +def applyFumenStructureToParsedTJA(globalHeader, courseHeader, measures): + """Merge song metadata, course metadata, and course data into a single fumen-like object.""" + song = {'measures': [], 'metadata': {}} + + for k, v in globalHeader.items(): + song['metadata'][k] = v + + for k, v in courseHeader.items(): + if k in ['scoreInit', 'scoreDiff']: + song[k] = v + else: + song['metadata'][k] = v + + for i, measure in enumerate(measures): + song['measures'].append(measure) + + return song + + +######################################################################################################################## +# Fumen-parsing functions +######################################################################################################################## + +# Fumen format reverse engineering TODOs +# TODO: Figure out the remaining header bytes represent (0x1b0 to 0x207) +# TODO: Figure out what drumroll bytes are (8 bytes after every drumroll) +# NB: fumen2osu.py assumed these were padding bytes, but they're not!! They contain some sort of metadata. +# TODO: Figure out what the unknown Wii1, Wii4, and PS4 notes represent (just in case they're important somehow) + + +def readFumen(fumenFile, byteOrder=None, debug=False): + """ + Parse bytes of a fumen .bin file into nested measure, branch, and note dictionaries. + + For more information on any of the terms used in this function (e.g. scoreInit, scoreDiff), + please refer to KatieFrog's excellent guide: https://gist.github.com/KatieFrogs/e000f406bbc70a12f3c34a07303eec8b + """ + if type(fumenFile) is str: + file = open(fumenFile, "rb") + else: + file = fumenFile + size = os.fstat(file.fileno()).st_size + + # Check for valid fumen header (first 432 bytes) using valid byte substrings + fumenHeader = file.read(432) + if not checkValidHeader(fumenHeader): + debugPrint(f"Invalid header!") + # Read the next 80 bytes, which contains unknown information + fumenHeaderUnknown = file.read(80) + + # Determine: + # - The byte order (big or little endian) + # - The total number of measures from byte 0x200 (decimal 512) + if byteOrder: + order = ">" if byteOrder == "big" else "<" + totalMeasures = readStruct(file, order, format_string="I", seek=0x200)[0] + else: + # Use the number of measures to determine the byte order + measuresBig = readStruct(file, order="", format_string=">I", seek=0x200)[0] + measuresLittle = readStruct(file, order="", format_string="" else "Little" + )) + + # Start reading measure data from position 0x208 (decimal 520) + file.seek(0x208) + for measureNumber in range(totalMeasures): + # Parse the measure data using the following `format_string`: + # "ffBBHiiiiiii" (12 format characters, 40 bytes per measure) + # - 'f': BPM (represented by one float (4 bytes)) + # - 'f': fumenOffset (represented by one float (4 bytes)) + # - 'B': gogo (represented by one unsigned char (1 byte)) + # - 'B': hidden (represented by one unsigned char (1 byte)) + # - 'H': (represented by one unsigned short (2 bytes)) + # - 'iiiiii': branchInfo (represented by six integers (24 bytes)) + # - 'i': (represented by one integer (4 bytes) + measureStruct = readStruct(file, order, format_string="ffBBHiiiiiii") + + # Create the measure dictionary using the newly-parsed measure data + measure = {} + measure["bpm"] = measureStruct[0] + measure["fumenOffset"] = measureStruct[1] + if measureNumber == 0: + measure["offset"] = measure["fumenOffset"] + 240000 / measure["bpm"] + else: + prev = song['measures'][measureNumber - 1] + measure["offset"] = ((prev["offset"] + measure["fumenOffset"] + 240000) / + (measure["bpm"] - prev["fumenOffset"] - 240000 / prev["bpm"])) + measure["gogo"] = getBool(measureStruct[2]) + measure["hidden"] = getBool(measureStruct[3]) + measure["padding1"] = measureStruct[4] + measure["branchInfo"] = list(measureStruct[5:11]) + measure["padding2"] = measureStruct[11] + + # Iterate through the three branch types + for branchNumber in range(len(branchNames)): + # Parse the measure data using the following `format_string`: + # "HHf" (3 format characters, 8 bytes per branch) + # - 'H': totalNotes (represented by one unsigned short (2 bytes)) + # - 'H': (represented by one unsigned short (2 bytes)) + # - 'f': speed (represented by one float (4 bytes) + branchStruct = readStruct(file, order, format_string="HHf") + + # Create the branch dictionary using the newly-parsed branch data + branch = {} + totalNotes = branchStruct[0] + branch["length"] = totalNotes + branch["padding"] = branchStruct[1] + branch["speed"] = branchStruct[2] + + # Print debug metadata about the branches + if debug and (hasBranches or branchNumber == 0 or totalNotes != 0): + branchName = " ({0})".format( + branchNames[branchNumber] + ) if hasBranches or branchNumber != 0 else "" + fileOffset = file.tell() + debugPrint("") + debugPrint("Measure #{0}{1} at {2}-{3} ({4})".format( + measureNumber + 1, + branchName, + shortHex(fileOffset - 0x8), + shortHex(fileOffset + 0x18 * totalNotes), + nameValue(measure, branch) + )) + debugPrint("Total notes: {0}".format(totalNotes)) + + # Iterate through each note in the measure (per branch) + for noteNumber in range(totalNotes): + if debug: + fileOffset = file.tell() + debugPrint("Note #{0} at {1}-{2}".format( + noteNumber + 1, + shortHex(fileOffset), + shortHex(fileOffset + 0x17) + ), end="") + + # Parse the note data using the following `format_string`: + # "ififHHf" (7 format characters, 24 bytes per note cluster) + # - 'i': note type + # - 'f': note position + # - 'i': item + # - 'f': + # - 'H': scoreInit + # - 'H': scoreDiff + # - 'f': duration + # NB: 'item' doesn't seem to be used at all in this function. + noteStruct = readStruct(file, order, format_string="ififHHf") + + # Validate the note type + noteType = noteStruct[0] + if noteType not in noteTypes: + raise ValueError("Error: Unknown note type '{0}' at offset {1}".format( + shortHex(noteType).upper(), + hex(file.tell() - 0x18)) + ) + + # Create the note dictionary using the newly-parsed note data + note = {} + note["type"] = noteTypes[noteType] + note["pos"] = noteStruct[1] + note["item"] = noteStruct[2] + note["padding"] = noteStruct[3] + if noteType == 0xa or noteType == 0xc: + # Balloon hits + note["hits"] = noteStruct[4] + note["hitsPadding"] = noteStruct[5] + else: + note['scoreInit'] = noteStruct[4] + note['scoreDiff'] = noteStruct[5] / 4.0 + if "scoreInit" not in song: + song["scoreInit"] = note['scoreInit'] + song["scoreDiff"] = note['scoreDiff'] + if noteType == 0x6 or noteType == 0x9 or noteType == 0xa or noteType == 0xc: + # Drumroll and balloon duration in ms + note["duration"] = noteStruct[6] + else: + note['durationPadding'] = noteStruct[6] + + # Print debug information about the note + if debug: + debugPrint(" ({0})".format(nameValue(note))) + + # Seek forward 8 bytes to account for padding bytes at the end of drumrolls + if noteType == 0x6 or noteType == 0x9 or noteType == 0x62: + note["drumrollBytes"] = file.read(8) + + # Assign the note to the branch + branch[noteNumber] = note + + # Assign the branch to the measure + measure[branchNames[branchNumber]] = branch + + # Assign the measure to the song + song['measures'].append(measure) + if file.tell() >= size: + break + + file.close() + return song diff --git a/tja2fumen/parsetja.py b/tja2fumen/parsetja.py deleted file mode 100644 index 9c68c06..0000000 --- a/tja2fumen/parsetja.py +++ /dev/null @@ -1,245 +0,0 @@ -# Original source: https://github.com/WHMHammer/tja-tools/blob/master/src/js/parseTJA.js - -import re - -# Valid strings for headers and chart commands -HEADER_GLOBAL = ['TITLE', 'TITLEJA', 'SUBTITLE', 'SUBTITLEJA', 'BPM', 'WAVE', 'OFFSET', 'DEMOSTART', 'GENRE'] -HEADER_COURSE = ['COURSE', 'LEVEL', 'BALLOON', 'SCOREINIT', 'SCOREDIFF', 'TTRO' 'WBEAT'] -BRANCH_COMMANDS = ['START', 'END', 'BRANCHSTART', 'BRANCHEND', 'N', 'E', 'M'] -MEASURE_COMMANDS = ['MEASURE', 'GOGOSTART', 'GOGOEND', 'SCROLL', 'BPMCHANGE', 'TTBREAK' 'LEVELHOLD'] -UNUSED_COMMANDS = ['DELAY', 'SECTION', 'BMSCROLL', 'HBSCROLL', 'BARLINEON', 'BARLINEOFF'] -COMMAND = BRANCH_COMMANDS + MEASURE_COMMANDS + UNUSED_COMMANDS - - -def getCourse(tjaHeaders, lines): - def parseHeaderMetadata(line): - nonlocal headers - if line["name"] == 'COURSE': - headers['course'] = line['value'] - elif line["name"] == 'LEVEL': - headers['level'] = int(line['value']) - elif line["name"] == 'SCOREINIT': - headers['scoreInit'] = int(line['value']) - elif line["name"] == 'SCOREDIFF': - headers['scoreDiff'] = int(line['value']) - elif line["name"] == 'TTROWBEAT': - headers['ttRowBeat'] = int(line['value']) - elif line["name"] == 'BALLOON': - if line['value']: - balloons = [int(v) for v in line['value'].split(",")] - else: - balloons = [] - headers['balloon'] = balloons - - def parseBranchCommands(line): - nonlocal flagLevelhold, targetBranch, currentBranch - if line["name"] == 'BRANCHSTART': - if flagLevelhold: - return - values = line['value'].split(',') - if values[0] == 'r': - if len(values) >= 3: - targetBranch = 'M' - elif len(values) == 2: - targetBranch = 'E' - else: - targetBranch = 'N' - elif values[0] == 'p': - if len(values) >= 3 and float(values[2]) <= 100: - targetBranch = 'M' - elif len(values) >= 2 and float(values[1]) <= 100: - targetBranch = 'E' - else: - targetBranch = 'N' - elif line["name"] == 'BRANCHEND': - currentBranch = targetBranch - elif line["name"] == 'N': - currentBranch = 'N' - elif line["name"] == 'E': - currentBranch = 'E' - elif line["name"] == 'M': - currentBranch = 'M' - elif line["name"] == 'START' or line['name'] == 'END': - currentBranch = 'N' - targetBranch = 'N' - flagLevelhold = False - - def parseMeasureCommands(line): - nonlocal measureDivisor, measureDividend, measureEvents, measureProperties, flagLevelhold - if line['name'] == 'MEASURE': - matchMeasure = re.match(r"(\d+)/(\d+)", line['value']) - if not matchMeasure: - return - measureDividend = int(matchMeasure.group(1)) - measureDivisor = int(matchMeasure.group(2)) - elif line['name'] == 'GOGOSTART': - measureEvents.append({"name": 'gogoStart', "position": len(measureData)}) - elif line['name'] == 'GOGOEND': - measureEvents.append({"name": 'gogoEnd', "position": len(measureData)}) - elif line['name'] == 'SCROLL': - measureEvents.append({"name": 'scroll', "position": len(measureData), "value": float(line['value'])}) - elif line['name'] == 'BPMCHANGE': - measureEvents.append({"name": 'bpm', "position": len(measureData), "value": float(line['value'])}) - elif line['name'] == 'TTBREAK': - measureProperties['ttBreak'] = True - elif line['name'] == 'LEVELHOLD': - flagLevelhold = True - - def parseMeasureData(line): - nonlocal measures, measureData, measureDividend, measureDivisor, measureEvents, measureProperties - data = line['data'] - # If measure has ended, then append the measure and start anew - if data.endswith(','): - measureData += data[0:-1] - measure = { - "length": [measureDividend, measureDivisor], - "properties": measureProperties, - "data": measureData, - "events": measureEvents, - } - measures.append(measure) - measureData = '' - measureEvents = [] - measureProperties = {} - # Otherwise, keep tracking measureData - else: - measureData += data - - # Define state variables - headers = {'balloon': []} # Charters sometimes exclude `BALLOON` entirely if there are none - measures = [] - measureDividend = 4 - measureDivisor = 4 - measureProperties = {} - measureData = '' - measureEvents = [] - currentBranch = 'N' - targetBranch = 'N' - flagLevelhold = False - - # Process course lines - for line in lines: - if line["type"] == 'header': - parseHeaderMetadata(line) - elif line["type"] == 'command' and line['name'] in BRANCH_COMMANDS: - parseBranchCommands(line) - elif line["type"] == 'command' and line['name'] in MEASURE_COMMANDS and currentBranch == targetBranch: - parseMeasureCommands(line) - elif line['type'] == 'data' and currentBranch == targetBranch: - parseMeasureData(line) - - # Post-processing: Ensure the first measure has a BPM event - if measures: - firstBPMEventFound = False - # Search for BPM event in the first measure - for i in range(len(measures[0]['events'])): - evt = measures[0]['events'][i] - if evt.name == 'bpm' and evt.position == 0: - firstBPMEventFound = True - # If not present, insert a BPM event into the first measure using the global header metadata - if not firstBPMEventFound: - # noinspection PyTypeChecker - measures[0]['events'].insert(0, {"name": 'bpm', "position": 0, "value": tjaHeaders['bpm']}) - - # Post-processing: In case the file doesn't end on a "measure end" symbol (','), append whatever is left - if measureData: - measures.append({ - "length": [measureDividend, measureDivisor], - "properties": measureProperties, - "data": measureData, - "events": measureEvents, - }) - - # Post-processing: Otherwise, if the file ends on a measure event (e.g. #GOGOEND), append any remaining events - elif measureEvents: - for event in measureEvents: - event['position'] = len(measures[len(measures) - 1]['data']) - # noinspection PyTypeChecker - measures[len(measures) - 1]['events'].append(event) - - return headers, measures - - -def parseLine(line): - # Regex matches for various line types - match_comment = re.match(r"//.*", line) - match_header = re.match(r"^([A-Z]+):(.+)", line) - match_command = re.match(r"^#([A-Z]+)(?:\s+(.+))?", line) - match_data = re.match(r"^(([0-9]|A|B|C|F|G)*,?)$", line) - - if match_comment: - return {"type": 'comment', "value": line} - - elif match_header: - nameUpper = match_header.group(1).upper() - value = match_header.group(2) - if nameUpper in HEADER_GLOBAL: - return {"type": 'header', "scope": 'global', "name": nameUpper, "value": value.strip()} - elif nameUpper in HEADER_COURSE: - return {"type": 'header', "scope": 'course', "name": nameUpper, "value": value.strip()} - - elif match_command: - nameUpper = match_command.group(1).upper() - value = match_command.group(2) if match_command.group(2) else '' - if nameUpper in COMMAND: - return {"type": 'command', "name": nameUpper, "value": value.strip()} - - elif match_data: - return {"type": 'data', "data": match_data.group(1)} - - return {"type": 'unknown', "value": line} - - -def parseTJA(tja): - # Split into lines - lines = tja.read().splitlines() - lines = [line for line in lines if line] # Discard empty lines - - # Line by line - headers = {} - courses = {} - currentCourse = '' - for line in lines: - parsed = parseLine(line) - # Case 1: Comments (ignore) - if parsed['type'] == 'comment': - pass - # Case 2: Global header metadata - elif parsed['type'] == 'header' and parsed['scope'] == 'global': - headers[parsed['name'].lower()] = parsed['value'] - # Case 3: Course data (metadata, commands, note data) - else: - # Check to see if we're starting a new course - if parsed['type'] == 'header' and parsed['scope'] == 'course' and parsed['name'] == 'COURSE': - currentCourse = parsed['value'] - if currentCourse not in courses.keys(): - courses[currentCourse] = [] - # Append the line to the current course - courses[currentCourse].append(parsed) - - # Convert parsed course lines into actual note data - songs = {} - for courseName, courseLines in courses.items(): - courseHeader, courseMeasures = getCourse(headers, courseLines) - songs[courseName] = applyFumenStructureToParsedTJA(headers, courseHeader, courseMeasures) - - return songs - - -def applyFumenStructureToParsedTJA(globalHeader, courseHeader, measures): - """Merge song metadata, course metadata, and course data into a single fumen-like object.""" - song = { 'measures': [], 'metadata': {} } - - for k, v in globalHeader.items(): - song['metadata'][k] = v - - for k, v in courseHeader.items(): - if k in ['scoreInit', 'scoreDiff']: - song[k] = v - else: - song['metadata'][k] = v - - for i, measure in enumerate(measures): - song['measures'].append(measure) - - return song diff --git a/tja2fumen/tja2fumen.py b/tja2fumen/tja2fumen.py index 05f6adb..9ac646b 100644 --- a/tja2fumen/tja2fumen.py +++ b/tja2fumen/tja2fumen.py @@ -1,615 +1,34 @@ -import os -import sys -import struct -import argparse -from copy import deepcopy -from parsetja import parseTJA +from parsers import readFumen, parseTJA +from writers import writeFumen +from converters import convertTJAToFumen +from utils import checkMismatchedBytes tja2fumen_version = "v0.1" -# Fumen format reverse engineering TODOs -# TODO: Figure out the remaining header bytes represent (0x1b0 to 0x207) -# TODO: Figure out what drumroll bytes are (8 bytes after every drumroll) -# NB: fumen2osu.py assumed these were padding bytes, but they're not!! They contain some sort of metadata. -# TODO: Figure out what the unknown Wii1, Wii4, and PS4 notes represent (just in case they're important somehow) - -branchNames = ("normal", "advanced", "master") - -noteTypes = { - 0x1: "Don", # ドン - 0x2: "Don2", # ド - 0x3: "Don3", # コ - 0x4: "Ka", # カッ - 0x5: "Ka2", # カ - 0x6: "Drumroll", - 0x7: "DON", - 0x8: "KA", - 0x9: "DRUMROLL", - 0xa: "Balloon", - 0xb: "DON2", # hands - 0xc: "Kusudama", - 0xd: "KA2", # hands - 0xe: "Unknown1", # ? (Present in some Wii1 songs) - 0xf: "Unknown2", # ? (Present in some PS4 songs) - 0x10: "Unknown3", # ? (Present in some Wii1 songs) - 0x11: "Unknown4", # ? (Present in some Wii1 songs) - 0x12: "Unknown5", # ? (Present in some Wii4 songs) - 0x13: "Unknown6", # ? (Present in some Wii1 songs) - 0x14: "Unknown7", # ? (Present in some PS4 songs) - 0x15: "Unknown8", # ? (Present in some Wii1 songs) - 0x16: "Unknown9", # ? (Present in some Wii1 songs) - 0x17: "Unknown10", # ? (Present in some Wii4 songs) - 0x18: "Unknown11", # ? (Present in some PS4 songs) - 0x19: "Unknown12", # ? (Present in some PS4 songs) - 0x22: "Unknown13", # ? (Present in some Wii1 songs) - 0x62: "Drumroll2" # ? -} -typeNotes = {v: k for k, v in noteTypes.items()} - -# Fumen headers are made up of smaller substrings of bytes -b_x00 = b'\x00\x00\x00\x00\x00\x00' -b_431 = b'43\xc8Ag&\x96B"\xe2\xd8B' -b_432 = b'43\xc8Ag&\x96BD\x84\xb7B' -b_433 = b'43\xc8A"\xe2\xd8B\x00@\xfaB' -b_434 = b'43\xc8AD\x84\xb7B"\xe2\xd8B' -b_g1 = b'g&\x96B4\xa3\x89Cxw\x05A' -b_V1 = b'V\xd5&B\x00@\xfaB\x00@\xfaB' -b_V2 = b'V\xd5&B"\xe2\xd8B\x00@\xfaB' -b_V3 = b'V\xd5&B\x00@\xfaB\xf0\xce\rC' - -simpleHeaders = [b * 36 for b in [b_431, b_V1, b_V2]] - - -def checkValidHeader(header): - # These simple headers (substrings repeated 36 times) are used for many Gen2 systems (AC, Wii, etc.) - if header in simpleHeaders: - return True - # Starting with Gen3, they began using unique headers for every song. (3DS and PSPDX are the big offenders.) - # They seem to be some random combination of b_x00 + one of the non-null byte substrings. - # To avoid enumerating every combination of 432 bytes, we do a lazy check instead. - elif b_x00 in header and any(b in header for b in [b_431, b_432, b_433, b_434, b_V1, b_V2, b_V3]): - return True - # The PS4 song 'wii5op' is a special case: It throws in this odd b_g1 string in combo with other substrings. - elif b_g1 in header and any(b in header for b in [b_431, b_V2]): - return True - # Otherwise, this is some unknown header we haven't seen before. - # Typically, these will be tja2bin.exe converted files with a completely invalid header. - else: - return False - - -def readFumen(fumenFile, byteOrder=None, debug=False): - """ - Parse bytes of a fumen .bin file into nested measure, branch, and note dictionaries. - - For more information on any of the terms used in this function (e.g. scoreInit, scoreDiff), - please refer to KatieFrog's excellent guide: https://gist.github.com/KatieFrogs/e000f406bbc70a12f3c34a07303eec8b - """ - if type(fumenFile) is str: - file = open(fumenFile, "rb") - else: - file = fumenFile - size = os.fstat(file.fileno()).st_size - - # Check for valid fumen header (first 432 bytes) using valid byte substrings - fumenHeader = file.read(432) - if not checkValidHeader(fumenHeader): - debugPrint(f"Invalid header!") - # Read the next 80 bytes, which contains unknown information - fumenHeaderUnknown = file.read(80) - - # Determine: - # - The byte order (big or little endian) - # - The total number of measures from byte 0x200 (decimal 512) - if byteOrder: - order = ">" if byteOrder == "big" else "<" - totalMeasures = readStruct(file, order, format_string="I", seek=0x200)[0] - else: - # Use the number of measures to determine the byte order - measuresBig = readStruct(file, order="", format_string=">I", seek=0x200)[0] - measuresLittle = readStruct(file, order="", format_string="" else "Little" - )) - - # Start reading measure data from position 0x208 (decimal 520) - file.seek(0x208) - for measureNumber in range(totalMeasures): - # Parse the measure data using the following `format_string`: - # "ffBBHiiiiiii" (12 format characters, 40 bytes per measure) - # - 'f': BPM (represented by one float (4 bytes)) - # - 'f': fumenOffset (represented by one float (4 bytes)) - # - 'B': gogo (represented by one unsigned char (1 byte)) - # - 'B': hidden (represented by one unsigned char (1 byte)) - # - 'H': (represented by one unsigned short (2 bytes)) - # - 'iiiiii': branchInfo (represented by six integers (24 bytes)) - # - 'i': (represented by one integer (4 bytes) - measureStruct = readStruct(file, order, format_string="ffBBHiiiiiii") - - # Create the measure dictionary using the newly-parsed measure data - measure = {} - measure["bpm"] = measureStruct[0] - measure["fumenOffset"] = measureStruct[1] - if measureNumber == 0: - measure["offset"] = measure["fumenOffset"] + 240000 / measure["bpm"] - else: - prev = song['measures'][measureNumber - 1] - measure["offset"] = ((prev["offset"] + measure["fumenOffset"] + 240000) / - (measure["bpm"] - prev["fumenOffset"] - 240000 / prev["bpm"])) - measure["gogo"] = getBool(measureStruct[2]) - measure["hidden"] = getBool(measureStruct[3]) - measure["padding1"] = measureStruct[4] - measure["branchInfo"] = list(measureStruct[5:11]) - measure["padding2"] = measureStruct[11] - - # Iterate through the three branch types - for branchNumber in range(len(branchNames)): - # Parse the measure data using the following `format_string`: - # "HHf" (3 format characters, 8 bytes per branch) - # - 'H': totalNotes (represented by one unsigned short (2 bytes)) - # - 'H': (represented by one unsigned short (2 bytes)) - # - 'f': speed (represented by one float (4 bytes) - branchStruct = readStruct(file, order, format_string="HHf") - - # Create the branch dictionary using the newly-parsed branch data - branch = {} - totalNotes = branchStruct[0] - branch["length"] = totalNotes - branch["padding"] = branchStruct[1] - branch["speed"] = branchStruct[2] - - # Print debug metadata about the branches - if debug and (hasBranches or branchNumber == 0 or totalNotes != 0): - branchName = " ({0})".format( - branchNames[branchNumber] - ) if hasBranches or branchNumber != 0 else "" - fileOffset = file.tell() - debugPrint("") - debugPrint("Measure #{0}{1} at {2}-{3} ({4})".format( - measureNumber + 1, - branchName, - shortHex(fileOffset - 0x8), - shortHex(fileOffset + 0x18 * totalNotes), - nameValue(measure, branch) - )) - debugPrint("Total notes: {0}".format(totalNotes)) - - # Iterate through each note in the measure (per branch) - for noteNumber in range(totalNotes): - if debug: - fileOffset = file.tell() - debugPrint("Note #{0} at {1}-{2}".format( - noteNumber + 1, - shortHex(fileOffset), - shortHex(fileOffset + 0x17) - ), end="") - - # Parse the note data using the following `format_string`: - # "ififHHf" (7 format characters, 24 bytes per note cluster) - # - 'i': note type - # - 'f': note position - # - 'i': item - # - 'f': - # - 'H': scoreInit - # - 'H': scoreDiff - # - 'f': duration - # NB: 'item' doesn't seem to be used at all in this function. - noteStruct = readStruct(file, order, format_string="ififHHf") - - # Validate the note type - noteType = noteStruct[0] - if noteType not in noteTypes: - raise ValueError("Error: Unknown note type '{0}' at offset {1}".format( - shortHex(noteType).upper(), - hex(file.tell() - 0x18)) - ) - - # Create the note dictionary using the newly-parsed note data - note = {} - note["type"] = noteTypes[noteType] - note["pos"] = noteStruct[1] - note["item"] = noteStruct[2] - note["padding"] = noteStruct[3] - if noteType == 0xa or noteType == 0xc: - # Balloon hits - note["hits"] = noteStruct[4] - note["hitsPadding"] = noteStruct[5] - else: - note['scoreInit'] = noteStruct[4] - note['scoreDiff'] = noteStruct[5] / 4.0 - if "scoreInit" not in song: - song["scoreInit"] = note['scoreInit'] - song["scoreDiff"] = note['scoreDiff'] - if noteType == 0x6 or noteType == 0x9 or noteType == 0xa or noteType == 0xc: - # Drumroll and balloon duration in ms - note["duration"] = noteStruct[6] - else: - note['durationPadding'] = noteStruct[6] - - # Print debug information about the note - if debug: - debugPrint(" ({0})".format(nameValue(note))) - - # Seek forward 8 bytes to account for padding bytes at the end of drumrolls - if noteType == 0x6 or noteType == 0x9 or noteType == 0x62: - note["drumrollBytes"] = file.read(8) - - # Assign the note to the branch - branch[noteNumber] = note - - # Assign the branch to the measure - measure[branchNames[branchNumber]] = branch - - # Assign the measure to the song - song['measures'].append(measure) - if file.tell() >= size: - break - - file.close() - return song - - -def readStruct(file, order, format_string, seek=None): - """ - Interpret bytes as packed binary data. - - Arguments: - - file: The fumen's file object (presumably in 'rb' mode). - - order: '<' or '>' (little or big endian). - - format_string: String made up of format characters that describes the data layout. - Full list of available format characters: - (https://docs.python.org/3/library/struct.html#format-characters) - However, this script uses only the following format characters: - - B: unsigned char (1 byte) - - H: unsigned short (2 bytes) - - I: unsigned int (4 bytes) - - i: int (4 bytes) - - f: float (4 bytes) - - seek: The position of the read pointer to be used within the file. - - Return values: - - interpreted_string: A string containing interpreted byte values, - based on the specified 'fmt' format characters. - """ - if seek: - file.seek(seek) - expected_size = struct.calcsize(order + format_string) - byte_string = file.read(expected_size) - # One "official" fumen (AC11\deo\deo_n.bin) runs out of data early - # This workaround fixes the issue by appending 0's to get the size to match - if len(byte_string) != expected_size: - byte_string += (b'\x00' * (expected_size - len(byte_string))) - interpreted_string = struct.unpack(order + format_string, byte_string) - return interpreted_string - - -def writeFumen(file, song): - # Fetch the byte order (little/big endian) - order = song['order'] - - # Write the header - file.write(simpleHeaders[0]) # Write known, valid header - file.write(song['headerUnknown']) # Write unknown header - - # Preallocate space in the file - len_metadata = 8 - len_measures = 0 - for measureNumber in range(song['length']): - len_measures += 40 - measure = song['measures'][measureNumber] - for branchNumber in range(len(branchNames)): - len_measures += 8 - branch = measure[branchNames[branchNumber]] - for noteNumber in range(branch['length']): - len_measures += 24 - note = branch[noteNumber] - if note['type'].lower() == "drumroll": - len_measures += 8 - file.write(b'\x00' * (len_metadata + len_measures)) - - # Write metadata - writeStruct(file, order, format_string="B", value_list=[putBool(song['branches'])], seek=0x1b0) - writeStruct(file, order, format_string="I", value_list=[song['length']], seek=0x200) - writeStruct(file, order, format_string="I", value_list=[song['unknownMetadata']], seek=0x204) - - # Write measure data - file.seek(0x208) - for measureNumber in range(song['length']): - measure = song['measures'][measureNumber] - measureStruct = [measure['bpm'], measure['fumenOffset'], int(measure['gogo']), int(measure['hidden'])] - measureStruct.extend([measure['padding1']] + measure['branchInfo'] + [measure['padding2']]) - writeStruct(file, order, format_string="ffBBHiiiiiii", value_list=measureStruct) - - for branchNumber in range(len(branchNames)): - branch = measure[branchNames[branchNumber]] - branchStruct = [branch['length'], branch['padding'], branch['speed']] - writeStruct(file, order, format_string="HHf", value_list=branchStruct) - - for noteNumber in range(branch['length']): - note = branch[noteNumber] - noteStruct = [typeNotes[note['type']], note['pos'], note['item'], note['padding']] - # Balloon hits - if 'hits' in note.keys(): - noteStruct.extend([note["hits"], note['hitsPadding']]) - else: - noteStruct.extend([note['scoreInit'], int(note['scoreDiff'] * 4)]) - # Drumroll or balloon duration - if 'duration' in note.keys(): - noteStruct.append(note['duration']) - else: - noteStruct.append(note['durationPadding']) - writeStruct(file, order, format_string="ififHHf", value_list=noteStruct) - if note['type'].lower() == "drumroll": - file.write(note['drumrollBytes']) - file.close() - - -def writeStruct(file, order, format_string, value_list, seek=None): - if seek: - file.seek(seek) - packed_bytes = struct.pack(order + format_string, *value_list) - file.write(packed_bytes) - - -def shortHex(number): - return hex(number)[2:] - - -def getBool(number): - return True if number == 0x1 else False if number == 0x0 else number - - -def putBool(boolean): - return 0x1 if boolean is True else 0x0 if boolean is False else boolean - - -def nameValue(*lists): - string = [] - for lst in lists: - for name in lst: - if name == "type": - string.append(lst[name]) - elif name != "length" and type(name) is not int: - value = lst[name] - if type(value) == float and value % 1 == 0.0: - value = int(value) - string.append("{0}: {1}".format(name, value)) - return ", ".join(string) - - -def debugPrint(*args, **kwargs): - print(*args, file=sys.stderr, **kwargs) - - -def checkMismatchedBytes(file1, file2): - with open(file1, 'rb') as file1, open(file2, 'rb') as file2: - data1, data2 = file1.read(), file2.read() - incorrect_bytes = {} - # Ignore header (first 432 + 80 = 512 bytes) - for i, (byte1, byte2) in enumerate(zip(data1[512:], data2[512:])): - if byte1 == byte2: - pass - else: - incorrect_bytes[hex(i+512)] = [byte1, byte2] - return incorrect_bytes - - -TJA_NOTE_TYPES = { - '1': 'Don', - '2': 'Ka', - '3': 'DON', - '4': 'KA', - '5': 'Drumroll', - '6': 'DRUMROLL', - '7': 'Balloon', - '8': 'EndDRB', - '9': 'Kusudama', - 'A': 'DON', # hands - 'B': 'KA', # hands -} - -# Filler metadata that the `writeFumen` function expects -default_note = {'type': '', 'pos': 0.0, 'item': 0, 'padding': 0.0, - 'scoreInit': 0, 'scoreDiff': 0, 'durationPadding': 0.0} -default_branch = {'length': 0, 'padding': 0, 'speed': 1.0} -default_measure = { - 'bpm': 0.0, - 'fumenOffset': 0.0, - 'gogo': False, - 'hidden': False, - 'padding1': 0, - 'branchInfo': [-1, -1, -1, -1, -1, -1], - 'padding2': 0, - 'normal': deepcopy(default_branch), - 'advanced': deepcopy(default_branch), - 'master': deepcopy(default_branch) -} - - -def convertTJAToFumen(fumen, tja): - # Fumen offset for the first measure that has a barline - fumenOffset1 = float(tja['metadata']['offset']) * -1000 - - # Variables that will change over time due to events - currentBPM = 0.0 - currentGogo = False - currentHidden = False - currentBranch = 'normal' # TODO: Program in branch support - - # Parse TJA measures to create converted TJA -> Fumen file - tjaConverted = { 'measures': [] } - for i, measureTJA in enumerate(tja['measures']): - measureFumenExample = fumen['measures'][i+9] - measureFumen = deepcopy(default_measure) - - # TODO Event: GOGOTIME - - # TODO Event: HIDDEN - - # TODO Event: BARLINE - - # TODO Event: MEASURE - - # Event: BPMCHANGE - # TODO: Handle TJA measure being broken up into multiple Fumen measures due to mid-measure BPM changes - midMeasureBPM = [(0, currentBPM)] - for event in measureTJA['events']: - if event['name'] == 'bpm': - currentBPM = float(event['value']) - if event['position'] == 0: - midMeasureBPM[0] = (0, currentBPM,) - else: - midMeasureBPM.append((event['position'], currentBPM)) - if len(midMeasureBPM) > 1: - test = None - measureFumen['bpm'] = currentBPM - - # TODO: `measureFumen['fumenOffset'] - # Will need to account for BARLINEON and BARLINEOFF. - # Some examples that line up with actual fumen data: - # fumenOffset0 = (fumenOffset1 - measureLength) - # fumenOffset2 = (fumenOffset1 + measureLength) - measureLength = 240_000 / currentBPM - # measureFumen['fumenOffset'] = prev['fumenOffset'] + measureLength - - # Create note dictionaries based on TJA measure data (containing 0's plus 1/2/3/4/etc. for notes) - note_counter = 0 - for i, note_value in enumerate(measureTJA['data']): - if note_value != '0': - note = deepcopy(default_note) - note['pos'] = measureLength * (i / len(measureTJA['data'])) - note['type'] = TJA_NOTE_TYPES[note_value] # TODO: Handle BALLOON/DRUMROLL - note['scoreInit'] = tja['scoreInit'] # Probably not fully accurate - note['scoreDiff'] = tja['scoreDiff'] # Probably not fully accurate - measureFumen[currentBranch][note_counter] = note - note_counter += 1 - measureFumen[currentBranch]['length'] = note_counter - - # Append the measure to the tja's list of measures - tjaConverted['measures'].append(measureFumen) - - tjaConverted['headerUnknown'] = b'x\00' * 80 - tjaConverted['order'] = '<' - tjaConverted['length'] = len(tjaConverted['measures']) - tjaConverted['unknownMetadata'] = 0 - tjaConverted['branches'] = False - - return tjaConverted - if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="tja2fumen {0}".format(tja2fumen_version) - ) - parser.add_argument( - "file_m.bin", - help="Path to a Taiko no Tatsujin fumen file.", - type=argparse.FileType("rb") - ) - parser.add_argument( - "offset", - help="Note offset in seconds, negative values will make the notes appear later. Example: -1.9", - nargs="?", - type=float, - default=0 - ) - group = parser.add_mutually_exclusive_group() - group.add_argument( - "--big", - help="Force big endian byte order for parsing.", - action="store_const", - dest="order", - const="big" - ) - group.add_argument( - "--little", - help="Force little endian byte order for parsing.", - action="store_const", - dest="order", - const="little" - ) - parser.add_argument( - "-o", - metavar="file.osu", - help="Set the filename of the output file.", - type=argparse.FileType("bw+") - ) - parser.add_argument( - "--title", - metavar="\"Title\"", - help="Set the title in the output file." - ) - parser.add_argument( - "--subtitle", - metavar="\"Subtitle\"", - help="Set the subtitle (artist field) in the output file.", - default="" - ) - parser.add_argument( - "--wave", - metavar="file.wav", - help="Set the audio filename in the output file." - ) - parser.add_argument( - "--branch", - metavar="master", - help="Select a branch from a branched song ({0}).".format(", ".join(branchNames)), - choices=branchNames - ) - parser.add_argument( - "-v", "--debug", - help="Print verbose debug information.", - action="store_true" - ) - if len(sys.argv) == 1: - parser.print_help() - else: - # arguments = parser.parse_args() - arguments = { - "input_fumen": "./roku/ia6cho_m.bin", # NB: Contains only oni chart - "input_tja": "./roku/Rokuchounen to Ichiya Monogatari.tja", # NB: Contains 5 charts - } - # Parse fumen - inputFile = open(arguments["input_fumen"], "rb") - parsedSongFumen = readFumen(inputFile) + # arguments = parser.parse_args() + arguments = { + "input_fumen": "./test-data/ia6cho_m.bin", # NB: Contains only oni chart + "input_tja": "./test-data/Rokuchounen to Ichiya Monogatari.tja", # NB: Contains 5 charts + } + # Parse fumen + inputFile = open(arguments["input_fumen"], "rb") + parsedSongFumen = readFumen(inputFile) - # Steps to validate the `writeFumen` function to make sure it reproduces correct output - validate = False - if validate: - outputName = inputFile.name.split('.')[0] + "_rebuilt.bin" - outputFile = open(outputName, "wb") - writeFumen(outputFile, parsedSongFumen) - # Read output file back in to validate that the rewritten song is a perfect match - print(False if checkMismatchedBytes(inputFile.name, outputFile.name) else True) + # Steps to validate the `writeFumen` function to make sure it reproduces correct output + validate = False + if validate: + outputName = inputFile.name.split('.')[0] + "_rebuilt.bin" + outputFile = open(outputName, "wb") + writeFumen(outputFile, parsedSongFumen) + # Read output file back in to validate that the rewritten song is a perfect match + print(False if checkMismatchedBytes(inputFile.name, outputFile.name) else True) - # Parse tja - inputFile = open(arguments["input_tja"], "r", encoding="utf-8-sig") - parsedSongsTJA = parseTJA(inputFile) + # Parse tja + inputFile = open(arguments["input_tja"], "r", encoding="utf-8-sig") + parsedSongsTJA = parseTJA(inputFile) - # Try converting the Oni TJA chart to match the Oni fumen - convertedTJA = convertTJAToFumen(parsedSongFumen, parsedSongsTJA['Oni']) - # writeFumen(outputFile, convertedTJA) + # Try converting the Oni TJA chart to match the Oni fumen + convertedTJA = convertTJAToFumen(parsedSongFumen, parsedSongsTJA['Oni']) + # writeFumen(outputFile, convertedTJA) diff --git a/tja2fumen/utils.py b/tja2fumen/utils.py new file mode 100644 index 0000000..5a2826d --- /dev/null +++ b/tja2fumen/utils.py @@ -0,0 +1,102 @@ +import sys +import struct + +from constants import simpleHeaders, byte_strings + + +def checkValidHeader(header): + # These simple headers (substrings repeated 36 times) are used for many Gen2 systems (AC, Wii, etc.) + if header in simpleHeaders: + return True + # Starting with Gen3, they began using unique headers for every song. (3DS and PSPDX are the big offenders.) + # They seem to be some random combination of b_x00 + one of the non-null byte substrings. + # To avoid enumerating every combination of 432 bytes, we do a lazy check instead. + elif (byte_strings['x00'] in header and + any(b in header for b in [byte_strings[key] for key in ['431', '432', '433', '434', 'V1', 'V2', 'V3']])): + return True + # The PS4 song 'wii5op' is a special case: It throws in this odd 'g1' string in combo with other substrings. + elif (byte_strings['g1'] in header and + any(b in header for b in [byte_strings[key] for key in ['431', 'V2']])): + return True + # Otherwise, this is some unknown header we haven't seen before. + # Typically, these will be tja2bin.exe converted files with a completely invalid header. + else: + return False + + +def readStruct(file, order, format_string, seek=None): + """ + Interpret bytes as packed binary data. + + Arguments: + - file: The fumen's file object (presumably in 'rb' mode). + - order: '<' or '>' (little or big endian). + - format_string: String made up of format characters that describes the data layout. + Full list of available format characters: + (https://docs.python.org/3/library/struct.html#format-characters) + - seek: The position of the read pointer to be used within the file. + + Return values: + - interpreted_string: A string containing interpreted byte values, + based on the specified 'fmt' format characters. + """ + if seek: + file.seek(seek) + expected_size = struct.calcsize(order + format_string) + byte_string = file.read(expected_size) + # One "official" fumen (AC11\deo\deo_n.bin) runs out of data early + # This workaround fixes the issue by appending 0's to get the size to match + if len(byte_string) != expected_size: + byte_string += (b'\x00' * (expected_size - len(byte_string))) + interpreted_string = struct.unpack(order + format_string, byte_string) + return interpreted_string + + +def writeStruct(file, order, format_string, value_list, seek=None): + if seek: + file.seek(seek) + packed_bytes = struct.pack(order + format_string, *value_list) + file.write(packed_bytes) + + +def shortHex(number): + return hex(number)[2:] + + +def getBool(number): + return True if number == 0x1 else False if number == 0x0 else number + + +def putBool(boolean): + return 0x1 if boolean is True else 0x0 if boolean is False else boolean + + +def nameValue(*lists): + string = [] + for lst in lists: + for name in lst: + if name == "type": + string.append(lst[name]) + elif name != "length" and type(name) is not int: + value = lst[name] + if type(value) == float and value % 1 == 0.0: + value = int(value) + string.append("{0}: {1}".format(name, value)) + return ", ".join(string) + + +def debugPrint(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) + + +def checkMismatchedBytes(file1, file2): + with open(file1, 'rb') as file1, open(file2, 'rb') as file2: + data1, data2 = file1.read(), file2.read() + incorrect_bytes = {} + # Ignore header (first 432 + 80 = 512 bytes) + for i, (byte1, byte2) in enumerate(zip(data1[512:], data2[512:])): + if byte1 == byte2: + pass + else: + incorrect_bytes[hex(i+512)] = [byte1, byte2] + return incorrect_bytes diff --git a/tja2fumen/writers.py b/tja2fumen/writers.py new file mode 100644 index 0000000..e8c7444 --- /dev/null +++ b/tja2fumen/writers.py @@ -0,0 +1,63 @@ +from utils import writeStruct, putBool +from constants import simpleHeaders, branchNames, typeNotes + + +def writeFumen(file, song): + # Fetch the byte order (little/big endian) + order = song['order'] + + # Write the header + file.write(simpleHeaders[0]) # Write known, valid header + file.write(song['headerUnknown']) # Write unknown header + + # Preallocate space in the file + len_metadata = 8 + len_measures = 0 + for measureNumber in range(song['length']): + len_measures += 40 + measure = song['measures'][measureNumber] + for branchNumber in range(len(branchNames)): + len_measures += 8 + branch = measure[branchNames[branchNumber]] + for noteNumber in range(branch['length']): + len_measures += 24 + note = branch[noteNumber] + if note['type'].lower() == "drumroll": + len_measures += 8 + file.write(b'\x00' * (len_metadata + len_measures)) + + # Write metadata + writeStruct(file, order, format_string="B", value_list=[putBool(song['branches'])], seek=0x1b0) + writeStruct(file, order, format_string="I", value_list=[song['length']], seek=0x200) + writeStruct(file, order, format_string="I", value_list=[song['unknownMetadata']], seek=0x204) + + # Write measure data + file.seek(0x208) + for measureNumber in range(song['length']): + measure = song['measures'][measureNumber] + measureStruct = [measure['bpm'], measure['fumenOffset'], int(measure['gogo']), int(measure['hidden'])] + measureStruct.extend([measure['padding1']] + measure['branchInfo'] + [measure['padding2']]) + writeStruct(file, order, format_string="ffBBHiiiiiii", value_list=measureStruct) + + for branchNumber in range(len(branchNames)): + branch = measure[branchNames[branchNumber]] + branchStruct = [branch['length'], branch['padding'], branch['speed']] + writeStruct(file, order, format_string="HHf", value_list=branchStruct) + + for noteNumber in range(branch['length']): + note = branch[noteNumber] + noteStruct = [typeNotes[note['type']], note['pos'], note['item'], note['padding']] + # Balloon hits + if 'hits' in note.keys(): + noteStruct.extend([note["hits"], note['hitsPadding']]) + else: + noteStruct.extend([note['scoreInit'], int(note['scoreDiff'] * 4)]) + # Drumroll or balloon duration + if 'duration' in note.keys(): + noteStruct.append(note['duration']) + else: + noteStruct.append(note['durationPadding']) + writeStruct(file, order, format_string="ififHHf", value_list=noteStruct) + if note['type'].lower() == "drumroll": + file.write(note['drumrollBytes']) + file.close()