mirror of
https://github.com/vgmstream/vgmstream.git
synced 2024-11-12 01:30:49 +01:00
Add txtp_maker.py companion CLI tool
This commit is contained in:
parent
a4d4e4a5b7
commit
f2d37f28cd
544
cli/txtp_maker.py
Normal file
544
cli/txtp_maker.py
Normal file
@ -0,0 +1,544 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# ########################################################################### #
|
||||
# TXTP MAKER
|
||||
# ########################################################################### #
|
||||
|
||||
from __future__ import division
|
||||
import subprocess
|
||||
import zlib
|
||||
import os.path
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import fnmatch
|
||||
|
||||
def print_usage(appname):
|
||||
print("Usage: {} (filename) [options]".format(appname)+"\n"
|
||||
"\n"
|
||||
"Creates (filename)_(subsong).txtp for every subsong in (filename).\n"
|
||||
" (filename) can be a * or *.ext wildcard too (works with dupe filters).\n"
|
||||
"Works with files with no subsongs (unless filtered) too.\n"
|
||||
"\n"
|
||||
"Use -h to print [options]. Examples:\n"
|
||||
"\n"
|
||||
"{} bgm.fsb -in -fcm 2 -fms 5.0 ".format(appname)+"\n"
|
||||
" make TXTP for subsongs with at least 2 channels and 5 seconds\n"
|
||||
"{} *.scd -r -fd -l 2".format(appname)+"\n"
|
||||
" all .scd in subdirs, ignoring dupes and making per 2ch layers\n"
|
||||
"{} *.sm1 -fne .+STREAM[.]SS[0-9]$ ".format(appname)+"\n"
|
||||
" all .sm1 excluding those subsong names that ends with 'STREAM.SS0..9'\n"
|
||||
"{} samples.bnk -fni ^bgm.? ".format(appname)+"\n"
|
||||
" in .bnk including only subsong names that start with 'bgm'\n"
|
||||
"{} * -r -fss 1".format(appname)+"\n"
|
||||
" all files in subdirs with at least 1 subsong (ignoring formats without them)\n"
|
||||
)
|
||||
|
||||
def print_help(appname):
|
||||
print("Options:\n"
|
||||
" -r: find recursive (writes files to current dir, with dir in TXTP)\n"
|
||||
" -c (name): set path to CLI (default: test.exe)\n"
|
||||
" -n (name): use (name)_(subsong).txtp format\n"
|
||||
" You can put '{filename}' somewhere to get it substituted by the base name\n"
|
||||
" -z N: zero-fill subsong number (default: auto fill up to total subsongs)\n"
|
||||
" -d (dir): add dir in TXTP (if the file will reside in a subdir)\n"
|
||||
" -m: create mini-txtp\n"
|
||||
" -o: overwrite existing .txtp (beware when using with internal names)\n"
|
||||
" -in: name TXTP using the subsong's internal name if found\n"
|
||||
" -ie: remove internal name's extension\n"
|
||||
" -ii: add subsong number when using internal name\n"
|
||||
" -l N: create multiple TXTP per subsong layers, every N channels\n"
|
||||
" -fd: filter duplicates (slower)\n"
|
||||
" -fcm N: filter min channels\n"
|
||||
" -fcM N: filter by max channels\n"
|
||||
" -frm N: filter by min sample rate\n"
|
||||
" -frM N: filter by max sample rate\n"
|
||||
" -fsm N.N: filter by min seconds\n"
|
||||
" -fsM N.N: filter by max seconds\n"
|
||||
" -fss N: filter min subsongs (1 filters formats incapable of subsongs)\n"
|
||||
" -fni (regex): filter by subsong name, include files that match\n"
|
||||
" -fne (regex): filter by subsong name, exclude files that match\n"
|
||||
" -v (name): verbose level (off|trace|debug|info, default: info)\n"
|
||||
" -h N: show this help\n"
|
||||
)
|
||||
|
||||
# ########################################################################### #
|
||||
|
||||
def find_files(dir, pattern, recursive):
|
||||
files = []
|
||||
for root, dirnames, filenames in os.walk(dir):
|
||||
for filename in fnmatch.filter(filenames, pattern):
|
||||
files.append(os.path.join(root, filename))
|
||||
|
||||
if not recursive:
|
||||
break
|
||||
|
||||
return files
|
||||
|
||||
def make_cmd(cfg, fname_in, fname_out, target_subsong):
|
||||
if (cfg.test_dupes):
|
||||
cmd = "{} -s {} -i -o {} {}".format(cfg.cli, target_subsong, fname_out, fname_in)
|
||||
else:
|
||||
cmd = "{} -s {} -m -i -o {} {}".format(cfg.cli, target_subsong, fname_out, fname_in)
|
||||
return cmd
|
||||
|
||||
class LogHelper(object):
|
||||
|
||||
def __init__(self, cfg):
|
||||
self.cfg = cfg
|
||||
|
||||
def trace(self, msg):
|
||||
v = self.cfg.verbose
|
||||
if v == "trace":
|
||||
print(msg)
|
||||
|
||||
def debug(self, msg):
|
||||
v = self.cfg.verbose
|
||||
if v == "trace" or v == "debug":
|
||||
print(msg)
|
||||
|
||||
def info(self, msg):
|
||||
v = self.cfg.verbose
|
||||
if v == "trace" or v == "debug" or v == "info":
|
||||
print(msg)
|
||||
|
||||
class ConfigHelper(object):
|
||||
show_help = False
|
||||
cli = "test.exe"
|
||||
|
||||
recursive = False
|
||||
base_name = ''
|
||||
zero_fill = -1
|
||||
subdir = ''
|
||||
mini_txtp = False
|
||||
overwrite = False
|
||||
layers = 0
|
||||
|
||||
use_internal_name = False
|
||||
use_internal_ext = False
|
||||
use_internal_index = False
|
||||
|
||||
test_dupes = False
|
||||
min_channels = 0
|
||||
max_channels = 0
|
||||
min_sample_rate = 0
|
||||
max_sample_rate = 0
|
||||
min_seconds = 0.0
|
||||
max_seconds = 0.0
|
||||
min_subsongs = 0
|
||||
include_regex = ""
|
||||
exclude_regex = ""
|
||||
|
||||
verbose = "info"
|
||||
|
||||
argv_len = 0
|
||||
index = 0
|
||||
|
||||
|
||||
def read_bool(self, command, default):
|
||||
if self.index > self.argv_len - 1:
|
||||
return default
|
||||
if self.argv[self.index] == command:
|
||||
val = True
|
||||
self.index += 1
|
||||
return val
|
||||
return default
|
||||
|
||||
def read_value(self, command, default):
|
||||
if self.index > self.argv_len - 2:
|
||||
return default
|
||||
if self.argv[self.index] == command:
|
||||
val = self.argv[self.index+1]
|
||||
self.index += 2
|
||||
return val
|
||||
return default
|
||||
|
||||
def read_string(self, command, default):
|
||||
return str(self.read_value(command, default))
|
||||
|
||||
def read_int(self, command, default):
|
||||
return int(self.read_value(command, default))
|
||||
|
||||
def read_float(self, command, default):
|
||||
return float(self.read_value(command, default))
|
||||
|
||||
#todo improve this poop
|
||||
def __init__(self, argv):
|
||||
self.index = 2 #after file
|
||||
self.argv = argv
|
||||
self.argv_len = len(argv)
|
||||
|
||||
if argv[1] == '-h':
|
||||
self.show_help = True
|
||||
|
||||
prev_index = self.index
|
||||
while self.index < len(self.argv):
|
||||
self.show_help = self.read_bool('-h', self.show_help)
|
||||
self.cli = self.read_string('-c', self.cli)
|
||||
self.recursive = self.read_bool('-r', self.recursive)
|
||||
self.base_name = self.read_string('-n', self.base_name)
|
||||
self.zero_fill = self.read_int('-z', self.zero_fill)
|
||||
self.subdir = self.read_string('-d', self.subdir)
|
||||
|
||||
self.test_dupes = self.read_bool('-fd', self.test_dupes)
|
||||
self.min_channels = self.read_int('-fcm', self.min_channels)
|
||||
self.max_channels = self.read_int('-fcM', self.max_channels)
|
||||
self.min_sample_rate = self.read_int('-frm', self.min_sample_rate)
|
||||
self.max_sample_rate = self.read_int('-frM', self.max_sample_rate)
|
||||
self.min_seconds = self.read_float('-fsm', self.min_seconds)
|
||||
self.max_seconds = self.read_float('-fsM', self.max_seconds)
|
||||
self.min_subsongs = self.read_int('-fss', self.min_subsongs)
|
||||
self.include_regex = self.read_string('-fni', self.include_regex)
|
||||
self.exclude_regex = self.read_string('-fne', self.exclude_regex)
|
||||
|
||||
self.mini_txtp = self.read_bool('-m', self.mini_txtp)
|
||||
self.overwrite = self.read_bool('-o', self.overwrite)
|
||||
self.layers = self.read_int('-l', self.layers)
|
||||
|
||||
self.use_internal_name = self.read_bool('-in', self.use_internal_name)
|
||||
self.use_internal_ext = self.read_bool('-ie', self.use_internal_ext)
|
||||
self.use_internal_index = self.read_bool('-ii', self.use_internal_index)
|
||||
|
||||
self.verbose = self.read_string('-v', self.verbose)
|
||||
|
||||
if prev_index == self.index:
|
||||
self.index += 1
|
||||
prev_index = self.index
|
||||
|
||||
if (self.subdir != '') and not (self.subdir.endswith('/') or self.subdir.endswith('\\')):
|
||||
self.subdir += '/'
|
||||
|
||||
def __str__(self):
|
||||
return str(self.__dict__)
|
||||
|
||||
|
||||
class Cr32Helper(object):
|
||||
crc32_map = {}
|
||||
dupe = False
|
||||
cfg = None
|
||||
|
||||
def __init__(self, cfg):
|
||||
self.cfg = cfg
|
||||
|
||||
def get_crc32(self, fname):
|
||||
buf_size = 0x8000
|
||||
with open(fname, 'rb') as file:
|
||||
buf = file.read(buf_size)
|
||||
crc32 = 0
|
||||
while len(buf) > 0:
|
||||
crc32 = zlib.crc32(buf, crc32)
|
||||
buf = file.read(buf_size)
|
||||
return crc32 & 0xFFFFFFFF
|
||||
|
||||
def update(self, fname):
|
||||
cfg = self.cfg
|
||||
|
||||
self.dupe = False
|
||||
if cfg.test_dupes == 0:
|
||||
return
|
||||
if not os.path.exists(fname):
|
||||
return
|
||||
|
||||
crc32_str = format(self.get_crc32(fname),'08x')
|
||||
if (crc32_str in self.crc32_map):
|
||||
self.dupe = True
|
||||
return
|
||||
self.crc32_map[crc32_str] = True
|
||||
|
||||
return
|
||||
|
||||
def is_dupe(self):
|
||||
return self.dupe
|
||||
|
||||
|
||||
class TxtpMaker(object):
|
||||
channels = 0
|
||||
sample_rate = 0
|
||||
num_samples = 0
|
||||
stream_count = 0
|
||||
stream_index = 0
|
||||
stream_name = ''
|
||||
stream_seconds = 0
|
||||
|
||||
def __init__(self, cfg, output_b, log):
|
||||
self.cfg = cfg
|
||||
self.log = log
|
||||
|
||||
self.output = str(output_b).replace("\\r","").replace("\\n","\n")
|
||||
self.channels = self.get_value("channels: ")
|
||||
self.sample_rate = self.get_value("sample rate: ")
|
||||
self.num_samples = self.get_value("stream total samples: ")
|
||||
self.stream_count = self.get_value("stream count: ")
|
||||
self.stream_index = self.get_value("stream index: ")
|
||||
self.stream_name = self.get_string("stream name: ")
|
||||
|
||||
if self.channels == 0:
|
||||
raise ValueError('Incorrect command result')
|
||||
|
||||
self.stream_seconds = self.num_samples / self.sample_rate
|
||||
|
||||
def __str__(self):
|
||||
return str(self.__dict__)
|
||||
|
||||
def get_string(self, str):
|
||||
find_pos = self.output.find(str)
|
||||
if (find_pos == -1):
|
||||
return ''
|
||||
cut_pos = find_pos + len(str)
|
||||
str_cut = self.output[cut_pos:]
|
||||
return str_cut.split()[0]
|
||||
|
||||
def get_value(self, str):
|
||||
res = self.get_string(str)
|
||||
if (res == ''):
|
||||
return 0;
|
||||
return int(res)
|
||||
|
||||
def is_ignorable(self):
|
||||
cfg = self.cfg
|
||||
|
||||
if (self.channels < cfg.min_channels):
|
||||
return True;
|
||||
if (cfg.max_channels > 0 and self.channels > cfg.max_channels):
|
||||
return True;
|
||||
if (self.sample_rate < cfg.min_sample_rate):
|
||||
return True;
|
||||
if (cfg.max_sample_rate > 0 and self.sample_rate > cfg.max_sample_rate):
|
||||
return True;
|
||||
if (self.stream_seconds < cfg.min_seconds):
|
||||
return True;
|
||||
if (cfg.max_seconds > 0 and self.stream_seconds > cfg.max_seconds):
|
||||
return True;
|
||||
if (self.stream_count < cfg.min_subsongs):
|
||||
return True;
|
||||
if (cfg.exclude_regex != "" and self.stream_name != ""):
|
||||
p = re.compile(cfg.exclude_regex)
|
||||
if (p.match(self.stream_name) != None):
|
||||
return True
|
||||
if (cfg.include_regex != "" and self.stream_name != ""):
|
||||
p = re.compile(cfg.include_regex)
|
||||
if (p.match(self.stream_name) == None):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def get_stream_mask(self, layer):
|
||||
cfg = self.cfg
|
||||
|
||||
mask = '#c'
|
||||
|
||||
loops = cfg.layers
|
||||
if layer + cfg.layers > self.channels:
|
||||
loops = self.channels - cfg.layers
|
||||
for ch in range(0,loops):
|
||||
mask += str(layer+ch) + ','
|
||||
|
||||
mask = mask[:-1]
|
||||
return mask
|
||||
|
||||
def get_stream_name(self):
|
||||
cfg = self.cfg
|
||||
|
||||
if not cfg.use_internal_name:
|
||||
return ''
|
||||
txt = self.stream_name
|
||||
|
||||
# remove paths #todo maybe config/replace?
|
||||
pos = txt.rfind("\\")
|
||||
if (pos != -1):
|
||||
txt = txt[pos+1:]
|
||||
pos = txt.rfind("/")
|
||||
if (pos != -1):
|
||||
txt = txt[pos+1:]
|
||||
# remove bad chars
|
||||
txt = txt.replace("%", "_")
|
||||
txt = txt.replace("*", "_")
|
||||
txt = txt.replace("?", "_")
|
||||
txt = txt.replace(":", "_")
|
||||
txt = txt.replace("\"", "_")
|
||||
txt = txt.replace("|", "_")
|
||||
txt = txt.replace("<", "_")
|
||||
txt = txt.replace(">", "_")
|
||||
|
||||
if not cfg.use_internal_ext:
|
||||
pos = txt.rfind(".")
|
||||
if (pos != -1):
|
||||
txt = txt[:pos]
|
||||
return txt
|
||||
|
||||
def write(self, outname, line):
|
||||
cfg = self.cfg
|
||||
|
||||
outname += '.txtp'
|
||||
if not cfg.overwrite and os.path.exists(outname):
|
||||
raise ValueError('TXTP exists in path: ' + outname)
|
||||
ftxtp = open(outname,"w+")
|
||||
if line != '':
|
||||
ftxtp.write(line)
|
||||
ftxtp.close()
|
||||
|
||||
self.log.debug("created: " + outname)
|
||||
return
|
||||
|
||||
def make(self, fname_path, fname_clean):
|
||||
cfg = self.cfg
|
||||
total_done = 0
|
||||
|
||||
if self.is_ignorable():
|
||||
return total_done
|
||||
|
||||
# write plain (name).txtp when no subsongs
|
||||
if self.stream_count <= 1:
|
||||
index = ""
|
||||
else:
|
||||
index = str(self.stream_index)
|
||||
if cfg.zero_fill < 0:
|
||||
index = index.zfill(len(str(self.stream_count)))
|
||||
else:
|
||||
index = index.zfill(cfg.zero_fill)
|
||||
|
||||
if cfg.mini_txtp:
|
||||
outname = fname_path
|
||||
if index != "":
|
||||
outname += "#" + index
|
||||
|
||||
if cfg.layers > 0 and cfg.layers < self.channels:
|
||||
for layer in range(0, self.channels, cfg.layers):
|
||||
mask = self.get_stream_mask(layer)
|
||||
self.write(outname + mask, '')
|
||||
total_done += 1
|
||||
else:
|
||||
self.write(outname, '')
|
||||
total_done += 1
|
||||
|
||||
else:
|
||||
stream_name = self.get_stream_name()
|
||||
if stream_name != '':
|
||||
outname = stream_name
|
||||
if cfg.use_internal_index:
|
||||
outname += "_{}".format(index)
|
||||
else:
|
||||
if cfg.base_name != '':
|
||||
fname_base = os.path.basename(fname_path)
|
||||
pos = fname_base.rfind(".") #remove ext
|
||||
if (pos != -1 and pos > 1):
|
||||
fname_base = fname_base[:pos]
|
||||
|
||||
txt = cfg.base_name
|
||||
txt = txt.replace("{filename}",fname_base)
|
||||
|
||||
else:
|
||||
txt = fname_path
|
||||
pos = txt.rfind(".") #remove ext
|
||||
if (pos != -1 and pos > 1):
|
||||
txt = txt[:pos]
|
||||
outname = "{}".format(txt)
|
||||
if index != "":
|
||||
outname += "_" + index
|
||||
|
||||
line = ''
|
||||
if cfg.subdir != '':
|
||||
line += cfg.subdir
|
||||
line += fname_clean
|
||||
if index != "":
|
||||
line += "#" + index
|
||||
|
||||
if cfg.layers > 0 and cfg.layers < self.channels:
|
||||
done = 0
|
||||
for layer in range(0, self.channels, cfg.layers):
|
||||
sub = chr(ord('a') + done)
|
||||
done += 1
|
||||
mask = self.get_stream_mask(layer)
|
||||
self.write(outname + sub, line + mask)
|
||||
total_done += 1
|
||||
else:
|
||||
self.write(outname, line)
|
||||
total_done += 1
|
||||
return total_done
|
||||
|
||||
def has_more_subsongs(self, target_subsong):
|
||||
return target_subsong < self.stream_count
|
||||
|
||||
# ########################################################################### #
|
||||
|
||||
def main():
|
||||
appname = os.path.basename(sys.argv[0])
|
||||
if (len(sys.argv) <= 1):
|
||||
print_usage(appname)
|
||||
return
|
||||
|
||||
cfg = ConfigHelper(sys.argv)
|
||||
crc32 = Cr32Helper(cfg)
|
||||
log = LogHelper(cfg)
|
||||
|
||||
if cfg.show_help:
|
||||
print_help(appname)
|
||||
return
|
||||
|
||||
fname = sys.argv[1]
|
||||
fnames_in = find_files('.', fname, cfg.recursive)
|
||||
|
||||
total_created = 0
|
||||
total_dupes = 0
|
||||
total_errors = 0
|
||||
for fname_in in fnames_in:
|
||||
fname_in_clean = fname_in.replace("\\", "/")
|
||||
if fname_in_clean.startswith("./"):
|
||||
fname_in_clean = fname_in_clean[2:]
|
||||
|
||||
fname_in_base = os.path.basename(fname_in)
|
||||
|
||||
if fname_in.startswith(".\\"): #skip starting dot for extensionless files
|
||||
fname_in = fname_in[2:]
|
||||
|
||||
fname_out = ".temp." + fname_in_base + ".wav"
|
||||
created = 0
|
||||
dupes = 0
|
||||
errors = 0
|
||||
target_subsong = 1
|
||||
while 1:
|
||||
|
||||
try:
|
||||
cmd = make_cmd(cfg, fname_in, fname_out, target_subsong)
|
||||
log.trace("calling: " + cmd)
|
||||
output_b = subprocess.check_output(cmd, shell=False) #stderr=subprocess.STDOUT
|
||||
except subprocess.CalledProcessError as e:
|
||||
log.debug("ignoring CLI error in " + fname_in + "#"+str(target_subsong)+": " + e.output)
|
||||
errors += 1
|
||||
break
|
||||
|
||||
if target_subsong == 1:
|
||||
log.debug("processing {}...".format(fname_in_clean))
|
||||
|
||||
maker = TxtpMaker(cfg, output_b, log)
|
||||
|
||||
if not maker.is_ignorable():
|
||||
crc32.update(fname_out)
|
||||
|
||||
if not crc32.is_dupe():
|
||||
created += maker.make(fname_in_base, fname_in_clean)
|
||||
else:
|
||||
dupes += 1
|
||||
log.debug("Dupe subsong {}".format(target_subsong))
|
||||
|
||||
if not maker.has_more_subsongs(target_subsong):
|
||||
break
|
||||
target_subsong += 1
|
||||
|
||||
if target_subsong % 200 == 0:
|
||||
log.info("{}/{} subsongs... ".format(target_subsong, maker.stream_count) +
|
||||
"({} dupes, {} errors)".format(dupes, errors)
|
||||
)
|
||||
|
||||
if os.path.exists(fname_out):
|
||||
os.remove(fname_out)
|
||||
|
||||
total_created += created
|
||||
total_dupes += dupes
|
||||
total_errors += errors
|
||||
|
||||
|
||||
log.info("done! ({} done, {} dupes, {} errors)".format(total_created, total_dupes, total_errors))
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user