diff --git a/README.md b/README.md
index 2df72b7498..09096218e8 100644
--- a/README.md
+++ b/README.md
@@ -1768,7 +1768,7 @@ # EXTRACTOR ARGUMENTS
#### youtube
* `lang`: Prefer translated metadata (`title`, `description` etc) of this language code (case-sensitive). By default, the video primary language metadata is preferred, with a fallback to `en` translated. See [youtube.py](https://github.com/yt-dlp/yt-dlp/blob/c26f9b991a0681fd3ea548d535919cec1fbbd430/yt_dlp/extractor/youtube.py#L381-L390) for list of supported content language codes
* `skip`: One or more of `hls`, `dash` or `translated_subs` to skip extraction of the m3u8 manifests, dash manifests and [auto-translated subtitles](https://github.com/yt-dlp/yt-dlp/issues/4090#issuecomment-1158102032) respectively
-* `player_client`: Clients to extract video data from. The main clients are `web`, `ios` and `android`, with variants `_music` and `_creator` (e.g. `ios_creator`); and `mweb`, `mediaconnect`, `android_testsuite`, `android_vr`, `web_safari`, `web_embedded`, `tv` and `tv_embedded` with no variants. By default, `ios,mweb` is used, and `web_creator,mediaconnect` is added as needed for age-gated videos when account age verification is required. Similarly, the `_music` variants are added for `music.youtube.com` URLs. Some clients, such as `web` and `android`, require a `po_token` for their formats to be downloadable. Some clients, such as the `_creator` variants, will only work with authentication. You can use `all` to use all the clients, and `default` for the default clients. You can prefix a client with `-` to exclude it, e.g. `youtube:player_client=all,-web`
+* `player_client`: Clients to extract video data from. The main clients are `web`, `ios` and `android`, with variants `_music` and `_creator` (e.g. `ios_creator`); and `mweb`, `mediaconnect`, `android_vr`, `web_safari`, `web_embedded`, `tv` and `tv_embedded` with no variants. By default, `ios,mweb` is used, and `web_creator` is added as needed for age-gated videos when account age verification is required. Similarly, the `_music` variants are added for `music.youtube.com` URLs. Some clients, such as `web` and `android`, require a `po_token` for their formats to be downloadable. Some clients, such as the `_creator` variants, will only work with authentication. You can use `all` to use all the clients, and `default` for the default clients. You can prefix a client with `-` to exclude it, e.g. `youtube:player_client=all,-web`
* `player_skip`: Skip some network requests that are generally needed for robust extraction. One or more of `configs` (skip client configs), `webpage` (skip initial webpage), `js` (skip js player). While these options can help reduce the number of requests needed or avoid some rate-limiting, they could cause some issues. See [#860](https://github.com/yt-dlp/yt-dlp/pull/860) for more details
* `player_params`: YouTube player parameters to use for player requests. Will overwrite any default ones set by yt-dlp.
* `comment_sort`: `top` or `new` (default) - choose comment sorting mode (on YouTube's side)
diff --git a/devscripts/generate_aes_testdata.py b/devscripts/generate_aes_testdata.py
index 7f3c88bcfb..73cf803b8f 100644
--- a/devscripts/generate_aes_testdata.py
+++ b/devscripts/generate_aes_testdata.py
@@ -11,13 +11,12 @@
import subprocess
from yt_dlp.aes import aes_encrypt, key_expansion
-from yt_dlp.utils import intlist_to_bytes
secret_msg = b'Secret message goes here'
def hex_str(int_list):
- return codecs.encode(intlist_to_bytes(int_list), 'hex')
+ return codecs.encode(bytes(int_list), 'hex')
def openssl_encode(algo, key, iv):
diff --git a/pyproject.toml b/pyproject.toml
index ef921fed58..92d399e319 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -313,6 +313,16 @@ banned-from = [
"yt_dlp.compat.compat_urllib_parse_urlparse".msg = "Use `urllib.parse.urlparse` instead."
"yt_dlp.compat.compat_shlex_quote".msg = "Use `yt_dlp.utils.shell_quote` instead."
"yt_dlp.utils.error_to_compat_str".msg = "Use `str` instead."
+"yt_dlp.utils.bytes_to_intlist".msg = "Use `list` instead."
+"yt_dlp.utils.intlist_to_bytes".msg = "Use `bytes` instead."
+"yt_dlp.utils.decodeArgument".msg = "Do not use"
+"yt_dlp.utils.decodeFilename".msg = "Do not use"
+"yt_dlp.utils.encodeFilename".msg = "Do not use"
+"yt_dlp.compat.compat_os_name".msg = "Use `os.name` instead."
+"yt_dlp.compat.compat_realpath".msg = "Use `os.path.realpath` instead."
+"yt_dlp.compat.functools".msg = "Use `functools` instead."
+"yt_dlp.utils.decodeOption".msg = "Do not use"
+"yt_dlp.utils.compiled_regex_type".msg = "Use `re.Pattern` instead."
[tool.autopep8]
max_line_length = 120
diff --git a/test/helper.py b/test/helper.py
index 3b550d1927..c776e70b73 100644
--- a/test/helper.py
+++ b/test/helper.py
@@ -9,7 +9,6 @@
import yt_dlp.extractor
from yt_dlp import YoutubeDL
-from yt_dlp.compat import compat_os_name
from yt_dlp.utils import preferredencoding, try_call, write_string, find_available_port
if 'pytest' in sys.modules:
@@ -49,7 +48,7 @@ def report_warning(message, *args, **kwargs):
Print the message to stderr, it will be prefixed with 'WARNING:'
If stderr is a tty file the 'WARNING:' will be colored
"""
- if sys.stderr.isatty() and compat_os_name != 'nt':
+ if sys.stderr.isatty() and os.name != 'nt':
_msg_header = '\033[0;33mWARNING:\033[0m'
else:
_msg_header = 'WARNING:'
diff --git a/test/test_YoutubeDL.py b/test/test_YoutubeDL.py
index a99e624080..966d27a498 100644
--- a/test/test_YoutubeDL.py
+++ b/test/test_YoutubeDL.py
@@ -15,7 +15,6 @@
from test.helper import FakeYDL, assertRegexpMatches, try_rm
from yt_dlp import YoutubeDL
-from yt_dlp.compat import compat_os_name
from yt_dlp.extractor import YoutubeIE
from yt_dlp.extractor.common import InfoExtractor
from yt_dlp.postprocessor.common import PostProcessor
@@ -839,8 +838,8 @@ def expect_same_infodict(out):
test('%(filesize)#D', '1Ki')
test('%(height)5.2D', ' 1.08k')
test('%(title4)#S', 'foo_bar_test')
- test('%(title4).10S', ('foo "bar" ', 'foo "bar"' + ('#' if compat_os_name == 'nt' else ' ')))
- if compat_os_name == 'nt':
+ test('%(title4).10S', ('foo "bar" ', 'foo "bar"' + ('#' if os.name == 'nt' else ' ')))
+ if os.name == 'nt':
test('%(title4)q', ('"foo ""bar"" test"', None))
test('%(formats.:.id)#q', ('"id 1" "id 2" "id 3"', None))
test('%(formats.0.id)#q', ('"id 1"', None))
@@ -903,9 +902,9 @@ def gen():
# Environment variable expansion for prepare_filename
os.environ['__yt_dlp_var'] = 'expanded'
- envvar = '%__yt_dlp_var%' if compat_os_name == 'nt' else '$__yt_dlp_var'
+ envvar = '%__yt_dlp_var%' if os.name == 'nt' else '$__yt_dlp_var'
test(envvar, (envvar, 'expanded'))
- if compat_os_name == 'nt':
+ if os.name == 'nt':
test('%s%', ('%s%', '%s%'))
os.environ['s'] = 'expanded'
test('%s%', ('%s%', 'expanded')) # %s% should be expanded before escaping %s
diff --git a/test/test_aes.py b/test/test_aes.py
index 6fe6059a17..9cd9189bcc 100644
--- a/test/test_aes.py
+++ b/test/test_aes.py
@@ -27,7 +27,6 @@
pad_block,
)
from yt_dlp.dependencies import Cryptodome
-from yt_dlp.utils import bytes_to_intlist, intlist_to_bytes
# the encrypted data can be generate with 'devscripts/generate_aes_testdata.py'
@@ -40,33 +39,33 @@ def setUp(self):
def test_encrypt(self):
msg = b'message'
key = list(range(16))
- encrypted = aes_encrypt(bytes_to_intlist(msg), key)
- decrypted = intlist_to_bytes(aes_decrypt(encrypted, key))
+ encrypted = aes_encrypt(list(msg), key)
+ decrypted = bytes(aes_decrypt(encrypted, key))
self.assertEqual(decrypted, msg)
def test_cbc_decrypt(self):
data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
- decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
+ decrypted = bytes(aes_cbc_decrypt(list(data), self.key, self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
if Cryptodome.AES:
- decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
+ decrypted = aes_cbc_decrypt_bytes(data, bytes(self.key), bytes(self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
def test_cbc_encrypt(self):
- data = bytes_to_intlist(self.secret_msg)
- encrypted = intlist_to_bytes(aes_cbc_encrypt(data, self.key, self.iv))
+ data = list(self.secret_msg)
+ encrypted = bytes(aes_cbc_encrypt(data, self.key, self.iv))
self.assertEqual(
encrypted,
b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\'\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd')
def test_ctr_decrypt(self):
- data = bytes_to_intlist(b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
- decrypted = intlist_to_bytes(aes_ctr_decrypt(data, self.key, self.iv))
+ data = list(b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
+ decrypted = bytes(aes_ctr_decrypt(data, self.key, self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
def test_ctr_encrypt(self):
- data = bytes_to_intlist(self.secret_msg)
- encrypted = intlist_to_bytes(aes_ctr_encrypt(data, self.key, self.iv))
+ data = list(self.secret_msg)
+ encrypted = bytes(aes_ctr_encrypt(data, self.key, self.iv))
self.assertEqual(
encrypted,
b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
@@ -75,19 +74,19 @@ def test_gcm_decrypt(self):
data = b'\x159Y\xcf5eud\x90\x9c\x85&]\x14\x1d\x0f.\x08\xb4T\xe4/\x17\xbd'
authentication_tag = b'\xe8&I\x80rI\x07\x9d}YWuU@:e'
- decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
- bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
+ decrypted = bytes(aes_gcm_decrypt_and_verify(
+ list(data), self.key, list(authentication_tag), self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
if Cryptodome.AES:
decrypted = aes_gcm_decrypt_and_verify_bytes(
- data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
+ data, bytes(self.key), authentication_tag, bytes(self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
def test_gcm_aligned_decrypt(self):
data = b'\x159Y\xcf5eud\x90\x9c\x85&]\x14\x1d\x0f'
authentication_tag = b'\x08\xb1\x9d!&\x98\xd0\xeaRq\x90\xe6;\xb5]\xd8'
- decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
+ decrypted = bytes(aes_gcm_decrypt_and_verify(
list(data), self.key, list(authentication_tag), self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg[:16])
if Cryptodome.AES:
@@ -96,38 +95,38 @@ def test_gcm_aligned_decrypt(self):
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg[:16])
def test_decrypt_text(self):
- password = intlist_to_bytes(self.key).decode()
+ password = bytes(self.key).decode()
encrypted = base64.b64encode(
- intlist_to_bytes(self.iv[:8])
+ bytes(self.iv[:8])
+ b'\x17\x15\x93\xab\x8d\x80V\xcdV\xe0\t\xcdo\xc2\xa5\xd8ksM\r\xe27N\xae',
).decode()
decrypted = (aes_decrypt_text(encrypted, password, 16))
self.assertEqual(decrypted, self.secret_msg)
- password = intlist_to_bytes(self.key).decode()
+ password = bytes(self.key).decode()
encrypted = base64.b64encode(
- intlist_to_bytes(self.iv[:8])
+ bytes(self.iv[:8])
+ b'\x0b\xe6\xa4\xd9z\x0e\xb8\xb9\xd0\xd4i_\x85\x1d\x99\x98_\xe5\x80\xe7.\xbf\xa5\x83',
).decode()
decrypted = (aes_decrypt_text(encrypted, password, 32))
self.assertEqual(decrypted, self.secret_msg)
def test_ecb_encrypt(self):
- data = bytes_to_intlist(self.secret_msg)
- encrypted = intlist_to_bytes(aes_ecb_encrypt(data, self.key))
+ data = list(self.secret_msg)
+ encrypted = bytes(aes_ecb_encrypt(data, self.key))
self.assertEqual(
encrypted,
b'\xaa\x86]\x81\x97>\x02\x92\x9d\x1bR[[L/u\xd3&\xd1(h\xde{\x81\x94\xba\x02\xae\xbd\xa6\xd0:')
def test_ecb_decrypt(self):
- data = bytes_to_intlist(b'\xaa\x86]\x81\x97>\x02\x92\x9d\x1bR[[L/u\xd3&\xd1(h\xde{\x81\x94\xba\x02\xae\xbd\xa6\xd0:')
- decrypted = intlist_to_bytes(aes_ecb_decrypt(data, self.key, self.iv))
+ data = list(b'\xaa\x86]\x81\x97>\x02\x92\x9d\x1bR[[L/u\xd3&\xd1(h\xde{\x81\x94\xba\x02\xae\xbd\xa6\xd0:')
+ decrypted = bytes(aes_ecb_decrypt(data, self.key, self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
def test_key_expansion(self):
key = '4f6bdaa39e2f8cb07f5e722d9edef314'
- self.assertEqual(key_expansion(bytes_to_intlist(bytearray.fromhex(key))), [
+ self.assertEqual(key_expansion(list(bytearray.fromhex(key))), [
0x4F, 0x6B, 0xDA, 0xA3, 0x9E, 0x2F, 0x8C, 0xB0, 0x7F, 0x5E, 0x72, 0x2D, 0x9E, 0xDE, 0xF3, 0x14,
0x53, 0x66, 0x20, 0xA8, 0xCD, 0x49, 0xAC, 0x18, 0xB2, 0x17, 0xDE, 0x35, 0x2C, 0xC9, 0x2D, 0x21,
0x8C, 0xBE, 0xDD, 0xD9, 0x41, 0xF7, 0x71, 0xC1, 0xF3, 0xE0, 0xAF, 0xF4, 0xDF, 0x29, 0x82, 0xD5,
diff --git a/test/test_compat.py b/test/test_compat.py
index e7d97e3e93..b1cc2a8187 100644
--- a/test/test_compat.py
+++ b/test/test_compat.py
@@ -12,12 +12,7 @@
from yt_dlp import compat
from yt_dlp.compat import urllib # isort: split
-from yt_dlp.compat import (
- compat_etree_fromstring,
- compat_expanduser,
- compat_urllib_parse_unquote, # noqa: TID251
- compat_urllib_parse_urlencode, # noqa: TID251
-)
+from yt_dlp.compat import compat_etree_fromstring, compat_expanduser
from yt_dlp.compat.urllib.request import getproxies
@@ -43,39 +38,6 @@ def test_compat_expanduser(self):
finally:
os.environ['HOME'] = old_home or ''
- def test_compat_urllib_parse_unquote(self):
- self.assertEqual(compat_urllib_parse_unquote('abc%20def'), 'abc def')
- self.assertEqual(compat_urllib_parse_unquote('%7e/abc+def'), '~/abc+def')
- self.assertEqual(compat_urllib_parse_unquote(''), '')
- self.assertEqual(compat_urllib_parse_unquote('%'), '%')
- self.assertEqual(compat_urllib_parse_unquote('%%'), '%%')
- self.assertEqual(compat_urllib_parse_unquote('%%%'), '%%%')
- self.assertEqual(compat_urllib_parse_unquote('%2F'), '/')
- self.assertEqual(compat_urllib_parse_unquote('%2f'), '/')
- self.assertEqual(compat_urllib_parse_unquote('%E6%B4%A5%E6%B3%A2'), '津波')
- self.assertEqual(
- compat_urllib_parse_unquote('''
-%%a'''),
- '''
-%%a''')
- self.assertEqual(
- compat_urllib_parse_unquote('''%28%5E%E2%97%A3_%E2%97%A2%5E%29%E3%81%A3%EF%B8%BB%E3%83%87%E2%95%90%E4%B8%80 %E2%87%80 %E2%87%80 %E2%87%80 %E2%87%80 %E2%87%80 %E2%86%B6%I%Break%25Things%'''),
- '''(^◣_◢^)っ︻デ═一 ⇀ ⇀ ⇀ ⇀ ⇀ ↶%I%Break%Things%''')
-
- def test_compat_urllib_parse_unquote_plus(self):
- self.assertEqual(urllib.parse.unquote_plus('abc%20def'), 'abc def')
- self.assertEqual(urllib.parse.unquote_plus('%7e/abc+def'), '~/abc def')
-
- def test_compat_urllib_parse_urlencode(self):
- self.assertEqual(compat_urllib_parse_urlencode({'abc': 'def'}), 'abc=def')
- self.assertEqual(compat_urllib_parse_urlencode({'abc': b'def'}), 'abc=def')
- self.assertEqual(compat_urllib_parse_urlencode({b'abc': 'def'}), 'abc=def')
- self.assertEqual(compat_urllib_parse_urlencode({b'abc': b'def'}), 'abc=def')
- self.assertEqual(compat_urllib_parse_urlencode([('abc', 'def')]), 'abc=def')
- self.assertEqual(compat_urllib_parse_urlencode([('abc', b'def')]), 'abc=def')
- self.assertEqual(compat_urllib_parse_urlencode([(b'abc', 'def')]), 'abc=def')
- self.assertEqual(compat_urllib_parse_urlencode([(b'abc', b'def')]), 'abc=def')
-
def test_compat_etree_fromstring(self):
xml = '''
diff --git a/test/test_downloader_http.py b/test/test_downloader_http.py
index faba0bc9c8..cf2e3fac16 100644
--- a/test/test_downloader_http.py
+++ b/test/test_downloader_http.py
@@ -15,7 +15,6 @@
from test.helper import http_server_port, try_rm
from yt_dlp import YoutubeDL
from yt_dlp.downloader.http import HttpFD
-from yt_dlp.utils import encodeFilename
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
@@ -82,12 +81,12 @@ def download(self, params, ep):
ydl = YoutubeDL(params)
downloader = HttpFD(ydl, params)
filename = 'testfile.mp4'
- try_rm(encodeFilename(filename))
+ try_rm(filename)
self.assertTrue(downloader.real_download(filename, {
'url': f'http://127.0.0.1:{self.port}/{ep}',
}), ep)
- self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE, ep)
- try_rm(encodeFilename(filename))
+ self.assertEqual(os.path.getsize(filename), TEST_SIZE, ep)
+ try_rm(filename)
def download_all(self, params):
for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):
diff --git a/test/test_utils.py b/test/test_utils.py
index 835774a912..b3de14198e 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -21,7 +21,6 @@
from yt_dlp.compat import (
compat_etree_fromstring,
compat_HTMLParseError,
- compat_os_name,
)
from yt_dlp.utils import (
Config,
@@ -49,7 +48,6 @@
dfxp2srt,
encode_base_n,
encode_compat_str,
- encodeFilename,
expand_path,
extract_attributes,
extract_basic_auth,
@@ -69,7 +67,6 @@
get_elements_html_by_class,
get_elements_text_and_html_by_attribute,
int_or_none,
- intlist_to_bytes,
iri_to_uri,
is_html,
js_to_json,
@@ -566,10 +563,10 @@ def test_smuggle_url(self):
self.assertEqual(res_data, {'a': 'b', 'c': 'd'})
def test_shell_quote(self):
- args = ['ffmpeg', '-i', encodeFilename('ñ€ß\'.mp4')]
+ args = ['ffmpeg', '-i', 'ñ€ß\'.mp4']
self.assertEqual(
shell_quote(args),
- """ffmpeg -i 'ñ€ß'"'"'.mp4'""" if compat_os_name != 'nt' else '''ffmpeg -i "ñ€ß'.mp4"''')
+ """ffmpeg -i 'ñ€ß'"'"'.mp4'""" if os.name != 'nt' else '''ffmpeg -i "ñ€ß'.mp4"''')
def test_float_or_none(self):
self.assertEqual(float_or_none('42.42'), 42.42)
@@ -1309,15 +1306,10 @@ def test_clean_html(self):
self.assertEqual(clean_html('a:\n "b"'), 'a: "b"')
self.assertEqual(clean_html('a
\xa0b'), 'a\nb')
- def test_intlist_to_bytes(self):
- self.assertEqual(
- intlist_to_bytes([0, 1, 127, 128, 255]),
- b'\x00\x01\x7f\x80\xff')
-
def test_args_to_str(self):
self.assertEqual(
args_to_str(['foo', 'ba/r', '-baz', '2 be', '']),
- 'foo ba/r -baz \'2 be\' \'\'' if compat_os_name != 'nt' else 'foo ba/r -baz "2 be" ""',
+ 'foo ba/r -baz \'2 be\' \'\'' if os.name != 'nt' else 'foo ba/r -baz "2 be" ""',
)
def test_parse_filesize(self):
@@ -2117,7 +2109,7 @@ def test_extract_basic_auth(self):
assert extract_basic_auth('http://user:@foo.bar') == ('http://foo.bar', 'Basic dXNlcjo=')
assert extract_basic_auth('http://user:pass@foo.bar') == ('http://foo.bar', 'Basic dXNlcjpwYXNz')
- @unittest.skipUnless(compat_os_name == 'nt', 'Only relevant on Windows')
+ @unittest.skipUnless(os.name == 'nt', 'Only relevant on Windows')
def test_windows_escaping(self):
tests = [
'test"&',
diff --git a/yt_dlp/YoutubeDL.py b/yt_dlp/YoutubeDL.py
index 3130deda31..749de5d4e3 100644
--- a/yt_dlp/YoutubeDL.py
+++ b/yt_dlp/YoutubeDL.py
@@ -26,7 +26,7 @@
from .cache import Cache
from .compat import urllib # isort: split
-from .compat import compat_os_name, urllib_req_to_req
+from .compat import urllib_req_to_req
from .cookies import CookieLoadError, LenientSimpleCookie, load_cookies
from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name
from .downloader.rtmp import rtmpdump_version
@@ -109,7 +109,6 @@
determine_ext,
determine_protocol,
encode_compat_str,
- encodeFilename,
escapeHTML,
expand_path,
extract_basic_auth,
@@ -167,7 +166,7 @@
)
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
-if compat_os_name == 'nt':
+if os.name == 'nt':
import ctypes
@@ -643,7 +642,7 @@ def __init__(self, params=None, auto_init=True):
out=stdout,
error=sys.stderr,
screen=sys.stderr if self.params.get('quiet') else stdout,
- console=None if compat_os_name == 'nt' else next(
+ console=None if os.name == 'nt' else next(
filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None),
)
@@ -952,7 +951,7 @@ def to_stderr(self, message, only_once=False):
self._write_string(f'{self._bidi_workaround(message)}\n', self._out_files.error, only_once=only_once)
def _send_console_code(self, code):
- if compat_os_name == 'nt' or not self._out_files.console:
+ if os.name == 'nt' or not self._out_files.console:
return
self._write_string(code, self._out_files.console)
@@ -960,7 +959,7 @@ def to_console_title(self, message):
if not self.params.get('consoletitle', False):
return
message = remove_terminal_sequences(message)
- if compat_os_name == 'nt':
+ if os.name == 'nt':
if ctypes.windll.kernel32.GetConsoleWindow():
# c_wchar_p() might not be necessary if `message` is
# already of type unicode()
@@ -3255,9 +3254,9 @@ def check_max_downloads():
if full_filename is None:
return
- if not self._ensure_dir_exists(encodeFilename(full_filename)):
+ if not self._ensure_dir_exists(full_filename):
return
- if not self._ensure_dir_exists(encodeFilename(temp_filename)):
+ if not self._ensure_dir_exists(temp_filename):
return
if self._write_description('video', info_dict,
@@ -3289,16 +3288,16 @@ def check_max_downloads():
if self.params.get('writeannotations', False):
annofn = self.prepare_filename(info_dict, 'annotation')
if annofn:
- if not self._ensure_dir_exists(encodeFilename(annofn)):
+ if not self._ensure_dir_exists(annofn):
return
- if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(annofn)):
+ if not self.params.get('overwrites', True) and os.path.exists(annofn):
self.to_screen('[info] Video annotations are already present')
elif not info_dict.get('annotations'):
self.report_warning('There are no annotations to write.')
else:
try:
self.to_screen('[info] Writing video annotations to: ' + annofn)
- with open(encodeFilename(annofn), 'w', encoding='utf-8') as annofile:
+ with open(annofn, 'w', encoding='utf-8') as annofile:
annofile.write(info_dict['annotations'])
except (KeyError, TypeError):
self.report_warning('There are no annotations to write.')
@@ -3314,14 +3313,14 @@ def _write_link_file(link_type):
f'Cannot write internet shortcut file because the actual URL of "{info_dict["webpage_url"]}" is unknown')
return True
linkfn = replace_extension(self.prepare_filename(info_dict, 'link'), link_type, info_dict.get('ext'))
- if not self._ensure_dir_exists(encodeFilename(linkfn)):
+ if not self._ensure_dir_exists(linkfn):
return False
- if self.params.get('overwrites', True) and os.path.exists(encodeFilename(linkfn)):
+ if self.params.get('overwrites', True) and os.path.exists(linkfn):
self.to_screen(f'[info] Internet shortcut (.{link_type}) is already present')
return True
try:
self.to_screen(f'[info] Writing internet shortcut (.{link_type}) to: {linkfn}')
- with open(encodeFilename(to_high_limit_path(linkfn)), 'w', encoding='utf-8',
+ with open(to_high_limit_path(linkfn), 'w', encoding='utf-8',
newline='\r\n' if link_type == 'url' else '\n') as linkfile:
template_vars = {'url': url}
if link_type == 'desktop':
@@ -3352,7 +3351,7 @@ def _write_link_file(link_type):
if self.params.get('skip_download'):
info_dict['filepath'] = temp_filename
- info_dict['__finaldir'] = os.path.dirname(os.path.abspath(encodeFilename(full_filename)))
+ info_dict['__finaldir'] = os.path.dirname(os.path.abspath(full_filename))
info_dict['__files_to_move'] = files_to_move
replace_info_dict(self.run_pp(MoveFilesAfterDownloadPP(self, False), info_dict))
info_dict['__write_download_archive'] = self.params.get('force_write_download_archive')
@@ -3482,7 +3481,7 @@ def correct_ext(filename, ext=new_ext):
self.report_file_already_downloaded(dl_filename)
dl_filename = dl_filename or temp_filename
- info_dict['__finaldir'] = os.path.dirname(os.path.abspath(encodeFilename(full_filename)))
+ info_dict['__finaldir'] = os.path.dirname(os.path.abspath(full_filename))
except network_exceptions as err:
self.report_error(f'unable to download video data: {err}')
@@ -4297,7 +4296,7 @@ def _write_description(self, label, ie_result, descfn):
else:
try:
self.to_screen(f'[info] Writing {label} description to: {descfn}')
- with open(encodeFilename(descfn), 'w', encoding='utf-8') as descfile:
+ with open(descfn, 'w', encoding='utf-8') as descfile:
descfile.write(ie_result['description'])
except OSError:
self.report_error(f'Cannot write {label} description file {descfn}')
@@ -4399,7 +4398,7 @@ def _write_thumbnails(self, label, info_dict, filename, thumb_filename_base=None
try:
uf = self.urlopen(Request(t['url'], headers=t.get('http_headers', {})))
self.to_screen(f'[info] Writing {thumb_display_id} to: {thumb_filename}')
- with open(encodeFilename(thumb_filename), 'wb') as thumbf:
+ with open(thumb_filename, 'wb') as thumbf:
shutil.copyfileobj(uf, thumbf)
ret.append((thumb_filename, thumb_filename_final))
t['filepath'] = thumb_filename
diff --git a/yt_dlp/__init__.py b/yt_dlp/__init__.py
index a7665159bd..a1880bf7dc 100644
--- a/yt_dlp/__init__.py
+++ b/yt_dlp/__init__.py
@@ -14,7 +14,6 @@
import re
import traceback
-from .compat import compat_os_name
from .cookies import SUPPORTED_BROWSERS, SUPPORTED_KEYRINGS, CookieLoadError
from .downloader.external import get_external_downloader
from .extractor import list_extractor_classes
@@ -44,7 +43,6 @@
GeoUtils,
PlaylistEntries,
SameFileError,
- decodeOption,
download_range_func,
expand_path,
float_or_none,
@@ -883,8 +881,8 @@ def parse_options(argv=None):
'listsubtitles': opts.listsubtitles,
'subtitlesformat': opts.subtitlesformat,
'subtitleslangs': opts.subtitleslangs,
- 'matchtitle': decodeOption(opts.matchtitle),
- 'rejecttitle': decodeOption(opts.rejecttitle),
+ 'matchtitle': opts.matchtitle,
+ 'rejecttitle': opts.rejecttitle,
'max_downloads': opts.max_downloads,
'prefer_free_formats': opts.prefer_free_formats,
'trim_file_name': opts.trim_file_name,
@@ -1053,7 +1051,7 @@ def make_row(target, handler):
ydl.warn_if_short_id(args)
# Show a useful error message and wait for keypress if not launched from shell on Windows
- if not args and compat_os_name == 'nt' and getattr(sys, 'frozen', False):
+ if not args and os.name == 'nt' and getattr(sys, 'frozen', False):
import ctypes.wintypes
import msvcrt
diff --git a/yt_dlp/aes.py b/yt_dlp/aes.py
index be67b40fe2..0930d36df9 100644
--- a/yt_dlp/aes.py
+++ b/yt_dlp/aes.py
@@ -3,7 +3,6 @@
from .compat import compat_ord
from .dependencies import Cryptodome
-from .utils import bytes_to_intlist, intlist_to_bytes
if Cryptodome.AES:
def aes_cbc_decrypt_bytes(data, key, iv):
@@ -17,15 +16,15 @@ def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
else:
def aes_cbc_decrypt_bytes(data, key, iv):
""" Decrypt bytes with AES-CBC using native implementation since pycryptodome is unavailable """
- return intlist_to_bytes(aes_cbc_decrypt(*map(bytes_to_intlist, (data, key, iv))))
+ return bytes(aes_cbc_decrypt(*map(list, (data, key, iv))))
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
""" Decrypt bytes with AES-GCM using native implementation since pycryptodome is unavailable """
- return intlist_to_bytes(aes_gcm_decrypt_and_verify(*map(bytes_to_intlist, (data, key, tag, nonce))))
+ return bytes(aes_gcm_decrypt_and_verify(*map(list, (data, key, tag, nonce))))
def aes_cbc_encrypt_bytes(data, key, iv, **kwargs):
- return intlist_to_bytes(aes_cbc_encrypt(*map(bytes_to_intlist, (data, key, iv)), **kwargs))
+ return bytes(aes_cbc_encrypt(*map(list, (data, key, iv)), **kwargs))
BLOCK_SIZE_BYTES = 16
@@ -221,7 +220,7 @@ def aes_gcm_decrypt_and_verify(data, key, tag, nonce):
j0 = [*nonce, 0, 0, 0, 1]
else:
fill = (BLOCK_SIZE_BYTES - (len(nonce) % BLOCK_SIZE_BYTES)) % BLOCK_SIZE_BYTES + 8
- ghash_in = nonce + [0] * fill + bytes_to_intlist((8 * len(nonce)).to_bytes(8, 'big'))
+ ghash_in = nonce + [0] * fill + list((8 * len(nonce)).to_bytes(8, 'big'))
j0 = ghash(hash_subkey, ghash_in)
# TODO: add nonce support to aes_ctr_decrypt
@@ -234,9 +233,9 @@ def aes_gcm_decrypt_and_verify(data, key, tag, nonce):
s_tag = ghash(
hash_subkey,
data
- + [0] * pad_len # pad
- + bytes_to_intlist((0 * 8).to_bytes(8, 'big') # length of associated data
- + ((len(data) * 8).to_bytes(8, 'big'))), # length of data
+ + [0] * pad_len # pad
+ + list((0 * 8).to_bytes(8, 'big') # length of associated data
+ + ((len(data) * 8).to_bytes(8, 'big'))), # length of data
)
if tag != aes_ctr_encrypt(s_tag, key, j0):
@@ -300,8 +299,8 @@ def aes_decrypt_text(data, password, key_size_bytes):
"""
NONCE_LENGTH_BYTES = 8
- data = bytes_to_intlist(base64.b64decode(data))
- password = bytes_to_intlist(password.encode())
+ data = list(base64.b64decode(data))
+ password = list(password.encode())
key = password[:key_size_bytes] + [0] * (key_size_bytes - len(password))
key = aes_encrypt(key[:BLOCK_SIZE_BYTES], key_expansion(key)) * (key_size_bytes // BLOCK_SIZE_BYTES)
@@ -310,7 +309,7 @@ def aes_decrypt_text(data, password, key_size_bytes):
cipher = data[NONCE_LENGTH_BYTES:]
decrypted_data = aes_ctr_decrypt(cipher, key, nonce + [0] * (BLOCK_SIZE_BYTES - NONCE_LENGTH_BYTES))
- return intlist_to_bytes(decrypted_data)
+ return bytes(decrypted_data)
RCON = (0x8d, 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80, 0x1b, 0x36)
diff --git a/yt_dlp/compat/__init__.py b/yt_dlp/compat/__init__.py
index d820adaf1e..d779620688 100644
--- a/yt_dlp/compat/__init__.py
+++ b/yt_dlp/compat/__init__.py
@@ -1,5 +1,4 @@
import os
-import sys
import xml.etree.ElementTree as etree
from .compat_utils import passthrough_module
@@ -24,33 +23,14 @@ def compat_etree_fromstring(text):
return etree.XML(text, parser=etree.XMLParser(target=_TreeBuilder()))
-compat_os_name = os._name if os.name == 'java' else os.name
-
-
-def compat_shlex_quote(s):
- from ..utils import shell_quote
- return shell_quote(s)
-
-
def compat_ord(c):
return c if isinstance(c, int) else ord(c)
-if compat_os_name == 'nt' and sys.version_info < (3, 8):
- # os.path.realpath on Windows does not follow symbolic links
- # prior to Python 3.8 (see https://bugs.python.org/issue9949)
- def compat_realpath(path):
- while os.path.islink(path):
- path = os.path.abspath(os.readlink(path))
- return os.path.realpath(path)
-else:
- compat_realpath = os.path.realpath
-
-
# Python 3.8+ does not honor %HOME% on windows, but this breaks compatibility with youtube-dl
# See https://github.com/yt-dlp/yt-dlp/issues/792
# https://docs.python.org/3/library/os.path.html#os.path.expanduser
-if compat_os_name in ('nt', 'ce'):
+if os.name in ('nt', 'ce'):
def compat_expanduser(path):
HOME = os.environ.get('HOME')
if not HOME:
diff --git a/yt_dlp/compat/_deprecated.py b/yt_dlp/compat/_deprecated.py
index 607bae9999..445acc1a06 100644
--- a/yt_dlp/compat/_deprecated.py
+++ b/yt_dlp/compat/_deprecated.py
@@ -8,16 +8,14 @@
DeprecationWarning(f'{__name__}.{attr} is deprecated'), stacklevel=6))
del passthrough_module
-import base64
-import urllib.error
-import urllib.parse
+import functools # noqa: F401
+import os
-compat_str = str
-compat_b64decode = base64.b64decode
+compat_os_name = os.name
+compat_realpath = os.path.realpath
-compat_urlparse = urllib.parse
-compat_parse_qs = urllib.parse.parse_qs
-compat_urllib_parse_unquote = urllib.parse.unquote
-compat_urllib_parse_urlencode = urllib.parse.urlencode
-compat_urllib_parse_urlparse = urllib.parse.urlparse
+
+def compat_shlex_quote(s):
+ from ..utils import shell_quote
+ return shell_quote(s)
diff --git a/yt_dlp/compat/_legacy.py b/yt_dlp/compat/_legacy.py
index dfc792eae4..dae2c14592 100644
--- a/yt_dlp/compat/_legacy.py
+++ b/yt_dlp/compat/_legacy.py
@@ -30,7 +30,7 @@
from re import Pattern as compat_Pattern # noqa: F401
from re import match as compat_Match # noqa: F401
-from . import compat_expanduser, compat_HTMLParseError, compat_realpath
+from . import compat_expanduser, compat_HTMLParseError
from .compat_utils import passthrough_module
from ..dependencies import brotli as compat_brotli # noqa: F401
from ..dependencies import websockets as compat_websockets # noqa: F401
@@ -78,7 +78,7 @@ def compat_setenv(key, value, env=os.environ):
compat_map = map
compat_numeric_types = (int, float, complex)
compat_os_path_expanduser = compat_expanduser
-compat_os_path_realpath = compat_realpath
+compat_os_path_realpath = os.path.realpath
compat_print = print
compat_shlex_split = shlex.split
compat_socket_create_connection = socket.create_connection
@@ -104,5 +104,12 @@ def compat_setenv(key, value, env=os.environ):
compat_xpath = lambda xpath: xpath
compat_zip = zip
workaround_optparse_bug9161 = lambda: None
+compat_str = str
+compat_b64decode = base64.b64decode
+compat_urlparse = urllib.parse
+compat_parse_qs = urllib.parse.parse_qs
+compat_urllib_parse_unquote = urllib.parse.unquote
+compat_urllib_parse_urlencode = urllib.parse.urlencode
+compat_urllib_parse_urlparse = urllib.parse.urlparse
legacy = []
diff --git a/yt_dlp/compat/functools.py b/yt_dlp/compat/functools.py
deleted file mode 100644
index c2e9e90279..0000000000
--- a/yt_dlp/compat/functools.py
+++ /dev/null
@@ -1,7 +0,0 @@
-# flake8: noqa: F405
-from functools import * # noqa: F403
-
-from .compat_utils import passthrough_module
-
-passthrough_module(__name__, 'functools')
-del passthrough_module
diff --git a/yt_dlp/compat/urllib/request.py b/yt_dlp/compat/urllib/request.py
index ad9fa83c87..dfc7f4a2dc 100644
--- a/yt_dlp/compat/urllib/request.py
+++ b/yt_dlp/compat/urllib/request.py
@@ -7,9 +7,9 @@
del passthrough_module
-from .. import compat_os_name
+import os
-if compat_os_name == 'nt':
+if os.name == 'nt':
# On older Python versions, proxies are extracted from Windows registry erroneously. [1]
# If the https proxy in the registry does not have a scheme, urllib will incorrectly add https:// to it. [2]
# It is unlikely that the user has actually set it to be https, so we should be fine to safely downgrade
@@ -37,4 +37,4 @@ def getproxies_registry_patched():
def getproxies():
return getproxies_environment() or getproxies_registry_patched()
-del compat_os_name
+del os
diff --git a/yt_dlp/cookies.py b/yt_dlp/cookies.py
index e673498244..d5b0d3991b 100644
--- a/yt_dlp/cookies.py
+++ b/yt_dlp/cookies.py
@@ -25,7 +25,6 @@
aes_gcm_decrypt_and_verify_bytes,
unpad_pkcs7,
)
-from .compat import compat_os_name
from .dependencies import (
_SECRETSTORAGE_UNAVAILABLE_REASON,
secretstorage,
@@ -343,7 +342,7 @@ def _extract_chrome_cookies(browser_name, profile, keyring, logger):
logger.debug(f'cookie version breakdown: {counts}')
return jar
except PermissionError as error:
- if compat_os_name == 'nt' and error.errno == 13:
+ if os.name == 'nt' and error.errno == 13:
message = 'Could not copy Chrome cookie database. See https://github.com/yt-dlp/yt-dlp/issues/7271 for more info'
logger.error(message)
raise DownloadError(message) # force exit
diff --git a/yt_dlp/downloader/common.py b/yt_dlp/downloader/common.py
index 2e3ea2fc4e..e8dcb37cc3 100644
--- a/yt_dlp/downloader/common.py
+++ b/yt_dlp/downloader/common.py
@@ -20,9 +20,7 @@
Namespace,
RetryManager,
classproperty,
- decodeArgument,
deprecation_warning,
- encodeFilename,
format_bytes,
join_nonempty,
parse_bytes,
@@ -219,7 +217,7 @@ def slow_down(self, start_time, now, byte_counter):
def temp_name(self, filename):
"""Returns a temporary filename for the given filename."""
if self.params.get('nopart', False) or filename == '-' or \
- (os.path.exists(encodeFilename(filename)) and not os.path.isfile(encodeFilename(filename))):
+ (os.path.exists(filename) and not os.path.isfile(filename)):
return filename
return filename + '.part'
@@ -273,7 +271,7 @@ def try_utime(self, filename, last_modified_hdr):
"""Try to set the last-modified time of the given file."""
if last_modified_hdr is None:
return
- if not os.path.isfile(encodeFilename(filename)):
+ if not os.path.isfile(filename):
return
timestr = last_modified_hdr
if timestr is None:
@@ -432,13 +430,13 @@ def download(self, filename, info_dict, subtitle=False):
"""
nooverwrites_and_exists = (
not self.params.get('overwrites', True)
- and os.path.exists(encodeFilename(filename))
+ and os.path.exists(filename)
)
if not hasattr(filename, 'write'):
continuedl_and_exists = (
self.params.get('continuedl', True)
- and os.path.isfile(encodeFilename(filename))
+ and os.path.isfile(filename)
and not self.params.get('nopart', False)
)
@@ -448,7 +446,7 @@ def download(self, filename, info_dict, subtitle=False):
self._hook_progress({
'filename': filename,
'status': 'finished',
- 'total_bytes': os.path.getsize(encodeFilename(filename)),
+ 'total_bytes': os.path.getsize(filename),
}, info_dict)
self._finish_multiline_status()
return True, False
@@ -489,9 +487,7 @@ def _debug_cmd(self, args, exe=None):
if not self.params.get('verbose', False):
return
- str_args = [decodeArgument(a) for a in args]
-
if exe is None:
- exe = os.path.basename(str_args[0])
+ exe = os.path.basename(args[0])
- self.write_debug(f'{exe} command line: {shell_quote(str_args)}')
+ self.write_debug(f'{exe} command line: {shell_quote(args)}')
diff --git a/yt_dlp/downloader/external.py b/yt_dlp/downloader/external.py
index 6c1ec403c8..7f6b5b45cc 100644
--- a/yt_dlp/downloader/external.py
+++ b/yt_dlp/downloader/external.py
@@ -23,7 +23,6 @@
cli_valueless_option,
determine_ext,
encodeArgument,
- encodeFilename,
find_available_port,
remove_end,
traverse_obj,
@@ -67,7 +66,7 @@ def real_download(self, filename, info_dict):
'elapsed': time.time() - started,
}
if filename != '-':
- fsize = os.path.getsize(encodeFilename(tmpfilename))
+ fsize = os.path.getsize(tmpfilename)
self.try_rename(tmpfilename, filename)
status.update({
'downloaded_bytes': fsize,
@@ -184,9 +183,9 @@ def _call_downloader(self, tmpfilename, info_dict):
dest.write(decrypt_fragment(fragment, src.read()))
src.close()
if not self.params.get('keep_fragments', False):
- self.try_remove(encodeFilename(fragment_filename))
+ self.try_remove(fragment_filename)
dest.close()
- self.try_remove(encodeFilename(f'{tmpfilename}.frag.urls'))
+ self.try_remove(f'{tmpfilename}.frag.urls')
return 0
def _call_process(self, cmd, info_dict):
@@ -620,7 +619,7 @@ def _call_downloader(self, tmpfilename, info_dict):
args += self._configuration_args(('_o1', '_o', ''))
args = [encodeArgument(opt) for opt in args]
- args.append(encodeFilename(ffpp._ffmpeg_filename_argument(tmpfilename), True))
+ args.append(ffpp._ffmpeg_filename_argument(tmpfilename))
self._debug_cmd(args)
piped = any(fmt['url'] in ('-', 'pipe:') for fmt in selected_formats)
diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py
index 0d00196e2e..98784e7039 100644
--- a/yt_dlp/downloader/fragment.py
+++ b/yt_dlp/downloader/fragment.py
@@ -9,10 +9,9 @@
from .common import FileDownloader
from .http import HttpFD
from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7
-from ..compat import compat_os_name
from ..networking import Request
from ..networking.exceptions import HTTPError, IncompleteRead
-from ..utils import DownloadError, RetryManager, encodeFilename, traverse_obj
+from ..utils import DownloadError, RetryManager, traverse_obj
from ..utils.networking import HTTPHeaderDict
from ..utils.progress import ProgressCalculator
@@ -152,7 +151,7 @@ def _append_fragment(self, ctx, frag_content):
if self.__do_ytdl_file(ctx):
self._write_ytdl_file(ctx)
if not self.params.get('keep_fragments', False):
- self.try_remove(encodeFilename(ctx['fragment_filename_sanitized']))
+ self.try_remove(ctx['fragment_filename_sanitized'])
del ctx['fragment_filename_sanitized']
def _prepare_frag_download(self, ctx):
@@ -188,7 +187,7 @@ def _prepare_frag_download(self, ctx):
})
if self.__do_ytdl_file(ctx):
- ytdl_file_exists = os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename'])))
+ ytdl_file_exists = os.path.isfile(self.ytdl_filename(ctx['filename']))
continuedl = self.params.get('continuedl', True)
if continuedl and ytdl_file_exists:
self._read_ytdl_file(ctx)
@@ -390,7 +389,7 @@ class FTPE(concurrent.futures.ThreadPoolExecutor):
def __exit__(self, exc_type, exc_val, exc_tb):
pass
- if compat_os_name == 'nt':
+ if os.name == 'nt':
def future_result(future):
while True:
try:
diff --git a/yt_dlp/downloader/http.py b/yt_dlp/downloader/http.py
index c0165790d1..9c6dd8b799 100644
--- a/yt_dlp/downloader/http.py
+++ b/yt_dlp/downloader/http.py
@@ -15,7 +15,6 @@
ThrottledDownload,
XAttrMetadataError,
XAttrUnavailableError,
- encodeFilename,
int_or_none,
parse_http_range,
try_call,
@@ -58,9 +57,8 @@ class DownloadContext(dict):
if self.params.get('continuedl', True):
# Establish possible resume length
- if os.path.isfile(encodeFilename(ctx.tmpfilename)):
- ctx.resume_len = os.path.getsize(
- encodeFilename(ctx.tmpfilename))
+ if os.path.isfile(ctx.tmpfilename):
+ ctx.resume_len = os.path.getsize(ctx.tmpfilename)
ctx.is_resume = ctx.resume_len > 0
@@ -241,7 +239,7 @@ def retry(e):
ctx.resume_len = byte_counter
else:
try:
- ctx.resume_len = os.path.getsize(encodeFilename(ctx.tmpfilename))
+ ctx.resume_len = os.path.getsize(ctx.tmpfilename)
except FileNotFoundError:
ctx.resume_len = 0
raise RetryDownload(e)
diff --git a/yt_dlp/downloader/rtmp.py b/yt_dlp/downloader/rtmp.py
index d7ffb3b34d..1b831e5f30 100644
--- a/yt_dlp/downloader/rtmp.py
+++ b/yt_dlp/downloader/rtmp.py
@@ -8,7 +8,6 @@
Popen,
check_executable,
encodeArgument,
- encodeFilename,
get_exe_version,
)
@@ -179,7 +178,7 @@ def run_rtmpdump(args):
return False
while retval in (RD_INCOMPLETE, RD_FAILED) and not test and not live:
- prevsize = os.path.getsize(encodeFilename(tmpfilename))
+ prevsize = os.path.getsize(tmpfilename)
self.to_screen(f'[rtmpdump] Downloaded {prevsize} bytes')
time.sleep(5.0) # This seems to be needed
args = [*basic_args, '--resume']
@@ -187,7 +186,7 @@ def run_rtmpdump(args):
args += ['--skip', '1']
args = [encodeArgument(a) for a in args]
retval = run_rtmpdump(args)
- cursize = os.path.getsize(encodeFilename(tmpfilename))
+ cursize = os.path.getsize(tmpfilename)
if prevsize == cursize and retval == RD_FAILED:
break
# Some rtmp streams seem abort after ~ 99.8%. Don't complain for those
@@ -196,7 +195,7 @@ def run_rtmpdump(args):
retval = RD_SUCCESS
break
if retval == RD_SUCCESS or (test and retval == RD_INCOMPLETE):
- fsize = os.path.getsize(encodeFilename(tmpfilename))
+ fsize = os.path.getsize(tmpfilename)
self.to_screen(f'[rtmpdump] Downloaded {fsize} bytes')
self.try_rename(tmpfilename, filename)
self._hook_progress({
diff --git a/yt_dlp/downloader/rtsp.py b/yt_dlp/downloader/rtsp.py
index e89269fed9..b4b0be7e6e 100644
--- a/yt_dlp/downloader/rtsp.py
+++ b/yt_dlp/downloader/rtsp.py
@@ -2,7 +2,7 @@
import subprocess
from .common import FileDownloader
-from ..utils import check_executable, encodeFilename
+from ..utils import check_executable
class RtspFD(FileDownloader):
@@ -26,7 +26,7 @@ def real_download(self, filename, info_dict):
retval = subprocess.call(args)
if retval == 0:
- fsize = os.path.getsize(encodeFilename(tmpfilename))
+ fsize = os.path.getsize(tmpfilename)
self.to_screen(f'\r[{args[0]}] {fsize} bytes')
self.try_rename(tmpfilename, filename)
self._hook_progress({
diff --git a/yt_dlp/extractor/abematv.py b/yt_dlp/extractor/abematv.py
index 66ab083fe0..b1343eed39 100644
--- a/yt_dlp/extractor/abematv.py
+++ b/yt_dlp/extractor/abematv.py
@@ -6,7 +6,6 @@
import io
import json
import re
-import struct
import time
import urllib.parse
import uuid
@@ -18,10 +17,8 @@
from ..utils import (
ExtractorError,
OnDemandPagedList,
- bytes_to_intlist,
decode_base_n,
int_or_none,
- intlist_to_bytes,
time_seconds,
traverse_obj,
update_url_query,
@@ -72,15 +69,15 @@ def _get_videokey_from_ticket(self, ticket):
})
res = decode_base_n(license_response['k'], table=self._STRTABLE)
- encvideokey = bytes_to_intlist(struct.pack('>QQ', res >> 64, res & 0xffffffffffffffff))
+ encvideokey = list(res.to_bytes(16, 'big'))
h = hmac.new(
binascii.unhexlify(self._HKEY),
(license_response['cid'] + self.ie._DEVICE_ID).encode(),
digestmod=hashlib.sha256)
- enckey = bytes_to_intlist(h.digest())
+ enckey = list(h.digest())
- return intlist_to_bytes(aes_ecb_decrypt(encvideokey, enckey))
+ return bytes(aes_ecb_decrypt(encvideokey, enckey))
class AbemaTVBaseIE(InfoExtractor):
diff --git a/yt_dlp/extractor/adn.py b/yt_dlp/extractor/adn.py
index c8a2613754..919e1d6af5 100644
--- a/yt_dlp/extractor/adn.py
+++ b/yt_dlp/extractor/adn.py
@@ -11,11 +11,9 @@
from ..utils import (
ExtractorError,
ass_subtitles_timecode,
- bytes_to_intlist,
bytes_to_long,
float_or_none,
int_or_none,
- intlist_to_bytes,
join_nonempty,
long_to_bytes,
parse_iso8601,
@@ -198,16 +196,16 @@ def _real_extract(self, url):
links_url = try_get(options, lambda x: x['video']['url']) or (video_base_url + 'link')
self._K = ''.join(random.choices('0123456789abcdef', k=16))
- message = bytes_to_intlist(json.dumps({
+ message = list(json.dumps({
'k': self._K,
't': token,
- }))
+ }).encode())
# Sometimes authentication fails for no good reason, retry with
# a different random padding
links_data = None
for _ in range(3):
- padded_message = intlist_to_bytes(pkcs1pad(message, 128))
+ padded_message = bytes(pkcs1pad(message, 128))
n, e = self._RSA_KEY
encrypted_message = long_to_bytes(pow(bytes_to_long(padded_message), e, n))
authorization = base64.b64encode(encrypted_message).decode()
diff --git a/yt_dlp/extractor/anvato.py b/yt_dlp/extractor/anvato.py
index ba1d7df372..bd3b19b133 100644
--- a/yt_dlp/extractor/anvato.py
+++ b/yt_dlp/extractor/anvato.py
@@ -8,10 +8,8 @@
from .common import InfoExtractor
from ..aes import aes_encrypt
from ..utils import (
- bytes_to_intlist,
determine_ext,
int_or_none,
- intlist_to_bytes,
join_nonempty,
smuggle_url,
strip_jsonp,
@@ -234,8 +232,8 @@ def _get_video_json(self, access_key, video_id, extracted_token):
server_time = self._server_time(access_key, video_id)
input_data = f'{server_time}~{md5_text(video_data_url)}~{md5_text(server_time)}'
- auth_secret = intlist_to_bytes(aes_encrypt(
- bytes_to_intlist(input_data[:64]), bytes_to_intlist(self._AUTH_KEY)))
+ auth_secret = bytes(aes_encrypt(
+ list(input_data[:64].encode()), list(self._AUTH_KEY)))
query = {
'X-Anvato-Adst-Auth': base64.b64encode(auth_secret).decode('ascii'),
'rtyp': 'fp',
diff --git a/yt_dlp/extractor/common.py b/yt_dlp/extractor/common.py
index 23f6fc6c46..2aa40a77a7 100644
--- a/yt_dlp/extractor/common.py
+++ b/yt_dlp/extractor/common.py
@@ -25,7 +25,6 @@
from ..compat import (
compat_etree_fromstring,
compat_expanduser,
- compat_os_name,
urllib_req_to_req,
)
from ..cookies import LenientSimpleCookie
@@ -1029,7 +1028,7 @@ def _request_dump_filename(self, url, video_id, data=None):
filename = sanitize_filename(f'{basen}.dump', restricted=True)
# Working around MAX_PATH limitation on Windows (see
# http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx)
- if compat_os_name == 'nt':
+ if os.name == 'nt':
absfilepath = os.path.abspath(filename)
if len(absfilepath) > 259:
filename = fR'\\?\{absfilepath}'
diff --git a/yt_dlp/extractor/shemaroome.py b/yt_dlp/extractor/shemaroome.py
index 284b2f89c1..3ab322f67d 100644
--- a/yt_dlp/extractor/shemaroome.py
+++ b/yt_dlp/extractor/shemaroome.py
@@ -1,11 +1,9 @@
import base64
from .common import InfoExtractor
-from ..aes import aes_cbc_decrypt, unpad_pkcs7
+from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7
from ..utils import (
ExtractorError,
- bytes_to_intlist,
- intlist_to_bytes,
unified_strdate,
)
@@ -68,10 +66,10 @@ def _real_extract(self, url):
data_json = self._download_json('https://www.shemaroome.com/users/user_all_lists', video_id, data=data.encode())
if not data_json.get('status'):
raise ExtractorError('Premium videos cannot be downloaded yet.', expected=True)
- url_data = bytes_to_intlist(base64.b64decode(data_json['new_play_url']))
- key = bytes_to_intlist(base64.b64decode(data_json['key']))
- iv = [0] * 16
- m3u8_url = unpad_pkcs7(intlist_to_bytes(aes_cbc_decrypt(url_data, key, iv))).decode('ascii')
+ url_data = base64.b64decode(data_json['new_play_url'])
+ key = base64.b64decode(data_json['key'])
+ iv = bytes(16)
+ m3u8_url = unpad_pkcs7(aes_cbc_decrypt_bytes(url_data, key, iv)).decode('ascii')
headers = {'stream_key': data_json['stream_key']}
formats, m3u8_subs = self._extract_m3u8_formats_and_subtitles(m3u8_url, video_id, fatal=False, headers=headers)
for fmt in formats:
diff --git a/yt_dlp/extractor/youtube.py b/yt_dlp/extractor/youtube.py
index caa99182ae..a3b237bc8d 100644
--- a/yt_dlp/extractor/youtube.py
+++ b/yt_dlp/extractor/youtube.py
@@ -22,7 +22,7 @@
from .common import InfoExtractor, SearchInfoExtractor
from .openload import PhantomJSwrapper
from ..jsinterp import JSInterpreter
-from ..networking.exceptions import HTTPError, TransportError, network_exceptions
+from ..networking.exceptions import HTTPError, network_exceptions
from ..utils import (
NO_DEFAULT,
ExtractorError,
@@ -50,12 +50,12 @@
parse_iso8601,
parse_qs,
qualities,
+ remove_end,
remove_start,
smuggle_url,
str_or_none,
str_to_int,
strftime_or_none,
- time_seconds,
traverse_obj,
try_call,
try_get,
@@ -124,14 +124,15 @@
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 62,
+ 'REQUIRE_AUTH': True,
},
'android': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID',
- 'clientVersion': '19.29.37',
+ 'clientVersion': '19.44.38',
'androidSdkVersion': 30,
- 'userAgent': 'com.google.android.youtube/19.29.37 (Linux; U; Android 11) gzip',
+ 'userAgent': 'com.google.android.youtube/19.44.38 (Linux; U; Android 11) gzip',
'osName': 'Android',
'osVersion': '11',
},
@@ -140,13 +141,14 @@
'REQUIRE_JS_PLAYER': False,
'REQUIRE_PO_TOKEN': True,
},
+ # This client now requires sign-in for every video
'android_music': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID_MUSIC',
- 'clientVersion': '7.11.50',
+ 'clientVersion': '7.27.52',
'androidSdkVersion': 30,
- 'userAgent': 'com.google.android.apps.youtube.music/7.11.50 (Linux; U; Android 11) gzip',
+ 'userAgent': 'com.google.android.apps.youtube.music/7.27.52 (Linux; U; Android 11) gzip',
'osName': 'Android',
'osVersion': '11',
},
@@ -154,15 +156,16 @@
'INNERTUBE_CONTEXT_CLIENT_NAME': 21,
'REQUIRE_JS_PLAYER': False,
'REQUIRE_PO_TOKEN': True,
+ 'REQUIRE_AUTH': True,
},
# This client now requires sign-in for every video
'android_creator': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID_CREATOR',
- 'clientVersion': '24.30.100',
+ 'clientVersion': '24.45.100',
'androidSdkVersion': 30,
- 'userAgent': 'com.google.android.apps.youtube.creator/24.30.100 (Linux; U; Android 11) gzip',
+ 'userAgent': 'com.google.android.apps.youtube.creator/24.45.100 (Linux; U; Android 11) gzip',
'osName': 'Android',
'osVersion': '11',
},
@@ -170,17 +173,18 @@
'INNERTUBE_CONTEXT_CLIENT_NAME': 14,
'REQUIRE_JS_PLAYER': False,
'REQUIRE_PO_TOKEN': True,
+ 'REQUIRE_AUTH': True,
},
# YouTube Kids videos aren't returned on this client for some reason
'android_vr': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID_VR',
- 'clientVersion': '1.57.29',
+ 'clientVersion': '1.60.19',
'deviceMake': 'Oculus',
'deviceModel': 'Quest 3',
'androidSdkVersion': 32,
- 'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.57.29 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip',
+ 'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.60.19 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip',
'osName': 'Android',
'osVersion': '12L',
},
@@ -188,68 +192,56 @@
'INNERTUBE_CONTEXT_CLIENT_NAME': 28,
'REQUIRE_JS_PLAYER': False,
},
- 'android_testsuite': {
- 'INNERTUBE_CONTEXT': {
- 'client': {
- 'clientName': 'ANDROID_TESTSUITE',
- 'clientVersion': '1.9',
- 'androidSdkVersion': 30,
- 'userAgent': 'com.google.android.youtube/1.9 (Linux; U; Android 11) gzip',
- 'osName': 'Android',
- 'osVersion': '11',
- },
- },
- 'INNERTUBE_CONTEXT_CLIENT_NAME': 30,
- 'REQUIRE_JS_PLAYER': False,
- 'PLAYER_PARAMS': '2AMB',
- },
# iOS clients have HLS live streams. Setting device model to get 60fps formats.
# See: https://github.com/TeamNewPipe/NewPipeExtractor/issues/680#issuecomment-1002724558
'ios': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'IOS',
- 'clientVersion': '19.29.1',
+ 'clientVersion': '19.45.4',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
- 'userAgent': 'com.google.ios.youtube/19.29.1 (iPhone16,2; U; CPU iOS 17_5_1 like Mac OS X;)',
+ 'userAgent': 'com.google.ios.youtube/19.45.4 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)',
'osName': 'iPhone',
- 'osVersion': '17.5.1.21F90',
+ 'osVersion': '18.1.0.22B83',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
'REQUIRE_JS_PLAYER': False,
},
+ # This client now requires sign-in for every video
'ios_music': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'IOS_MUSIC',
- 'clientVersion': '7.08.2',
+ 'clientVersion': '7.27.0',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
- 'userAgent': 'com.google.ios.youtubemusic/7.08.2 (iPhone16,2; U; CPU iOS 17_5_1 like Mac OS X;)',
+ 'userAgent': 'com.google.ios.youtubemusic/7.27.0 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)',
'osName': 'iPhone',
- 'osVersion': '17.5.1.21F90',
+ 'osVersion': '18.1.0.22B83',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 26,
'REQUIRE_JS_PLAYER': False,
+ 'REQUIRE_AUTH': True,
},
# This client now requires sign-in for every video
'ios_creator': {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'IOS_CREATOR',
- 'clientVersion': '24.30.100',
+ 'clientVersion': '24.45.100',
'deviceMake': 'Apple',
'deviceModel': 'iPhone16,2',
- 'userAgent': 'com.google.ios.ytcreator/24.30.100 (iPhone16,2; U; CPU iOS 17_5_1 like Mac OS X;)',
+ 'userAgent': 'com.google.ios.ytcreator/24.45.100 (iPhone16,2; U; CPU iOS 18_1_0 like Mac OS X;)',
'osName': 'iPhone',
- 'osVersion': '17.5.1.21F90',
+ 'osVersion': '18.1.0.22B83',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 15,
'REQUIRE_JS_PLAYER': False,
+ 'REQUIRE_AUTH': True,
},
# mweb has 'ultralow' formats
# See: https://github.com/yt-dlp/yt-dlp/pull/557
@@ -282,8 +274,10 @@
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 85,
+ 'REQUIRE_AUTH': True,
},
- # This client has pre-merged video+audio 720p/1080p streams
+ # This client now requires sign-in for every video
+ # It may be able to receive pre-merged video+audio 720p/1080p streams
'mediaconnect': {
'INNERTUBE_CONTEXT': {
'client': {
@@ -293,6 +287,7 @@
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 95,
'REQUIRE_JS_PLAYER': False,
+ 'REQUIRE_AUTH': True,
},
}
@@ -321,6 +316,7 @@ def build_innertube_clients():
ytcfg.setdefault('INNERTUBE_HOST', 'www.youtube.com')
ytcfg.setdefault('REQUIRE_JS_PLAYER', True)
ytcfg.setdefault('REQUIRE_PO_TOKEN', False)
+ ytcfg.setdefault('REQUIRE_AUTH', False)
ytcfg.setdefault('PLAYER_PARAMS', None)
ytcfg['INNERTUBE_CONTEXT']['client'].setdefault('hl', 'en')
@@ -577,208 +573,18 @@ def _real_initialize(self):
self._check_login_required()
def _perform_login(self, username, password):
- auth_type, _, user = (username or '').partition('+')
-
- if auth_type != 'oauth':
- raise ExtractorError(self._youtube_login_hint, expected=True)
-
- self._initialize_oauth(user, password)
-
- '''
- OAuth 2.0 Device Authorization Grant flow, used by the YouTube TV client (youtube.com/tv).
-
- For more information regarding OAuth 2.0 and the Device Authorization Grant flow in general, see:
- - https://developers.google.com/identity/protocols/oauth2/limited-input-device
- - https://accounts.google.com/.well-known/openid-configuration
- - https://www.rfc-editor.org/rfc/rfc8628
- - https://www.rfc-editor.org/rfc/rfc6749
-
- Note: The official client appears to use a proxied version of the oauth2 endpoints on youtube.com/o/oauth2,
- which applies some modifications to the response (such as returning errors as 200 OK).
- Since the client works with the standard API, we will use that as it is well-documented.
- '''
-
- _OAUTH_PROFILE = None
- _OAUTH_ACCESS_TOKEN_CACHE = {}
- _OAUTH_DISPLAY_ID = 'oauth'
-
- # YouTube TV (TVHTML5) client. You can find these at youtube.com/tv
- _OAUTH_CLIENT_ID = '861556708454-d6dlm3lh05idd8npek18k6be8ba3oc68.apps.googleusercontent.com'
- _OAUTH_CLIENT_SECRET = 'SboVhoG9s0rNafixCSGGKXAT'
- _OAUTH_SCOPE = 'http://gdata.youtube.com https://www.googleapis.com/auth/youtube-paid-content'
-
- # From https://accounts.google.com/.well-known/openid-configuration
- # Technically, these should be fetched dynamically and not hard-coded.
- # However, as these endpoints rarely change, we can risk saving an extra request for every invocation.
- _OAUTH_DEVICE_AUTHORIZATION_ENDPOINT = 'https://oauth2.googleapis.com/device/code'
- _OAUTH_TOKEN_ENDPOINT = 'https://oauth2.googleapis.com/token'
-
- @property
- def _oauth_cache_key(self):
- return f'oauth_refresh_token_{self._OAUTH_PROFILE}'
-
- def _read_oauth_error_response(self, response):
- return traverse_obj(
- self._webpage_read_content(response, self._OAUTH_TOKEN_ENDPOINT, self._OAUTH_DISPLAY_ID, fatal=False),
- ({json.loads}, 'error', {str}))
-
- def _set_oauth_info(self, token_response):
- YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE.setdefault(self._OAUTH_PROFILE, {}).update({
- 'access_token': token_response['access_token'],
- 'token_type': token_response['token_type'],
- 'expiry': time_seconds(
- seconds=traverse_obj(token_response, ('expires_in', {float_or_none}), default=300) - 10),
- })
- refresh_token = traverse_obj(token_response, ('refresh_token', {str}))
- if refresh_token:
- self.cache.store(self._NETRC_MACHINE, self._oauth_cache_key, refresh_token)
- YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE[self._OAUTH_PROFILE]['refresh_token'] = refresh_token
-
- def _initialize_oauth(self, user, refresh_token):
- self._OAUTH_PROFILE = user or 'default'
-
- if self._OAUTH_PROFILE in YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE:
- self.write_debug(f'{self._OAUTH_DISPLAY_ID}: Using cached access token for profile "{self._OAUTH_PROFILE}"')
- return
-
- YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE[self._OAUTH_PROFILE] = {}
-
- if refresh_token:
- msg = f'{self._OAUTH_DISPLAY_ID}: Using password input as refresh token'
- if self.get_param('cachedir') is not False:
- msg += ' and caching token to disk; you should supply an empty password next time'
- self.to_screen(msg)
- self.cache.store(self._NETRC_MACHINE, self._oauth_cache_key, refresh_token)
- else:
- refresh_token = self.cache.load(self._NETRC_MACHINE, self._oauth_cache_key)
-
- if refresh_token:
- YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE[self._OAUTH_PROFILE]['refresh_token'] = refresh_token
- try:
- token_response = self._refresh_token(refresh_token)
- except ExtractorError as e:
- error_msg = str(e.orig_msg).replace('Failed to refresh access token: ', '')
- self.report_warning(f'{self._OAUTH_DISPLAY_ID}: Failed to refresh access token: {error_msg}')
- token_response = self._oauth_authorize
- else:
- token_response = self._oauth_authorize
-
- self._set_oauth_info(token_response)
- self.write_debug(f'{self._OAUTH_DISPLAY_ID}: Logged in using profile "{self._OAUTH_PROFILE}"')
-
- def _refresh_token(self, refresh_token):
- try:
- token_response = self._download_json(
- self._OAUTH_TOKEN_ENDPOINT,
- video_id=self._OAUTH_DISPLAY_ID,
- note='Refreshing access token',
- data=json.dumps({
- 'client_id': self._OAUTH_CLIENT_ID,
- 'client_secret': self._OAUTH_CLIENT_SECRET,
- 'refresh_token': refresh_token,
- 'grant_type': 'refresh_token',
- }).encode(),
- headers={'Content-Type': 'application/json'})
- except ExtractorError as e:
- if isinstance(e.cause, HTTPError):
- error = self._read_oauth_error_response(e.cause.response)
- if error == 'invalid_grant':
- # RFC6749 § 5.2
- raise ExtractorError(
- 'Failed to refresh access token: Refresh token is invalid, revoked, or expired (invalid_grant)',
- expected=True, video_id=self._OAUTH_DISPLAY_ID)
- raise ExtractorError(
- f'Failed to refresh access token: Authorization server returned error {error}',
- video_id=self._OAUTH_DISPLAY_ID)
- raise
- return token_response
-
- @property
- def _oauth_authorize(self):
- code_response = self._download_json(
- self._OAUTH_DEVICE_AUTHORIZATION_ENDPOINT,
- video_id=self._OAUTH_DISPLAY_ID,
- note='Initializing authorization flow',
- data=json.dumps({
- 'client_id': self._OAUTH_CLIENT_ID,
- 'scope': self._OAUTH_SCOPE,
- }).encode(),
- headers={'Content-Type': 'application/json'})
-
- verification_url = traverse_obj(code_response, ('verification_url', {str}))
- user_code = traverse_obj(code_response, ('user_code', {str}))
- if not verification_url or not user_code:
+ if username.startswith('oauth'):
raise ExtractorError(
- 'Authorization server did not provide verification_url or user_code', video_id=self._OAUTH_DISPLAY_ID)
+ f'Login with OAuth is no longer supported. {self._youtube_login_hint}', expected=True)
- # note: The whitespace is intentional
- self.to_screen(
- f'{self._OAUTH_DISPLAY_ID}: To give yt-dlp access to your account, '
- f'go to {verification_url} and enter code {user_code}')
-
- # RFC8628 § 3.5: default poll interval is 5 seconds if not provided
- poll_interval = traverse_obj(code_response, ('interval', {int}), default=5)
-
- for retry in self.RetryManager():
- while True:
- try:
- token_response = self._download_json(
- self._OAUTH_TOKEN_ENDPOINT,
- video_id=self._OAUTH_DISPLAY_ID,
- note=False,
- errnote='Failed to request access token',
- data=json.dumps({
- 'client_id': self._OAUTH_CLIENT_ID,
- 'client_secret': self._OAUTH_CLIENT_SECRET,
- 'device_code': code_response['device_code'],
- 'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
- }).encode(),
- headers={'Content-Type': 'application/json'})
- except ExtractorError as e:
- if isinstance(e.cause, TransportError):
- retry.error = e
- break
- elif isinstance(e.cause, HTTPError):
- error = self._read_oauth_error_response(e.cause.response)
- if not error:
- retry.error = e
- break
-
- if error == 'authorization_pending':
- time.sleep(poll_interval)
- continue
- elif error == 'expired_token':
- raise ExtractorError(
- 'Authorization timed out', expected=True, video_id=self._OAUTH_DISPLAY_ID)
- elif error == 'access_denied':
- raise ExtractorError(
- 'You denied access to an account', expected=True, video_id=self._OAUTH_DISPLAY_ID)
- elif error == 'slow_down':
- # RFC8628 § 3.5: add 5 seconds to the poll interval
- poll_interval += 5
- time.sleep(poll_interval)
- continue
- else:
- raise ExtractorError(
- f'Authorization server returned an error when fetching access token: {error}',
- video_id=self._OAUTH_DISPLAY_ID)
- raise
-
- return token_response
-
- def _update_oauth(self):
- token = YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE.get(self._OAUTH_PROFILE)
- if token is None or token['expiry'] > time.time():
- return
-
- self._set_oauth_info(self._refresh_token(token['refresh_token']))
+ self.report_warning(
+ f'Login with password is not supported for YouTube. {self._youtube_login_hint}')
@property
def _youtube_login_hint(self):
- return ('Use --username=oauth[+PROFILE] --password="" to log in using oauth, '
- f'or else u{self._login_hint(method="cookies")[1:]}. '
- 'See https://github.com/yt-dlp/yt-dlp/wiki/Extractors#logging-in-with-oauth for more on how to use oauth. '
- 'See https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies for help with cookies')
+ return (f'{self._login_hint(method="cookies")}. Also see '
+ 'https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies '
+ 'for tips on effectively exporting YouTube cookies')
def _check_login_required(self):
if self._LOGIN_REQUIRED and not self.is_authenticated:
@@ -928,7 +734,7 @@ def _extract_visitor_data(self, *args):
@functools.cached_property
def is_authenticated(self):
- return self._OAUTH_PROFILE or bool(self._generate_sapisidhash_header())
+ return bool(self._generate_sapisidhash_header())
def extract_ytcfg(self, video_id, webpage):
if not webpage:
@@ -938,16 +744,6 @@ def extract_ytcfg(self, video_id, webpage):
r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;', webpage, 'ytcfg',
default='{}'), video_id, fatal=False) or {}
- def _generate_oauth_headers(self):
- self._update_oauth()
- oauth_token = YoutubeBaseInfoExtractor._OAUTH_ACCESS_TOKEN_CACHE.get(self._OAUTH_PROFILE)
- if not oauth_token:
- return {}
-
- return {
- 'Authorization': f'{oauth_token["token_type"]} {oauth_token["access_token"]}',
- }
-
def _generate_cookie_auth_headers(self, *, ytcfg=None, account_syncid=None, session_index=None, origin=None, **kwargs):
headers = {}
account_syncid = account_syncid or self._extract_account_syncid(ytcfg)
@@ -977,14 +773,10 @@ def generate_api_headers(
'Origin': origin,
'X-Goog-Visitor-Id': visitor_data or self._extract_visitor_data(ytcfg),
'User-Agent': self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT']['client']['userAgent'], default_client=default_client),
- **self._generate_oauth_headers(),
**self._generate_cookie_auth_headers(ytcfg=ytcfg, account_syncid=account_syncid, session_index=session_index, origin=origin),
}
return filter_dict(headers)
- def _generate_webpage_headers(self):
- return self._generate_oauth_headers()
-
def _download_ytcfg(self, client, video_id):
url = {
'web': 'https://www.youtube.com',
@@ -994,8 +786,7 @@ def _download_ytcfg(self, client, video_id):
if not url:
return {}
webpage = self._download_webpage(
- url, video_id, fatal=False, note=f'Downloading {client.replace("_", " ").strip()} client config',
- headers=self._generate_webpage_headers())
+ url, video_id, fatal=False, note=f'Downloading {client.replace("_", " ").strip()} client config')
return self.extract_ytcfg(video_id, webpage) or {}
@staticmethod
@@ -3260,8 +3051,7 @@ def _load_player(self, video_id, player_url, fatal=True):
code = self._download_webpage(
player_url, video_id, fatal=fatal,
note='Downloading player ' + player_id,
- errnote=f'Download of {player_url} failed',
- headers=self._generate_webpage_headers())
+ errnote=f'Download of {player_url} failed')
if code:
self._code_cache[player_id] = code
return self._code_cache.get(player_id)
@@ -3544,8 +3334,7 @@ def _mark_watched(self, video_id, player_responses):
self._download_webpage(
url, video_id, f'Marking {label}watched',
- 'Unable to mark watched', fatal=False,
- headers=self._generate_webpage_headers())
+ 'Unable to mark watched', fatal=False)
@classmethod
def _extract_from_webpage(cls, url, webpage):
@@ -4059,9 +3848,10 @@ def _get_requested_clients(self, url, smuggled_data):
if smuggled_data.get('is_music_url') or self.is_music_url(url):
for requested_client in requested_clients:
_, base_client, variant = _split_innertube_client(requested_client)
- music_client = f'{base_client}_music'
+ music_client = f'{base_client}_music' if base_client != 'mweb' else 'web_music'
if variant != 'music' and music_client in INNERTUBE_CLIENTS:
- requested_clients.append(music_client)
+ if not INNERTUBE_CLIENTS[music_client]['REQUIRE_AUTH'] or self.is_authenticated:
+ requested_clients.append(music_client)
return orderedSet(requested_clients)
@@ -4174,10 +3964,10 @@ def append_client(*client_names):
self.to_screen(
f'{video_id}: This video is age-restricted and YouTube is requiring '
'account age-verification; some formats may be missing', only_once=True)
- # web_creator and mediaconnect can work around the age-verification requirement
- # _testsuite & _vr variants can also work around age-verification
+ # web_creator can work around the age-verification requirement
+ # android_vr and mediaconnect may also be able to work around age-verification
# tv_embedded may(?) still work around age-verification if the video is embeddable
- append_client('web_creator', 'mediaconnect')
+ append_client('web_creator')
prs.extend(deprioritized_prs)
@@ -4526,7 +4316,7 @@ def _download_player_responses(self, url, smuggled_data, video_id, webpage_url):
if pp:
query['pp'] = pp
webpage = self._download_webpage(
- webpage_url, video_id, fatal=False, query=query, headers=self._generate_webpage_headers())
+ webpage_url, video_id, fatal=False, query=query)
master_ytcfg = self.extract_ytcfg(video_id, webpage) or self._get_default_ytcfg()
@@ -4669,6 +4459,9 @@ def feed_entry(name):
self.raise_geo_restricted(subreason, countries, metadata_available=True)
reason += f'. {subreason}'
if reason:
+ if 'sign in' in reason.lower():
+ reason = remove_end(reason, 'This helps protect our community. Learn more')
+ reason = f'{remove_end(reason.strip(), ".")}. {self._youtube_login_hint}'
self.raise_no_formats(reason, expected=True)
keywords = get_first(video_details, 'keywords', expected_type=list) or []
@@ -5814,7 +5607,7 @@ def _extract_webpage(self, url, item_id, fatal=True):
webpage, data = None, None
for retry in self.RetryManager(fatal=fatal):
try:
- webpage = self._download_webpage(url, item_id, note='Downloading webpage', headers=self._generate_webpage_headers())
+ webpage = self._download_webpage(url, item_id, note='Downloading webpage')
data = self.extract_yt_initial_data(item_id, webpage or '', fatal=fatal) or {}
except ExtractorError as e:
if isinstance(e.cause, network_exceptions):
diff --git a/yt_dlp/postprocessor/common.py b/yt_dlp/postprocessor/common.py
index eeeece82c2..be2bb33f64 100644
--- a/yt_dlp/postprocessor/common.py
+++ b/yt_dlp/postprocessor/common.py
@@ -9,7 +9,6 @@
RetryManager,
_configuration_args,
deprecation_warning,
- encodeFilename,
)
@@ -151,7 +150,7 @@ def run(self, information):
def try_utime(self, path, atime, mtime, errnote='Cannot update utime of file'):
try:
- os.utime(encodeFilename(path), (atime, mtime))
+ os.utime(path, (atime, mtime))
except Exception:
self.report_warning(errnote)
diff --git a/yt_dlp/postprocessor/embedthumbnail.py b/yt_dlp/postprocessor/embedthumbnail.py
index 16c8bcdda7..d8ba220cab 100644
--- a/yt_dlp/postprocessor/embedthumbnail.py
+++ b/yt_dlp/postprocessor/embedthumbnail.py
@@ -12,7 +12,6 @@
PostProcessingError,
check_executable,
encodeArgument,
- encodeFilename,
prepend_extension,
shell_quote,
)
@@ -68,7 +67,7 @@ def run(self, info):
self.to_screen('There are no thumbnails on disk')
return [], info
thumbnail_filename = info['thumbnails'][idx]['filepath']
- if not os.path.exists(encodeFilename(thumbnail_filename)):
+ if not os.path.exists(thumbnail_filename):
self.report_warning('Skipping embedding the thumbnail because the file is missing.')
return [], info
@@ -85,7 +84,7 @@ def run(self, info):
thumbnail_filename = convertor.convert_thumbnail(thumbnail_filename, 'png')
thumbnail_ext = 'png'
- mtime = os.stat(encodeFilename(filename)).st_mtime
+ mtime = os.stat(filename).st_mtime
success = True
if info['ext'] == 'mp3':
@@ -154,12 +153,12 @@ def run(self, info):
else:
if not prefer_atomicparsley:
self.to_screen('mutagen was not found. Falling back to AtomicParsley')
- cmd = [encodeFilename(atomicparsley, True),
- encodeFilename(filename, True),
+ cmd = [atomicparsley,
+ filename,
encodeArgument('--artwork'),
- encodeFilename(thumbnail_filename, True),
+ thumbnail_filename,
encodeArgument('-o'),
- encodeFilename(temp_filename, True)]
+ temp_filename]
cmd += [encodeArgument(o) for o in self._configuration_args('AtomicParsley')]
self._report_run('atomicparsley', filename)
diff --git a/yt_dlp/postprocessor/ffmpeg.py b/yt_dlp/postprocessor/ffmpeg.py
index 164c46d143..d994754fd3 100644
--- a/yt_dlp/postprocessor/ffmpeg.py
+++ b/yt_dlp/postprocessor/ffmpeg.py
@@ -21,7 +21,6 @@
determine_ext,
dfxp2srt,
encodeArgument,
- encodeFilename,
filter_dict,
float_or_none,
is_outdated_version,
@@ -243,13 +242,13 @@ def get_audio_codec(self, path):
try:
if self.probe_available:
cmd = [
- encodeFilename(self.probe_executable, True),
+ self.probe_executable,
encodeArgument('-show_streams')]
else:
cmd = [
- encodeFilename(self.executable, True),
+ self.executable,
encodeArgument('-i')]
- cmd.append(encodeFilename(self._ffmpeg_filename_argument(path), True))
+ cmd.append(self._ffmpeg_filename_argument(path))
self.write_debug(f'{self.basename} command line: {shell_quote(cmd)}')
stdout, stderr, returncode = Popen.run(
cmd, text=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -282,7 +281,7 @@ def get_metadata_object(self, path, opts=[]):
self.check_version()
cmd = [
- encodeFilename(self.probe_executable, True),
+ self.probe_executable,
encodeArgument('-hide_banner'),
encodeArgument('-show_format'),
encodeArgument('-show_streams'),
@@ -335,9 +334,9 @@ def real_run_ffmpeg(self, input_path_opts, output_path_opts, *, expected_retcode
self.check_version()
oldest_mtime = min(
- os.stat(encodeFilename(path)).st_mtime for path, _ in input_path_opts if path)
+ os.stat(path).st_mtime for path, _ in input_path_opts if path)
- cmd = [encodeFilename(self.executable, True), encodeArgument('-y')]
+ cmd = [self.executable, encodeArgument('-y')]
# avconv does not have repeat option
if self.basename == 'ffmpeg':
cmd += [encodeArgument('-loglevel'), encodeArgument('repeat+info')]
@@ -353,7 +352,7 @@ def make_args(file, args, name, number):
args.append('-i')
return (
[encodeArgument(arg) for arg in args]
- + [encodeFilename(self._ffmpeg_filename_argument(file), True)])
+ + [self._ffmpeg_filename_argument(file)])
for arg_type, path_opts in (('i', input_path_opts), ('o', output_path_opts)):
cmd += itertools.chain.from_iterable(
@@ -522,8 +521,8 @@ def run(self, information):
return [], information
orig_path = prepend_extension(path, 'orig')
temp_path = prepend_extension(path, 'temp')
- if (self._nopostoverwrites and os.path.exists(encodeFilename(new_path))
- and os.path.exists(encodeFilename(orig_path))):
+ if (self._nopostoverwrites and os.path.exists(new_path)
+ and os.path.exists(orig_path)):
self.to_screen(f'Post-process file {new_path} exists, skipping')
return [], information
@@ -838,7 +837,7 @@ def run(self, info):
args.extend(['-map', f'{i}:v:0'])
self.to_screen(f'Merging formats into "{filename}"')
self.run_ffmpeg_multiple_files(info['__files_to_merge'], temp_filename, args)
- os.rename(encodeFilename(temp_filename), encodeFilename(filename))
+ os.rename(temp_filename, filename)
return info['__files_to_merge'], info
def can_merge(self):
@@ -1039,7 +1038,7 @@ def _prepare_filename(self, number, chapter, info):
def _ffmpeg_args_for_chapter(self, number, chapter, info):
destination = self._prepare_filename(number, chapter, info)
- if not self._downloader._ensure_dir_exists(encodeFilename(destination)):
+ if not self._downloader._ensure_dir_exists(destination):
return
chapter['filepath'] = destination
diff --git a/yt_dlp/postprocessor/movefilesafterdownload.py b/yt_dlp/postprocessor/movefilesafterdownload.py
index 35e87051b4..964ca1f921 100644
--- a/yt_dlp/postprocessor/movefilesafterdownload.py
+++ b/yt_dlp/postprocessor/movefilesafterdownload.py
@@ -4,8 +4,6 @@
from ..compat import shutil
from ..utils import (
PostProcessingError,
- decodeFilename,
- encodeFilename,
make_dir,
)
@@ -21,25 +19,25 @@ def pp_key(cls):
return 'MoveFiles'
def run(self, info):
- dl_path, dl_name = os.path.split(encodeFilename(info['filepath']))
+ dl_path, dl_name = os.path.split(info['filepath'])
finaldir = info.get('__finaldir', dl_path)
finalpath = os.path.join(finaldir, dl_name)
if self._downloaded:
- info['__files_to_move'][info['filepath']] = decodeFilename(finalpath)
+ info['__files_to_move'][info['filepath']] = finalpath
- make_newfilename = lambda old: decodeFilename(os.path.join(finaldir, os.path.basename(encodeFilename(old))))
+ make_newfilename = lambda old: os.path.join(finaldir, os.path.basename(old))
for oldfile, newfile in info['__files_to_move'].items():
if not newfile:
newfile = make_newfilename(oldfile)
- if os.path.abspath(encodeFilename(oldfile)) == os.path.abspath(encodeFilename(newfile)):
+ if os.path.abspath(oldfile) == os.path.abspath(newfile):
continue
- if not os.path.exists(encodeFilename(oldfile)):
+ if not os.path.exists(oldfile):
self.report_warning(f'File "{oldfile}" cannot be found')
continue
- if os.path.exists(encodeFilename(newfile)):
+ if os.path.exists(newfile):
if self.get_param('overwrites', True):
self.report_warning(f'Replacing existing file "{newfile}"')
- os.remove(encodeFilename(newfile))
+ os.remove(newfile)
else:
self.report_warning(
f'Cannot move file "{oldfile}" out of temporary directory since "{newfile}" already exists. ')
diff --git a/yt_dlp/postprocessor/sponskrub.py b/yt_dlp/postprocessor/sponskrub.py
index 525b6392a4..ac6db1bc7b 100644
--- a/yt_dlp/postprocessor/sponskrub.py
+++ b/yt_dlp/postprocessor/sponskrub.py
@@ -9,7 +9,6 @@
check_executable,
cli_option,
encodeArgument,
- encodeFilename,
prepend_extension,
shell_quote,
str_or_none,
@@ -52,7 +51,7 @@ def run(self, information):
return [], information
filename = information['filepath']
- if not os.path.exists(encodeFilename(filename)): # no download
+ if not os.path.exists(filename): # no download
return [], information
if information['extractor_key'].lower() != 'youtube':
@@ -71,8 +70,8 @@ def run(self, information):
self.report_warning('If sponskrub is run multiple times, unintended parts of the video could be cut out.')
temp_filename = prepend_extension(filename, self._temp_ext)
- if os.path.exists(encodeFilename(temp_filename)):
- os.remove(encodeFilename(temp_filename))
+ if os.path.exists(temp_filename):
+ os.remove(temp_filename)
cmd = [self.path]
if not self.cutout:
diff --git a/yt_dlp/postprocessor/xattrpp.py b/yt_dlp/postprocessor/xattrpp.py
index 166aabaf92..e486b797b7 100644
--- a/yt_dlp/postprocessor/xattrpp.py
+++ b/yt_dlp/postprocessor/xattrpp.py
@@ -1,7 +1,6 @@
import os
from .common import PostProcessor
-from ..compat import compat_os_name
from ..utils import (
PostProcessingError,
XAttrMetadataError,
@@ -57,7 +56,7 @@ def run(self, info):
elif e.reason == 'VALUE_TOO_LONG':
self.report_warning(f'Unable to write extended attribute "{xattrname}" due to too long values.')
else:
- tip = ('You need to use NTFS' if compat_os_name == 'nt'
+ tip = ('You need to use NTFS' if os.name == 'nt'
else 'You may have to enable them in your "/etc/fstab"')
raise PostProcessingError(f'This filesystem doesn\'t support extended attributes. {tip}')
diff --git a/yt_dlp/update.py b/yt_dlp/update.py
index 90df2509f0..ca2ec5f376 100644
--- a/yt_dlp/update.py
+++ b/yt_dlp/update.py
@@ -13,7 +13,6 @@
from dataclasses import dataclass
from zipimport import zipimporter
-from .compat import compat_realpath
from .networking import Request
from .networking.exceptions import HTTPError, network_exceptions
from .utils import (
@@ -201,8 +200,6 @@ class UpdateInfo:
binary_name: str | None = _get_binary_name() # noqa: RUF009: Always returns the same value
checksum: str | None = None
- _has_update = True
-
class Updater:
# XXX: use class variables to simplify testing
@@ -523,7 +520,7 @@ def update(self, update_info=NO_DEFAULT):
@functools.cached_property
def filename(self):
"""Filename of the executable"""
- return compat_realpath(_get_variant_and_executable_path()[1])
+ return os.path.realpath(_get_variant_and_executable_path()[1])
@functools.cached_property
def cmd(self):
@@ -562,62 +559,14 @@ def _report_network_error(self, action, delim=';', tag=None):
f'Unable to {action}{delim} visit '
f'https://github.com/{self.requested_repo}/releases/{path}', True)
- # XXX: Everything below this line in this class is deprecated / for compat only
- @property
- def _target_tag(self):
- """Deprecated; requested tag with 'tags/' prepended when necessary for API calls"""
- return f'tags/{self.requested_tag}' if self.requested_tag != 'latest' else self.requested_tag
-
- def _check_update(self):
- """Deprecated; report whether there is an update available"""
- return bool(self.query_update(_output=True))
-
- def __getattr__(self, attribute: str):
- """Compat getter function for deprecated attributes"""
- deprecated_props_map = {
- 'check_update': '_check_update',
- 'target_tag': '_target_tag',
- 'target_channel': 'requested_channel',
- }
- update_info_props_map = {
- 'has_update': '_has_update',
- 'new_version': 'version',
- 'latest_version': 'requested_version',
- 'release_name': 'binary_name',
- 'release_hash': 'checksum',
- }
-
- if attribute not in deprecated_props_map and attribute not in update_info_props_map:
- raise AttributeError(f'{type(self).__name__!r} object has no attribute {attribute!r}')
-
- msg = f'{type(self).__name__}.{attribute} is deprecated and will be removed in a future version'
- if attribute in deprecated_props_map:
- source_name = deprecated_props_map[attribute]
- if not source_name.startswith('_'):
- msg += f'. Please use {source_name!r} instead'
- source = self
- mapping = deprecated_props_map
-
- else: # attribute in update_info_props_map
- msg += '. Please call query_update() instead'
- source = self.query_update()
- if source is None:
- source = UpdateInfo('', None, None, None)
- source._has_update = False
- mapping = update_info_props_map
-
- deprecation_warning(msg)
- for target_name, source_name in mapping.items():
- value = getattr(source, source_name)
- setattr(self, target_name, value)
-
- return getattr(self, attribute)
-
def run_update(ydl):
"""Update the program file with the latest version from the repository
@returns Whether there was a successful update (No update = False)
"""
+ deprecation_warning(
+ '"yt_dlp.update.run_update(ydl)" is deprecated and may be removed in a future version. '
+ 'Use "yt_dlp.update.Updater(ydl).update()" instead')
return Updater(ydl).update()
diff --git a/yt_dlp/utils/_deprecated.py b/yt_dlp/utils/_deprecated.py
index a8ae8ecb5d..e4762699b7 100644
--- a/yt_dlp/utils/_deprecated.py
+++ b/yt_dlp/utils/_deprecated.py
@@ -9,31 +9,23 @@
del passthrough_module
-from ._utils import preferredencoding
+import re
+import struct
-def encodeFilename(s, for_subprocess=False):
- assert isinstance(s, str)
- return s
+def bytes_to_intlist(bs):
+ if not bs:
+ return []
+ if isinstance(bs[0], int): # Python 3
+ return list(bs)
+ else:
+ return [ord(c) for c in bs]
-def decodeFilename(b, for_subprocess=False):
- return b
+def intlist_to_bytes(xs):
+ if not xs:
+ return b''
+ return struct.pack('%dB' % len(xs), *xs)
-def decodeArgument(b):
- return b
-
-
-def decodeOption(optval):
- if optval is None:
- return optval
- if isinstance(optval, bytes):
- optval = optval.decode(preferredencoding())
-
- assert isinstance(optval, str)
- return optval
-
-
-def error_to_compat_str(err):
- return str(err)
+compiled_regex_type = type(re.compile(''))
diff --git a/yt_dlp/utils/_legacy.py b/yt_dlp/utils/_legacy.py
index 356e580226..d65b135d9d 100644
--- a/yt_dlp/utils/_legacy.py
+++ b/yt_dlp/utils/_legacy.py
@@ -313,3 +313,30 @@ def make_HTTPS_handler(params, **kwargs):
def process_communicate_or_kill(p, *args, **kwargs):
return Popen.communicate_or_kill(p, *args, **kwargs)
+
+
+def encodeFilename(s, for_subprocess=False):
+ assert isinstance(s, str)
+ return s
+
+
+def decodeFilename(b, for_subprocess=False):
+ return b
+
+
+def decodeArgument(b):
+ return b
+
+
+def decodeOption(optval):
+ if optval is None:
+ return optval
+ if isinstance(optval, bytes):
+ optval = optval.decode(preferredencoding())
+
+ assert isinstance(optval, str)
+ return optval
+
+
+def error_to_compat_str(err):
+ return str(err)
diff --git a/yt_dlp/utils/_utils.py b/yt_dlp/utils/_utils.py
index 89c53c39e7..8517b762ef 100644
--- a/yt_dlp/utils/_utils.py
+++ b/yt_dlp/utils/_utils.py
@@ -49,15 +49,11 @@
compat_etree_fromstring,
compat_expanduser,
compat_HTMLParseError,
- compat_os_name,
)
from ..dependencies import xattr
__name__ = __name__.rsplit('.', 1)[0] # noqa: A001: Pretend to be the parent module
-# This is not clearly defined otherwise
-compiled_regex_type = type(re.compile(''))
-
class NO_DEFAULT:
pass
@@ -874,7 +870,7 @@ def __init__(self, args, *remaining, env=None, text=False, shell=False, **kwargs
kwargs.setdefault('encoding', 'utf-8')
kwargs.setdefault('errors', 'replace')
- if shell and compat_os_name == 'nt' and kwargs.get('executable') is None:
+ if shell and os.name == 'nt' and kwargs.get('executable') is None:
if not isinstance(args, str):
args = shell_quote(args, shell=True)
shell = False
@@ -1457,7 +1453,7 @@ def system_identifier():
@functools.cache
def get_windows_version():
""" Get Windows version. returns () if it's not running on Windows """
- if compat_os_name == 'nt':
+ if os.name == 'nt':
return version_tuple(platform.win32_ver()[1])
else:
return ()
@@ -1470,7 +1466,7 @@ def write_string(s, out=None, encoding=None):
if not out:
return
- if compat_os_name == 'nt' and supports_terminal_sequences(out):
+ if os.name == 'nt' and supports_terminal_sequences(out):
s = re.sub(r'([\r\n]+)', r' \1', s)
enc, buffer = None, out
@@ -1503,21 +1499,6 @@ def deprecation_warning(msg, *, printer=None, stacklevel=0, **kwargs):
deprecation_warning._cache = set()
-def bytes_to_intlist(bs):
- if not bs:
- return []
- if isinstance(bs[0], int): # Python 3
- return list(bs)
- else:
- return [ord(c) for c in bs]
-
-
-def intlist_to_bytes(xs):
- if not xs:
- return b''
- return struct.pack('%dB' % len(xs), *xs)
-
-
class LockingUnsupportedError(OSError):
msg = 'File locking is not supported'
@@ -1701,7 +1682,7 @@ def get_filesystem_encoding():
def shell_quote(args, *, shell=False):
args = list(variadic(args))
- if compat_os_name != 'nt':
+ if os.name != 'nt':
return shlex.join(args)
trans = _CMD_QUOTE_TRANS if shell else _WINDOWS_QUOTE_TRANS
@@ -4516,7 +4497,7 @@ def urshift(val, n):
def write_xattr(path, key, value):
# Windows: Write xattrs to NTFS Alternate Data Streams:
# http://en.wikipedia.org/wiki/NTFS#Alternate_data_streams_.28ADS.29
- if compat_os_name == 'nt':
+ if os.name == 'nt':
assert ':' not in key
assert os.path.exists(path)
@@ -4778,12 +4759,12 @@ def jwt_decode_hs256(jwt):
return json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
-WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
+WINDOWS_VT_MODE = False if os.name == 'nt' else None
@functools.cache
def supports_terminal_sequences(stream):
- if compat_os_name == 'nt':
+ if os.name == 'nt':
if not WINDOWS_VT_MODE:
return False
elif not os.getenv('TERM'):
@@ -4877,7 +4858,7 @@ def parse_http_range(range):
def read_stdin(what):
if what:
- eof = 'Ctrl+Z' if compat_os_name == 'nt' else 'Ctrl+D'
+ eof = 'Ctrl+Z' if os.name == 'nt' else 'Ctrl+D'
write_string(f'Reading {what} from STDIN - EOF ({eof}) to end:\n')
return sys.stdin