#!/usr/bin/env python3 from __future__ import division import argparse, subprocess, zlib, os, re, sys, fnmatch, logging as log #****************************************************************************** # TXTP MAKER # # Creates .txtp from lists of files, mainly one .txtp per subsong #****************************************************************************** class Cli(object): def _parse(self): description = ( "Makes TXTP from files in folders" ) epilog = ( "examples:\n" " %(prog)s *.fsb \n" " - make .txtp for subsongs in current dir\n\n" " %(prog)s bgm/*.fsb \n" " - make .txtp for subsongs in bgm subdir\n\n" " %(prog)s * -r -fss 1\n" " - make .txtp for all files in any subdirs with at least 1 subsong\n" " (ignores formats without subsongs)\n\n" " %(prog)s bgm.fsb -in -fcm 2 -fms 5.0\n" " - make .txtp for subsongs with at least 2 channels and 5 seconds\n\n" " %(prog)s *.scd -r -fd -l 2\n" " - make .txtp for all .scd in subdirs, ignoring dupes, one .txtp per 2ch\n\n" " %(prog)s *.sm1 -fne .+STREAM[.]SS[0-9]$\n" " - make .txtp for all .sm1 excluding subsongs ending with 'STREAM.SS0..9'\n\n" " %(prog)s samples.bnk -fni ^bgm.?\n" " - make .txtp for in .bnk including only subsong names that start with 'bgm'\n\n" " %(prog)s *.fsb -n \"{fn}<__{ss}>< [{in}]>\" -z 4 -o\n" " - make .txtp for all fsb, adding subsongs and stream name if they exist\n\n" ) p = argparse.ArgumentParser(description=description, epilog=epilog, formatter_class=argparse.RawTextHelpFormatter) p.add_argument('files', help="Files to get (wildcards work)", nargs='+') p.add_argument('-r', dest='recursive', help="Create .txtp in base folder from data in subfolders", action='store_true') p.add_argument('-c', dest='cli', help="Set path to CLI (default: auto)") p.add_argument('-d', dest='subdir', help="Set subdir inside .txtp (where file will reside)") p.add_argument('-n', dest='base_name', help=("Define (name).txtp, that can be formatted using:\n" "- {filename}|{fn}=filename without extension\n" "- {subsong}|{ss}=subsong number)\n" "- {internal-name}|{in}=internal stream name\n" "- {if}=internal name or filename if not found\n" "* may be inside <...> for conditional text\n")) p.add_argument('-z', dest='zero_fill', help="Zero-fill subsong number (default: auto per subsongs)", type=int) p.add_argument('-ie', dest='no_internal_ext', help="Remove internal name's extension if any", action='store_true') p.add_argument('-m', dest='mini_txtp', help="Create mini-txtp", action='store_true') p.add_argument('-o', dest='overwrite', help="Overwrite existing .txtp\n(beware when using with internal names alone)", action='store_true') p.add_argument('-oi', dest='overwrite_ignore', help="Ignore repeated rather than overwritting.txtp\n", action='store_true') p.add_argument('-or', dest='overwrite_rename', help="Rename rather than overwriting", action='store_true') p.add_argument('-os', dest='overwrite_suffix', help="Rename with a suffix") p.add_argument('-l', dest='layers', help="Create .txtp per subsong layers, every N channels", type=int) p.add_argument('-fd', dest='test_dupes', help="Skip .txtp that point to duplicate streams (slower)", action='store_true') p.add_argument('-fcm', dest='min_channels', help="Filter by min channels", type=int) p.add_argument('-fcM', dest='max_channels', help="Filter by max channels", type=int) p.add_argument('-frm', dest='min_sample_rate', help="Filter by min sample rate", type=int) p.add_argument('-frM', dest='max_sample_rate', help="Filter by max sample rate", type=int) p.add_argument('-fsm', dest='min_seconds', help="Filter by min seconds (N.N)", type=float) p.add_argument('-fsM', dest='max_seconds', help="Filter by max seconds (N.N)", type=float) p.add_argument('-fss', dest='min_subsongs', help="Filter min subsongs\n(1 filters formats incapable of subsongs)", type=int) p.add_argument('-fni', dest='include_regex', help="Filter by REGEX including matches of subsong name") p.add_argument('-fne', dest='exclude_regex', help="Filter by REGEX excluding matches of subsong name") p.add_argument('-nsc',dest='no_semicolon', help="Remove semicolon names (for songs with multinames)", action='store_true') p.add_argument('-v', dest='log_level', help="Verbose log level (off|debug|info, default: info)", default='info') return p.parse_args() def start(self): args = self._parse() if not args.files: return Logger(args).setup_cli() App(args).start() #****************************************************************************** class _GuiLogHandler(log.Handler): def __init__(self, txt): log.Handler.__init__(self) self._txt = txt def emit(self, message): msg = self.format(message) self._txt.config(state='normal') self._txt.insert('end', msg + '\n') self._txt.config(state='disabled') class Logger(object): def __init__(self, cfg): levels = { 'info': log.INFO, 'debug': log.DEBUG, } self.level = levels.get(cfg.log_level, log.ERROR) def setup_cli(self): log.basicConfig(level=self.level, format='%(message)s') def setup_gui(self, txt): log.basicConfig(level=self.level, format='%(message)s', handlers=[_GuiLogHandler(txt)]) #****************************************************************************** class Cr32Helper(object): def __init__(self, cfg): self.cfg = cfg self.crc32_map = {} self.last_dupe = False def get_crc32(self, filename): buf_size = 0x8000 with open(filename, 'rb') as file: buf = file.read(buf_size) crc32 = 0 while len(buf) > 0: crc32 = zlib.crc32(buf, crc32) buf = file.read(buf_size) return crc32 & 0xFFFFFFFF def update(self, filename): self.last_dupe = False if self.cfg.test_dupes == 0: return if not os.path.exists(filename): return crc32_str = format(self.get_crc32(filename),'08x') if (crc32_str in self.crc32_map): self.last_dupe = True return self.crc32_map[crc32_str] = True return def is_last_dupe(self): return self.last_dupe #****************************************************************************** # Makes .txtp (usually 1 but may do N) from a CLI output + subsong class TxtpMaker(object): def __init__(self, cfg, output_b, rename_map): self.cfg = cfg self.output = str(output_b).replace("\\r","").replace("\\n","\n") self.channels = self._get_value("channels: ") self.sample_rate = self._get_value("sample rate: ") self.num_samples = self._get_value("stream total samples: ") self.stream_count = self._get_value("stream count: ") self.stream_index = self._get_value("stream index: ") self.stream_name = self._get_text("stream name: ") self.encoding = self._get_text("encoding: ") # in case vgmstream returns error, but output code wasn't EXIT_FAILURE if self.channels <= 0 or self.sample_rate <= 0: raise ValueError('Incorrect vgmstream command') self.stream_seconds = self.num_samples / self.sample_rate self.ignorable = self._is_ignorable(cfg) self.rename_map = rename_map def __str__(self): return str(self.__dict__) def _get_string(self, str, full=False): find_pos = self.output.find(str) if (find_pos == -1): return None cut_pos = find_pos + len(str) str_cut = self.output[cut_pos:] if full: return str_cut.split("\n")[0].strip() else: return str_cut.split()[0].strip() def _get_text(self, str): return self._get_string(str, full=True) def _get_value(self, str): res = self._get_string(str) if not res: return 0 return int(res) def is_ignorable(self): return self.ignorable def _is_ignorable(self, cfg): if cfg.min_channels and self.channels < cfg.min_channels: return True if cfg.max_channels and self.channels > cfg.max_channels: return True if cfg.min_sample_rate and self.sample_rate < cfg.min_sample_rate: return True if cfg.max_sample_rate and self.sample_rate > cfg.max_sample_rate: return True if cfg.min_seconds and self.stream_seconds < cfg.min_seconds: return True if cfg.max_seconds and self.stream_seconds > cfg.max_seconds: return True if cfg.min_subsongs and self.stream_count < cfg.min_subsongs: return True if cfg.exclude_regex and self.stream_name: p = re.compile(cfg.exclude_regex) if p.match(self.stream_name) is not None: return True if cfg.include_regex and self.stream_name: p = re.compile(cfg.include_regex) if p.match(self.stream_name) is None: return True if self.encoding.lower() == 'silence': return True return False def _get_stream_mask(self, layer): if layer + self.cfg.layers > self.channels: loops = self.channels - self.cfg.layers else: loops = self.cfg.layers + 1 mask = '#C' for ch in range(1, loops): mask += str(layer + ch) + ',' return mask[:-1] def _clean_stream_name(self): if not self.stream_name: return None txt = self.stream_name # remove paths #todo maybe config/replace? pos = txt.rfind('\\') if pos >= 0: txt = txt[pos+1:] pos = txt.rfind('/') if pos >= 0: txt = txt[pos+1:] # remove bad chars badchars = ['%', '*', '?', ':', '\"', '|', '<', '>'] for badchar in badchars: txt = txt.replace(badchar, '_') if not self.cfg.no_internal_ext: pos = txt.rfind(".") if pos >= 0: txt = txt[:pos] if self.cfg.no_semicolon: pos = txt.find(";") if pos >= 0: txt = txt[:pos].strip() return txt def _write(self, outname, line): outname += '.txtp' cfg = self.cfg exists = os.path.exists(outname) if exists and (cfg.overwrite_rename or cfg.overwrite_suffix): must_rename = True if cfg.overwrite_suffix: outname = outname.replace(".txtp", "%s.txtp" % (cfg.overwrite_suffix)) must_rename = os.path.exists(outname) if must_rename: if outname in self.rename_map: rename_count = self.rename_map[outname] else: rename_count = 0 self.rename_map[outname] = rename_count + 1 outname = outname.replace(".txtp", "_%08i.txtp" % (rename_count)) # decide action after (possible) renames above if os.path.exists(outname): if cfg.overwrite_ignore: log.debug("ignored: " + outname) return if not cfg.overwrite: raise ValueError('TXTP exists in path: ' + outname) ftxtp = open(outname,"w+") if line: ftxtp.write(line) ftxtp.close() log.debug("created: " + outname) return def make(self, filename_path, filename_clean): cfg = self.cfg total_done = 0 if self.is_ignorable(): return total_done # write plain (name).txtp when no subsongs if self.stream_count <= 1: index = None else: index = str(self.stream_index) #str to avoid falsy 0 if cfg.zero_fill is None or cfg.zero_fill < 0: index = index.zfill(len(str(self.stream_count))) else: index = index.zfill(cfg.zero_fill) if cfg.mini_txtp: outname = filename_path if index: outname += "#" + index if cfg.layers and cfg.layers < self.channels: for layer in range(0, self.channels, cfg.layers): mask = self._get_stream_mask(layer) self._write(outname + mask, '') total_done += 1 else: self._write(outname, '') total_done += 1 else: filename_base = os.path.basename(filename_path) pos = filename_base.rfind(".") #remove ext if pos > 1: filename_base = filename_base[:pos] outname = '' if cfg.base_name: stream_name = self._clean_stream_name() internal_filename = stream_name if not internal_filename: internal_filename = filename_base replaces = { 'fn': filename_base, 'filename': filename_base, 'ss': index, 'subsong': index, 'in': stream_name, 'internal-name': stream_name, 'if': internal_filename, } pattern1 = re.compile(r"<(.+?)>") pattern2 = re.compile(r"{(.+?)}") txt = cfg.base_name # print optional info like "" only if value in {cmd} exists optionals = pattern1.findall(txt) for optional in optionals: has_values = False cmds = pattern2.findall(optional) for cmd in cmds: if cmd in replaces and replaces[cmd] is not None: has_values = True break if has_values: #leave text there (cmds will be replaced later) txt = txt.replace('<%s>' % optional, optional, 1) else: txt = txt.replace('<%s>' % optional, '', 1) # replace "{cmd}" if cmd exists with its value (non-existent values use '') cmds = pattern2.findall(txt) for cmd in cmds: if cmd in replaces: value = replaces[cmd] if value is None: value = '' txt = txt.replace('{%s}' % cmd, value, 1) outname = "%s" % (txt) # no name set, or empty results above if not outname: outname = "%s" % (filename_base) if index: outname += "_" + index line = '' if cfg.subdir: line += cfg.subdir line += filename_clean if index: line += "#" + index if cfg.layers and cfg.layers < self.channels: done = 0 for layer in range(0, self.channels, cfg.layers): sub = chr(ord('a') + done) done += 1 mask = self._get_stream_mask(layer) self._write(outname + sub, line + mask) total_done += 1 else: self._write(outname, line) total_done += 1 return total_done def has_more_subsongs(self, target_subsong): return target_subsong < self.stream_count #****************************************************************************** class App(object): def __init__(self, args): self.cfg = args self.crc32 = Cr32Helper(args) # check CLI in path (can be called, not just file exists) def _test_cli(self): clis = [] if self.cfg.cli: clis.append(self.cfg.cli) else: clis.append('vgmstream_cli') clis.append('test.exe') for cli in clis: try: with open(os.devnull, 'wb') as DEVNULL: #subprocess.STDOUT #py3 only cmd = "%s" % (cli) subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL) self.cfg.cli = cli return True #exists and returns ok except subprocess.CalledProcessError as e: self.cfg.cli = cli return True #exists but returns strerr (ran with no args) except Exception as e: continue #doesn't exist #none found return False def _make_cmd(self, filename_in, filename_out, target_subsong): if self.cfg.test_dupes: cmd = "%s -s %s -i -o \"%s\" \"%s\"" % (self.cfg.cli, target_subsong, filename_out, filename_in) else: cmd = "%s -s %s -m -i -O \"%s\"" % (self.cfg.cli, target_subsong, filename_in) return cmd def _find_files(self, dir, pattern): if os.path.isfile(pattern): return [pattern] if os.path.isdir(pattern): dir = pattern pattern = None for dirsep in '/\\': if dirsep in pattern: index = pattern.rfind(dirsep) if index >= 0: dir = pattern[0:index] pattern = pattern[index+1:] files = [] for root, dirnames, filenames in os.walk(dir): for filename in fnmatch.filter(filenames, pattern): files.append(os.path.join(root, filename)) if not self.cfg.recursive: break return files def start(self): if not self._test_cli(): log.error("ERROR: CLI not found") return filenames_in = [] for filename in self.cfg.files: filenames_in += self._find_files('.', filename) rename_map = {} total_created = 0 total_dupes = 0 total_errors = 0 for filename_in in filenames_in: filename_in_clean = filename_in.replace("\\", "/") if filename_in_clean.startswith("./"): filename_in_clean = filename_in_clean[2:] filename_in_base = os.path.basename(filename_in) #skip starting dot for extensionless files if filename_in.startswith(".\\"): filename_in = filename_in[2:] filename_out = ".temp." + filename_in_base + ".wav" created = 0 dupes = 0 errors = 0 target_subsong = 1 while True: try: # main call to vgmstream cmd = self._make_cmd(filename_in, filename_out, target_subsong) log.debug("calling: %s", cmd) output_b = subprocess.check_output(cmd, shell=False) #stderr=subprocess.STDOUT # basic parse of vgmstream info maker = TxtpMaker(self.cfg, output_b, rename_map) except (subprocess.CalledProcessError, ValueError) as e: log.debug("ignoring CLI error in %s #%s: %s", filename_in, target_subsong, str(e)) errors += 1 break if target_subsong == 1: log.debug("processing %s...", filename_in_clean) if not maker.is_ignorable(): self.crc32.update(filename_out) if not self.crc32.is_last_dupe(): created += maker.make(filename_in_base, filename_in_clean) else: dupes += 1 log.debug("dupe subsong %s", target_subsong) if not maker.has_more_subsongs(target_subsong): break target_subsong += 1 if target_subsong % 200 == 0: log.info("%s/%s subsongs... (%s dupes, %s errors)", target_subsong, maker.stream_count, dupes, errors) if os.path.exists(filename_out): os.remove(filename_out) total_created += created total_dupes += dupes total_errors += errors log.info("done! (%s done, %s dupes, %s errors)", total_created, total_dupes, total_errors) if __name__ == "__main__": Cli().start() #if len(sys.argv) > 1: # Cli().start() #else: # Gui().start()