1
0
mirror of synced 2025-01-24 07:04:09 +01:00

Refactor TJA parsing code to enable the development of further features (#19)

This commit is contained in:
Viv 2023-06-30 14:51:57 -04:00 committed by GitHub
parent 1510a62bf5
commit 4a5a3399aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 193 additions and 261 deletions

View File

@ -1,11 +1,3 @@
# Various commands and header fields for TJA files
HEADER_GLOBAL = ['TITLE', 'TITLEJA', 'SUBTITLE', 'SUBTITLEJA', 'BPM', 'WAVE', 'OFFSET', 'DEMOSTART', 'GENRE',
'SCOREMODE', 'BGMOVIE', 'SONGVOL', 'SEVOL']
HEADER_COURSE = ['COURSE', 'LEVEL', 'BALLOON', 'SCOREINIT', 'SCOREDIFF', 'STYLE']
BRANCH_COMMANDS = ['START', 'END', 'BRANCHSTART', 'BRANCHEND', 'N', 'E', 'M', 'SECTION']
MEASURE_COMMANDS = ['MEASURE', 'GOGOSTART', 'GOGOEND', 'BARLINEON', 'BARLINEOFF', 'SCROLL', 'BPMCHANGE', 'DELAY', 'LEVELHOLD']
COMMAND = BRANCH_COMMANDS + MEASURE_COMMANDS
# Note types for TJA files
TJA_NOTE_TYPES = {
'1': 'Don',

View File

@ -1,7 +1,8 @@
from copy import deepcopy
import re
from tja2fumen.utils import computeSoulGaugeBytes
from tja2fumen.constants import TJA_NOTE_TYPES, DIFFICULTY_BYTES, sampleHeaderMetadata, simpleHeaders
from tja2fumen.constants import DIFFICULTY_BYTES, sampleHeaderMetadata, simpleHeaders
# Filler metadata that the `writeFumen` function expects
# TODO: Determine how to properly set the item byte (https://github.com/vivaria/tja2fumen/issues/17)
@ -22,7 +23,7 @@ default_measure = {
}
def preprocessTJAMeasures(tja):
def processTJACommands(tja):
"""
Merge TJA 'data' and 'event' fields into a single measure property, and split
measures into sub-measures whenever a mid-measure BPM change occurs.
@ -40,35 +41,20 @@ def preprocessTJAMeasures(tja):
In the future, this logic should probably be moved into the TJA parser itself.
"""
currentBPM = 0
currentBPM = float(tja['metadata']['bpm'])
currentScroll = 1.0
currentGogo = False
currentBarline = True
currentDividend = 4
currentDivisor = 4
measuresCorrected = []
for measure in tja['measures']:
# Step 1: Combine notes and events
notes = [{'pos': i, 'type': 'note', 'value': TJA_NOTE_TYPES[note]}
for i, note in enumerate(measure['data']) if note != '0']
events = [{'pos': e['position'], 'type': e['name'], 'value': e['value']}
for e in measure['events']]
combined = []
while notes or events:
if events and notes:
if notes[0]['pos'] >= events[0]['pos']:
combined.append(events.pop(0))
else:
combined.append(notes.pop(0))
elif events:
combined.append(events.pop(0))
elif notes:
combined.append(notes.pop(0))
# Step 2: Split measure into submeasure
# Split measure into submeasure
measure_cur = {'bpm': currentBPM, 'scroll': currentScroll, 'gogo': currentGogo, 'barline': currentBarline,
'subdivisions': len(measure['data']), 'pos_start': 0, 'pos_end': 0,
'time_sig': measure['length'], 'data': []}
for data in combined:
'time_sig': [currentDividend, currentDivisor], 'data': []}
for data in measure['combined']:
if data['type'] == 'note':
measure_cur['data'].append(data)
# Update the current measure's SCROLL/GOGO/BARLINE status.
@ -96,7 +82,14 @@ def preprocessTJAMeasures(tja):
measuresCorrected.append(measure_cur)
measure_cur = {'bpm': currentBPM, 'scroll': currentScroll, 'gogo': currentGogo, 'barline': currentBarline,
'subdivisions': len(measure['data']), 'pos_start': data['pos'], 'pos_end': 0,
'time_sig': measure['length'], 'data': []}
'time_sig': [currentDividend, currentDivisor], 'data': []}
elif data['type'] == 'measure':
matchMeasure = re.match(r"(\d+)/(\d+)", data['value'])
if not matchMeasure:
continue
currentDividend = int(matchMeasure.group(1))
currentDivisor = int(matchMeasure.group(2))
measure_cur['time_sig'] = [currentDividend, currentDivisor]
elif data['type'] == 'scroll':
currentScroll = data['value']
elif data['type'] == 'gogo':
@ -114,11 +107,12 @@ def preprocessTJAMeasures(tja):
def convertTJAToFumen(tja):
# Hardcode currentBranch due to current lack of support for branching songs
currentBranch = 'normal' # TODO: Program in branch support
tja['measures'] = preprocessTJAMeasures(tja)
measureDurationPrev = 0
currentDrumroll = None
total_notes = 0
tja['measures'] = processTJACommands(tja)
# Parse TJA measures to create converted TJA -> Fumen file
tjaConverted = {'measures': []}
for idx_m, measureTJA in enumerate(tja['measures']):
@ -188,8 +182,8 @@ def convertTJAToFumen(tja):
note = deepcopy(default_note)
note['pos'] = note_pos
note['type'] = data['value']
note['scoreInit'] = tja['scoreInit'] # Probably not fully accurate
note['scoreDiff'] = tja['scoreDiff'] # Probably not fully accurate
note['scoreInit'] = tja['metadata']['scoreInit'] # Probably not fully accurate
note['scoreDiff'] = tja['metadata']['scoreDiff'] # Probably not fully accurate
# Handle drumroll/balloon-specific metadata
if note['type'] in ["Balloon", "Kusudama"]:
note['hits'] = tja['metadata']['balloon'].pop(0)
@ -234,7 +228,7 @@ def convertTJAToFumen(tja):
tjaConverted['order'] = '<'
tjaConverted['unknownMetadata'] = 0
tjaConverted['branches'] = False
tjaConverted['scoreInit'] = tja['scoreInit']
tjaConverted['scoreDiff'] = tja['scoreDiff']
tjaConverted['scoreInit'] = tja['metadata']['scoreInit']
tjaConverted['scoreDiff'] = tja['metadata']['scoreDiff']
return tjaConverted

View File

@ -2,12 +2,7 @@ import os
import re
from tja2fumen.utils import readStruct, getBool, shortHex
from tja2fumen.constants import (
# TJA constants
HEADER_GLOBAL, HEADER_COURSE, BRANCH_COMMANDS, MEASURE_COMMANDS, COMMAND, NORMALIZE_COURSE,
# Fumen constants
branchNames, noteTypes
)
from tja2fumen.constants import NORMALIZE_COURSE, TJA_NOTE_TYPES, branchNames, noteTypes
########################################################################################################################
@ -21,252 +16,203 @@ def parseTJA(fnameTJA):
except UnicodeDecodeError:
tja = open(fnameTJA, "r", encoding="shift-jis")
# Split into lines
lines = tja.read().splitlines()
lines = [line for line in lines if line.strip()] # Discard empty lines
lines = [line for line in tja.read().splitlines() if line.strip() != '']
courses = getCourseData(lines)
for courseData in courses.values():
courseData['measures'] = parseCourseMeasures(courseData['measures'])
# Line by line
headers = {}
return courses
def getCourseData(lines):
courses = {}
currentCourse = ''
songBPM = 0
songOffset = 0
for line in lines:
parsed = parseLine(line)
# Case 1: Comments (ignore)
if parsed['type'] == 'comment':
pass
# Case 2: Global header metadata
elif parsed['type'] == 'header' and parsed['scope'] == 'global':
headers[parsed['name'].lower()] = parsed['value']
# Case 3: Course data (metadata, commands, note data)
else:
# Check to see if we're starting a new course
if parsed['type'] == 'header' and parsed['scope'] == 'course' and parsed['name'] == 'COURSE':
currentCourse = NORMALIZE_COURSE[parsed['value']]
# Case 1: Header metadata
match_header = re.match(r"^([A-Z]+):(.*)", line)
if match_header:
nameUpper = match_header.group(1).upper()
value = match_header.group(2).strip()
# Global header fields
if nameUpper == 'BPM':
songBPM = value
elif nameUpper == 'OFFSET':
songOffset = value
# Course-specific header fields
elif nameUpper == 'COURSE':
currentCourse = NORMALIZE_COURSE[value]
if currentCourse not in courses.keys():
courses[currentCourse] = []
# Append the line to the current course
courses[currentCourse].append(parsed)
# Convert parsed course lines into actual note data
songs = {}
for courseName, courseLines in courses.items():
courseHeader, courseMeasures = getCourse(headers, courseLines)
songs[courseName] = applyFumenStructureToParsedTJA(headers, courseHeader, courseMeasures)
return songs
def parseLine(line):
# Regex matches for various line types
match_comment = re.match(r"//.*", line)
match_header = re.match(r"^([A-Z]+):(.*)", line)
match_command = re.match(r"^#([A-Z]+)(?:\s+(.+))?", line)
match_data = re.match(r"^(([0-9]|A|B|C|F|G)*,?).*$", line)
if match_comment:
return {"type": 'comment', "value": line}
elif match_header:
nameUpper = match_header.group(1).upper()
value = match_header.group(2)
if nameUpper in HEADER_GLOBAL:
return {"type": 'header', "scope": 'global', "name": nameUpper, "value": value.strip()}
elif nameUpper in HEADER_COURSE:
return {"type": 'header', "scope": 'course', "name": nameUpper, "value": value.strip()}
elif match_command:
nameUpper = match_command.group(1).upper()
value = match_command.group(2) if match_command.group(2) else ''
if nameUpper in COMMAND:
return {"type": 'command', "name": nameUpper, "value": value.strip()}
elif match_data:
return {"type": 'data', "data": match_data.group(1)}
return {"type": 'unknown', "value": line}
def getCourse(tjaHeaders, lines):
def parseHeaderMetadata(line):
nonlocal headers
if line["name"] == 'COURSE':
headers['course'] = NORMALIZE_COURSE[line['value']]
elif line["name"] == 'LEVEL':
headers['level'] = int(line['value']) if line['value'] else 0
elif line["name"] == 'SCOREINIT':
headers['scoreInit'] = int(line['value']) if line['value'] else 0
elif line["name"] == 'SCOREDIFF':
headers['scoreDiff'] = int(line['value']) if line['value'] else 0
elif line["name"] == 'BALLOON':
if line['value']:
balloons = [int(v) for v in line['value'].split(",") if v]
courses[currentCourse] = {
'metadata': {'course': currentCourse, 'bpm': songBPM, 'offset': songOffset, 'level': 0,
'balloon': [], 'scoreInit': 0, 'scoreDiff': 0},
'measures': [],
}
elif nameUpper == 'LEVEL':
courses[currentCourse]['metadata']['level'] = int(value) if value else 0
elif nameUpper == 'SCOREINIT':
courses[currentCourse]['metadata']['scoreInit'] = int(value) if value else 0
elif nameUpper == 'SCOREDIFF':
courses[currentCourse]['metadata']['scoreDiff'] = int(value) if value else 0
elif nameUpper == 'BALLOON':
if value:
balloons = [int(v) for v in value.split(",") if v]
courses[currentCourse]['metadata']['balloon'] = balloons
# STYLE is a P1/P2 command, which we don't support yet, so normally this would be a
# NotImplemetedError. However, TakoTako outputs `STYLE:SINGLE` when converting Ura
# charts, so throwing an error here would prevent Ura charts from being converted.
# See: https://github.com/vivaria/tja2fumen/issues/15#issuecomment-1575341088
elif nameUpper == 'STYLE':
pass
else:
balloons = []
headers['balloon'] = balloons
# STYLE is a P1/P2 command, which we don't support yet, so normally this would be a NotImplemetedError.
# However, TakoTako outputs `STYLE:SINGLE` when converting Ura charts, so throwing an error here prevents
# Ura charts from being converted. See: https://github.com/vivaria/tja2fumen/issues/15#issuecomment-1575341088
elif line["name"] == 'STYLE':
pass
else:
raise NotImplementedError
pass # Ignore other header fields such as 'TITLE', 'SUBTITLE', 'WAVE', etc.
def parseBranchCommands(line):
nonlocal flagLevelhold, targetBranch, currentBranch
if line["name"] == 'BRANCHSTART':
if flagLevelhold:
return
values = line['value'].split(',')
if values[0] == 'r':
if len(values) >= 3:
targetBranch = 'M'
elif len(values) == 2:
targetBranch = 'E'
else:
targetBranch = 'N'
elif values[0] == 'p':
if len(values) >= 3 and float(values[2]) <= 100:
targetBranch = 'M'
elif len(values) >= 2 and float(values[1]) <= 100:
targetBranch = 'E'
else:
targetBranch = 'N'
elif line["name"] == 'BRANCHEND':
currentBranch = targetBranch
elif line["name"] == 'N':
currentBranch = 'N'
elif line["name"] == 'E':
currentBranch = 'E'
elif line["name"] == 'M':
currentBranch = 'M'
elif line["name"] == 'START' or line['name'] == 'END':
currentBranch = 'N'
targetBranch = 'N'
flagLevelhold = False
elif line['name'] == 'SECTION':
raise NotImplementedError
else:
raise NotImplementedError
# Case 2: Commands and note data (to be further processed course-by-course later on)
elif not re.match(r"//.*", line): # Exclude comment-only lines ('//')
match_command = re.match(r"^#([A-Z]+)(?:\s+(.+))?", line)
match_notes = re.match(r"^(([0-9]|A|B|C|F|G)*,?).*$", line)
if match_command:
nameUpper = match_command.group(1).upper()
value = match_command.group(2).strip() if match_command.group(2) else ''
elif match_notes:
nameUpper = 'NOTES'
value = match_notes.group(1)
courses[currentCourse]['measures'].append({"name": nameUpper, "value": value})
def parseMeasureCommands(line):
nonlocal measureDivisor, measureDividend, measureEvents, flagLevelhold
if line['name'] == 'MEASURE':
matchMeasure = re.match(r"(\d+)/(\d+)", line['value'])
if not matchMeasure:
return
measureDividend = int(matchMeasure.group(1))
measureDivisor = int(matchMeasure.group(2))
elif line['name'] == 'GOGOSTART':
measureEvents.append({"name": 'gogo', "position": len(measureData), "value": '1'})
elif line['name'] == 'GOGOEND':
measureEvents.append({"name": 'gogo', "position": len(measureData), "value": '0'})
elif line['name'] == 'BARLINEON':
measureEvents.append({"name": 'barline', "position": len(measureData), "value": '1'})
elif line['name'] == 'BARLINEOFF':
measureEvents.append({"name": 'barline', "position": len(measureData), "value": '0'})
elif line['name'] == 'SCROLL':
measureEvents.append({"name": 'scroll', "position": len(measureData), "value": float(line['value'])})
elif line['name'] == 'BPMCHANGE':
measureEvents.append({"name": 'bpm', "position": len(measureData), "value": float(line['value'])})
elif line['name'] == 'LEVELHOLD':
flagLevelhold = True
elif line['name'] == 'DELAY':
raise NotImplementedError
elif line['name'] == 'LYRIC':
pass
elif line['name'] == 'NEXTSONG':
pass
else:
raise NotImplementedError
return courses
def parseMeasureData(line):
nonlocal measures, measureData, measureDividend, measureDivisor, measureEvents
data = line['data']
# If measure has ended, then append the measure and start anew
if data.endswith(','):
measureData += data[0:-1]
measure = {
"length": [measureDividend, measureDivisor],
"data": measureData,
"events": measureEvents,
}
measures.append(measure)
measureData = ''
measureEvents = []
# Otherwise, keep tracking measureData
else:
measureData += data
def parseCourseMeasures(lines):
# Define state variables
headers = {'balloon': []} # Charters sometimes exclude `BALLOON` entirely if there are none
measures = []
measureDividend = 4
measureDivisor = 4
measureData = ''
measureEvents = []
currentBranch = 'N'
targetBranch = 'N'
flagLevelhold = False
# Process course lines
measures = []
measureNotes = ''
measureEvents = []
for line in lines:
if line["type"] == 'header':
parseHeaderMetadata(line)
elif line["type"] == 'command' and line['name'] in BRANCH_COMMANDS:
parseBranchCommands(line)
elif line["type"] == 'command' and line['name'] in MEASURE_COMMANDS and currentBranch == targetBranch:
parseMeasureCommands(line)
elif line['type'] == 'data' and currentBranch == targetBranch:
parseMeasureData(line)
assert currentBranch == targetBranch
# 1. Parse measure notes
if line['name'] == 'NOTES':
notes = line['value']
# If measure has ended, then append the measure and start anew
if notes.endswith(','):
measureNotes += notes[0:-1]
measure = {
"data": measureNotes,
"events": measureEvents,
}
measures.append(measure)
measureNotes = ''
measureEvents = []
# Otherwise, keep tracking measureNotes
else:
measureNotes += notes
# Post-processing: Ensure the first measure has a BPM event
if measures:
firstBPMEventFound = False
# Search for BPM event in the first measure
for i in range(len(measures[0]['events'])):
evt = measures[0]['events'][i]
if evt['name'] == 'bpm' and evt['position'] == 0:
firstBPMEventFound = True
# If not present, insert a BPM event into the first measure using the global header metadata
if not firstBPMEventFound:
# noinspection PyTypeChecker
measures[0]['events'].insert(0, {"name": 'bpm', "position": 0, "value": tjaHeaders['bpm']})
# 2. Parse commands
else:
# Measure commands
if line['name'] == 'GOGOSTART':
measureEvents.append({"name": 'gogo', "position": len(measureNotes), "value": '1'})
elif line['name'] == 'GOGOEND':
measureEvents.append({"name": 'gogo', "position": len(measureNotes), "value": '0'})
elif line['name'] == 'BARLINEON':
measureEvents.append({"name": 'barline', "position": len(measureNotes), "value": '1'})
elif line['name'] == 'BARLINEOFF':
measureEvents.append({"name": 'barline', "position": len(measureNotes), "value": '0'})
elif line['name'] == 'SCROLL':
measureEvents.append({"name": 'scroll', "position": len(measureNotes), "value": float(line['value'])})
elif line['name'] == 'BPMCHANGE':
measureEvents.append({"name": 'bpm', "position": len(measureNotes), "value": float(line['value'])})
elif line['name'] == 'MEASURE':
measureEvents.append({"name": 'measure', "position": len(measureNotes), "value": line['value']})
# Post-processing: In case the file doesn't end on a "measure end" symbol (','), append whatever is left
if measureData:
# Branch commands
elif line["name"] == 'START' or line['name'] == 'END':
currentBranch = 'N'
targetBranch = 'N'
flagLevelhold = False
elif line['name'] == 'LEVELHOLD':
flagLevelhold = True
elif line["name"] == 'N':
currentBranch = 'N'
elif line["name"] == 'E':
currentBranch = 'E'
elif line["name"] == 'M':
currentBranch = 'M'
elif line["name"] == 'BRANCHEND':
currentBranch = targetBranch
elif line["name"] == 'BRANCHSTART':
if flagLevelhold:
continue
values = line['value'].split(',')
if values[0] == 'r':
if len(values) >= 3:
targetBranch = 'M'
elif len(values) == 2:
targetBranch = 'E'
else:
targetBranch = 'N'
elif values[0] == 'p':
if len(values) >= 3 and float(values[2]) <= 100:
targetBranch = 'M'
elif len(values) >= 2 and float(values[1]) <= 100:
targetBranch = 'E'
else:
targetBranch = 'N'
# Ignored commands
elif line['name'] == 'LYRIC':
pass
elif line['name'] == 'NEXTSONG':
pass
# Not implemented commands
elif line['name'] == 'SECTION':
raise NotImplementedError
elif line['name'] == 'DELAY':
raise NotImplementedError
else:
raise NotImplementedError
# If there is measure data (i.e. the file doesn't end on a "measure end" symbol ','), append whatever is left
if measureNotes:
measures.append({
"length": [measureDividend, measureDivisor],
"data": measureData,
"data": measureNotes,
"events": measureEvents,
})
# Post-processing: Otherwise, if the file ends on a measure event (e.g. #GOGOEND), append any remaining events
# Otherwise, if the file ends on a measure event (e.g. #GOGOEND), append any remaining events
elif measureEvents:
for event in measureEvents:
event['position'] = len(measures[len(measures) - 1]['data'])
# noinspection PyTypeChecker
measures[len(measures) - 1]['events'].append(event)
return headers, measures
# Merge measure data and measure events in chronological order
for measure in measures:
notes = [{'pos': i, 'type': 'note', 'value': TJA_NOTE_TYPES[note]}
for i, note in enumerate(measure['data']) if note != '0']
events = [{'pos': e['position'], 'type': e['name'], 'value': e['value']}
for e in measure['events']]
combined = []
while notes or events:
if events and notes:
if notes[0]['pos'] >= events[0]['pos']:
combined.append(events.pop(0))
else:
combined.append(notes.pop(0))
elif events:
combined.append(events.pop(0))
elif notes:
combined.append(notes.pop(0))
measure['combined'] = combined
def applyFumenStructureToParsedTJA(globalHeader, courseHeader, measures):
"""Merge song metadata, course metadata, and course data into a single fumen-like object."""
song = {'measures': [], 'metadata': {}}
for k, v in globalHeader.items():
song['metadata'][k] = v
for k, v in courseHeader.items():
if k in ['scoreInit', 'scoreDiff']:
song[k] = v
else:
song['metadata'][k] = v
for i, measure in enumerate(measures):
song['measures'].append(measure)
return song
return measures
########################################################################################################################