Improve geo bypass mechanism

* Introduce geo bypass context
* Add ability to bypass based on IP blocks in CIDR notation
* Introduce --geo-bypass-ip-block
This commit is contained in:
Sergey M․ 2018-05-02 07:18:01 +07:00
parent a93ce61bd5
commit 5f95927a62
No known key found for this signature in database
GPG Key ID: 2C393E0F18A9236D
11 changed files with 113 additions and 28 deletions

View File

@ -286,6 +286,9 @@ class YoutubeDL(object):
Two-letter ISO 3166-2 country code that will be used for Two-letter ISO 3166-2 country code that will be used for
explicit geographic restriction bypassing via faking explicit geographic restriction bypassing via faking
X-Forwarded-For HTTP header (experimental) X-Forwarded-For HTTP header (experimental)
geo_bypass_ip_block:
IP range in CIDR notation that will be used similarly to
geo_bypass_country (experimental)
The following options determine which downloader is picked: The following options determine which downloader is picked:
external_downloader: Executable of the external downloader to call. external_downloader: Executable of the external downloader to call.

View File

@ -430,6 +430,7 @@ def parse_retries(retries):
'config_location': opts.config_location, 'config_location': opts.config_location,
'geo_bypass': opts.geo_bypass, 'geo_bypass': opts.geo_bypass,
'geo_bypass_country': opts.geo_bypass_country, 'geo_bypass_country': opts.geo_bypass_country,
'geo_bypass_ip_block': opts.geo_bypass_ip_block,
# just for deprecation check # just for deprecation check
'autonumber': opts.autonumber if opts.autonumber is True else None, 'autonumber': opts.autonumber if opts.autonumber is True else None,
'usetitle': opts.usetitle if opts.usetitle is True else None, 'usetitle': opts.usetitle if opts.usetitle is True else None,

View File

@ -277,7 +277,9 @@ def _extract_anvato_videos(self, webpage, video_id):
def _real_extract(self, url): def _real_extract(self, url):
url, smuggled_data = unsmuggle_url(url, {}) url, smuggled_data = unsmuggle_url(url, {})
self._initialize_geo_bypass(smuggled_data.get('geo_countries')) self._initialize_geo_bypass({
'countries': smuggled_data.get('geo_countries'),
})
mobj = re.match(self._VALID_URL, url) mobj = re.match(self._VALID_URL, url)
access_key, video_id = mobj.group('access_key_or_mcp', 'id') access_key, video_id = mobj.group('access_key_or_mcp', 'id')

View File

@ -669,7 +669,10 @@ def build_format_id(kind):
def _real_extract(self, url): def _real_extract(self, url):
url, smuggled_data = unsmuggle_url(url, {}) url, smuggled_data = unsmuggle_url(url, {})
self._initialize_geo_bypass(smuggled_data.get('geo_countries')) self._initialize_geo_bypass({
'countries': smuggled_data.get('geo_countries'),
'ip_blocks': smuggled_data.get('geo_ip_blocks'),
})
account_id, player_id, embed, video_id = re.match(self._VALID_URL, url).groups() account_id, player_id, embed, video_id = re.match(self._VALID_URL, url).groups()

View File

@ -346,6 +346,11 @@ class InfoExtractor(object):
geo restriction bypass mechanism right away in order to bypass geo restriction bypass mechanism right away in order to bypass
geo restriction, of course, if the mechanism is not disabled. (experimental) geo restriction, of course, if the mechanism is not disabled. (experimental)
_GEO_IP_BLOCKS attribute may contain a list of presumably geo unrestricted
IP blocks in CIDR notation for this extractor. One of these IP blocks
will be used by geo restriction bypass mechanism similarly
to _GEO_COUNTRIES. (experimental)
NB: both these geo attributes are experimental and may change in future NB: both these geo attributes are experimental and may change in future
or be completely removed. or be completely removed.
@ -358,6 +363,7 @@ class InfoExtractor(object):
_x_forwarded_for_ip = None _x_forwarded_for_ip = None
_GEO_BYPASS = True _GEO_BYPASS = True
_GEO_COUNTRIES = None _GEO_COUNTRIES = None
_GEO_IP_BLOCKS = None
_WORKING = True _WORKING = True
def __init__(self, downloader=None): def __init__(self, downloader=None):
@ -392,12 +398,15 @@ def working(cls):
def initialize(self): def initialize(self):
"""Initializes an instance (authentication, etc).""" """Initializes an instance (authentication, etc)."""
self._initialize_geo_bypass(self._GEO_COUNTRIES) self._initialize_geo_bypass({
'countries': self._GEO_COUNTRIES,
'ip_blocks': self._GEO_IP_BLOCKS,
})
if not self._ready: if not self._ready:
self._real_initialize() self._real_initialize()
self._ready = True self._ready = True
def _initialize_geo_bypass(self, countries): def _initialize_geo_bypass(self, geo_bypass_context):
""" """
Initialize geo restriction bypass mechanism. Initialize geo restriction bypass mechanism.
@ -408,28 +417,82 @@ def _initialize_geo_bypass(self, countries):
HTTP requests. HTTP requests.
This method will be used for initial geo bypass mechanism initialization This method will be used for initial geo bypass mechanism initialization
during the instance initialization with _GEO_COUNTRIES. during the instance initialization with _GEO_COUNTRIES and
_GEO_IP_BLOCKS.
You may also manually call it from extractor's code if geo countries You may also manually call it from extractor's code if geo bypass
information is not available beforehand (e.g. obtained during information is not available beforehand (e.g. obtained during
extraction) or due to some another reason. extraction) or due to some other reason. In this case you should pass
this information in geo bypass context passed as first argument. It may
contain following fields:
countries: List of geo unrestricted countries (similar
to _GEO_COUNTRIES)
ip_blocks: List of geo unrestricted IP blocks in CIDR notation
(similar to _GEO_IP_BLOCKS)
""" """
if not self._x_forwarded_for_ip: if not self._x_forwarded_for_ip:
country_code = self._downloader.params.get('geo_bypass_country', None)
# If there is no explicit country for geo bypass specified and # Geo bypass mechanism is explicitly disabled by user
# the extractor is known to be geo restricted let's fake IP if not self._downloader.params.get('geo_bypass', True):
# as X-Forwarded-For right away. return
if (not country_code and
self._GEO_BYPASS and if not geo_bypass_context:
self._downloader.params.get('geo_bypass', True) and geo_bypass_context = {}
countries):
country_code = random.choice(countries) # Backward compatibility: previously _initialize_geo_bypass
if country_code: # expected a list of countries, some 3rd party code may still use
self._x_forwarded_for_ip = GeoUtils.random_ipv4(country_code) # it this way
if isinstance(geo_bypass_context, (list, tuple)):
geo_bypass_context = {
'countries': geo_bypass_context,
}
# The whole point of geo bypass mechanism is to fake IP
# as X-Forwarded-For HTTP header based on some IP block or
# country code.
# Path 1: bypassing based on IP block in CIDR notation
# Explicit IP block specified by user, use it right away
# regardless of whether extractor is geo bypassable or not
ip_block = self._downloader.params.get('geo_bypass_ip_block', None)
# Otherwise use random IP block from geo bypass context but only
# if extractor is known as geo bypassable
if not ip_block:
ip_blocks = geo_bypass_context.get('ip_blocks')
if self._GEO_BYPASS and ip_blocks:
ip_block = random.choice(ip_blocks)
if ip_block:
self._x_forwarded_for_ip = GeoUtils.random_ipv4(ip_block)
if self._downloader.params.get('verbose', False):
self._downloader.to_screen(
'[debug] Using fake IP %s as X-Forwarded-For.'
% self._x_forwarded_for_ip)
return
# Path 2: bypassing based on country code
# Explicit country code specified by user, use it right away
# regardless of whether extractor is geo bypassable or not
country = self._downloader.params.get('geo_bypass_country', None)
# Otherwise use random country code from geo bypass context but
# only if extractor is known as geo bypassable
if not country:
countries = geo_bypass_context.get('countries')
if self._GEO_BYPASS and countries:
country = random.choice(countries)
if country:
self._x_forwarded_for_ip = GeoUtils.random_ipv4(country)
if self._downloader.params.get('verbose', False): if self._downloader.params.get('verbose', False):
self._downloader.to_screen( self._downloader.to_screen(
'[debug] Using fake IP %s (%s) as X-Forwarded-For.' '[debug] Using fake IP %s (%s) as X-Forwarded-For.'
% (self._x_forwarded_for_ip, country_code.upper())) % (self._x_forwarded_for_ip, country.upper()))
def extract(self, url): def extract(self, url):
"""Extracts URL information and returns it in list of dicts.""" """Extracts URL information and returns it in list of dicts."""

View File

@ -102,7 +102,9 @@ def _real_extract(self, url):
display_id = mobj.group('id') display_id = mobj.group('id')
domain = mobj.group('domain') domain = mobj.group('domain')
self._initialize_geo_bypass([mobj.group('country').upper()]) self._initialize_geo_bypass({
'countries': [mobj.group('country').upper()],
})
webpage = self._download_webpage(url, display_id) webpage = self._download_webpage(url, display_id)

View File

@ -123,7 +123,7 @@ def _real_extract(self, url):
'adobe_requestor_id': requestor_id, 'adobe_requestor_id': requestor_id,
}) })
else: else:
self._initialize_geo_bypass(['US']) self._initialize_geo_bypass({'countries': ['US']})
entitlement = self._download_json( entitlement = self._download_json(
'https://api.entitlement.watchabc.go.com/vp2/ws-secure/entitlement/2020/authorize.json', 'https://api.entitlement.watchabc.go.com/vp2/ws-secure/entitlement/2020/authorize.json',
video_id, data=urlencode_postdata(data)) video_id, data=urlencode_postdata(data))

View File

@ -282,7 +282,9 @@ class LimelightMediaIE(LimelightBaseIE):
def _real_extract(self, url): def _real_extract(self, url):
url, smuggled_data = unsmuggle_url(url, {}) url, smuggled_data = unsmuggle_url(url, {})
video_id = self._match_id(url) video_id = self._match_id(url)
self._initialize_geo_bypass(smuggled_data.get('geo_countries')) self._initialize_geo_bypass({
'countries': smuggled_data.get('geo_countries'),
})
pc, mobile, metadata = self._extract( pc, mobile, metadata = self._extract(
video_id, 'getPlaylistByMediaId', video_id, 'getPlaylistByMediaId',

View File

@ -227,14 +227,16 @@ class TVPlayIE(InfoExtractor):
def _real_extract(self, url): def _real_extract(self, url):
url, smuggled_data = unsmuggle_url(url, {}) url, smuggled_data = unsmuggle_url(url, {})
self._initialize_geo_bypass(smuggled_data.get('geo_countries')) self._initialize_geo_bypass({
'countries': smuggled_data.get('geo_countries'),
})
video_id = self._match_id(url) video_id = self._match_id(url)
geo_country = self._search_regex( geo_country = self._search_regex(
r'https?://[^/]+\.([a-z]{2})', url, r'https?://[^/]+\.([a-z]{2})', url,
'geo country', default=None) 'geo country', default=None)
if geo_country: if geo_country:
self._initialize_geo_bypass([geo_country.upper()]) self._initialize_geo_bypass({'countries': [geo_country.upper()]})
video = self._download_json( video = self._download_json(
'http://playapi.mtgx.tv/v3/videos/%s' % video_id, video_id, 'Downloading video JSON') 'http://playapi.mtgx.tv/v3/videos/%s' % video_id, video_id, 'Downloading video JSON')

View File

@ -249,6 +249,10 @@ def _comma_separated_values_options_callback(option, opt_str, value, parser):
'--geo-bypass-country', metavar='CODE', '--geo-bypass-country', metavar='CODE',
dest='geo_bypass_country', default=None, dest='geo_bypass_country', default=None,
help='Force bypass geographic restriction with explicitly provided two-letter ISO 3166-2 country code (experimental)') help='Force bypass geographic restriction with explicitly provided two-letter ISO 3166-2 country code (experimental)')
geo.add_option(
'--geo-bypass-ip-block', metavar='IP_BLOCK',
dest='geo_bypass_ip_block', default=None,
help='Force bypass geographic restriction with explicitly provided IP block in CIDR notation (experimental)')
selection = optparse.OptionGroup(parser, 'Video Selection') selection = optparse.OptionGroup(parser, 'Video Selection')
selection.add_option( selection.add_option(

View File

@ -3534,10 +3534,13 @@ class GeoUtils(object):
} }
@classmethod @classmethod
def random_ipv4(cls, code): def random_ipv4(cls, code_or_block):
block = cls._country_ip_map.get(code.upper()) if len(code_or_block) == 2:
if not block: block = cls._country_ip_map.get(code_or_block.upper())
return None if not block:
return None
else:
block = code_or_block
addr, preflen = block.split('/') addr, preflen = block.split('/')
addr_min = compat_struct_unpack('!L', socket.inet_aton(addr))[0] addr_min = compat_struct_unpack('!L', socket.inet_aton(addr))[0]
addr_max = addr_min | (0xffffffff >> int(preflen)) addr_max = addr_min | (0xffffffff >> int(preflen))