diff --git a/yt_dlp/extractor/elemental_tv.py b/yt_dlp/extractor/elemental_tv.py index c85653e71a..b010aa1325 100644 --- a/yt_dlp/extractor/elemental_tv.py +++ b/yt_dlp/extractor/elemental_tv.py @@ -2,6 +2,7 @@ import re import time +from yt_dlp.networking.exceptions import HTTPError from yt_dlp.extractor.common import InfoExtractor from yt_dlp.utils import ExtractorError @@ -21,33 +22,32 @@ class ElementalTVIE(InfoExtractor): }, }] - API_URL_CHANNELS = 'https://play.elemental.tv/v1/channels' - API_URL_LOGIN = 'https://play.elemental.tv/v1/users/login' - API_URL_STREAM_URL = 'https://play.elemental.tv/v1/playlists/%s/playlist.m3u8?begin=%d&access_token=%s' - access_token = '' channel_id = '' - def get_channel_id(self, url): + def _get_channel_id(self, url): url_parts = re.search('(?<=channel/)[0-9a-f]{24}', url) if not url_parts or not url_parts.group(0): return None - return url_parts.group(0) + self.channel_id = url_parts.group(0) + self.write_debug(f'Channel ID: {self.channel_id}') - def get_stream_metadata(self): + if not self.channel_id: + raise ExtractorError('Channel ID not found') + + def _get_stream_metadata(self): try: headers = { 'Authorization': 'Bearer ' + self.access_token, } - res_api = self._download_json( - self.API_URL_CHANNELS, self.channel_id, headers=headers) - + 'https://play.elemental.tv/v1/channels', self.channel_id, headers=headers) data = res_api.get('data').get(self.channel_id) if not data: + self.write_debug('Getting metadata failed') return {} return { @@ -59,15 +59,17 @@ def get_stream_metadata(self): self.write_debug('Getting metadata failed') return {} - def get_stream_url(self): + def _get_stream_url(self): # Stream URL needs current epoch time rounded to 10000s begin = int((time.time() - 60) / 10000) * 10000 + stream_url = 'https://play.elemental.tv/v1/playlists/%s/playlist.m3u8?begin=%d&access_token=%s' % (self.channel_id, begin, self.access_token) - return self.API_URL_STREAM_URL % (self.channel_id, begin, self.access_token) + if not stream_url or '.m3u8' not in stream_url: + raise ExtractorError('Unable to get stream URL') + + return stream_url def _perform_login(self, username, password): - url = self.API_URL_LOGIN - post_data = { 'email': str(username), 'grant_type': 'client_credentials', @@ -75,40 +77,31 @@ def _perform_login(self, username, password): 'rememberme': 'true', } - post_data = json.dumps(post_data).encode() - - res_api = self._download_json(url, self.channel_id, data=post_data).get('data') + try: + res_api = self._download_json( + 'https://play.elemental.tv/v1/users/login', self.channel_id, data=json.dumps(post_data).encode()).get('data') + except ExtractorError as e: + if isinstance(e.cause, HTTPError) and e.cause.status == 400: + error_message = self._parse_json(e.cause.response.read().decode(), self.channel_id).get('error_info').get('description') + raise ExtractorError(error_message, expected=True) if not res_api or not res_api.get('access_token'): raise ExtractorError('Accessing login token failed') self.access_token = res_api.get('access_token') - token_type = res_api.get('token_type') - if token_type != 'Bearer': + if res_api.get('token_type') != 'Bearer': raise ExtractorError('Unknown login token type') def _real_extract(self, url): - if not self.access_token: - raise ExtractorError('Logging in failed') - - self.channel_id = self.get_channel_id(url) - - if not self.channel_id: - raise ExtractorError('Channel ID not found') - - self.write_debug(f'Channel ID: {self.channel_id}') - - stream_url = self.get_stream_url() - - if not stream_url or '.m3u8' not in stream_url: - raise ExtractorError('Unable to get stream URL') - + self._get_channel_id(url) + stream_url = self._get_stream_url() formats, subtitles = self._extract_m3u8_formats_and_subtitles(stream_url, self.channel_id, ext='mp4') return { 'id': self.channel_id, + 'is_live': True, 'formats': formats, 'subtitles': subtitles, - **self.get_stream_metadata(), + **self._get_stream_metadata(), }