[core] Support emitting ConEmu progress codes (#10649)

Authored by: Grub4K
This commit is contained in:
Simon Sawicki 2025-02-20 20:33:31 +01:00 committed by GitHub
parent 9deed13d7c
commit f7a1f2d813
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 63 additions and 26 deletions

View File

@ -157,7 +157,7 @@ from .utils import (
write_json_file, write_json_file,
write_string, write_string,
) )
from .utils._utils import _UnsafeExtensionError, _YDLLogger from .utils._utils import _UnsafeExtensionError, _YDLLogger, _ProgressState
from .utils.networking import ( from .utils.networking import (
HTTPHeaderDict, HTTPHeaderDict,
clean_headers, clean_headers,
@ -642,20 +642,19 @@ class YoutubeDL:
self.cache = Cache(self) self.cache = Cache(self)
self.__header_cookies = [] self.__header_cookies = []
try:
windows_enable_vt_mode()
except Exception as e:
self.write_debug(f'Failed to enable VT mode: {e}')
stdout = sys.stderr if self.params.get('logtostderr') else sys.stdout stdout = sys.stderr if self.params.get('logtostderr') else sys.stdout
self._out_files = Namespace( self._out_files = Namespace(
out=stdout, out=stdout,
error=sys.stderr, error=sys.stderr,
screen=sys.stderr if self.params.get('quiet') else stdout, screen=sys.stderr if self.params.get('quiet') else stdout,
console=None if os.name == 'nt' else next( console=next(filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None),
filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None),
) )
try:
windows_enable_vt_mode()
except Exception as e:
self.write_debug(f'Failed to enable VT mode: {e}')
if self.params.get('no_color'): if self.params.get('no_color'):
if self.params.get('color') is not None: if self.params.get('color') is not None:
self.params.setdefault('_warnings', []).append( self.params.setdefault('_warnings', []).append(
@ -956,21 +955,22 @@ class YoutubeDL:
self._write_string(f'{self._bidi_workaround(message)}\n', self._out_files.error, only_once=only_once) self._write_string(f'{self._bidi_workaround(message)}\n', self._out_files.error, only_once=only_once)
def _send_console_code(self, code): def _send_console_code(self, code):
if os.name == 'nt' or not self._out_files.console: if not supports_terminal_sequences(self._out_files.console):
return return False
self._write_string(code, self._out_files.console) self._write_string(code, self._out_files.console)
return True
def to_console_title(self, message): def to_console_title(self, message=None, progress_state=None, percent=None):
if not self.params.get('consoletitle', False): if not self.params.get('consoletitle'):
return return
message = remove_terminal_sequences(message)
if os.name == 'nt': if message:
if ctypes.windll.kernel32.GetConsoleWindow(): success = self._send_console_code(f'\033]0;{remove_terminal_sequences(message)}\007')
# c_wchar_p() might not be necessary if `message` is if not success and os.name == 'nt' and ctypes.windll.kernel32.GetConsoleWindow():
# already of type unicode() ctypes.windll.kernel32.SetConsoleTitleW(message)
ctypes.windll.kernel32.SetConsoleTitleW(ctypes.c_wchar_p(message))
else: if isinstance(progress_state, _ProgressState):
self._send_console_code(f'\033]0;{message}\007') self._send_console_code(progress_state.get_ansi_escape(percent))
def save_console_title(self): def save_console_title(self):
if not self.params.get('consoletitle') or self.params.get('simulate'): if not self.params.get('consoletitle') or self.params.get('simulate'):
@ -984,6 +984,7 @@ class YoutubeDL:
def __enter__(self): def __enter__(self):
self.save_console_title() self.save_console_title()
self.to_console_title(progress_state=_ProgressState.INDETERMINATE)
return self return self
def save_cookies(self): def save_cookies(self):
@ -992,6 +993,7 @@ class YoutubeDL:
def __exit__(self, *args): def __exit__(self, *args):
self.restore_console_title() self.restore_console_title()
self.to_console_title(progress_state=_ProgressState.HIDDEN)
self.close() self.close()
def close(self): def close(self):

View File

@ -31,6 +31,7 @@ from ..utils import (
timetuple_from_msec, timetuple_from_msec,
try_call, try_call,
) )
from ..utils._utils import _ProgressState
class FileDownloader: class FileDownloader:
@ -333,7 +334,7 @@ class FileDownloader:
progress_dict), s.get('progress_idx') or 0) progress_dict), s.get('progress_idx') or 0)
self.to_console_title(self.ydl.evaluate_outtmpl( self.to_console_title(self.ydl.evaluate_outtmpl(
progress_template.get('download-title') or 'yt-dlp %(progress._default_template)s', progress_template.get('download-title') or 'yt-dlp %(progress._default_template)s',
progress_dict)) progress_dict), _ProgressState.from_dict(s), s.get('_percent'))
def _format_progress(self, *args, **kwargs): def _format_progress(self, *args, **kwargs):
return self.ydl._format_text( return self.ydl._format_text(
@ -357,6 +358,7 @@ class FileDownloader:
'_speed_str': self.format_speed(speed).strip(), '_speed_str': self.format_speed(speed).strip(),
'_total_bytes_str': _format_bytes('total_bytes'), '_total_bytes_str': _format_bytes('total_bytes'),
'_elapsed_str': self.format_seconds(s.get('elapsed')), '_elapsed_str': self.format_seconds(s.get('elapsed')),
'_percent': 100.0,
'_percent_str': self.format_percent(100), '_percent_str': self.format_percent(100),
}) })
self._report_progress_status(s, join_nonempty( self._report_progress_status(s, join_nonempty(
@ -375,13 +377,15 @@ class FileDownloader:
return return
self._progress_delta_time += update_delta self._progress_delta_time += update_delta
progress = try_call(
lambda: 100 * s['downloaded_bytes'] / s['total_bytes'],
lambda: 100 * s['downloaded_bytes'] / s['total_bytes_estimate'],
lambda: s['downloaded_bytes'] == 0 and 0)
s.update({ s.update({
'_eta_str': self.format_eta(s.get('eta')).strip(), '_eta_str': self.format_eta(s.get('eta')).strip(),
'_speed_str': self.format_speed(s.get('speed')), '_speed_str': self.format_speed(s.get('speed')),
'_percent_str': self.format_percent(try_call( '_percent': progress,
lambda: 100 * s['downloaded_bytes'] / s['total_bytes'], '_percent_str': self.format_percent(progress),
lambda: 100 * s['downloaded_bytes'] / s['total_bytes_estimate'],
lambda: s['downloaded_bytes'] == 0 and 0)),
'_total_bytes_str': _format_bytes('total_bytes'), '_total_bytes_str': _format_bytes('total_bytes'),
'_total_bytes_estimate_str': _format_bytes('total_bytes_estimate'), '_total_bytes_estimate_str': _format_bytes('total_bytes_estimate'),
'_downloaded_bytes_str': _format_bytes('downloaded_bytes'), '_downloaded_bytes_str': _format_bytes('downloaded_bytes'),

View File

@ -10,6 +10,7 @@ from ..utils import (
_configuration_args, _configuration_args,
deprecation_warning, deprecation_warning,
) )
from ..utils._utils import _ProgressState
class PostProcessorMetaClass(type): class PostProcessorMetaClass(type):
@ -189,7 +190,7 @@ class PostProcessor(metaclass=PostProcessorMetaClass):
self._downloader.to_console_title(self._downloader.evaluate_outtmpl( self._downloader.to_console_title(self._downloader.evaluate_outtmpl(
progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s', progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s',
progress_dict)) progress_dict), _ProgressState.from_dict(s), s.get('_percent'))
def _retry_download(self, err, count, retries): def _retry_download(self, err, count, retries):
# While this is not an extractor, it behaves similar to one and # While this is not an extractor, it behaves similar to one and

View File

@ -8,6 +8,7 @@ import contextlib
import datetime as dt import datetime as dt
import email.header import email.header
import email.utils import email.utils
import enum
import errno import errno
import functools import functools
import hashlib import hashlib
@ -5677,3 +5678,32 @@ class _YDLLogger:
def stderr(self, message): def stderr(self, message):
if self._ydl: if self._ydl:
self._ydl.to_stderr(message) self._ydl.to_stderr(message)
class _ProgressState(enum.Enum):
"""
Represents a state for a progress bar.
See: https://conemu.github.io/en/AnsiEscapeCodes.html#ConEmu_specific_OSC
"""
HIDDEN = 0
INDETERMINATE = 3
VISIBLE = 1
WARNING = 4
ERROR = 2
@classmethod
def from_dict(cls, s, /):
if s['status'] == 'finished':
return cls.INDETERMINATE
# Not currently used
if s['status'] == 'error':
return cls.ERROR
return cls.INDETERMINATE if s.get('_percent') is None else cls.VISIBLE
def get_ansi_escape(self, /, percent=None):
percent = 0 if percent is None else int(percent)
return f'\033]9;4;{self.value};{percent}\007'