1
0
mirror of synced 2024-11-23 21:20:56 +01:00

Update codebase to adhere to flake8 and add linting check (#51)

This commit is contained in:
Viv 2023-07-21 22:57:18 -04:00 committed by GitHub
parent e1cd6f385d
commit 356bb7b036
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 601 additions and 425 deletions

View File

@ -1,3 +1,4 @@
#file: noinspection LongLine
name: "Test and publish release"
on:
@ -32,6 +33,9 @@ jobs:
run: |
pip install -e .[dev]
- name: Lint project
run: pflake8
- name: Run tests (Python API)
run: |
pytest testing --entry-point python-api

View File

@ -18,7 +18,8 @@ keywords = ["taiko", "tatsujin", "fumen", "TJA"]
tja2fumen = "tja2fumen:main"
[project.optional-dependencies]
dev = ["pytest", "build", "pyinstaller", "twine", "toml-cli"]
dev = ["pytest", "build", "pyinstaller", "twine", "toml-cli",
"flake8", "pyproject-flake8"]
[tool.setuptools.packages.find]
where = ["src"]
@ -26,3 +27,10 @@ where = ["src"]
[tool.pytest.ini_options]
addopts = "-vv --tb=short"
console_output_style = "count"
[tool.flake8]
exclude = "venv/"
per-file-ignores = """
./src/tja2fumen/types.py: E221
./testing/test_conversion.py: E221, E272
"""

View File

@ -26,9 +26,10 @@ def main(argv=None):
# Parse lines in TJA file
parsed_tja = parse_tja(fname_tja)
# Convert parsed TJA courses to Fumen data, and write each course to `.bin` files
# Convert parsed TJA courses and write each course to `.bin` files
for course in parsed_tja.courses.items():
convert_and_write(course, base_name, single_course=(len(parsed_tja.courses) == 1))
convert_and_write(course, base_name,
single_course=(len(parsed_tja.courses) == 1))
def convert_and_write(parsed_course, base_name, single_course=False):
@ -37,12 +38,12 @@ def convert_and_write(parsed_course, base_name, single_course=False):
# Add course ID (e.g. '_x', '_x_1', '_x_2') to the output file's base name
output_name = base_name
if single_course:
pass # Replicate tja2bin.exe behavior by excluding course ID if there's only one course
pass # Replicate tja2bin.exe behavior by excluding course ID
else:
split_name = course_name.split("P") # e.g. 'OniP2' -> ['Oni', '2'], 'Oni' -> ['Oni']
split_name = course_name.split("P") # e.g. 'OniP2' -> ['Oni', '2']
output_name += f"_{COURSE_IDS[split_name[0]]}"
if len(split_name) == 2:
output_name += f"_{split_name[1]}" # Add "_1" or "_2" if P1/P2 chart
output_name += f"_{split_name[1]}" # Add "_1"/"_2" if P1/P2 chart
write_fumen(f"{output_name}.bin", fumen_data)

View File

@ -5,23 +5,30 @@ from tja2fumen.types import TJAMeasureProcessed, FumenCourse, FumenNote
def process_tja_commands(tja):
"""
Merge TJA 'data' and 'event' fields into a single measure property, and split
measures into sub-measures whenever a mid-measure BPM/SCROLL/GOGO change occurs.
Merge TJA 'data' and 'event' fields into a single measure property, and
split measures into sub-measures whenever a mid-measure BPM/SCROLL/GOGO
change occurs.
The TJA parser produces measure objects with two important properties:
- 'data': Contains the note data (1: don, 2: ka, etc.) along with spacing (s)
- 'events' Contains event commands such as MEASURE, BPMCHANGE, GOGOTIME, etc.
- 'data': Contains the note data (1: don, 2: ka, etc.) along with
spacing (s)
- 'events' Contains event commands such as MEASURE, BPMCHANGE,
GOGOTIME, etc.
However, notes and events can be intertwined within a single measure. So, it's
not possible to process them separately; they must be considered as single sequence.
However, notes and events can be intertwined within a single measure. So,
it's not possible to process them separately; they must be considered as
single sequence.
A particular danger is BPM changes. TJA allows multiple BPMs within a single measure,
but the fumen format permits one BPM per measure. So, a TJA measure must be split up
if it has multiple BPM changes within a measure.
A particular danger is BPM changes. TJA allows multiple BPMs within a
single measure, but the fumen format permits one BPM per measure. So, a
TJA measure must be split up if it has multiple BPM changes within a
measure.
In the future, this logic should probably be moved into the TJA parser itself.
In the future, this logic should probably be moved into the TJA parser
itself.
"""
tja_branches_processed = {branch_name: [] for branch_name in tja.branches.keys()}
tja_branches_processed = {branch_name: []
for branch_name in tja.branches.keys()}
for branch_name, branch_measures_tja in tja.branches.items():
current_bpm = tja.BPM
current_scroll = 1.0
@ -44,7 +51,8 @@ def process_tja_commands(tja):
if data.name == 'note':
measure_tja_processed.data.append(data)
# Handle commands that can only be placed between measures (i.e. no mid-measure variations)
# Handle commands that can only be placed between measures
# (i.e. no mid-measure variations)
elif data.name == 'delay':
measure_tja_processed.delay = data.value * 1000 # ms -> s
elif data.name == 'branch_start':
@ -60,11 +68,14 @@ def process_tja_commands(tja):
continue
current_dividend = int(match_measure.group(1))
current_divisor = int(match_measure.group(2))
measure_tja_processed.time_sig = [current_dividend, current_divisor]
measure_tja_processed.time_sig = [current_dividend,
current_divisor]
# Handle commands that can be placed in the middle of a measure.
# NB: For fumen files, if there is a mid-measure change to BPM/SCROLL/GOGO, then the measure will
# actually be split into two small submeasures. So, we need to start a new measure in those cases.
# Handle commands that can be placed in the middle of a
# measure. (For fumen files, if there is a mid-measure change
# to BPM/SCROLL/GOGO, then the measure will actually be split
# into two small submeasures. So, we need to start a new
# measure in those cases.
elif data.name in ['bpm', 'scroll', 'gogo']:
# Parse the values
if data.name == 'bpm':
@ -74,13 +85,16 @@ def process_tja_commands(tja):
elif data.name == 'gogo':
new_val = current_gogo = bool(int(data.value))
# Check for mid-measure commands
# - Case 1: Command happens at the start of a measure; just change the value directly
# - Case 1: Command happens at the start of a measure;
# just change the value directly
if data.pos == 0:
measure_tja_processed.__setattr__(data.name, new_val)
# - Case 2: Command occurs mid-measure, so start a new sub-measure
# - Case 2: Command happens in the middle of a measure;
# start a new sub-measure
else:
measure_tja_processed.pos_end = data.pos
tja_branches_processed[branch_name].append(measure_tja_processed)
tja_branches_processed[branch_name]\
.append(measure_tja_processed)
measure_tja_processed = TJAMeasureProcessed(
bpm=current_bpm,
scroll=current_scroll,
@ -99,32 +113,47 @@ def process_tja_commands(tja):
has_branches = all(len(b) for b in tja_branches_processed.values())
if has_branches:
branch_lens = [len(b) for b in tja.branches.values()]
if not branch_lens.count(branch_lens[0]) == len(branch_lens):
raise ValueError("Branches do not have the same number of measures.")
else:
branch_corrected_lens = [len(b) for b in tja_branches_processed.values()]
if not branch_corrected_lens.count(branch_corrected_lens[0]) == len(branch_corrected_lens):
raise ValueError("Branches do not have matching GOGO/SCROLL/BPM commands.")
if len(set([len(b) for b in tja.branches.values()])) != 1:
raise ValueError(
"Branches do not have the same number of measures. (This "
"check was performed prior to splitting up the measures due "
"to mid-measure commands. Please check the number of ',' you"
"have in each branch.)"
)
if len(set([len(b) for b in tja_branches_processed.values()])) != 1:
raise ValueError(
"Branches do not have the same number of measures. (This "
"check was performed after splitting up the measures due "
"to mid-measure commands. Please check any GOGO, BPMCHANGE, "
"and SCROLL commands you have in your branches, and make sure"
"that each branch has the same number of commands.)"
)
return tja_branches_processed
def convert_tja_to_fumen(tja):
# Preprocess commands
processed_tja_branches = process_tja_commands(tja)
tja_branches_processed = process_tja_commands(tja)
# Pre-allocate the measures for the converted TJA
n_measures = len(tja_branches_processed['normal'])
fumen = FumenCourse(
measures=len(processed_tja_branches['normal']),
measures=n_measures,
score_init=tja.score_init,
score_diff=tja.score_diff,
)
# Set song metadata using information from the processed measures
fumen.header.b512_b515_number_of_measures = n_measures
fumen.header.b432_b435_has_branches = int(all(
[len(b) for b in tja_branches_processed.values()]
))
# Iterate through the different branches in the TJA
total_notes = {'normal': 0, 'professional': 0, 'master': 0}
for current_branch, branch_measures_tja_processed in processed_tja_branches.items():
if not len(branch_measures_tja_processed):
for current_branch, branch_tja in tja_branches_processed.items():
if not len(branch_tja):
continue
branch_points_total = 0
branch_points_measure = 0
@ -133,90 +162,86 @@ def convert_tja_to_fumen(tja):
course_balloons = tja.balloon.copy()
# Iterate through the measures within the branch
for idx_m, measure_tja_processed in enumerate(branch_measures_tja_processed):
for idx_m, measure_tja in enumerate(branch_tja):
# Fetch a pair of measures
measure_fumen_prev = fumen.measures[idx_m-1] if idx_m != 0 else None
measure_fumen_prev = fumen.measures[idx_m-1] if idx_m else None
measure_fumen = fumen.measures[idx_m]
# Copy over basic measure properties from the TJA (that don't depend on notes or commands)
measure_fumen.branches[current_branch].speed = measure_tja_processed.scroll
measure_fumen.gogo = measure_tja_processed.gogo
measure_fumen.bpm = measure_tja_processed.bpm
# Copy over basic measure properties from the TJA
measure_fumen.branches[current_branch].speed = measure_tja.scroll
measure_fumen.gogo = measure_tja.gogo
measure_fumen.bpm = measure_tja.bpm
# Compute the duration of the measure
# First, we compute the duration for a full 4/4 measure
measure_duration_full_measure = 4 * 60_000 / measure_tja_processed.bpm
# Next, we adjust this duration based on both:
# 1. The *actual* measure size (e.g. #MEASURE 1/8, #MEASURE 5/4, etc.)
measure_size = measure_tja_processed.time_sig[0] / measure_tja_processed.time_sig[1]
# 2. Whether this is a "submeasure" (i.e. it contains mid-measure commands, which split up the measure)
# - If this is a submeasure, then `measure_length` will be less than the total number of subdivisions.
measure_length = measure_tja_processed.pos_end - measure_tja_processed.pos_start
# - In other words, `measure_ratio` will be less than 1.0:
measure_ratio = (1.0 if measure_tja_processed.subdivisions == 0.0 # Avoid division by 0 for empty measures
else (measure_length / measure_tja_processed.subdivisions))
# Apply the 2 adjustments to the measure duration
measure_fumen.duration = measure_duration = measure_duration_full_measure * measure_size * measure_ratio
# 1. The *actual* measure size (e.g. #MEASURE 1/8, 5/4, etc.)
# 2. Whether this is a "submeasure" (i.e. whether it contains
# mid-measure commands, which split up the measure)
# - If this is a submeasure, then `measure_length` will be
# less than the total number of subdivisions.
# - In other words, `measure_ratio` will be less than 1.0.
measure_duration_full_measure = (240000 / measure_fumen.bpm)
measure_size = (measure_tja.time_sig[0] / measure_tja.time_sig[1])
measure_length = (measure_tja.pos_end - measure_tja.pos_start)
measure_ratio = (
1.0 if measure_tja.subdivisions == 0.0 # Avoid "/0"
else (measure_length / measure_tja.subdivisions)
)
measure_fumen.duration = (measure_duration_full_measure
* measure_size * measure_ratio)
# Compute the millisecond offsets for the start and end of each measure
# - Start: When the notes first appear on screen (to the right)
# - End: When the notes arrive at the judgment line, and the note gets hit.
# Compute the millisecond offsets for the start of each measure
# First, start the measure using the end timing of the
# previous measure (plus any #DELAY commands)
# Next, adjust the start timing to account for #BPMCHANGE
# commands (!!! Discovered by tana :3 !!!)
if idx_m == 0:
measure_fumen.fumen_offset_start = (tja.offset * 1000 * -1) - measure_duration_full_measure
measure_fumen.offset_start = (
(tja.offset * 1000 * -1) - measure_duration_full_measure
)
else:
# First, start the measure using the end timing of the previous measure (plus any #DELAY commands)
measure_fumen.fumen_offset_start = measure_fumen_prev.fumen_offset_end + measure_tja_processed.delay
# Next, adjust the start timing to account for #BPMCHANGE commands (!!! Discovered by tana :3 !!!)
# To understand what's going on here, imagine the following simple example:
# * You have a very slow-moving note (i.e. low BPM), like the big DON in Donkama 2000.
# * All the other notes move fast (i.e. high BPM), moving past the big slow note.
# * To get this overlapping to work, you need the big slow note to START EARLY, but also END LATE:
# - An early start means you need to subtract a LOT of time from the starting fumenOffset.
# - Thankfully, the low BPM of the slow note will create a HUGE `measure_offset_adjustment`,
# since we are dividing by the BPMs, and dividing by a small number will result in a big number.
measure_offset_adjustment = (4 * 60_000 / measure_fumen.bpm) - (4 * 60_000 / measure_fumen_prev.bpm)
# - When we subtract this adjustment from the fumen_offset_start, we get the "START EARLY" part:
measure_fumen.fumen_offset_start -= measure_offset_adjustment
# - The low BPM of the slow note will also create a HUGE measure duration.
# - When we add this long duration to the EARLY START, we end up with the "END LATE" part:
measure_fumen.fumen_offset_end = measure_fumen.fumen_offset_start + measure_fumen.duration
measure_fumen.offset_start = measure_fumen_prev.offset_end
measure_fumen.offset_start += measure_tja.delay
measure_fumen.offset_start += (240000 / measure_fumen_prev.bpm)
measure_fumen.offset_start -= (240000 / measure_fumen.bpm)
# Best guess at what 'barline' status means for each measure:
# - 'True' means the measure lands on a barline (i.e. most measures), and thus barline should be shown
# - 'False' means that the measure doesn't land on a barline, and thus barline should be hidden.
# For example:
# Compute the millisecond offset for the end of each measure
measure_fumen.offset_end = (measure_fumen.offset_start +
measure_fumen.duration)
# Handle whether barline should be hidden:
# 1. Measures where #BARLINEOFF has been set
# 2. Sub-measures that don't fall on the barline
if measure_tja_processed.barline is False or (measure_ratio != 1.0 and measure_tja_processed.pos_start != 0):
barline_off = measure_tja.barline is False
is_submeasure = (measure_ratio != 1.0 and
measure_tja.pos_start != 0)
if barline_off or is_submeasure:
measure_fumen.barline = False
# If a #SECTION command occurs in isolation, and it has a valid condition, then treat it like a branch_start
if (measure_tja_processed.section is not None and measure_tja_processed.section != 'not_available'
and not measure_tja_processed.branch_start):
branch_condition = measure_tja_processed.section
# If a #SECTION command occurs in isolation, and it has a valid
# condition, then treat it like a branch_start
if (measure_tja.section is not None
and measure_tja.section != 'not_available'
and not measure_tja.branch_start):
branch_condition = measure_tja.section
else:
branch_condition = measure_tja_processed.branch_start
branch_condition = measure_tja.branch_start
# Check to see if the measure contains a branching condition
if branch_condition:
# Determine which values to assign based on the type of branching condition
# Handle branch conditions for percentage accuracy
# There are three cases for interpreting #BRANCHSTART p:
# 1. Percentage is between 0% and 100%
# 2. Percentage is above 100% (guaranteed level down)
# 3. Percentage is 0% (guaranteed level up)
if branch_condition[0] == 'p':
vals = []
for percent in branch_condition[1:]:
# Ensure percentage is between 0% and 100%
if 0 < percent <= 1:
# If there is a proper branch condition, make sure that drumrolls do not contribute
fumen.header.b480_b483_branch_points_drumroll = 0
fumen.header.b492_b495_branch_points_drumroll_big = 0
val = branch_points_total * percent
# If the result is very close, then round to account for lack of precision in percentage
if abs(val - round(val)) < 0.1:
val = round(val)
vals.append(int(val))
# If it is above 100%, then it means a guaranteed "level down". Fumens use 999 for this.
vals.append(int(branch_points_total * percent))
elif percent > 1:
vals.append(999)
# If it is below 0%, it is a guaranteed "level up". Fumens use 0 for this.
else:
vals.append(0)
if current_branch == 'normal':
@ -226,120 +251,164 @@ def convert_tja_to_fumen(tja):
elif current_branch == 'master':
measure_fumen.branch_info[4:6] = vals
# If it's a drumroll, then the values to use depends on whether there is a #SECTION in the same measure
# - If there is a #SECTION, then accuracy is reset, so repeat the same condition for all 3 branches
# - If there isn't a #SECTION, but it's the first branch condition, repeat for all 3 branches as well
# - If there isn't a #SECTION, and there are previous branch conditions, the outcomes now matter:
# * If the player failed to go from Normal -> Advanced/Master, then they must stay in Normal,
# hence the 999 values (which force them to stay in Normal)
# * If the player made it to Advanced, then both condition values still apply (for either
# staying in Advanced or leveling up to Master)
# * If the player made it to Master, then only use the "master condition" value (2), otherwise
# they fall back to Normal.
# - The "no-#SECTION" behavior can be seen in songs like "Shoutoku Taiko no 「Hi Izuru Made Asuka」"
# Handle branch conditions for drumroll accuracy
# There are three cases for interpreting #BRANCHSTART r:
# 1. It's the first branching condition.
# 2. It's not the first branching condition, but it
# has a #SECTION command to reset the accuracy.
# 3. It's not the first branching condition, and it
# doesn't have a #SECTION command.
# For the first two cases, the branching conditions are the
# same no matter what branch you're currently on, so we just
# use the values as-is: [c1, c2, c1, c2, c1, c2]
# But, for the third case, since there is no #SECTION, the
# accuracy is not reset. This results in the following
# condition: [999, 999, c1, c2, c2, c2]
# - Normal can't advance to professional/master
# - Professional can stay, or advance to master.
# - Master can only stay in master.
elif branch_condition[0] == 'r':
# Ensure that only drumrolls contribute to the branching accuracy check
fumen.header.b468_b471_branch_points_good = 0
fumen.header.b484_b487_branch_points_good_big = 0
fumen.header.b472_b475_branch_points_ok = 0
fumen.header.b488_b491_branch_points_ok_big = 0
fumen.header.b496_b499_branch_points_balloon = 0
fumen.header.b500_b503_branch_points_kusudama = 0
if current_branch == 'normal':
measure_fumen.branch_info[0:2] = (branch_condition[1:] if measure_tja_processed.section or
not measure_tja_processed.section and not branch_conditions
else [999, 999])
elif current_branch == 'professional':
measure_fumen.branch_info[2:4] = branch_condition[1:]
elif current_branch == 'master':
measure_fumen.branch_info[4:6] = (branch_condition[1:] if measure_tja_processed.section or
not measure_tja_processed.section and not branch_conditions
else [branch_condition[2]] * 2)
is_first_branch_condition = not branch_conditions
has_section = bool(measure_tja.section)
if is_first_branch_condition or has_section:
measure_fumen.branch_info = branch_condition[1:] * 3
else:
measure_fumen.branch_info = (
[999, 999] +
[branch_condition[1]] +
[branch_condition[2]] * 3
)
# Reset the points corresponding to this branch (i.e. reset the accuracy)
# Reset the points to prepare for the next #BRANCHSTART p
branch_points_total = 0
# Keep track of branch conditions (to later determine how to set the header bytes for branches)
# Keep track of the branch conditions (to later determine how
# to set the header bytes for branches)
branch_conditions.append(branch_condition)
# NB: We update the branch condition note counter *after* we check the current measure's branch condition.
# NB: We update the branch condition note counter *after*
# we check the current measure's branch condition.
# This is because the TJA spec says:
# "The requirement is calculated one measure before #BRANCHSTART, changing the branch visually when it
# "The requirement is calculated one measure before
# #BRANCHSTART, changing the branch visually when it
# is calculated and changing the notes after #BRANCHSTART."
# So, by delaying the summation by one measure, we perform the calculation with notes "one measure before".
# So, by delaying the summation by one measure, we perform the
# calculation with notes "one measure before".
branch_points_total += branch_points_measure
# Create notes based on TJA measure data
branch_points_measure = 0
note_counter = 0
for idx_d, data in enumerate(measure_tja_processed.data):
if data.name == 'note':
note = FumenNote()
# Note positions must be calculated using the base measure duration (that uses a single BPM value)
# (In other words, note positions do not take into account any mid-measure BPM change adjustments.)
note.pos = measure_duration * (data.pos - measure_tja_processed.pos_start) / measure_length
# Handle the note that represents the end of a drumroll/balloon
if data.value == "EndDRB":
# If a drumroll spans a single measure, then add the difference between start/end position
if not current_drumroll.multimeasure:
current_drumroll.duration += (note.pos - current_drumroll.pos)
# Otherwise, if a drumroll spans multiple measures, then we want to add the duration between
# the start of the measure (i.e. pos=0.0) and the drumroll's end position.
else:
current_drumroll.duration += (note.pos - 0.0)
# 1182, 1385, 1588, 2469, 1568, 752, 1568
current_drumroll.duration = float(int(current_drumroll.duration))
current_drumroll = None
continue
# The TJA spec technically allows you to place double-Kusudama notes:
# "Use another 9 to specify when to lower the points for clearing."
# But this is unsupported in fumens, so just skip the second Kusudama note.
if data.value == "Kusudama" and current_drumroll:
continue
# Handle the remaining non-EndDRB, non-double Kusudama notes
note.type = data.value
note.score_init = tja.score_init
note.score_diff = tja.score_diff
# Handle drumroll/balloon-specific metadata
if note.type in ["Balloon", "Kusudama"]:
note.hits = course_balloons.pop(0)
current_drumroll = note
total_notes[current_branch] -= 1
if note.type in ["Drumroll", "DRUMROLL"]:
current_drumroll = note
total_notes[current_branch] -= 1
measure_fumen.branches[current_branch].notes.append(note)
note_counter += 1
for idx_d, data in enumerate(measure_tja.data):
# Compute the ms position of the note
pos_ratio = ((data.pos - measure_tja.pos_start)
/ measure_length)
note_pos = (measure_fumen.duration * pos_ratio)
# Track branch points for the current measure, to later compute `#BRANCHSTART p` bytes
if note.type in ['Don', 'Ka']:
branch_points_measure += fumen.header.b468_b471_branch_points_good
elif note.type in ['DON', 'KA']:
branch_points_measure += fumen.header.b484_b487_branch_points_good_big
elif note.type == 'Balloon':
branch_points_measure += fumen.header.b496_b499_branch_points_balloon
elif note.type == 'Kusudama':
branch_points_measure += fumen.header.b500_b503_branch_points_kusudama
# Handle '8' notes (end of a drumroll/balloon)
if data.value == "EndDRB":
# If a drumroll spans a single measure, then add the
# difference between start/end position
if not current_drumroll.multimeasure:
current_drumroll.duration += (note_pos -
current_drumroll.pos)
# Otherwise, if a drumroll spans multiple measures,
# then we want to add the duration between the start
# of the measure and the drumroll's end position.
else:
current_drumroll.duration += (note_pos - 0.0)
current_drumroll.duration = float(int(
current_drumroll.duration
))
current_drumroll = None
continue
# If drumroll hasn't ended by the end of this measure, increase duration by measure timing
if current_drumroll:
if current_drumroll.duration == 0.0:
current_drumroll.duration += (measure_duration - current_drumroll.pos)
current_drumroll.multimeasure = True
# The TJA spec technically allows you to place
# double-Kusudama notes. But this is unsupported in
# fumens, so just skip the second Kusudama note.
if data.value == "Kusudama" and current_drumroll:
continue
# Handle note metadata
note = FumenNote()
note.pos = note_pos
note.type = data.value
note.score_init = tja.score_init
note.score_diff = tja.score_diff
# Handle drumroll notes
if note.type in ["Balloon", "Kusudama"]:
note.hits = course_balloons.pop(0)
current_drumroll = note
elif note.type in ["Drumroll", "DRUMROLL"]:
current_drumroll = note
# Track Don/Ka notes (to later compute header values)
elif note.type.lower() in ['don', 'ka']:
total_notes[current_branch] += 1
# Track branch points (to later compute `#BRANCHSTART p` vals)
if note.type in ['Don', 'Ka']:
pts_to_add = fumen.header.b468_b471_branch_points_good
elif note.type in ['DON', 'KA']:
pts_to_add = fumen.header.b484_b487_branch_points_good_big
elif note.type == 'Balloon':
pts_to_add = fumen.header.b496_b499_branch_points_balloon
elif note.type == 'Kusudama':
pts_to_add = fumen.header.b500_b503_branch_points_kusudama
else:
current_drumroll.duration += measure_duration
pts_to_add = 0 # Drumrolls not relevant for `p` conditions
branch_points_measure += pts_to_add
measure_fumen.branches[current_branch].length = note_counter
total_notes[current_branch] += note_counter
# Add the note to the branch for this measure
measure_fumen.branches[current_branch].notes.append(note)
measure_fumen.branches[current_branch].length += 1
# Set song-specific metadata
fumen.header.b512_b515_number_of_measures = len(fumen.measures)
fumen.header.b432_b435_has_branches = int(all([len(b) for b in processed_tja_branches.values()]))
# If drumroll hasn't ended by this measure, increase duration
if current_drumroll:
# If drumroll spans multiple measures, add full duration
if current_drumroll.multimeasure:
current_drumroll.duration += measure_fumen.duration
# Otherwise, add the partial duration spanned by the drumroll
else:
current_drumroll.multimeasure = True
current_drumroll.duration += (measure_fumen.duration -
current_drumroll.pos)
# Compute the header bytes that dictate the soul gauge bar behavior
fumen.header.set_hp_bytes(total_notes['normal'], tja.course, tja.level)
# Compute the ratio between normal and professional/master branches (just in case the note counts differ)
# If song has only drumroll branching conditions (also allowing percentage
# conditions that force a level up/level down), then set the header bytes
# so that only drumrolls contribute to branching.
drumroll_only = branch_conditions != [] and all([
(cond[0] == 'r') or
(cond[0] == 'p' and cond[1] == 0.0 and cond[2] == 0.0) or
(cond[0] == 'p' and cond[1] > 1.00 and cond[2] > 1.00)
for cond in branch_conditions
])
if drumroll_only:
fumen.header.b468_b471_branch_points_good = 0
fumen.header.b484_b487_branch_points_good_big = 0
fumen.header.b472_b475_branch_points_ok = 0
fumen.header.b488_b491_branch_points_ok_big = 0
fumen.header.b496_b499_branch_points_balloon = 0
fumen.header.b500_b503_branch_points_kusudama = 0
# Alternatively, if the song has only percentage-based conditions, then set
# the header bytes so that only notes and balloons contribute to branching.
percentage_only = branch_conditions != [] and all([
(condition[0] != 'r')
for condition in branch_conditions
])
if percentage_only:
fumen.header.b480_b483_branch_points_drumroll = 0
fumen.header.b492_b495_branch_points_drumroll_big = 0
# Compute the ratio between normal and professional/master branches
if total_notes['professional']:
fumen.header.b460_b463_normal_professional_ratio = int(65536 * (total_notes['normal'] / total_notes['professional']))
fumen.header.b460_b463_normal_professional_ratio = \
int(65536 * (total_notes['normal'] / total_notes['professional']))
if total_notes['master']:
fumen.header.b464_b467_normal_master_ratio = int(65536 * (total_notes['normal'] / total_notes['master']))
fumen.header.b464_b467_normal_master_ratio = \
int(65536 * (total_notes['normal'] / total_notes['master']))
return fumen

View File

@ -2,14 +2,15 @@ import os
import re
from copy import deepcopy
from tja2fumen.utils import read_struct, short_hex
from tja2fumen.constants import NORMALIZE_COURSE, TJA_NOTE_TYPES, BRANCH_NAMES, FUMEN_NOTE_TYPES
from tja2fumen.types import (TJASong, TJAMeasure, TJAData,
FumenCourse, FumenMeasure, FumenBranch, FumenNote, FumenHeader)
from tja2fumen.utils import read_struct
from tja2fumen.types import (TJASong, TJAMeasure, TJAData, FumenCourse,
FumenMeasure, FumenBranch, FumenNote, FumenHeader)
from tja2fumen.constants import (NORMALIZE_COURSE, TJA_NOTE_TYPES,
BRANCH_NAMES, FUMEN_NOTE_TYPES)
########################################################################################################################
# TJA-parsing functions ( Original source: https://github.com/WHMHammer/tja-tools/blob/master/src/js/parseTJA.js)
########################################################################################################################
###############################################################################
# TJA-parsing functions #
###############################################################################
def parse_tja(fname_tja):
@ -56,54 +57,69 @@ def get_course_data(lines):
if current_course not in parsed_tja.courses.keys():
raise ValueError()
elif name_upper == 'LEVEL':
parsed_tja.courses[current_course].level = int(value) if value else 0
# NB: If there are multiple SCOREINIT/SCOREDIFF values, use the last one (shinuti)
parsed_tja.courses[current_course].level = \
int(value) if value else 0
elif name_upper == 'SCOREINIT':
parsed_tja.courses[current_course].score_init = int(value.split(",")[-1]) if value else 0
parsed_tja.courses[current_course].score_init = \
int(value.split(",")[-1]) if value else 0
elif name_upper == 'SCOREDIFF':
parsed_tja.courses[current_course].score_diff = int(value.split(",")[-1]) if value else 0
parsed_tja.courses[current_course].score_diff = \
int(value.split(",")[-1]) if value else 0
elif name_upper == 'BALLOON':
if value:
balloons = [int(v) for v in value.split(",") if v]
parsed_tja.courses[current_course].balloon = balloons
elif name_upper == 'STYLE':
# Reset the course name to remove "P1/P2" that may have been added by a previous STYLE:DOUBLE chart
# Reset the course name to remove "P1/P2" that may have been
# added by a previous STYLE:DOUBLE chart
if value == 'Single':
current_course = current_course_cached
else:
pass # Ignore other header fields such as 'TITLE', 'SUBTITLE', 'WAVE', etc.
pass # Ignore 'TITLE', 'SUBTITLE', 'WAVE', etc.
# Case 2: Commands and note data (to be further processed course-by-course later on)
# Case 2: Commands and note data (to be further processed
# course-by-course later on)
elif not re.match(r"//.*", line): # Exclude comment-only lines ('//')
match_command = re.match(r"^#([A-Z]+)(?:\s+(.+))?", line)
match_notes = re.match(r"^(([0-9]|A|B|C|F|G)*,?).*$", line)
if match_command:
name_upper = match_command.group(1).upper()
value = match_command.group(2).strip() if match_command.group(2) else ''
# For STYLE:Double, #START P1/P2 indicates the start of a new chart
# But, we want multiplayer charts to inherit the metadata from the course as a whole, so we deepcopy
value = (match_command.group(2).strip()
if match_command.group(2) else '')
# For STYLE:Double, #START P1/P2 indicates the start of a new
# chart. But, we want multiplayer charts to inherit the
# metadata from the course as a whole, so we deepcopy.
if name_upper == "START":
if value in ["P1", "P2"]:
current_course = current_course_cached + value
parsed_tja.courses[current_course] = deepcopy(parsed_tja.courses[current_course_cached])
parsed_tja.courses[current_course].data = list() # Keep the metadata, but reset the note data
value = '' # Once we've made the new course, we can reset this to a normal #START command
parsed_tja.courses[current_course] = \
deepcopy(parsed_tja.courses[current_course_cached])
parsed_tja.courses[current_course].data = list()
# Once we've made the new course, we can reset
# #START P1/P2 to a normal #START command
value = ''
elif value:
raise ValueError(f"Invalid value '{value}' for #START command.")
raise ValueError(f"Invalid value '{value}' for "
f"#START command.")
elif match_notes:
name_upper = 'NOTES'
value = match_notes.group(1)
parsed_tja.courses[current_course].data.append(TJAData(name_upper, value))
parsed_tja.courses[current_course].data.append(
TJAData(name_upper, value)
)
# If a course has no song data, then this is likely because the course has "STYLE: Double" but no "STYLE: Single".
# To fix this, we copy over the P1 chart from "STYLE: Double" to fill the "STYLE: Single" role.
# If a course has no song data, then this is likely because the course has
# "STYLE: Double" but no "STYLE: Single". To fix this, we copy over the P1
# chart from "STYLE: Double" to fill the "STYLE: Single" role.
for course_name, course in parsed_tja.courses.items():
if not course.data:
if course_name+"P1" in parsed_tja.courses.keys():
parsed_tja.courses[course_name] = deepcopy(parsed_tja.courses[course_name+"P1"])
parsed_tja.courses[course_name] = \
deepcopy(parsed_tja.courses[course_name+"P1"])
# Remove any charts (e.g. P1/P2) not present in the TJA file
for course_name in [k for k, v in parsed_tja.courses.items() if not v.data]:
# Remove any charts (e.g. P1/P2) not present in the TJA file (empty data)
for course_name in [k for k, v in parsed_tja.courses.items()
if not v.data]:
del parsed_tja.courses[course_name]
return parsed_tja
@ -111,7 +127,8 @@ def get_course_data(lines):
def parse_course_measures(course):
# Check if the course has branches or not
has_branches = True if [l for l in course.data if l.name == 'BRANCHSTART'] else False
has_branches = (True if [d for d in course.data if d.name == 'BRANCHSTART']
else False)
current_branch = 'all' if has_branches else 'normal'
branch_condition = None
flag_levelhold = False
@ -123,22 +140,29 @@ def parse_course_measures(course):
# 1. Parse measure notes
if line.name == 'NOTES':
notes = line.value
# If measure has ended, then add notes to the current measure, then start a new one by incrementing idx_m
# If measure has ended, then add notes to the current measure,
# then start a new measure by incrementing idx_m
if notes.endswith(','):
for branch in course.branches.keys() if current_branch == 'all' else [current_branch]:
for branch in (course.branches.keys()
if current_branch == 'all'
else [current_branch]):
course.branches[branch][idx_m].notes += notes[0:-1]
course.branches[branch].append(TJAMeasure())
idx_m += 1
# Otherwise, keep adding notes to the current measure ('idx_m')
else:
for branch in course.branches.keys() if current_branch == 'all' else [current_branch]:
for branch in (course.branches.keys()
if current_branch == 'all'
else [current_branch]):
course.branches[branch][idx_m].notes += notes
# 2. Parse measure commands that produce an "event"
elif line.name in ['GOGOSTART', 'GOGOEND', 'BARLINEON', 'BARLINEOFF', 'DELAY',
'SCROLL', 'BPMCHANGE', 'MEASURE', 'SECTION', 'BRANCHSTART']:
elif line.name in ['GOGOSTART', 'GOGOEND', 'BARLINEON', 'BARLINEOFF',
'DELAY', 'SCROLL', 'BPMCHANGE', 'MEASURE',
'SECTION', 'BRANCHSTART']:
# Get position of the event
for branch in course.branches.keys() if current_branch == 'all' else [current_branch]:
for branch in (course.branches.keys() if current_branch == 'all'
else [current_branch]):
pos = len(course.branches[branch][idx_m].notes)
# Parse event type
@ -163,30 +187,35 @@ def parse_course_measures(course):
current_event = TJAData('section', 'not_available', pos)
else:
current_event = TJAData('section', branch_condition, pos)
# If the command immediately after #SECTION is #BRANCHSTART, then we need to make sure that #SECTION
# is put on every branch. (We can't do this unconditionally because #SECTION commands can also exist
# in isolation in the middle of separate branches.)
# If the command immediately after #SECTION is #BRANCHSTART,
# then we need to make sure that #SECTION is put on every
# branch. (We can't do this unconditionally because #SECTION
# commands can also exist in isolation.)
if course.data[idx_l+1].name == 'BRANCHSTART':
current_branch = 'all'
elif line.name == 'BRANCHSTART':
if flag_levelhold:
continue
current_branch = 'all' # Ensure that the #BRANCHSTART command is present for all branches
# Ensure that the #BRANCHSTART command is added to all branches
current_branch = 'all'
branch_condition = line.value.split(',')
if branch_condition[0] == 'r': # r = drumRoll
branch_condition[1] = int(branch_condition[1]) # # of drumrolls
branch_condition[2] = int(branch_condition[2]) # # of drumrolls
branch_condition[1] = int(branch_condition[1]) # drumrolls
branch_condition[2] = int(branch_condition[2]) # drumrolls
elif branch_condition[0] == 'p': # p = Percentage
branch_condition[1] = float(branch_condition[1]) / 100 # %
branch_condition[2] = float(branch_condition[2]) / 100 # %
current_event = TJAData('branch_start', branch_condition, pos)
idx_m_branchstart = idx_m # Preserve the index of the BRANCHSTART command to re-use for each branch
# Preserve the index of the BRANCHSTART command to re-use
idx_m_branchstart = idx_m
# Append event to the current measure's events
for branch in course.branches.keys() if current_branch == 'all' else [current_branch]:
for branch in (course.branches.keys() if current_branch == 'all'
else [current_branch]):
course.branches[branch][idx_m].events.append(current_event)
# 3. Parse commands that don't create an event (e.g. simply changing the current branch)
# 3. Parse commands that don't create an event
# (e.g. simply changing the current branch)
else:
if line.name == 'START' or line.name == 'END':
current_branch = 'all' if has_branches else 'normal'
@ -208,7 +237,8 @@ def parse_course_measures(course):
else:
print(f"Ignoring unsupported command '{line.name}'")
# Delete the last measure in the branch if no notes or events were added to it (due to preallocating empty measures)
# Delete the last measure in the branch if no notes or events
# were added to it (due to preallocating empty measures)
for branch in course.branches.values():
if not branch[-1].notes and not branch[-1].events:
del branch[-1]
@ -232,27 +262,22 @@ def parse_course_measures(course):
# Ensure all branches have the same number of measures
if has_branches:
branch_lens = [len(b) for b in course.branches.values()]
if not branch_lens.count(branch_lens[0]) == len(branch_lens):
raise ValueError("Branches do not have the same number of measures.")
if len(set([len(b) for b in course.branches.values()])) != 1:
raise ValueError(
"Branches do not have the same number of measures. (This "
"check was performed prior to splitting up the measures due "
"to mid-measure commands. Please check the number of ',' you"
"have in each branch.)"
)
########################################################################################################################
# Fumen-parsing functions
########################################################################################################################
# Fumen format reverse engineering TODOs
# TODO: Figure out what drumroll bytes are (8 bytes after every drumroll)
# NB: fumen2osu.py assumed these were padding bytes, but they're not!! They contain some sort of metadata.
# TODO: Figure out what the unknown Wii1, Wii4, and PS4 notes represent (just in case they're important somehow)
###############################################################################
# Fumen-parsing functions #
###############################################################################
def read_fumen(fumen_file, exclude_empty_measures=False):
"""
Parse bytes of a fumen .bin file into nested measure, branch, and note dictionaries.
For more information on any of the terms used in this function (e.g. score_init, score_diff),
please refer to KatieFrog's excellent guide: https://gist.github.com/KatieFrogs/e000f406bbc70a12f3c34a07303eec8b
Parse bytes of a fumen .bin file into nested measures, branches, and notes.
"""
file = open(fumen_file, "rb")
size = os.fstat(file.fileno()).st_size
@ -264,19 +289,20 @@ def read_fumen(fumen_file, exclude_empty_measures=False):
for measure_number in range(song.header.b512_b515_number_of_measures):
# Parse the measure data using the following `format_string`:
# "ffBBHiiiiiii" (12 format characters, 40 bytes per measure)
# - 'f': BPM (represented by one float (4 bytes))
# - 'f': fumenOffset (represented by one float (4 bytes))
# - 'B': gogo (represented by one unsigned char (1 byte))
# - 'B': barline (represented by one unsigned char (1 byte))
# - 'H': <padding> (represented by one unsigned short (2 bytes))
# - 'iiiiii': branch_info (represented by six integers (24 bytes))
# - 'i': <padding> (represented by one integer (4 bytes)
measure_struct = read_struct(file, song.header.order, format_string="ffBBHiiiiiii")
# - 'f': BPM (one float (4 bytes))
# - 'f': fumenOffset (one float (4 bytes))
# - 'B': gogo (one unsigned char (1 byte))
# - 'B': barline (one unsigned char (1 byte))
# - 'H': <padding> (one unsigned short (2 bytes))
# - 'iiiiii': branch_info (six integers (24 bytes))
# - 'i': <padding> (one integer (4 bytes)
measure_struct = read_struct(file, song.header.order,
format_string="ffBBHiiiiiii")
# Create the measure dictionary using the newly-parsed measure data
measure = FumenMeasure(
bpm=measure_struct[0],
fumen_offset_start=measure_struct[1],
offset_start=measure_struct[1],
gogo=measure_struct[2],
barline=measure_struct[3],
padding1=measure_struct[4],
@ -288,10 +314,11 @@ def read_fumen(fumen_file, exclude_empty_measures=False):
for branch_name in BRANCH_NAMES:
# Parse the measure data using the following `format_string`:
# "HHf" (3 format characters, 8 bytes per branch)
# - 'H': total_notes (represented by one unsigned short (2 bytes))
# - 'H': <padding> (represented by one unsigned short (2 bytes))
# - 'f': speed (represented by one float (4 bytes)
branch_struct = read_struct(file, song.header.order, format_string="HHf")
# - 'H': total_notes ( one unsigned short (2 bytes))
# - 'H': <padding> ( one unsigned short (2 bytes))
# - 'f': speed ( one float (4 bytes)
branch_struct = read_struct(file, song.header.order,
format_string="HHf")
# Create the branch dictionary using the newly-parsed branch data
total_notes = branch_struct[0]
@ -313,17 +340,11 @@ def read_fumen(fumen_file, exclude_empty_measures=False):
# - 'H': score_diff
# - 'f': duration
# NB: 'item' doesn't seem to be used at all in this function.
note_struct = read_struct(file, song.header.order, format_string="ififHHf")
# Validate the note type
note_type = note_struct[0]
if note_type not in FUMEN_NOTE_TYPES:
raise ValueError("Error: Unknown note type '{0}' at offset {1}".format(
short_hex(note_type).upper(),
hex(file.tell() - 0x18))
)
note_struct = read_struct(file, song.header.order,
format_string="ififHHf")
# Create the note dictionary using the newly-parsed note data
note_type = note_struct[0]
note = FumenNote(
note_type=FUMEN_NOTE_TYPES[note_type],
pos=note_struct[1],
@ -342,7 +363,7 @@ def read_fumen(fumen_file, exclude_empty_measures=False):
# Drumroll/balloon duration
note.duration = note_struct[6]
# Seek forward 8 bytes to account for padding bytes at the end of drumrolls
# Account for padding at the end of drumrolls
if note_type == 0x6 or note_type == 0x9 or note_type == 0x62:
note.drumroll_bytes = file.read(8)
@ -359,12 +380,16 @@ def read_fumen(fumen_file, exclude_empty_measures=False):
file.close()
# NB: Official fumens often include empty measures as a way of inserting barlines for visual effect.
# But, TJA authors tend not to add these empty measures, because even without them, the song plays correctly.
# So, in tests, if we want to only compare the timing of the non-empty measures between an official fumen and
# a converted non-official TJA, then it's useful to exclude the empty measures.
# NB: Official fumens often include empty measures as a way of inserting
# barlines for visual effect. But, TJA authors tend not to add these empty
# measures, because even without them, the song plays correctly. So, in
# tests, if we want to only compare the timing of the non-empty measures
# between an official fumen and a converted non-official TJA, then it's
# useful to exclude the empty measures.
if exclude_empty_measures:
song.measures = [m for m in song.measures
if m.branches['normal'].length or m.branches['professional'].length or m.branches['master'].length]
if m.branches['normal'].length
or m.branches['professional'].length
or m.branches['master'].length]
return song

View File

@ -2,21 +2,24 @@ import csv
import os
import struct
from tja2fumen.constants import TJA_COURSE_NAMES
from tja2fumen.constants import TJA_COURSE_NAMES, BRANCH_NAMES
class TJASong:
def __init__(self, BPM=None, offset=None):
self.BPM = float(BPM)
self.offset = float(offset)
self.courses = {course: TJACourse(self.BPM, self.offset, course) for course in TJA_COURSE_NAMES}
self.courses = {course: TJACourse(self.BPM, self.offset, course)
for course in TJA_COURSE_NAMES}
def __repr__(self):
return f"{{'BPM': {self.BPM}, 'offset': {self.offset}, 'courses': {list(self.courses.keys())}}}"
return (f"{{'BPM': {self.BPM}, 'offset': {self.offset}, "
f"'courses': {list(self.courses.keys())}}}")
class TJACourse:
def __init__(self, BPM, offset, course, level=0, balloon=None, score_init=0, score_diff=0):
def __init__(self, BPM, offset, course, level=0, balloon=None,
score_init=0, score_diff=0):
self.level = level
self.balloon = [] if balloon is None else balloon
self.score_init = score_init
@ -47,7 +50,8 @@ class TJAMeasure:
class TJAMeasureProcessed:
def __init__(self, bpm, scroll, gogo, barline, time_sig, subdivisions,
pos_start=0, pos_end=0, delay=0, section=None, branch_start=None, data=None):
pos_start=0, pos_end=0, delay=0, section=None,
branch_start=None, data=None):
self.bpm = bpm
self.scroll = scroll
self.gogo = gogo
@ -90,17 +94,18 @@ class FumenCourse:
class FumenMeasure:
def __init__(self, bpm=0.0, fumen_offset_start=0.0, fumen_offset_end=0.0, duration=0.0,
gogo=False, barline=True, branch_start=None, branch_info=None, padding1=0, padding2=0):
def __init__(self, bpm=0.0, offset_start=0.0, offset_end=0.0,
duration=0.0, gogo=False, barline=True, branch_start=None,
branch_info=None, padding1=0, padding2=0):
self.bpm = bpm
self.fumen_offset_start = fumen_offset_start
self.fumen_offset_end = fumen_offset_end
self.offset_start = offset_start
self.offset_end = offset_end
self.duration = duration
self.gogo = gogo
self.barline = barline
self.branch_start = branch_start
self.branch_info = [-1, -1, -1, -1, -1, -1] if branch_info is None else branch_info
self.branches = {'normal': FumenBranch(), 'professional': FumenBranch(), 'master': FumenBranch()}
self.branch_info = [-1] * 6 if branch_info is None else branch_info
self.branches = {b: FumenBranch() for b in BRANCH_NAMES}
self.padding1 = padding1
self.padding2 = padding2
@ -120,14 +125,17 @@ class FumenBranch:
class FumenNote:
def __init__(self, note_type='', pos=0.0, score_init=0, score_diff=0, padding=0, item=0, duration=0.0,
multimeasure=False, hits=0, hits_padding=0, drumroll_bytes=b'\x00\x00\x00\x00\x00\x00\x00\x00'):
def __init__(self, note_type='', pos=0.0, score_init=0, score_diff=0,
padding=0, item=0, duration=0.0, multimeasure=False,
hits=0, hits_padding=0,
drumroll_bytes=b'\x00\x00\x00\x00\x00\x00\x00\x00'):
self.note_type = note_type
self.pos = pos
self.score_init = score_init
self.score_diff = score_diff
self.padding = padding
# TODO: Determine how to properly set the item byte (https://github.com/vivaria/tja2fumen/issues/17)
# TODO: Determine how to properly set the item byte
# (https://github.com/vivaria/tja2fumen/issues/17)
self.item = item
# These attributes are only used for drumrolls/balloons
self.duration = duration
@ -150,8 +158,9 @@ class FumenHeader:
self._parse_header_values(raw_bytes)
def _assign_default_header_values(self):
self.b000_b431_timing_windows = struct.unpack(self.order + ("fff" * 36),
b'43\xc8Ag&\x96B"\xe2\xd8B' * 36)
# This byte string corresponds to
timing_windows = self.up(b'43\xc8Ag&\x96B"\xe2\xd8B' * 36, "fff" * 36)
self.b000_b431_timing_windows = timing_windows
self.b432_b435_has_branches = 0
self.b436_b439_hp_max = 10000
self.b440_b443_hp_clear = 8000
@ -176,33 +185,42 @@ class FumenHeader:
self.b516_b519_unknown_data = 0
def _parse_header_values(self, raw_bytes):
self.b000_b431_timing_windows = struct.unpack(self.order + ("fff" * 36), raw_bytes[0:432])
self.b432_b435_has_branches = struct.unpack(self.order + "i", raw_bytes[432:436])[0]
self.b436_b439_hp_max = struct.unpack(self.order + "i", raw_bytes[436:440])[0]
self.b440_b443_hp_clear = struct.unpack(self.order + "i", raw_bytes[440:444])[0]
self.b444_b447_hp_gain_good = struct.unpack(self.order + "i", raw_bytes[444:448])[0]
self.b448_b451_hp_gain_ok = struct.unpack(self.order + "i", raw_bytes[448:452])[0]
self.b452_b455_hp_loss_bad = struct.unpack(self.order + "i", raw_bytes[452:456])[0]
self.b456_b459_normal_normal_ratio = struct.unpack(self.order + "i", raw_bytes[456:460])[0]
self.b460_b463_normal_professional_ratio = struct.unpack(self.order + "i", raw_bytes[460:464])[0]
self.b464_b467_normal_master_ratio = struct.unpack(self.order + "i", raw_bytes[464:468])[0]
self.b468_b471_branch_points_good = struct.unpack(self.order + "i", raw_bytes[468:472])[0]
self.b472_b475_branch_points_ok = struct.unpack(self.order + "i", raw_bytes[472:476])[0]
self.b476_b479_branch_points_bad = struct.unpack(self.order + "i", raw_bytes[476:480])[0]
self.b480_b483_branch_points_drumroll = struct.unpack(self.order + "i", raw_bytes[480:484])[0]
self.b484_b487_branch_points_good_big = struct.unpack(self.order + "i", raw_bytes[484:488])[0]
self.b488_b491_branch_points_ok_big = struct.unpack(self.order + "i", raw_bytes[488:492])[0]
self.b492_b495_branch_points_drumroll_big = struct.unpack(self.order + "i", raw_bytes[492:496])[0]
self.b496_b499_branch_points_balloon = struct.unpack(self.order + "i", raw_bytes[496:500])[0]
self.b500_b503_branch_points_kusudama = struct.unpack(self.order + "i", raw_bytes[500:504])[0]
self.b504_b507_branch_points_unknown = struct.unpack(self.order + "i", raw_bytes[504:508])[0]
self.b508_b511_dummy_data = struct.unpack(self.order + "i", raw_bytes[508:512])[0]
self.b512_b515_number_of_measures = struct.unpack(self.order + "i", raw_bytes[512:516])[0]
self.b516_b519_unknown_data = struct.unpack(self.order + "i", raw_bytes[516:520])[0]
rb = raw_bytes
self.b000_b431_timing_windows = self.up(rb, "f" * 108,
0, 431)
self.b432_b435_has_branches = self.up(rb, "i", 432, 435)
self.b436_b439_hp_max = self.up(rb, "i", 436, 439)
self.b440_b443_hp_clear = self.up(rb, "i", 440, 443)
self.b444_b447_hp_gain_good = self.up(rb, "i", 444, 447)
self.b448_b451_hp_gain_ok = self.up(rb, "i", 448, 451)
self.b452_b455_hp_loss_bad = self.up(rb, "i", 452, 455)
self.b456_b459_normal_normal_ratio = self.up(rb, "i", 456, 459)
self.b460_b463_normal_professional_ratio = self.up(rb, "i", 460, 463)
self.b464_b467_normal_master_ratio = self.up(rb, "i", 464, 467)
self.b468_b471_branch_points_good = self.up(rb, "i", 468, 471)
self.b472_b475_branch_points_ok = self.up(rb, "i", 472, 475)
self.b476_b479_branch_points_bad = self.up(rb, "i", 476, 479)
self.b480_b483_branch_points_drumroll = self.up(rb, "i", 480, 483)
self.b484_b487_branch_points_good_big = self.up(rb, "i", 484, 487)
self.b488_b491_branch_points_ok_big = self.up(rb, "i", 488, 491)
self.b492_b495_branch_points_drumroll_big = self.up(rb, "i", 492, 495)
self.b496_b499_branch_points_balloon = self.up(rb, "i", 496, 499)
self.b500_b503_branch_points_kusudama = self.up(rb, "i", 500, 503)
self.b504_b507_branch_points_unknown = self.up(rb, "i", 504, 507)
self.b508_b511_dummy_data = self.up(rb, "i", 508, 511)
self.b512_b515_number_of_measures = self.up(rb, "i", 512, 515)
self.b516_b519_unknown_data = self.up(rb, "i", 516, 519)
@staticmethod
def _parse_order(raw_bytes):
if struct.unpack(">I", raw_bytes[512:516])[0] < struct.unpack("<I", raw_bytes[512:516])[0]:
def up(self, raw_bytes, type_string, s=None, e=None):
if s is not None and e is not None:
raw_bytes = raw_bytes[s:e+1]
vals = struct.unpack(self.order + type_string, raw_bytes)
return vals[0] if len(vals) == 1 else vals
def _parse_order(self, raw_bytes):
self.order = ''
if (self.up(raw_bytes, ">I", 512, 515) <
self.up(raw_bytes, "<I", 512, 515)):
return ">"
else:
return "<"
@ -210,31 +228,42 @@ class FumenHeader:
def set_hp_bytes(self, n_notes, difficulty, stars):
difficulty = 'Oni' if difficulty in ['Ura', 'Edit'] else difficulty
self._get_hp_from_LUTs(n_notes, difficulty, stars)
self.b440_b443_hp_clear = {'Easy': 6000, 'Normal': 7000, 'Hard': 7000, 'Oni': 8000}[difficulty]
self.b440_b443_hp_clear = {'Easy': 6000, 'Normal': 7000,
'Hard': 7000, 'Oni': 8000}[difficulty]
def _get_hp_from_LUTs(self, n_notes, difficulty, stars):
if n_notes > 2500:
return
star_to_key = {
'Oni': {1: '17', 2: '17', 3: '17', 4: '17', 5: '17', 6: '17', 7: '17', 8: '8', 9: '910', 10: '910'},
'Hard': {1: '12', 2: '12', 3: '3', 4: '4', 5: '58', 6: '58', 7: '58', 8: '58', 9: '58', 10: '58'},
'Normal': {1: '12', 2: '12', 3: '3', 4: '4', 5: '57', 6: '57', 7: '57', 8: '57', 9: '57', 10: '57'},
'Easy': {1: '1', 2: '23', 3: '23', 4: '45', 5: '45', 6: '45', 7: '45', 8: '45', 9: '45', 10: '45'},
'Oni': {1: '17', 2: '17', 3: '17', 4: '17', 5: '17',
6: '17', 7: '17', 8: '8', 9: '910', 10: '910'},
'Hard': {1: '12', 2: '12', 3: '3', 4: '4', 5: '58',
6: '58', 7: '58', 8: '58', 9: '58', 10: '58'},
'Normal': {1: '12', 2: '12', 3: '3', 4: '4', 5: '57',
6: '57', 7: '57', 8: '57', 9: '57', 10: '57'},
'Easy': {1: '1', 2: '23', 3: '23', 4: '45', 5: '45',
6: '45', 7: '45', 8: '45', 9: '45', 10: '45'},
}
key = f"{difficulty}-{star_to_key[difficulty][stars]}"
pkg_dir = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(pkg_dir, "hp_values.csv"), newline='') as csvfile:
rows = [row for row in csv.reader(csvfile, delimiter=',')]
self.b444_b447_hp_gain_good = int(rows[n_notes][rows[0].index(f"good_{key}")])
self.b448_b451_hp_gain_ok = int(rows[n_notes][rows[0].index(f"ok_{key}")])
self.b452_b455_hp_loss_bad = int(rows[n_notes][rows[0].index(f"bad_{key}")])
with open(os.path.join(pkg_dir, "hp_values.csv"), newline='') as fp:
# Parse row data
rows = [row for row in csv.reader(fp, delimiter=',')]
# Get column numbers by indexing header row
column_good = rows[0].index(f"good_{key}")
column_ok = rows[0].index(f"ok_{key}")
column_bad = rows[0].index(f"bad_{key}")
# Fetch values from the row corresponding to the number of notes
self.b444_b447_hp_gain_good = int(rows[n_notes][column_good])
self.b448_b451_hp_gain_ok = int(rows[n_notes][column_ok])
self.b452_b455_hp_loss_bad = int(rows[n_notes][column_bad])
@property
def raw_bytes(self):
value_list = []
format_string = self.order
for key, val in self.__dict__.items():
if key == "order":
if key in ["order", "_raw_bytes"]:
pass
elif key == "b000_b431_timing_windows":
value_list.extend(list(val))
@ -247,6 +276,7 @@ class FumenHeader:
return raw_bytes
def __repr__(self):
# Display truncated version of timing windows
return str([v if not isinstance(v, tuple)
else [round(timing, 2) for timing in v[:3]] # Display truncated version of timing windows
else [round(timing, 2) for timing in v[:3]]
for v in self.__dict__.values()])

View File

@ -8,9 +8,9 @@ def read_struct(file, order, format_string, seek=None):
Arguments:
- file: The fumen's file object (presumably in 'rb' mode).
- order: '<' or '>' (little or big endian).
- format_string: String made up of format characters that describes the data layout.
Full list of available format characters:
(https://docs.python.org/3/library/struct.html#format-characters)
- format_string: String made up of format characters that describes
the data layout. Full list of available characters:
(https://docs.python.org/3/library/struct.html#format-characters)
- seek: The position of the read pointer to be used within the file.
Return values:
@ -34,7 +34,3 @@ def write_struct(file, order, format_string, value_list, seek=None):
file.seek(seek)
packed_bytes = struct.pack(order + format_string, *value_list)
file.write(packed_bytes)
def short_hex(number):
return hex(number)[2:]

View File

@ -8,23 +8,33 @@ def write_fumen(path_out, song):
for measure_number in range(len(song.measures)):
measure = song.measures[measure_number]
measure_struct = [measure.bpm, measure.fumen_offset_start, int(measure.gogo), int(measure.barline)]
measure_struct.extend([measure.padding1] + measure.branch_info + [measure.padding2])
write_struct(file, song.header.order, format_string="ffBBHiiiiiii", value_list=measure_struct)
measure_struct = ([measure.bpm, measure.offset_start,
int(measure.gogo), int(measure.barline),
measure.padding1] + measure.branch_info +
[measure.padding2])
write_struct(file, song.header.order,
format_string="ffBBHiiiiiii",
value_list=measure_struct)
for branch_number in range(len(BRANCH_NAMES)):
branch = measure.branches[BRANCH_NAMES[branch_number]]
branch_struct = [branch.length, branch.padding, branch.speed]
write_struct(file, song.header.order, format_string="HHf", value_list=branch_struct)
write_struct(file, song.header.order,
format_string="HHf",
value_list=branch_struct)
for note_number in range(branch.length):
note = branch.notes[note_number]
note_struct = [FUMEN_TYPE_NOTES[note.type], note.pos, note.item, note.padding]
note_struct = [FUMEN_TYPE_NOTES[note.type], note.pos,
note.item, note.padding]
if note.hits:
note_struct.extend([note.hits, note.hits_padding, note.duration])
extra_vals = [note.hits, note.hits_padding]
else:
note_struct.extend([note.score_init, note.score_diff * 4, note.duration])
write_struct(file, song.header.order, format_string="ififHHf", value_list=note_struct)
extra_vals = [note.score_init, note.score_diff * 4]
note_struct.extend(extra_vals + [note.duration])
write_struct(file, song.header.order,
format_string="ififHHf",
value_list=note_struct)
if note.type.lower() == "drumroll":
file.write(note.drumroll_bytes)

View File

@ -8,4 +8,3 @@ def pytest_addoption(parser):
@pytest.fixture
def entry_point(request):
return request.config.getoption("--entry-point")

View File

@ -12,14 +12,15 @@ from tja2fumen.constants import COURSE_IDS, NORMALIZE_COURSE
@pytest.mark.parametrize('id_song', [
pytest.param('shoto9', marks=pytest.mark.skip("TJA structure does not match fumen yet.")),
pytest.param('butou5'),
pytest.param('shoto9',
marks=pytest.mark.skip("TJA measures do not match fumen.")),
pytest.param('genpe'),
pytest.param('gimcho'),
pytest.param('imcanz'),
pytest.param('clsca'),
pytest.param('linda'),
pytest.param('senpac'),
pytest.param('butou5'),
pytest.param('hol6po'),
pytest.param('mikdp'),
pytest.param('ia6cho'),
@ -43,14 +44,16 @@ def test_converted_tja_vs_cached_fumen(id_song, tmp_path, entry_point):
elif entry_point == "python-cli":
os.system(f"tja2fumen {path_tja_tmp}")
elif entry_point == "exe":
exe_path = glob.glob(os.path.join(os.path.split(path_test)[0], "dist", "*.exe"))[0]
exe_path = glob.glob(os.path.join(os.path.split(path_test)[0],
"dist", "*.exe"))[0]
os.system(f"{exe_path} {path_tja_tmp}")
# Fetch output fumen paths
paths_out = glob.glob(os.path.join(path_temp, "*.bin"))
assert paths_out, f"No bin files generated in {path_temp}"
order = "xmhne" # Ura Oni -> Oni -> Hard -> Normal -> Easy
paths_out = sorted(paths_out, key=lambda s: [order.index(c) if c in order else len(order) for c in s])
paths_out = sorted(paths_out, key=lambda s: [order.index(c) if c in order
else len(order) for c in s])
# Extract cached fumen files to working directory
path_binzip = os.path.join(path_test, "data", f"{id_song}.zip")
@ -62,99 +65,131 @@ def test_converted_tja_vs_cached_fumen(id_song, tmp_path, entry_point):
for path_out in paths_out:
# Difficulty introspection to help with debugging
i_difficult_id = os.path.basename(path_out).split(".")[0].split("_")[1]
i_difficulty = NORMALIZE_COURSE[{v: k for k, v in COURSE_IDS.items()}[i_difficult_id]] # noqa
i_difficulty = NORMALIZE_COURSE[{v: k for k, v in # noqa F841
COURSE_IDS.items()}[i_difficult_id]] # noqa
# 0. Read fumen data (converted vs. cached)
path_out_fumen = os.path.join(path_bin, os.path.basename(path_out))
co_song = read_fumen(path_out, exclude_empty_measures=True)
ca_song = read_fumen(os.path.join(path_bin, os.path.basename(path_out)), exclude_empty_measures=True)
ca_song = read_fumen(path_out_fumen, exclude_empty_measures=True)
# 1. Check song headers
checkValidHeader(co_song.header)
checkValidHeader(ca_song.header)
assert_song_property(co_song.header, ca_song.header, 'order')
assert_song_property(co_song.header, ca_song.header, 'b432_b435_has_branches')
assert_song_property(co_song.header, ca_song.header, 'b436_b439_hp_max')
assert_song_property(co_song.header, ca_song.header, 'b440_b443_hp_clear')
assert_song_property(co_song.header, ca_song.header, 'b444_b447_hp_gain_good', abs=1)
assert_song_property(co_song.header, ca_song.header, 'b448_b451_hp_gain_ok', abs=1)
assert_song_property(co_song.header, ca_song.header, 'b452_b455_hp_loss_bad', abs=1)
assert_song_property(co_song.header, ca_song.header, 'b456_b459_normal_normal_ratio')
assert_song_property(co_song.header, ca_song.header, 'b460_b463_normal_professional_ratio')
assert_song_property(co_song.header, ca_song.header, 'b464_b467_normal_master_ratio')
# NB: KAGEKIYO's branching condition is very unique (BIG only), which cannot be expressed in a TJA file
# So, skip checking the `branch_point` header values for KAGEKIYO.
for header_property in ['order',
'b432_b435_has_branches',
'b436_b439_hp_max',
'b440_b443_hp_clear',
'b444_b447_hp_gain_good',
'b448_b451_hp_gain_ok',
'b452_b455_hp_loss_bad',
'b456_b459_normal_normal_ratio',
'b460_b463_normal_professional_ratio',
'b464_b467_normal_master_ratio']:
check(co_song.header, ca_song.header, header_property, abs=1)
# NB: KAGEKIYO's branching condition is very unique (BIG only), which
# cannot be expressed in a TJA file. So, we skip checking the
# `branch_point` header values for KAGEKIYO.
if id_song != 'genpe':
assert_song_property(co_song.header, ca_song.header, 'b468_b471_branch_points_good')
assert_song_property(co_song.header, ca_song.header, 'b472_b475_branch_points_ok')
assert_song_property(co_song.header, ca_song.header, 'b476_b479_branch_points_bad')
assert_song_property(co_song.header, ca_song.header, 'b480_b483_branch_points_drumroll')
assert_song_property(co_song.header, ca_song.header, 'b484_b487_branch_points_good_big')
assert_song_property(co_song.header, ca_song.header, 'b488_b491_branch_points_ok_big')
assert_song_property(co_song.header, ca_song.header, 'b492_b495_branch_points_drumroll_big')
assert_song_property(co_song.header, ca_song.header, 'b496_b499_branch_points_balloon')
assert_song_property(co_song.header, ca_song.header, 'b500_b503_branch_points_kusudama')
for header_property in ['b468_b471_branch_points_good',
'b472_b475_branch_points_ok',
'b476_b479_branch_points_bad',
'b480_b483_branch_points_drumroll',
'b484_b487_branch_points_good_big',
'b488_b491_branch_points_ok_big',
'b492_b495_branch_points_drumroll_big',
'b496_b499_branch_points_balloon',
'b500_b503_branch_points_kusudama']:
check(co_song.header, ca_song.header, header_property)
# 2. Check song metadata
assert_song_property(co_song, ca_song, 'score_init')
assert_song_property(co_song, ca_song, 'score_diff')
check(co_song, ca_song, 'score_init')
check(co_song, ca_song, 'score_diff')
# 3. Check measure data
for i_measure in range(max([len(co_song.measures), len(ca_song.measures)])):
# NB: We could assert that len(measures) is the same for both songs, then iterate through zipped measures.
# But, if there is a mismatched number of measures, we want to know _where_ it occurs. So, we let the
# comparison go on using the max length of both songs until something else fails.
for i_measure in range(max([len(co_song.measures),
len(ca_song.measures)])):
# NB: We could assert that len(measures) is the same for both
# songs, then iterate through zipped measures. But, if there is a
# mismatched number of measures, we want to know _where_ it
# occurs. So, we let the comparison go on using the max length of
# both songs until something else fails.
co_measure = co_song.measures[i_measure]
ca_measure = ca_song.measures[i_measure]
# 3a. Check measure metadata
assert_song_property(co_measure, ca_measure, 'bpm', i_measure, abs=0.01)
assert_song_property(co_measure, ca_measure, 'fumen_offset_start', i_measure, abs=0.15)
assert_song_property(co_measure, ca_measure, 'gogo', i_measure)
assert_song_property(co_measure, ca_measure, 'barline', i_measure)
check(co_measure, ca_measure, 'bpm', i_measure, abs=0.01)
check(co_measure, ca_measure, 'offset_start', i_measure, abs=0.15)
check(co_measure, ca_measure, 'gogo', i_measure)
check(co_measure, ca_measure, 'barline', i_measure)
# NB: KAGEKIYO's fumen has some strange details that can't be replicated using the TJA charting format.
# So, for now, we use a special case to skip checking A) notes for certain measures and B) branchInfo
# NB: KAGEKIYO's fumen has some strange details that can't be
# replicated using the TJA charting format. So, for now, we use a
# special case to skip checking:
# A) notes for certain measures and
# B) branchInfo
if id_song == 'genpe':
# A) The 2/4 measures in the Ura of KAGEKIYO's official Ura fumen don't match the wikiwiki.jp/TJA
# charts. In the official fumen, the note ms offsets of branches 5/12/17/etc. go _past_ the duration of
# the measure. This behavior is impossible to represent using the TJA format, so we skip checking notes
# for these measures, since the rest of the measures have perfect note ms offsets anyway.
if i_difficult_id == "x" and i_measure in [5, 6, 12, 13, 17, 18, 26, 27, 46, 47, 51, 52, 56, 57]:
# A) The 2/4 measures in the Ura of KAGEKIYO's official Ura
# fumen don't match the wikiwiki.jp/TJA charts. In the official
# fumen, the note ms offsets of branches 5/12/17/etc. go _past_
# the duration of the measure. This behavior is impossible to
# represent using the TJA format, so we skip checking notes
# for these measures, since the rest of the measures have
# perfect note ms offsets anyway.
if (i_difficult_id == "x" and
i_measure in [5, 6, 12, 13, 17, 18, 26, 27,
46, 47, 51, 52, 56, 57]):
continue
# B) The branching condition for KAGEKIYO is very strange (accuracy for the 7 big notes in the song)
# So, we only test the branchInfo bytes for non-KAGEKIYO songs:
# B) The branching condition for KAGEKIYO is very strange
# (accuracy for the 7 big notes in the song) So, we only test
# the branchInfo bytes for non-KAGEKIYO songs:
else:
assert_song_property(co_measure, ca_measure, 'branch_info', i_measure)
check(co_measure, ca_measure, 'branch_info', i_measure)
# 3b. Check measure notes
for i_branch in ['normal', 'professional', 'master']:
co_branch = co_measure.branches[i_branch]
ca_branch = ca_measure.branches[i_branch]
# NB: We only check speed for non-empty branches, as fumens store speed changes even for empty branches
# NB: We only check speed for non-empty branches, as fumens
# store speed changes even for empty branches.
if co_branch.length != 0:
assert_song_property(co_branch, ca_branch, 'speed', i_measure, i_branch)
# NB: We could assert that len(notes) is the same for both songs, then iterate through zipped notes.
# But, if there is a mismatched number of notes, we want to know _where_ it occurs. So, we let the
# comparison go on using the max length of both branches until something else fails.
check(co_branch, ca_branch, 'speed', i_measure, i_branch)
# NB: We could assert that len(notes) is the same for both
# songs, then iterate through zipped notes. But, if there is a
# mismatched number of notes, we want to know _where_ it
# occurs. So, we let the comparison go on using the max length
# of both branches until something else fails.
for i_note in range(max([co_branch.length, ca_branch.length])):
co_note = co_branch.notes[i_note]
ca_note = ca_branch.notes[i_note]
assert_song_property(co_note, ca_note, 'note_type', i_measure, i_branch, i_note, func=normalize_type)
assert_song_property(co_note, ca_note, 'pos', i_measure, i_branch, i_note, abs=0.1)
# NB: Drumroll duration doesn't always end exactly on a beat. Plus, TJA charters often eyeball
# drumrolls, leading them to be often off by a 1/4th/8th/16th/32th/etc. These charting errors
# are fixable, but tedious to do when writing tests. So, I've added a try/except so that they
# can be checked locally with a breakpoint when adding new songs, but so that fixing every
# duration-related chart error isn't 100% mandatory.
check(co_note, ca_note, 'note_type', i_measure,
i_branch, i_note, func=normalize_type)
check(co_note, ca_note, 'pos', i_measure,
i_branch, i_note, abs=0.1)
# NB: Drumroll duration doesn't always end exactly on a
# beat. Plus, TJA charters often eyeball drumrolls,
# leading them to be often off by a 1/4th/8th/16th/etc.
# These charting errors are fixable, but tedious to do
# when writing tests. So, I've added a try/except so that
# they can be checked locally with a breakpoint when
# adding new songs, but so that fixing every
# duration-related chart error isn't 100% mandatory.
try:
assert_song_property(co_note, ca_note, 'duration', i_measure, i_branch, i_note, abs=25.0)
check(co_note, ca_note, 'duration', i_measure,
i_branch, i_note, abs=25.0)
except AssertionError:
pass
if ca_note.note_type not in ["Balloon", "Kusudama"]:
assert_song_property(co_note, ca_note, 'score_init', i_measure, i_branch, i_note)
assert_song_property(co_note, ca_note, 'score_diff', i_measure, i_branch, i_note)
# NB: 'item' still needs to be implemented: https://github.com/vivaria/tja2fumen/issues/17
# assert_song_property(co_note, ca_note, 'item', i_measure, i_branch, i_note)
check(co_note, ca_note, 'score_init', i_measure,
i_branch, i_note)
check(co_note, ca_note, 'score_diff', i_measure,
i_branch, i_note)
# NB: 'item' still needs to be implemented:
# https://github.com/vivaria/tja2fumen/issues/17
# check(co_note, ca_note, 'item', i_measure,
# i_branch, i_note)
def assert_song_property(converted_obj, cached_obj, prop, measure=None, branch=None, note=None, func=None, abs=None):
# NB: TJA parser/converter uses 0-based indexing, but TJA files use 1-based indexing.
# So, we increment 1 in the error message to more easily identify problematic lines in TJA files.
def check(converted_obj, cached_obj, prop, measure=None,
branch=None, note=None, func=None, abs=None):
# NB: TJA parser/converter uses 0-based indexing, but TJA files use
# 1-based indexing. So, we increment 1 in the error message to more easily
# identify problematic lines in TJA files.
msg_failure = f"'{prop}' mismatch"
msg_failure += f": measure '{measure+1}'" if measure is not None else ""
msg_failure += f", branch '{branch}'" if branch is not None else ""
@ -193,4 +228,3 @@ def checkValidHeader(header):
assert header.b492_b495_branch_points_drumroll_big in [1, 0]
assert header.b496_b499_branch_points_balloon in [30, 0, 1]
assert header.b500_b503_branch_points_kusudama in [30, 0]