2021-06-03 11:43:42 +02:00
|
|
|
#!/usr/bin/env python3
|
2022-06-24 16:36:16 +05:30
|
|
|
|
2016-04-23 21:30:44 +08:00
|
|
|
# Allow direct execution
|
|
|
|
import os
|
|
|
|
import sys
|
|
|
|
import unittest
|
2022-04-12 04:02:57 +05:30
|
|
|
|
2016-04-23 21:30:44 +08:00
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
|
2022-06-24 16:36:16 +05:30
|
|
|
|
2016-05-05 17:09:13 +08:00
|
|
|
import random
|
|
|
|
import subprocess
|
2022-06-24 13:40:17 +05:30
|
|
|
import urllib.request
|
2016-04-23 21:30:44 +08:00
|
|
|
|
2022-06-24 13:40:17 +05:30
|
|
|
from test.helper import FakeYDL, get_params, is_download_test
|
2016-04-23 21:30:44 +08:00
|
|
|
|
2016-05-05 17:09:13 +08:00
|
|
|
|
2021-07-23 20:18:15 +05:30
|
|
|
@is_download_test
|
2016-05-05 17:09:13 +08:00
|
|
|
class TestMultipleSocks(unittest.TestCase):
|
2016-04-23 21:30:44 +08:00
|
|
|
@staticmethod
|
|
|
|
def _check_params(attrs):
|
|
|
|
params = get_params()
|
|
|
|
for attr in attrs:
|
|
|
|
if attr not in params:
|
|
|
|
print('Missing %s. Skipping.' % attr)
|
|
|
|
return
|
|
|
|
return params
|
|
|
|
|
|
|
|
def test_proxy_http(self):
|
|
|
|
params = self._check_params(['primary_proxy', 'primary_server_ip'])
|
|
|
|
if params is None:
|
|
|
|
return
|
|
|
|
ydl = FakeYDL({
|
|
|
|
'proxy': params['primary_proxy']
|
|
|
|
})
|
|
|
|
self.assertEqual(
|
2022-05-09 17:24:28 +05:30
|
|
|
ydl.urlopen('http://yt-dl.org/ip').read().decode(),
|
2016-04-23 21:30:44 +08:00
|
|
|
params['primary_server_ip'])
|
|
|
|
|
|
|
|
def test_proxy_https(self):
|
|
|
|
params = self._check_params(['primary_proxy', 'primary_server_ip'])
|
|
|
|
if params is None:
|
|
|
|
return
|
|
|
|
ydl = FakeYDL({
|
|
|
|
'proxy': params['primary_proxy']
|
|
|
|
})
|
|
|
|
self.assertEqual(
|
2022-05-09 17:24:28 +05:30
|
|
|
ydl.urlopen('https://yt-dl.org/ip').read().decode(),
|
2016-04-23 21:30:44 +08:00
|
|
|
params['primary_server_ip'])
|
|
|
|
|
|
|
|
def test_secondary_proxy_http(self):
|
|
|
|
params = self._check_params(['secondary_proxy', 'secondary_server_ip'])
|
|
|
|
if params is None:
|
|
|
|
return
|
|
|
|
ydl = FakeYDL()
|
2022-06-24 13:40:17 +05:30
|
|
|
req = urllib.request.Request('http://yt-dl.org/ip')
|
2016-04-23 21:30:44 +08:00
|
|
|
req.add_header('Ytdl-request-proxy', params['secondary_proxy'])
|
|
|
|
self.assertEqual(
|
2022-05-09 17:24:28 +05:30
|
|
|
ydl.urlopen(req).read().decode(),
|
2016-04-23 21:30:44 +08:00
|
|
|
params['secondary_server_ip'])
|
|
|
|
|
|
|
|
def test_secondary_proxy_https(self):
|
|
|
|
params = self._check_params(['secondary_proxy', 'secondary_server_ip'])
|
|
|
|
if params is None:
|
|
|
|
return
|
|
|
|
ydl = FakeYDL()
|
2022-06-24 13:40:17 +05:30
|
|
|
req = urllib.request.Request('https://yt-dl.org/ip')
|
2016-04-23 21:30:44 +08:00
|
|
|
req.add_header('Ytdl-request-proxy', params['secondary_proxy'])
|
|
|
|
self.assertEqual(
|
2022-05-09 17:24:28 +05:30
|
|
|
ydl.urlopen(req).read().decode(),
|
2016-04-23 21:30:44 +08:00
|
|
|
params['secondary_server_ip'])
|
|
|
|
|
|
|
|
|
2021-07-23 20:18:15 +05:30
|
|
|
@is_download_test
|
2016-05-05 17:09:13 +08:00
|
|
|
class TestSocks(unittest.TestCase):
|
2016-05-14 18:48:36 +08:00
|
|
|
_SKIP_SOCKS_TEST = True
|
|
|
|
|
2016-05-05 17:09:13 +08:00
|
|
|
def setUp(self):
|
2016-05-14 18:48:36 +08:00
|
|
|
if self._SKIP_SOCKS_TEST:
|
|
|
|
return
|
|
|
|
|
2016-05-08 15:16:32 +08:00
|
|
|
self.port = random.randint(20000, 30000)
|
2016-05-05 17:09:13 +08:00
|
|
|
self.server_process = subprocess.Popen([
|
|
|
|
'srelay', '-f', '-i', '127.0.0.1:%d' % self.port],
|
|
|
|
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
|
|
|
|
def tearDown(self):
|
2016-05-14 18:48:36 +08:00
|
|
|
if self._SKIP_SOCKS_TEST:
|
|
|
|
return
|
|
|
|
|
2016-05-05 17:09:13 +08:00
|
|
|
self.server_process.terminate()
|
|
|
|
self.server_process.communicate()
|
|
|
|
|
|
|
|
def _get_ip(self, protocol):
|
2016-05-14 18:48:36 +08:00
|
|
|
if self._SKIP_SOCKS_TEST:
|
|
|
|
return '127.0.0.1'
|
|
|
|
|
2016-05-05 17:09:13 +08:00
|
|
|
ydl = FakeYDL({
|
|
|
|
'proxy': '%s://127.0.0.1:%d' % (protocol, self.port),
|
|
|
|
})
|
2022-05-09 17:24:28 +05:30
|
|
|
return ydl.urlopen('http://yt-dl.org/ip').read().decode()
|
2016-05-05 17:09:13 +08:00
|
|
|
|
|
|
|
def test_socks4(self):
|
2022-06-24 16:24:43 +05:30
|
|
|
self.assertTrue(isinstance(self._get_ip('socks4'), str))
|
2016-05-05 17:09:13 +08:00
|
|
|
|
|
|
|
def test_socks4a(self):
|
2022-06-24 16:24:43 +05:30
|
|
|
self.assertTrue(isinstance(self._get_ip('socks4a'), str))
|
2016-05-05 17:09:13 +08:00
|
|
|
|
|
|
|
def test_socks5(self):
|
2022-06-24 16:24:43 +05:30
|
|
|
self.assertTrue(isinstance(self._get_ip('socks5'), str))
|
2016-05-05 17:09:13 +08:00
|
|
|
|
|
|
|
|
2016-04-23 21:30:44 +08:00
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|