vgmstream/cli/txtp_maker.py

545 lines
18 KiB
Python
Raw Normal View History

2019-02-17 21:04:43 +01:00
#!/usr/bin/env python3
# ########################################################################### #
# TXTP MAKER
# ########################################################################### #
from __future__ import division
import subprocess
import zlib
import os.path
import os
import re
import sys
import fnmatch
def print_usage(appname):
print("Usage: {} (filename) [options]".format(appname)+"\n"
"\n"
"Creates (filename)_(subsong).txtp for every subsong in (filename).\n"
" (filename) can be a * or *.ext wildcard too (works with dupe filters).\n"
"Works with files with no subsongs (unless filtered) too.\n"
"\n"
"Use -h to print [options]. Examples:\n"
"\n"
"{} bgm.fsb -in -fcm 2 -fms 5.0 ".format(appname)+"\n"
" make TXTP for subsongs with at least 2 channels and 5 seconds\n"
"{} *.scd -r -fd -l 2".format(appname)+"\n"
" all .scd in subdirs, ignoring dupes and making per 2ch layers\n"
"{} *.sm1 -fne .+STREAM[.]SS[0-9]$ ".format(appname)+"\n"
" all .sm1 excluding those subsong names that ends with 'STREAM.SS0..9'\n"
"{} samples.bnk -fni ^bgm.? ".format(appname)+"\n"
" in .bnk including only subsong names that start with 'bgm'\n"
"{} * -r -fss 1".format(appname)+"\n"
" all files in subdirs with at least 1 subsong (ignoring formats without them)\n"
)
def print_help(appname):
print("Options:\n"
" -r: find recursive (writes files to current dir, with dir in TXTP)\n"
" -c (name): set path to CLI (default: test.exe)\n"
" -n (name): use (name)_(subsong).txtp format\n"
" You can put '{filename}' somewhere to get it substituted by the base name\n"
" -z N: zero-fill subsong number (default: auto fill up to total subsongs)\n"
" -d (dir): add dir in TXTP (if the file will reside in a subdir)\n"
" -m: create mini-txtp\n"
" -o: overwrite existing .txtp (beware when using with internal names)\n"
" -in: name TXTP using the subsong's internal name if found\n"
" -ie: remove internal name's extension\n"
" -ii: add subsong number when using internal name\n"
" -l N: create multiple TXTP per subsong layers, every N channels\n"
" -fd: filter duplicates (slower)\n"
" -fcm N: filter min channels\n"
" -fcM N: filter by max channels\n"
" -frm N: filter by min sample rate\n"
" -frM N: filter by max sample rate\n"
" -fsm N.N: filter by min seconds\n"
" -fsM N.N: filter by max seconds\n"
" -fss N: filter min subsongs (1 filters formats incapable of subsongs)\n"
" -fni (regex): filter by subsong name, include files that match\n"
" -fne (regex): filter by subsong name, exclude files that match\n"
" -v (name): verbose level (off|trace|debug|info, default: info)\n"
" -h N: show this help\n"
)
# ########################################################################### #
def find_files(dir, pattern, recursive):
files = []
for root, dirnames, filenames in os.walk(dir):
for filename in fnmatch.filter(filenames, pattern):
files.append(os.path.join(root, filename))
if not recursive:
break
return files
def make_cmd(cfg, fname_in, fname_out, target_subsong):
if (cfg.test_dupes):
cmd = "{} -s {} -i -o {} {}".format(cfg.cli, target_subsong, fname_out, fname_in)
else:
cmd = "{} -s {} -m -i -o {} {}".format(cfg.cli, target_subsong, fname_out, fname_in)
return cmd
class LogHelper(object):
def __init__(self, cfg):
self.cfg = cfg
def trace(self, msg):
v = self.cfg.verbose
if v == "trace":
print(msg)
def debug(self, msg):
v = self.cfg.verbose
if v == "trace" or v == "debug":
print(msg)
def info(self, msg):
v = self.cfg.verbose
if v == "trace" or v == "debug" or v == "info":
print(msg)
class ConfigHelper(object):
show_help = False
cli = "test.exe"
recursive = False
base_name = ''
zero_fill = -1
subdir = ''
mini_txtp = False
overwrite = False
layers = 0
use_internal_name = False
use_internal_ext = False
use_internal_index = False
test_dupes = False
min_channels = 0
max_channels = 0
min_sample_rate = 0
max_sample_rate = 0
min_seconds = 0.0
max_seconds = 0.0
min_subsongs = 0
include_regex = ""
exclude_regex = ""
verbose = "info"
argv_len = 0
index = 0
def read_bool(self, command, default):
if self.index > self.argv_len - 1:
return default
if self.argv[self.index] == command:
val = True
self.index += 1
return val
return default
def read_value(self, command, default):
if self.index > self.argv_len - 2:
return default
if self.argv[self.index] == command:
val = self.argv[self.index+1]
self.index += 2
return val
return default
def read_string(self, command, default):
return str(self.read_value(command, default))
def read_int(self, command, default):
return int(self.read_value(command, default))
def read_float(self, command, default):
return float(self.read_value(command, default))
#todo improve this poop
def __init__(self, argv):
self.index = 2 #after file
self.argv = argv
self.argv_len = len(argv)
if argv[1] == '-h':
self.show_help = True
prev_index = self.index
while self.index < len(self.argv):
self.show_help = self.read_bool('-h', self.show_help)
self.cli = self.read_string('-c', self.cli)
self.recursive = self.read_bool('-r', self.recursive)
self.base_name = self.read_string('-n', self.base_name)
self.zero_fill = self.read_int('-z', self.zero_fill)
self.subdir = self.read_string('-d', self.subdir)
self.test_dupes = self.read_bool('-fd', self.test_dupes)
self.min_channels = self.read_int('-fcm', self.min_channels)
self.max_channels = self.read_int('-fcM', self.max_channels)
self.min_sample_rate = self.read_int('-frm', self.min_sample_rate)
self.max_sample_rate = self.read_int('-frM', self.max_sample_rate)
self.min_seconds = self.read_float('-fsm', self.min_seconds)
self.max_seconds = self.read_float('-fsM', self.max_seconds)
self.min_subsongs = self.read_int('-fss', self.min_subsongs)
self.include_regex = self.read_string('-fni', self.include_regex)
self.exclude_regex = self.read_string('-fne', self.exclude_regex)
self.mini_txtp = self.read_bool('-m', self.mini_txtp)
self.overwrite = self.read_bool('-o', self.overwrite)
self.layers = self.read_int('-l', self.layers)
self.use_internal_name = self.read_bool('-in', self.use_internal_name)
self.use_internal_ext = self.read_bool('-ie', self.use_internal_ext)
self.use_internal_index = self.read_bool('-ii', self.use_internal_index)
self.verbose = self.read_string('-v', self.verbose)
if prev_index == self.index:
self.index += 1
prev_index = self.index
if (self.subdir != '') and not (self.subdir.endswith('/') or self.subdir.endswith('\\')):
self.subdir += '/'
def __str__(self):
return str(self.__dict__)
class Cr32Helper(object):
crc32_map = {}
dupe = False
cfg = None
def __init__(self, cfg):
self.cfg = cfg
def get_crc32(self, fname):
buf_size = 0x8000
with open(fname, 'rb') as file:
buf = file.read(buf_size)
crc32 = 0
while len(buf) > 0:
crc32 = zlib.crc32(buf, crc32)
buf = file.read(buf_size)
return crc32 & 0xFFFFFFFF
def update(self, fname):
cfg = self.cfg
self.dupe = False
if cfg.test_dupes == 0:
return
if not os.path.exists(fname):
return
crc32_str = format(self.get_crc32(fname),'08x')
if (crc32_str in self.crc32_map):
self.dupe = True
return
self.crc32_map[crc32_str] = True
return
def is_dupe(self):
return self.dupe
class TxtpMaker(object):
channels = 0
sample_rate = 0
num_samples = 0
stream_count = 0
stream_index = 0
stream_name = ''
stream_seconds = 0
def __init__(self, cfg, output_b, log):
self.cfg = cfg
self.log = log
self.output = str(output_b).replace("\\r","").replace("\\n","\n")
self.channels = self.get_value("channels: ")
self.sample_rate = self.get_value("sample rate: ")
self.num_samples = self.get_value("stream total samples: ")
self.stream_count = self.get_value("stream count: ")
self.stream_index = self.get_value("stream index: ")
self.stream_name = self.get_string("stream name: ")
if self.channels == 0:
raise ValueError('Incorrect command result')
self.stream_seconds = self.num_samples / self.sample_rate
def __str__(self):
return str(self.__dict__)
def get_string(self, str):
find_pos = self.output.find(str)
if (find_pos == -1):
return ''
cut_pos = find_pos + len(str)
str_cut = self.output[cut_pos:]
return str_cut.split()[0]
def get_value(self, str):
res = self.get_string(str)
if (res == ''):
return 0;
return int(res)
def is_ignorable(self):
cfg = self.cfg
if (self.channels < cfg.min_channels):
return True;
if (cfg.max_channels > 0 and self.channels > cfg.max_channels):
return True;
if (self.sample_rate < cfg.min_sample_rate):
return True;
if (cfg.max_sample_rate > 0 and self.sample_rate > cfg.max_sample_rate):
return True;
if (self.stream_seconds < cfg.min_seconds):
return True;
if (cfg.max_seconds > 0 and self.stream_seconds > cfg.max_seconds):
return True;
if (self.stream_count < cfg.min_subsongs):
return True;
if (cfg.exclude_regex != "" and self.stream_name != ""):
p = re.compile(cfg.exclude_regex)
if (p.match(self.stream_name) != None):
return True
if (cfg.include_regex != "" and self.stream_name != ""):
p = re.compile(cfg.include_regex)
if (p.match(self.stream_name) == None):
return True
return False
def get_stream_mask(self, layer):
cfg = self.cfg
mask = '#c'
loops = cfg.layers
if layer + cfg.layers > self.channels:
loops = self.channels - cfg.layers
for ch in range(0,loops):
mask += str(layer+ch) + ','
mask = mask[:-1]
return mask
def get_stream_name(self):
cfg = self.cfg
if not cfg.use_internal_name:
return ''
txt = self.stream_name
# remove paths #todo maybe config/replace?
pos = txt.rfind("\\")
if (pos != -1):
txt = txt[pos+1:]
pos = txt.rfind("/")
if (pos != -1):
txt = txt[pos+1:]
# remove bad chars
txt = txt.replace("%", "_")
txt = txt.replace("*", "_")
txt = txt.replace("?", "_")
txt = txt.replace(":", "_")
txt = txt.replace("\"", "_")
txt = txt.replace("|", "_")
txt = txt.replace("<", "_")
txt = txt.replace(">", "_")
if not cfg.use_internal_ext:
pos = txt.rfind(".")
if (pos != -1):
txt = txt[:pos]
return txt
def write(self, outname, line):
cfg = self.cfg
outname += '.txtp'
if not cfg.overwrite and os.path.exists(outname):
raise ValueError('TXTP exists in path: ' + outname)
ftxtp = open(outname,"w+")
if line != '':
ftxtp.write(line)
ftxtp.close()
self.log.debug("created: " + outname)
return
def make(self, fname_path, fname_clean):
cfg = self.cfg
total_done = 0
if self.is_ignorable():
return total_done
# write plain (name).txtp when no subsongs
if self.stream_count <= 1:
index = ""
else:
index = str(self.stream_index)
if cfg.zero_fill < 0:
index = index.zfill(len(str(self.stream_count)))
else:
index = index.zfill(cfg.zero_fill)
if cfg.mini_txtp:
outname = fname_path
if index != "":
outname += "#" + index
if cfg.layers > 0 and cfg.layers < self.channels:
for layer in range(0, self.channels, cfg.layers):
mask = self.get_stream_mask(layer)
self.write(outname + mask, '')
total_done += 1
else:
self.write(outname, '')
total_done += 1
else:
stream_name = self.get_stream_name()
if stream_name != '':
outname = stream_name
if cfg.use_internal_index:
outname += "_{}".format(index)
else:
if cfg.base_name != '':
fname_base = os.path.basename(fname_path)
pos = fname_base.rfind(".") #remove ext
if (pos != -1 and pos > 1):
fname_base = fname_base[:pos]
txt = cfg.base_name
txt = txt.replace("{filename}",fname_base)
else:
txt = fname_path
pos = txt.rfind(".") #remove ext
if (pos != -1 and pos > 1):
txt = txt[:pos]
outname = "{}".format(txt)
if index != "":
outname += "_" + index
line = ''
if cfg.subdir != '':
line += cfg.subdir
line += fname_clean
if index != "":
line += "#" + index
if cfg.layers > 0 and cfg.layers < self.channels:
done = 0
for layer in range(0, self.channels, cfg.layers):
sub = chr(ord('a') + done)
done += 1
mask = self.get_stream_mask(layer)
self.write(outname + sub, line + mask)
total_done += 1
else:
self.write(outname, line)
total_done += 1
return total_done
def has_more_subsongs(self, target_subsong):
return target_subsong < self.stream_count
# ########################################################################### #
def main():
appname = os.path.basename(sys.argv[0])
if (len(sys.argv) <= 1):
print_usage(appname)
return
cfg = ConfigHelper(sys.argv)
crc32 = Cr32Helper(cfg)
log = LogHelper(cfg)
if cfg.show_help:
print_help(appname)
return
fname = sys.argv[1]
fnames_in = find_files('.', fname, cfg.recursive)
total_created = 0
total_dupes = 0
total_errors = 0
for fname_in in fnames_in:
fname_in_clean = fname_in.replace("\\", "/")
if fname_in_clean.startswith("./"):
fname_in_clean = fname_in_clean[2:]
fname_in_base = os.path.basename(fname_in)
if fname_in.startswith(".\\"): #skip starting dot for extensionless files
fname_in = fname_in[2:]
fname_out = ".temp." + fname_in_base + ".wav"
created = 0
dupes = 0
errors = 0
target_subsong = 1
while 1:
try:
cmd = make_cmd(cfg, fname_in, fname_out, target_subsong)
log.trace("calling: " + cmd)
output_b = subprocess.check_output(cmd, shell=False) #stderr=subprocess.STDOUT
except subprocess.CalledProcessError as e:
log.debug("ignoring CLI error in " + fname_in + "#"+str(target_subsong)+": " + e.output)
errors += 1
break
if target_subsong == 1:
log.debug("processing {}...".format(fname_in_clean))
maker = TxtpMaker(cfg, output_b, log)
if not maker.is_ignorable():
crc32.update(fname_out)
if not crc32.is_dupe():
created += maker.make(fname_in_base, fname_in_clean)
else:
dupes += 1
log.debug("Dupe subsong {}".format(target_subsong))
if not maker.has_more_subsongs(target_subsong):
break
target_subsong += 1
if target_subsong % 200 == 0:
log.info("{}/{} subsongs... ".format(target_subsong, maker.stream_count) +
"({} dupes, {} errors)".format(dupes, errors)
)
if os.path.exists(fname_out):
os.remove(fname_out)
total_created += created
total_dupes += dupes
total_errors += errors
log.info("done! ({} done, {} dupes, {} errors)".format(total_created, total_dupes, total_errors))
if __name__ == "__main__":
main()