1
0
mirror of synced 2025-01-23 22:54:08 +01:00

Replace all {song, branch, measure, note} dictionaries with classes (#32)

This refactor will hopefully pave the way for further refactors that
will make this code easier to understand and work with.

Fixes #9.
This commit is contained in:
Viv 2023-07-11 21:30:55 -04:00 committed by GitHub
parent bd27e9dade
commit cd755bd646
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 453 additions and 329 deletions

View File

@ -24,11 +24,11 @@ def main(argv=None):
baseName = os.path.splitext(fnameTJA)[0]
# Parse lines in TJA file
parsedTJACourses = parseTJA(fnameTJA)
parsedTJA = parseTJA(fnameTJA)
# Convert parsed TJA courses to Fumen data, and write each course to `.bin` files
for parsedCourse in parsedTJACourses.items():
convert_and_write(parsedCourse, baseName, singleCourse=(len(parsedTJACourses) == 1))
for course in parsedTJA.courses.items():
convert_and_write(course, baseName, singleCourse=(len(parsedTJA.courses) == 1))
def convert_and_write(parsedCourse, baseName, singleCourse=False):

View File

@ -92,6 +92,11 @@ sampleHeaderMetadata[77] = 97
sampleHeaderMetadata[78] = 188
# Certain other bytes (8+9, 20) will need to be filled in on a song-by-song basis
TJA_COURSE_NAMES = []
for difficulty in ['Ura', 'Oni', 'Hard', 'Normal', 'Easy']:
for player in ['', 'P1', 'P2']:
TJA_COURSE_NAMES.append(difficulty+player)
NORMALIZE_COURSE = {
'0': 'Easy',
'Easy': 'Easy',

View File

@ -1,29 +1,8 @@
from copy import deepcopy
import re
from tja2fumen.utils import computeSoulGaugeBytes
from tja2fumen.constants import DIFFICULTY_BYTES, sampleHeaderMetadata, simpleHeaders
# Filler metadata that the `writeFumen` function expects
# TODO: Determine how to properly set the item byte (https://github.com/vivaria/tja2fumen/issues/17)
default_note = {'type': '', 'pos': 0.0, 'item': 0, 'padding': 0.0,
'scoreInit': 0, 'scoreDiff': 0, 'duration': 0.0}
default_branch = {'length': 0, 'padding': 0, 'speed': 1.0}
default_measure = {
'bpm': 0.0,
'fumenOffsetStart': 0.0,
'fumenOffsetEnd': 0.0,
'duration': 0.0,
'gogo': False,
'barline': True,
'padding1': 0,
'branchStart': None,
'branchInfo': [-1, -1, -1, -1, -1, -1],
'padding2': 0,
'normal': deepcopy(default_branch),
'advanced': deepcopy(default_branch),
'master': deepcopy(default_branch)
}
from tja2fumen.constants import DIFFICULTY_BYTES
from tja2fumen.types import TJAMeasureProcessed, FumenCourse, FumenNote
def processTJACommands(tja):
@ -44,136 +23,149 @@ def processTJACommands(tja):
In the future, this logic should probably be moved into the TJA parser itself.
"""
branches = tja['branches']
branchesCorrected = {branchName: [] for branchName in branches.keys()}
for branchName, branch in branches.items():
currentBPM = float(tja['metadata']['bpm'])
tjaBranchesProcessed = {branchName: [] for branchName in tja.branches.keys()}
for branchName, branchMeasuresTJA in tja.branches.items():
currentBPM = tja.BPM
currentScroll = 1.0
currentGogo = False
currentBarline = True
currentDividend = 4
currentDivisor = 4
for measure in branch:
for measureTJA in branchMeasuresTJA:
# Split measure into submeasure
measure_cur = {'bpm': currentBPM, 'scroll': currentScroll, 'gogo': currentGogo, 'barline': currentBarline,
'subdivisions': len(measure['data']), 'pos_start': 0, 'pos_end': 0, 'delay': 0,
'branchStart': None, 'time_sig': [currentDividend, currentDivisor], 'data': []}
for data in measure['combined']:
measureTJAProcessed = TJAMeasureProcessed(
bpm=currentBPM,
scroll=currentScroll,
gogo=currentGogo,
barline=currentBarline,
time_sig=[currentDividend, currentDivisor],
subdivisions=len(measureTJA.notes),
)
for data in measureTJA.combined:
# Handle note data
if data['type'] == 'note':
measure_cur['data'].append(data)
if data.name == 'note':
measureTJAProcessed.data.append(data)
# Handle commands that can only be placed between measures (i.e. no mid-measure variations)
elif data['type'] == 'delay':
measure_cur['delay'] = data['value'] * 1000 # ms -> s
elif data['type'] == 'branchStart':
measure_cur['branchStart'] = data['value']
elif data['type'] == 'barline':
currentBarline = bool(int(data['value']))
measure_cur['barline'] = currentBarline
elif data['type'] == 'measure':
matchMeasure = re.match(r"(\d+)/(\d+)", data['value'])
elif data.name == 'delay':
measureTJAProcessed.delay = data.value * 1000 # ms -> s
elif data.name == 'branchStart':
measureTJAProcessed.branchStart = data.value
elif data.name == 'barline':
currentBarline = bool(int(data.value))
measureTJAProcessed.barline = currentBarline
elif data.name == 'measure':
matchMeasure = re.match(r"(\d+)/(\d+)", data.value)
if not matchMeasure:
continue
currentDividend = int(matchMeasure.group(1))
currentDivisor = int(matchMeasure.group(2))
measure_cur['time_sig'] = [currentDividend, currentDivisor]
measureTJAProcessed.time_sig = [currentDividend, currentDivisor]
# Handle commands that can be placed in the middle of a measure.
# NB: For fumen files, if there is a mid-measure change to BPM/SCROLL/GOGO, then the measure will
# actually be split into two small submeasures. So, we need to start a new measure in those cases.
elif data['type'] in ['bpm', 'scroll', 'gogo']:
elif data.name in ['bpm', 'scroll', 'gogo']:
# Parse the values
if data['type'] == 'bpm':
new_val = currentBPM = float(data['value'])
elif data['type'] == 'scroll':
new_val = currentScroll = data['value']
elif data['type'] == 'gogo':
new_val = currentGogo = bool(int(data['value']))
if data.name == 'bpm':
new_val = currentBPM = float(data.value)
elif data.name == 'scroll':
new_val = currentScroll = data.value
elif data.name == 'gogo':
new_val = currentGogo = bool(int(data.value))
# Check for mid-measure commands
# - Case 1: Command happens at the start of a measure; just change the value directly
if data['pos'] == 0:
measure_cur[data['type']] = new_val
if data.pos == 0:
measureTJAProcessed.__setattr__(data.name, new_val)
# - Case 2: Command occurs mid-measure, so start a new sub-measure
else:
measure_cur['pos_end'] = data['pos']
branchesCorrected[branchName].append(measure_cur)
measure_cur = {'bpm': currentBPM, 'scroll': currentScroll, 'gogo': currentGogo,
'barline': currentBarline, 'subdivisions': len(measure['data']),
'pos_start': data['pos'], 'pos_end': 0, 'delay': 0,
'branchStart': None, 'time_sig': [currentDividend, currentDivisor], 'data': []}
measureTJAProcessed.pos_end = data.pos
tjaBranchesProcessed[branchName].append(measureTJAProcessed)
measureTJAProcessed = TJAMeasureProcessed(
bpm=currentBPM,
scroll=currentScroll,
gogo=currentGogo,
barline=currentBarline,
time_sig=[currentDividend, currentDivisor],
subdivisions=len(measureTJA.notes),
pos_start=data.pos
)
else:
print(f"Unexpected event type: {data['type']}")
print(f"Unexpected event type: {data.name}")
measure_cur['pos_end'] = len(measure['data'])
branchesCorrected[branchName].append(measure_cur)
measureTJAProcessed.pos_end = len(measureTJA.notes)
tjaBranchesProcessed[branchName].append(measureTJAProcessed)
hasBranches = all(len(b) for b in branchesCorrected.values())
hasBranches = all(len(b) for b in tjaBranchesProcessed.values())
if hasBranches:
branch_lens = [len(b) for b in branches.values()]
branch_lens = [len(b) for b in tja.branches.values()]
if not branch_lens.count(branch_lens[0]) == len(branch_lens):
raise ValueError("Branches do not have the same number of measures.")
else:
branchCorrected_lens = [len(b) for b in branchesCorrected.values()]
branchCorrected_lens = [len(b) for b in tjaBranchesProcessed.values()]
if not branchCorrected_lens.count(branchCorrected_lens[0]) == len(branchCorrected_lens):
raise ValueError("Branches do not have matching GOGO/SCROLL/BPM commands.")
return branchesCorrected
return tjaBranchesProcessed
def convertTJAToFumen(tja):
# Preprocess commands
tja['branches'] = processTJACommands(tja)
processedTJABranches = processTJACommands(tja)
# Pre-allocate the measures for the converted TJA
tjaConverted = {'measures': [deepcopy(default_measure) for _ in range(len(tja['branches']['normal']))]}
fumen = FumenCourse(
measures=len(processedTJABranches['normal']),
hasBranches=all([len(b) for b in processedTJABranches.values()]),
scoreInit=tja.scoreInit,
scoreDiff=tja.scoreDiff,
)
# Iterate through the different branches in the TJA
for currentBranch, branch in tja['branches'].items():
if not len(branch):
for currentBranch, branchMeasuresTJAProcessed in processedTJABranches.items():
if not len(branchMeasuresTJAProcessed):
continue
total_notes = 0
total_notes_branch = 0
note_counter_branch = 0
currentDrumroll = None
courseBalloons = tja['metadata']['balloon'].copy()
courseBalloons = tja.balloon.copy()
# Iterate through the measures within the branch
for idx_m, measureTJA in enumerate(branch):
for idx_m, measureTJAProcessed in enumerate(branchMeasuresTJAProcessed):
# Fetch a pair of measures
measureFumenPrev = tjaConverted['measures'][idx_m-1] if idx_m != 0 else None
measureFumen = tjaConverted['measures'][idx_m]
measureFumenPrev = fumen.measures[idx_m-1] if idx_m != 0 else None
measureFumen = fumen.measures[idx_m]
# Copy over basic measure properties from the TJA (that don't depend on notes or commands)
measureFumen[currentBranch]['speed'] = measureTJA['scroll']
measureFumen['gogo'] = measureTJA['gogo']
measureFumen['bpm'] = measureTJA['bpm']
measureFumen.branches[currentBranch].speed = measureTJAProcessed.scroll
measureFumen.gogo = measureTJAProcessed.gogo
measureFumen.bpm = measureTJAProcessed.bpm
# Compute the duration of the measure
# First, we compute the duration for a full 4/4 measure
measureDurationFullMeasure = 4 * 60_000 / measureTJA['bpm']
measureDurationFullMeasure = 4 * 60_000 / measureTJAProcessed.bpm
# Next, we adjust this duration based on both:
# 1. The *actual* measure size (e.g. #MEASURE 1/8, #MEASURE 5/4, etc.)
measureSize = measureTJA['time_sig'][0] / measureTJA['time_sig'][1]
measureSize = measureTJAProcessed.time_sig[0] / measureTJAProcessed.time_sig[1]
# 2. Whether this is a "submeasure" (i.e. it contains mid-measure commands, which split up the measure)
# - If this is a submeasure, then `measureLength` will be less than the total number of subdivisions.
measureLength = measureTJA['pos_end'] - measureTJA['pos_start']
measureLength = measureTJAProcessed.pos_end - measureTJAProcessed.pos_start
# - In other words, `measureRatio` will be less than 1.0:
measureRatio = (1.0 if measureTJA['subdivisions'] == 0.0 # Avoid division by 0 for empty measures
else (measureLength / measureTJA['subdivisions']))
measureRatio = (1.0 if measureTJAProcessed.subdivisions == 0.0 # Avoid division by 0 for empty measures
else (measureLength / measureTJAProcessed.subdivisions))
# Apply the 2 adjustments to the measure duration
measureFumen['duration'] = measureDuration = measureDurationFullMeasure * measureSize * measureRatio
measureFumen.duration = measureDuration = measureDurationFullMeasure * measureSize * measureRatio
# Compute the millisecond offsets for the start and end of each measure
# - Start: When the notes first appear on screen (to the right)
# - End: When the notes arrive at the judgment line, and the note gets hit.
if idx_m == 0:
tjaOffset = float(tja['metadata']['offset']) * 1000 * -1
measureFumen['fumenOffsetStart'] = tjaOffset - measureDurationFullMeasure
measureFumen.fumenOffsetStart = (tja.offset * 1000 * -1) - measureDurationFullMeasure
else:
# First, start the measure using the end timing of the previous measure (plus any #DELAY commands)
measureFumen['fumenOffsetStart'] = measureFumenPrev['fumenOffsetEnd'] + measureTJA['delay']
measureFumen.fumenOffsetStart = measureFumenPrev.fumenOffsetEnd + measureTJAProcessed.delay
# Next, adjust the start timing to account for #BPMCHANGE commands (!!! Discovered by tana :3 !!!)
# To understand what's going on here, imagine the following simple example:
# * You have a very slow-moving note (i.e. low BPM), like the big DON in Donkama 2000.
@ -182,12 +174,12 @@ def convertTJAToFumen(tja):
# - An early start means you need to subtract a LOT of time from the starting fumenOffset.
# - Thankfully, the low BPM of the slow note will create a HUGE `measureOffsetAdjustment`,
# since we are dividing by the BPMs, and dividing by a small number will result in a big number.
measureOffsetAdjustment = (4 * 60_000 / measureTJA['bpm']) - (4 * 60_000 / measureFumenPrev['bpm'])
measureOffsetAdjustment = (4 * 60_000 / measureFumen.bpm) - (4 * 60_000 / measureFumenPrev.bpm)
# - When we subtract this adjustment from the fumenOffsetStart, we get the "START EARLY" part:
measureFumen['fumenOffsetStart'] -= measureOffsetAdjustment
measureFumen.fumenOffsetStart -= measureOffsetAdjustment
# - The low BPM of the slow note will also create a HUGE measure duration.
# - When we add this long duration to the EARLY START, we end up with the "END LATE" part:
measureFumen['fumenOffsetEnd'] = measureFumen['fumenOffsetStart'] + measureFumen['duration']
measureFumen.fumenOffsetEnd = measureFumen.fumenOffsetStart + measureFumen.duration
# Best guess at what 'barline' status means for each measure:
# - 'True' means the measure lands on a barline (i.e. most measures), and thus barline should be shown
@ -195,18 +187,18 @@ def convertTJAToFumen(tja):
# For example:
# 1. Measures where #BARLINEOFF has been set
# 2. Sub-measures that don't fall on the barline
if measureTJA['barline'] is False or (measureRatio != 1.0 and measureTJA['pos_start'] != 0):
measureFumen['barline'] = False
if measureTJAProcessed.barline is False or (measureRatio != 1.0 and measureTJAProcessed.pos_start != 0):
measureFumen.barline = False
# Check to see if the measure contains a branching condition
if measureTJA['branchStart']:
if measureTJAProcessed.branchStart:
# Determine which values to assign based on the type of branching condition
if measureTJA['branchStart'][0] == 'p':
if measureTJAProcessed.branchStart[0] == 'p':
vals = [int(total_notes_branch * v * 20) if 0 <= v <= 1 # Ensure value is actually a percentage
else int(v * 100) # If it's not, pass the value as-is
for v in measureTJA['branchStart'][1:]]
elif measureTJA['branchStart'][0] == 'r':
vals = measureTJA['branchStart'][1:]
for v in measureTJAProcessed.branchStart[1:]]
elif measureTJAProcessed.branchStart[0] == 'r':
vals = measureTJAProcessed.branchStart[1:]
# Determine which bytes to assign the values to
if currentBranch == 'normal':
idx_b1, idx_b2 = 0, 1
@ -215,94 +207,84 @@ def convertTJAToFumen(tja):
elif currentBranch == 'master':
idx_b1, idx_b2 = 4, 5
# Assign the values to their intended bytes
measureFumen['branchInfo'][idx_b1] = vals[0]
measureFumen['branchInfo'][idx_b2] = vals[1]
measureFumen.branchInfo[idx_b1] = vals[0]
measureFumen.branchInfo[idx_b2] = vals[1]
# Reset the note counter corresponding to this branch
total_notes_branch = 0
total_notes_branch += note_counter_branch
# Create note dictionaries based on TJA measure data (containing 0's plus 1/2/3/4/etc. for notes)
# Create notes based on TJA measure data
note_counter_branch = 0
note_counter = 0
for idx_d, data in enumerate(measureTJA['data']):
if data['type'] == 'note':
for idx_d, data in enumerate(measureTJAProcessed.data):
if data.name == 'note':
note = FumenNote()
# Note positions must be calculated using the base measure duration (that uses a single BPM value)
# (In other words, note positions do not take into account any mid-measure BPM change adjustments.)
note_pos = measureDuration * (data['pos'] - measureTJA['pos_start']) / measureLength
note.pos = measureDuration * (data.pos - measureTJAProcessed.pos_start) / measureLength
# Handle the note that represents the end of a drumroll/balloon
if data['value'] == "EndDRB":
if data.value == "EndDRB":
# If a drumroll spans a single measure, then add the difference between start/end position
if 'multimeasure' not in currentDrumroll.keys():
currentDrumroll['duration'] += (note_pos - currentDrumroll['pos'])
if not currentDrumroll.multimeasure:
currentDrumroll.duration += (note.pos - currentDrumroll.pos)
# Otherwise, if a drumroll spans multiple measures, then we want to add the duration between
# the start of the measure (i.e. pos=0.0) and the drumroll's end position.
else:
currentDrumroll['duration'] += (note_pos - 0.0)
currentDrumroll.duration += (note.pos - 0.0)
# 1182, 1385, 1588, 2469, 1568, 752, 1568
currentDrumroll['duration'] = float(int(currentDrumroll['duration']))
currentDrumroll.duration = float(int(currentDrumroll.duration))
currentDrumroll = None
continue
# The TJA spec technically allows you to place double-Kusudama notes:
# "Use another 9 to specify when to lower the points for clearing."
# But this is unsupported in fumens, so just skip the second Kusudama note.
if data['value'] == "Kusudama" and currentDrumroll:
if data.value == "Kusudama" and currentDrumroll:
continue
# Handle the remaining non-EndDRB, non-double Kusudama notes
note = deepcopy(default_note)
note['pos'] = note_pos
note['type'] = data['value']
note['scoreInit'] = tja['metadata']['scoreInit'] # Probably not fully accurate
note['scoreDiff'] = tja['metadata']['scoreDiff'] # Probably not fully accurate
note.type = data.value
note.scoreInit = tja.scoreInit
note.scoreDiff = tja.scoreDiff
# Handle drumroll/balloon-specific metadata
if note['type'] in ["Balloon", "Kusudama"]:
note['hits'] = courseBalloons.pop(0)
note['hitsPadding'] = 0
if note.type in ["Balloon", "Kusudama"]:
note.hits = courseBalloons.pop(0)
currentDrumroll = note
total_notes -= 1
if note['type'] in ["Drumroll", "DRUMROLL"]:
note['drumrollBytes'] = b'\x00\x00\x00\x00\x00\x00\x00\x00'
if note.type in ["Drumroll", "DRUMROLL"]:
currentDrumroll = note
total_notes -= 1
# Count dons, kas, and balloons for the purpose of tracking branching accuracy
if note['type'].lower() in ['don', 'ka']:
if note.type.lower() in ['don', 'ka']:
note_counter_branch += 1
elif note['type'].lower() in ['balloon', 'kusudama']:
elif note.type.lower() in ['balloon', 'kusudama']:
note_counter_branch += 1.5
measureFumen[currentBranch][note_counter] = note
measureFumen.branches[currentBranch].notes.append(note)
note_counter += 1
# If drumroll hasn't ended by the end of this measure, increase duration by measure timing
if currentDrumroll:
if currentDrumroll['duration'] == 0.0:
currentDrumroll['duration'] += (measureDuration - currentDrumroll['pos'])
currentDrumroll['multimeasure'] = True
if currentDrumroll.duration == 0.0:
currentDrumroll.duration += (measureDuration - currentDrumroll.pos)
currentDrumroll.multimeasure = True
else:
currentDrumroll['duration'] += measureDuration
currentDrumroll.duration += measureDuration
measureFumen[currentBranch]['length'] = note_counter
measureFumen.branches[currentBranch].length = note_counter
total_notes += note_counter
# Take a stock header metadata sample and add song-specific metadata
headerMetadata = sampleHeaderMetadata.copy()
headerMetadata[8] = DIFFICULTY_BYTES[tja['metadata']['course']][0]
headerMetadata[9] = DIFFICULTY_BYTES[tja['metadata']['course']][1]
fumen.headerMetadata[8] = DIFFICULTY_BYTES[tja.course][0]
fumen.headerMetadata[9] = DIFFICULTY_BYTES[tja.course][1]
soulGaugeBytes = computeSoulGaugeBytes(
n_notes=total_notes,
difficulty=tja['metadata']['course'],
stars=tja['metadata']['level']
difficulty=tja.course,
stars=tja.level
)
headerMetadata[12] = soulGaugeBytes[0]
headerMetadata[13] = soulGaugeBytes[1]
headerMetadata[16] = soulGaugeBytes[2]
headerMetadata[17] = soulGaugeBytes[3]
headerMetadata[20] = soulGaugeBytes[4]
headerMetadata[21] = soulGaugeBytes[5]
tjaConverted['headerMetadata'] = b"".join(i.to_bytes(1, 'little') for i in headerMetadata)
tjaConverted['headerPadding'] = simpleHeaders[0] # Use a basic, known set of header bytes
tjaConverted['order'] = '<'
tjaConverted['unknownMetadata'] = 0
tjaConverted['branches'] = all([len(b) for b in tja['branches'].values()])
tjaConverted['scoreInit'] = tja['metadata']['scoreInit']
tjaConverted['scoreDiff'] = tja['metadata']['scoreDiff']
fumen.headerMetadata[12] = soulGaugeBytes[0]
fumen.headerMetadata[13] = soulGaugeBytes[1]
fumen.headerMetadata[16] = soulGaugeBytes[2]
fumen.headerMetadata[17] = soulGaugeBytes[3]
fumen.headerMetadata[20] = soulGaugeBytes[4]
fumen.headerMetadata[21] = soulGaugeBytes[5]
fumen.headerMetadata = b"".join(i.to_bytes(1, 'little') for i in fumen.headerMetadata)
return tjaConverted
return fumen

View File

@ -4,7 +4,7 @@ from copy import deepcopy
from tja2fumen.utils import readStruct, getBool, shortHex
from tja2fumen.constants import NORMALIZE_COURSE, TJA_NOTE_TYPES, branchNames, noteTypes
from tja2fumen.types import TJASong, TJAMeasure, TJAData, FumenCourse, FumenMeasure, FumenBranch, FumenNote
########################################################################################################################
# TJA-parsing functions ( Original source: https://github.com/WHMHammer/tja-tools/blob/master/src/js/parseTJA.js)
@ -18,15 +18,15 @@ def parseTJA(fnameTJA):
tja_text = open(fnameTJA, "r", encoding="shift-jis").read()
lines = [line for line in tja_text.splitlines() if line.strip() != '']
courses = getCourseData(lines)
for courseData in courses.values():
courseData['branches'] = parseCourseMeasures(courseData['data'])
parsedTJA = getCourseData(lines)
for course in parsedTJA.courses.values():
parseCourseMeasures(course)
return courses
return parsedTJA
def getCourseData(lines):
courses = {}
parsedTJA = None
currentCourse = ''
currentCourseCached = ''
songBPM = 0
@ -40,31 +40,30 @@ def getCourseData(lines):
value = match_header.group(2).strip()
# Global header fields
if nameUpper in ['BPM', 'OFFSET']:
if nameUpper == 'BPM':
songBPM = value
elif nameUpper == 'OFFSET':
songOffset = value
if songBPM and songOffset:
parsedTJA = TJASong(songBPM, songOffset)
# Course-specific header fields
elif nameUpper == 'COURSE':
currentCourse = NORMALIZE_COURSE[value]
currentCourseCached = currentCourse
if currentCourse not in courses.keys():
courses[currentCourse] = {
'metadata': {'course': currentCourse, 'bpm': songBPM, 'offset': songOffset, 'level': 0,
'balloon': [], 'scoreInit': 0, 'scoreDiff': 0},
'data': [],
}
if currentCourse not in parsedTJA.courses.keys():
raise ValueError()
elif nameUpper == 'LEVEL':
courses[currentCourse]['metadata']['level'] = int(value) if value else 0
parsedTJA.courses[currentCourse].level = int(value) if value else 0
elif nameUpper == 'SCOREINIT':
courses[currentCourse]['metadata']['scoreInit'] = int(value) if value else 0
parsedTJA.courses[currentCourse].scoreInit = int(value) if value else 0
elif nameUpper == 'SCOREDIFF':
courses[currentCourse]['metadata']['scoreDiff'] = int(value) if value else 0
parsedTJA.courses[currentCourse].scoreDiff = int(value) if value else 0
elif nameUpper == 'BALLOON':
if value:
balloons = [int(v) for v in value.split(",") if v]
courses[currentCourse]['metadata']['balloon'] = balloons
parsedTJA.courses[currentCourse].balloon = balloons
elif nameUpper == 'STYLE':
# Reset the course name to remove "P1/P2" that may have been added by a previous STYLE:DOUBLE chart
if value == 'Single':
@ -84,121 +83,123 @@ def getCourseData(lines):
if nameUpper == "START":
if value in ["P1", "P2"]:
currentCourse = currentCourseCached + value
courses[currentCourse] = deepcopy(courses[currentCourseCached])
courses[currentCourse]['data'] = list() # Keep the metadata, but reset the note data
parsedTJA.courses[currentCourse] = deepcopy(parsedTJA.courses[currentCourseCached])
parsedTJA.courses[currentCourse].data = list() # Keep the metadata, but reset the note data
value = '' # Once we've made the new course, we can reset this to a normal #START command
elif value:
raise ValueError(f"Invalid value '{value}' for #START command.")
elif match_notes:
nameUpper = 'NOTES'
value = match_notes.group(1)
courses[currentCourse]['data'].append({"name": nameUpper, "value": value})
parsedTJA.courses[currentCourse].data.append(TJAData(nameUpper, value))
# If a course has no song data, then this is likely because the course has "STYLE: Double" but no "STYLE: Single".
# To fix this, we copy over the P1 chart from "STYLE: Double" to fill the "STYLE: Single" role.
for courseName, course in courses.items():
if not course['data']:
if courseName+"P1" in courses.keys():
courses[courseName] = deepcopy(courses[courseName+"P1"])
for courseName, course in parsedTJA.courses.items():
if not course.data:
if courseName+"P1" in parsedTJA.courses.keys():
parsedTJA.courses[courseName] = deepcopy(parsedTJA.courses[courseName+"P1"])
return courses
# Remove any charts (e.g. P1/P2) not present in the TJA file
for course_name in [k for k, v in parsedTJA.courses.items() if not v.data]:
del parsedTJA.courses[course_name]
return parsedTJA
def parseCourseMeasures(lines):
def parseCourseMeasures(course):
# Check if the course has branches or not
hasBranches = True if [l for l in lines if l['name'] == 'BRANCHSTART'] else False
hasBranches = True if [l for l in course.data if l.name == 'BRANCHSTART'] else False
currentBranch = 'all' if hasBranches else 'normal'
flagLevelhold = False
# Process course lines
idx_m = 0
idx_m_branchstart = 0
emptyMeasure = {'data': '', 'events': []}
branches = {'normal': [deepcopy(emptyMeasure)], 'advanced': [deepcopy(emptyMeasure)], 'master': [deepcopy(emptyMeasure)]}
for line in lines:
for line in course.data:
# 1. Parse measure notes
if line['name'] == 'NOTES':
notes = line['value']
if line.name == 'NOTES':
notes = line.value
# If measure has ended, then add notes to the current measure, then start a new one by incrementing idx_m
if notes.endswith(','):
for branch in branches.keys() if currentBranch == 'all' else [currentBranch]:
branches[branch][idx_m]['data'] += notes[0:-1]
branches[branch].append(deepcopy(emptyMeasure))
for branch in course.branches.keys() if currentBranch == 'all' else [currentBranch]:
course.branches[branch][idx_m].notes += notes[0:-1]
course.branches[branch].append(TJAMeasure())
idx_m += 1
# Otherwise, keep adding notes to the current measure ('idx_m')
else:
for branch in branches.keys() if currentBranch == 'all' else [currentBranch]:
branches[branch][idx_m]['data'] += notes
for branch in course.branches.keys() if currentBranch == 'all' else [currentBranch]:
course.branches[branch][idx_m].notes += notes
# 2. Parse measure commands that produce an "event"
elif line['name'] in ['GOGOSTART', 'GOGOEND', 'BARLINEON', 'BARLINEOFF', 'DELAY',
elif line.name in ['GOGOSTART', 'GOGOEND', 'BARLINEON', 'BARLINEOFF', 'DELAY',
'SCROLL', 'BPMCHANGE', 'MEASURE', 'BRANCHSTART']:
# Get position of the event
for branch in branches.keys() if currentBranch == 'all' else [currentBranch]:
pos = len(branches[branch][idx_m]['data'])
for branch in course.branches.keys() if currentBranch == 'all' else [currentBranch]:
pos = len(course.branches[branch][idx_m].notes)
# Parse event type
if line['name'] == 'GOGOSTART':
currentEvent = {"name": 'gogo', "position": pos, "value": '1'}
elif line['name'] == 'GOGOEND':
currentEvent = {"name": 'gogo', "position": pos, "value": '0'}
elif line['name'] == 'BARLINEON':
currentEvent = {"name": 'barline', "position": pos, "value": '1'}
elif line['name'] == 'BARLINEOFF':
currentEvent = {"name": 'barline', "position": pos, "value": '0'}
elif line['name'] == 'DELAY':
currentEvent = {"name": 'delay', "position": pos, "value": float(line['value'])}
elif line['name'] == 'SCROLL':
currentEvent = {"name": 'scroll', "position": pos, "value": float(line['value'])}
elif line['name'] == 'BPMCHANGE':
currentEvent = {"name": 'bpm', "position": pos, "value": float(line['value'])}
elif line['name'] == 'MEASURE':
currentEvent = {"name": 'measure', "position": pos, "value": line['value']}
elif line["name"] == 'BRANCHSTART':
if line.name == 'GOGOSTART':
currentEvent = TJAData('gogo', '1', pos)
elif line.name == 'GOGOEND':
currentEvent = TJAData('gogo', '0', pos)
elif line.name == 'BARLINEON':
currentEvent = TJAData('barline', '1', pos)
elif line.name == 'BARLINEOFF':
currentEvent = TJAData('barline', '0', pos)
elif line.name == 'DELAY':
currentEvent = TJAData('delay', float(line.value), pos)
elif line.name == 'SCROLL':
currentEvent = TJAData('scroll', float(line.value), pos)
elif line.name == 'BPMCHANGE':
currentEvent = TJAData('bpm', float(line.value), pos)
elif line.name == 'MEASURE':
currentEvent = TJAData('measure', line.value, pos)
elif line.name == 'BRANCHSTART':
if flagLevelhold:
continue
currentBranch = 'all' # Ensure that the #BRANCHSTART command is present for all branches
values = line['value'].split(',')
values = line.value.split(',')
if values[0] == 'r': # r = drumRoll
values[1] = int(values[1]) # # of drumrolls
values[2] = int(values[2]) # # of drumrolls
elif values[0] == 'p': # p = Percentage
values[1] = float(values[1]) / 100 # %
values[2] = float(values[2]) / 100 # %
currentEvent = {"name": 'branchStart', "position": pos, "value": values}
currentEvent = TJAData('branchStart', values, pos)
idx_m_branchstart = idx_m # Preserve the index of the BRANCHSTART command to re-use for each branch
# Append event to the current measure's events
for branch in branches.keys() if currentBranch == 'all' else [currentBranch]:
branches[branch][idx_m]['events'].append(currentEvent)
elif line['name'] == 'SECTION':
for branch in course.branches.keys() if currentBranch == 'all' else [currentBranch]:
course.branches[branch][idx_m].events.append(currentEvent)
elif line.name == 'SECTION':
# Simply repeat the same #BRANCHSTART condition that happened previously
# The purpose of #SECTION is to "Reset accuracy values for notes and drumrolls on the next measure."
branches[branch][idx_m]['events'].append({"name": 'branchStart', "position": pos, "value": values})
course.branches[branch][idx_m].events.append(TJAData('branchStart', values, pos))
# 3. Parse commands that don't create an event (e.g. simply changing the current branch)
else:
if line["name"] == 'START' or line['name'] == 'END':
if line.name == 'START' or line.name == 'END':
currentBranch = 'all' if hasBranches else 'normal'
flagLevelhold = False
elif line['name'] == 'LEVELHOLD':
elif line.name == 'LEVELHOLD':
flagLevelhold = True
elif line["name"] == 'N':
elif line.name == 'N':
currentBranch = 'normal'
idx_m = idx_m_branchstart
elif line["name"] == 'E':
elif line.name == 'E':
currentBranch = 'advanced'
idx_m = idx_m_branchstart
elif line["name"] == 'M':
elif line.name == 'M':
currentBranch = 'master'
idx_m = idx_m_branchstart
elif line["name"] == 'BRANCHEND':
elif line.name == 'BRANCHEND':
currentBranch = 'all'
# Ignored commands
elif line['name'] == 'LYRIC':
elif line.name == 'LYRIC':
pass
elif line['name'] == 'NEXTSONG':
elif line.name == 'NEXTSONG':
pass
# Not implemented commands
@ -206,38 +207,33 @@ def parseCourseMeasures(lines):
raise NotImplementedError
# Delete the last measure in the branch if no notes or events were added to it (due to preallocating empty measures)
for branch in branches.values():
if not branch[-1]['data'] and not branch[-1]['events']:
for branch in course.branches.values():
if not branch[-1].notes and not branch[-1].events:
del branch[-1]
# Merge measure data and measure events in chronological order
for branchName, branch in branches.items():
for branchName, branch in course.branches.items():
for measure in branch:
notes = [{'pos': i, 'type': 'note', 'value': TJA_NOTE_TYPES[note]}
for i, note in enumerate(measure['data']) if note != '0']
events = [{'pos': e['position'], 'type': e['name'], 'value': e['value']}
for e in measure['events']]
combined = []
notes = [TJAData('note', TJA_NOTE_TYPES[note], i)
for i, note in enumerate(measure.notes) if note != '0']
events = measure.events
while notes or events:
if events and notes:
if notes[0]['pos'] >= events[0]['pos']:
combined.append(events.pop(0))
if notes[0].pos >= events[0].pos:
measure.combined.append(events.pop(0))
else:
combined.append(notes.pop(0))
measure.combined.append(notes.pop(0))
elif events:
combined.append(events.pop(0))
measure.combined.append(events.pop(0))
elif notes:
combined.append(notes.pop(0))
measure['combined'] = combined
measure.combined.append(notes.pop(0))
# Ensure all branches have the same number of measures
if hasBranches:
branch_lens = [len(b) for b in branches.values()]
branch_lens = [len(b) for b in course.branches.values()]
if not branch_lens.count(branch_lens[0]) == len(branch_lens):
raise ValueError("Branches do not have the same number of measures.")
return branches
########################################################################################################################
# Fumen-parsing functions
@ -275,18 +271,13 @@ def readFumen(fumenFile, exclude_empty_measures=False):
totalMeasures = measuresLittle
# Initialize the dict that will contain the chart information
song = {'measures': []}
song['headerPadding'] = fumenHeader[:432]
song['headerMetadata'] = fumenHeader[-80:]
song['order'] = order
# I am unsure what byte this represents
unknownMetadata = readStruct(file, order, format_string="I", seek=0x204)[0]
song["unknownMetadata"] = unknownMetadata
# Determine whether the song has branches from byte 0x1b0 (decimal 432)
hasBranches = getBool(readStruct(file, order, format_string="B", seek=0x1b0)[0])
song["branches"] = hasBranches
song = FumenCourse(
headerPadding=fumenHeader[:432],
headerMetadata=fumenHeader[-80:],
order=order,
unknownMetadata=readStruct(file, order, format_string="I", seek=0x204)[0],
hasBranches=getBool(readStruct(file, order, format_string="B", seek=0x1b0)[0])
)
# Start reading measure data from position 0x208 (decimal 520)
file.seek(0x208)
@ -303,17 +294,18 @@ def readFumen(fumenFile, exclude_empty_measures=False):
measureStruct = readStruct(file, order, format_string="ffBBHiiiiiii")
# Create the measure dictionary using the newly-parsed measure data
measure = {}
measure["bpm"] = measureStruct[0]
measure["fumenOffsetStart"] = measureStruct[1]
measure["gogo"] = getBool(measureStruct[2])
measure["barline"] = getBool(measureStruct[3])
measure["padding1"] = measureStruct[4]
measure["branchInfo"] = list(measureStruct[5:11])
measure["padding2"] = measureStruct[11]
measure = FumenMeasure(
bpm=measureStruct[0],
fumenOffsetStart=measureStruct[1],
gogo=getBool(measureStruct[2]),
barline=getBool(measureStruct[3]),
padding1=measureStruct[4],
branchInfo=list(measureStruct[5:11]),
padding2=measureStruct[11]
)
# Iterate through the three branch types
for branchNumber in range(len(branchNames)):
for branchName in branchNames:
# Parse the measure data using the following `format_string`:
# "HHf" (3 format characters, 8 bytes per branch)
# - 'H': totalNotes (represented by one unsigned short (2 bytes))
@ -322,11 +314,12 @@ def readFumen(fumenFile, exclude_empty_measures=False):
branchStruct = readStruct(file, order, format_string="HHf")
# Create the branch dictionary using the newly-parsed branch data
branch = {}
totalNotes = branchStruct[0]
branch["length"] = totalNotes
branch["padding"] = branchStruct[1]
branch["speed"] = branchStruct[2]
branch = FumenBranch(
length=totalNotes,
padding=branchStruct[1],
speed=branchStruct[2],
)
# Iterate through each note in the measure (per branch)
for noteNumber in range(totalNotes):
@ -351,39 +344,41 @@ def readFumen(fumenFile, exclude_empty_measures=False):
)
# Create the note dictionary using the newly-parsed note data
note = {}
note["type"] = noteTypes[noteType]
note["pos"] = noteStruct[1]
note["item"] = noteStruct[2]
note["padding"] = noteStruct[3]
note = FumenNote(
note_type=noteTypes[noteType],
pos=noteStruct[1],
item=noteStruct[2],
padding=noteStruct[3],
)
if noteType == 0xa or noteType == 0xc:
# Balloon hits
note["hits"] = noteStruct[4]
note["hitsPadding"] = noteStruct[5]
note.hits = noteStruct[4]
note.hitsPadding = noteStruct[5]
else:
note['scoreInit'] = noteStruct[4]
note['scoreDiff'] = noteStruct[5] // 4
if "scoreInit" not in song:
song["scoreInit"] = note['scoreInit']
song["scoreDiff"] = note['scoreDiff']
note.scoreInit = noteStruct[4]
note.scoreDiff = noteStruct[5] // 4
if not song.scoreInit:
song.scoreInit = note.scoreInit
song.scoreDiff = note.scoreDiff
if noteType == 0x6 or noteType == 0x9 or noteType == 0xa or noteType == 0xc:
# Drumroll and balloon duration in ms
note["duration"] = noteStruct[6]
note.duration = noteStruct[6]
else:
note['duration'] = noteStruct[6]
note.duration = noteStruct[6]
# Seek forward 8 bytes to account for padding bytes at the end of drumrolls
if noteType == 0x6 or noteType == 0x9 or noteType == 0x62:
note["drumrollBytes"] = file.read(8)
note.drumrollBytes = file.read(8)
# Assign the note to the branch
branch[noteNumber] = note
branch.notes.append(note)
# Assign the branch to the measure
measure[branchNames[branchNumber]] = branch
measure.branches[branchName] = branch
# Assign the measure to the song
song['measures'].append(measure)
song.measures.append(measure)
if file.tell() >= size:
break
@ -394,7 +389,7 @@ def readFumen(fumenFile, exclude_empty_measures=False):
# So, in tests, if we want to only compare the timing of the non-empty measures between an official fumen and
# a converted non-official TJA, then it's useful to exclude the empty measures.
if exclude_empty_measures:
song['measures'] = [m for m in song['measures']
if m['normal']['length'] or m['advanced']['length'] or m['master']['length']]
song.measures = [m for m in song.measures
if m.branches['normal'].length or m.branches['advanced'].length or m.branches['master'].length]
return song

140
src/tja2fumen/types.py Normal file
View File

@ -0,0 +1,140 @@
from tja2fumen.constants import sampleHeaderMetadata, simpleHeaders, TJA_COURSE_NAMES
class TJASong:
def __init__(self, BPM=None, offset=None):
self.BPM = float(BPM)
self.offset = float(offset)
self.courses = {course: TJACourse(self.BPM, self.offset, course) for course in TJA_COURSE_NAMES}
def __repr__(self):
return f"{{'BPM': {self.BPM}, 'offset': {self.offset}, 'courses': {list(self.courses.keys())}}}"
class TJACourse:
def __init__(self, BPM, offset, course, level=0, balloon=None, scoreInit=0, scoreDiff=0):
self.level = level
self.balloon = [] if balloon is None else balloon
self.scoreInit = scoreInit
self.scoreDiff = scoreDiff
self.BPM = BPM
self.offset = offset
self.course = course
self.data = []
self.branches = {
'normal': [TJAMeasure()],
'advanced': [TJAMeasure()],
'master': [TJAMeasure()]
}
def __repr__(self):
return str(self.__dict__) if self.data else "{'data': []}"
class TJAMeasure:
def __init__(self, notes=None, events=None):
self.notes = [] if notes is None else notes
self.events = [] if events is None else events
self.combined = []
def __repr__(self):
return str(self.__dict__)
class TJAMeasureProcessed:
def __init__(self, bpm, scroll, gogo, barline, time_sig, subdivisions,
pos_start=0, pos_end=0, delay=0, branchStart=None, data=None):
self.bpm = bpm
self.scroll = scroll
self.gogo = gogo
self.barline = barline
self.time_sig = time_sig
self.subdivisions = subdivisions
self.pos_start = pos_start
self.pos_end = pos_end
self.delay = delay
self.branchStart = branchStart
self.data = [] if data is None else data
def __repr__(self):
return str(self.__dict__)
class TJAData:
def __init__(self, name, value, pos=None):
self.pos = pos
self.name = name
self.value = value
def __repr__(self):
return str(self.__dict__)
class FumenCourse:
def __init__(self, measures=None, hasBranches=False, scoreInit=0, scoreDiff=0,
order='<', headerPadding=None, headerMetadata=None, unknownMetadata=0):
if isinstance(measures, int):
self.measures = [FumenMeasure() for _ in range(measures)]
else:
self.measures = [] if measures is None else measures
self.hasBranches = hasBranches
self.scoreInit = scoreInit
self.scoreDiff = scoreDiff
self.order = order
self.headerPadding = simpleHeaders.copy()[0] if headerPadding is None else headerPadding
self.headerMetadata = sampleHeaderMetadata.copy() if headerMetadata is None else headerMetadata
self.unknownMetadata = unknownMetadata
def __repr__(self):
return str(self.__dict__)
class FumenMeasure:
def __init__(self, bpm=0.0, fumenOffsetStart=0.0, fumenOffsetEnd=0.0, duration=0.0,
gogo=False, barline=True, branchStart=None, branchInfo=None, padding1=0, padding2=0):
self.bpm = bpm
self.fumenOffsetStart = fumenOffsetStart
self.fumenOffsetEnd = fumenOffsetEnd
self.duration = duration
self.gogo = gogo
self.barline = barline
self.branchStart = branchStart
self.branchInfo = [-1, -1, -1, -1, -1, -1] if branchInfo is None else branchInfo
self.branches = {'normal': FumenBranch(), 'advanced': FumenBranch(), 'master': FumenBranch()}
self.padding1 = padding1
self.padding2 = padding2
def __repr__(self):
return str(self.__dict__)
class FumenBranch:
def __init__(self, length=0, speed=0.0, padding=0):
self.length = length
self.speed = speed
self.padding = padding
self.notes = []
def __repr__(self):
return str(self.__dict__)
class FumenNote:
def __init__(self, note_type='', pos=0.0, scoreInit=0, scoreDiff=0, padding=0, item=0, duration=0.0,
multimeasure=False, hits=0, hitsPadding=0, drumrollBytes=b'\x00\x00\x00\x00\x00\x00\x00\x00'):
self.note_type = note_type
self.pos = pos
self.scoreInit = scoreInit
self.scoreDiff = scoreDiff
self.padding = padding
# TODO: Determine how to properly set the item byte (https://github.com/vivaria/tja2fumen/issues/17)
self.item = item
# These attributes are only used for drumrolls/balloons
self.duration = duration
self.multimeasure = multimeasure
self.hits = hits
self.hitsPadding = hitsPadding
self.drumrollBytes = drumrollBytes
def __repr__(self):
return str(self.__dict__)

View File

@ -4,58 +4,58 @@ from tja2fumen.constants import branchNames, typeNotes
def writeFumen(path_out, song):
# Fetch the byte order (little/big endian)
order = song['order']
order = song.order
# Write the header
file = open(path_out, "wb")
file.write(song['headerPadding']) # Write header padding bytes
file.write(song['headerMetadata']) # Write header metadata bytes
file.write(song.headerPadding) # Write header padding bytes
file.write(song.headerMetadata) # Write header metadata bytes
# Preallocate space in the file
len_metadata = 8
len_measures = 0
for measureNumber in range(len(song['measures'])):
for measureNumber in range(len(song.measures)):
len_measures += 40
measure = song['measures'][measureNumber]
measure = song.measures[measureNumber]
for branchNumber in range(len(branchNames)):
len_measures += 8
branch = measure[branchNames[branchNumber]]
for noteNumber in range(branch['length']):
branch = measure.branches[branchNames[branchNumber]]
for noteNumber in range(branch.length):
len_measures += 24
note = branch[noteNumber]
if note['type'].lower() == "drumroll":
note = branch.notes[noteNumber]
if note.type.lower() == "drumroll":
len_measures += 8
file.write(b'\x00' * (len_metadata + len_measures))
# Write metadata
writeStruct(file, order, format_string="B", value_list=[putBool(song['branches'])], seek=0x1b0)
writeStruct(file, order, format_string="I", value_list=[len(song['measures'])], seek=0x200)
writeStruct(file, order, format_string="I", value_list=[song['unknownMetadata']], seek=0x204)
writeStruct(file, order, format_string="B", value_list=[putBool(song.hasBranches)], seek=0x1b0)
writeStruct(file, order, format_string="I", value_list=[len(song.measures)], seek=0x200)
writeStruct(file, order, format_string="I", value_list=[song.unknownMetadata], seek=0x204)
# Write measure data
file.seek(0x208)
for measureNumber in range(len(song['measures'])):
measure = song['measures'][measureNumber]
measureStruct = [measure['bpm'], measure['fumenOffsetStart'], int(measure['gogo']), int(measure['barline'])]
measureStruct.extend([measure['padding1']] + measure['branchInfo'] + [measure['padding2']])
for measureNumber in range(len(song.measures)):
measure = song.measures[measureNumber]
measureStruct = [measure.bpm, measure.fumenOffsetStart, int(measure.gogo), int(measure.barline)]
measureStruct.extend([measure.padding1] + measure.branchInfo + [measure.padding2])
writeStruct(file, order, format_string="ffBBHiiiiiii", value_list=measureStruct)
for branchNumber in range(len(branchNames)):
branch = measure[branchNames[branchNumber]]
branchStruct = [branch['length'], branch['padding'], branch['speed']]
branch = measure.branches[branchNames[branchNumber]]
branchStruct = [branch.length, branch.padding, branch.speed]
writeStruct(file, order, format_string="HHf", value_list=branchStruct)
for noteNumber in range(branch['length']):
note = branch[noteNumber]
noteStruct = [typeNotes[note['type']], note['pos'], note['item'], note['padding']]
for noteNumber in range(branch.length):
note = branch.notes[noteNumber]
noteStruct = [typeNotes[note.type], note.pos, note.item, note.padding]
# Balloon hits
if 'hits' in note.keys():
noteStruct.extend([note["hits"], note['hitsPadding']])
if note.hits:
noteStruct.extend([note.hits, note.hitsPadding])
else:
noteStruct.extend([note['scoreInit'], note['scoreDiff'] * 4])
noteStruct.extend([note.scoreInit, note.scoreDiff * 4])
# Drumroll or balloon duration
noteStruct.append(note['duration'])
noteStruct.append(note.duration)
writeStruct(file, order, format_string="ififHHf", value_list=noteStruct)
if note['type'].lower() == "drumroll":
file.write(note['drumrollBytes'])
if note.type.lower() == "drumroll":
file.write(note.drumrollBytes)
file.close()

View File

@ -65,20 +65,20 @@ def test_converted_tja_vs_cached_fumen(id_song, tmp_path, entry_point):
co_song = readFumen(path_out, exclude_empty_measures=True)
ca_song = readFumen(os.path.join(path_bin, os.path.basename(path_out)), exclude_empty_measures=True)
# 1. Check song headers
checkValidHeader(co_song['headerPadding']+co_song['headerMetadata'], strict=True)
checkValidHeader(ca_song['headerPadding']+ca_song['headerMetadata'])
checkValidHeader(co_song.headerPadding+co_song.headerMetadata, strict=True)
checkValidHeader(ca_song.headerPadding+ca_song.headerMetadata)
# 2. Check song metadata
assert_song_property(co_song, ca_song, 'order')
assert_song_property(co_song, ca_song, 'branches')
assert_song_property(co_song, ca_song, 'hasBranches')
assert_song_property(co_song, ca_song, 'scoreInit')
assert_song_property(co_song, ca_song, 'scoreDiff')
# 3. Check measure data
for i_measure in range(max([len(co_song['measures']), len(ca_song['measures'])])):
for i_measure in range(max([len(co_song.measures), len(ca_song.measures)])):
# NB: We could assert that len(measures) is the same for both songs, then iterate through zipped measures.
# But, if there is a mismatched number of measures, we want to know _where_ it occurs. So, we let the
# comparison go on using the max length of both songs until something else fails.
co_measure = co_song['measures'][i_measure]
ca_measure = ca_song['measures'][i_measure]
co_measure = co_song.measures[i_measure]
ca_measure = ca_song.measures[i_measure]
# 3a. Check measure metadata
assert_song_property(co_measure, ca_measure, 'bpm', i_measure, abs=0.01)
assert_song_property(co_measure, ca_measure, 'fumenOffsetStart', i_measure, abs=0.15)
@ -87,19 +87,19 @@ def test_converted_tja_vs_cached_fumen(id_song, tmp_path, entry_point):
assert_song_property(co_measure, ca_measure, 'branchInfo', i_measure)
# 3b. Check measure notes
for i_branch in ['normal', 'advanced', 'master']:
co_branch = co_measure[i_branch]
ca_branch = ca_measure[i_branch]
co_branch = co_measure.branches[i_branch]
ca_branch = ca_measure.branches[i_branch]
# NB: We check for branching before checking speed as fumens store speed changes even for empty branches
if co_branch['length'] == 0:
if co_branch.length == 0:
continue
assert_song_property(co_branch, ca_branch, 'speed', i_measure, i_branch)
# NB: We could assert that len(notes) is the same for both songs, then iterate through zipped notes.
# But, if there is a mismatched number of notes, we want to know _where_ it occurs. So, we let the
# comparison go on using the max length of both branches until something else fails.
for i_note in range(max([co_branch['length'], ca_branch['length']])):
co_note = co_branch[i_note]
ca_note = ca_branch[i_note]
assert_song_property(co_note, ca_note, 'type', i_measure, i_branch, i_note, func=normalize_type)
for i_note in range(max([co_branch.length, ca_branch.length])):
co_note = co_branch.notes[i_note]
ca_note = ca_branch.notes[i_note]
assert_song_property(co_note, ca_note, 'note_type', i_measure, i_branch, i_note, func=normalize_type)
assert_song_property(co_note, ca_note, 'pos', i_measure, i_branch, i_note, abs=0.1)
# NB: Drumroll duration doesn't always end exactly on a beat. Plus, TJA charters often eyeball
# drumrolls, leading them to be often off by a 1/4th/8th/16th/32th/etc. These charting errors
@ -110,7 +110,7 @@ def test_converted_tja_vs_cached_fumen(id_song, tmp_path, entry_point):
assert_song_property(co_note, ca_note, 'duration', i_measure, i_branch, i_note, abs=25.0)
except AssertionError:
pass
if ca_note['type'] not in ["Balloon", "Kusudama"]:
if ca_note.note_type not in ["Balloon", "Kusudama"]:
assert_song_property(co_note, ca_note, 'scoreInit', i_measure, i_branch, i_note)
assert_song_property(co_note, ca_note, 'scoreDiff', i_measure, i_branch, i_note)
# NB: 'item' still needs to be implemented: https://github.com/vivaria/tja2fumen/issues/17
@ -124,12 +124,14 @@ def assert_song_property(converted_obj, cached_obj, prop, measure=None, branch=N
msg_failure += f": measure '{measure+1}'" if measure is not None else ""
msg_failure += f", branch '{branch}'" if branch is not None else ""
msg_failure += f", note '{note+1}'" if note is not None else ""
converted_val = converted_obj.__getattribute__(prop)
cached_val = cached_obj.__getattribute__(prop)
if func:
assert func(converted_obj[prop]) == func(cached_obj[prop]), msg_failure
assert func(converted_val) == func(cached_val), msg_failure
elif abs:
assert converted_obj[prop] == pytest.approx(cached_obj[prop], abs=abs), msg_failure
assert converted_val == pytest.approx(cached_val, abs=abs), msg_failure
else:
assert converted_obj[prop] == cached_obj[prop], msg_failure
assert converted_val == cached_val, msg_failure
def normalize_type(note_type):