mirror of
synced 2025-02-03 20:55:25 +01:00
158 lines
6.4 KiB
158 lines
6.4 KiB
import json
import logging
import re
import secrets
import urllib
from urllib import parse
import aiohttp as aiohttp
from yarl import URL
from pyhon import const
_LOGGER = logging.getLogger()
class HonAuth:
def __init__(self) -> None:
self._access_token = ""
self._refresh_token = ""
self._cognito_token = ""
self._id_token = ""
def cognito_token(self):
return self._cognito_token
def id_token(self):
return self._id_token
def access_token(self):
return self._access_token
def refresh_token(self):
return self._refresh_token
async def _load_login(self, session):
nonce = secrets.token_hex(16)
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
params = {
"response_type": "token+id_token",
"client_id": const.CLIENT_ID,
"redirect_uri": urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done"),
"display": "touch",
"scope": "api openid refresh_token web",
"nonce": nonce
params = "&".join([f"{k}={v}" for k, v in params.items()])
async with session.get(f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}") as resp:
if not (login_url := re.findall("url = '(.+?)'", await resp.text())):
return False
async with session.get(login_url[0], allow_redirects=False) as redirect1:
if not (url := redirect1.headers.get("Location")):
return False
async with session.get(url, allow_redirects=False) as redirect2:
if not (url := redirect2.headers.get("Location") + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"):
return False
async with session.get(URL(url, encoded=True)) as login_screen:
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()):
fw_uid, loaded_str = context[0]
loaded = json.loads(loaded_str)
login_url = login_url[0].replace("/".join(const.AUTH_API.split("/")[:-1]), "")
return fw_uid, loaded, login_url
return False
async def _login(self, session, email, password, fw_uid, loaded, login_url):
data = {
"message": {
"actions": [
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": email,
"password": password,
"startUrl": parse.unquote(login_url.split("startURL=")[-1]).split("%3D")[0]
"aura.context": {
"mode": "PROD",
"fwuid": fw_uid,
"app": "siteforce:loginApp2",
"loaded": loaded,
"dn": [],
"globals": {},
"uad": False},
"aura.pageURI": login_url,
"aura.token": None}
params = {"r": 3, "other.LightningLoginCustom.login": 1}
async with session.post(
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
) as response:
if response.status == 200:
return (await response.json())["events"][0]["attributes"]["values"]["url"]
except json.JSONDecodeError:
_LOGGER.error("Unable to login: %s\n%s", response.status, await response.text())
return ""
async def _get_token(self, session, url):
async with session.get(url) as resp:
if resp.status != 200:
_LOGGER.error("Unable to get token: %s", resp.status)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text())
async with session.get(url[0]) as resp:
if resp.status != 200:
_LOGGER.error("Unable to get token: %s", resp.status)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with session.get(url) as resp:
if resp.status != 200:
_LOGGER.error("Unable to connect to the login service: %s", resp.status)
return False
text = await resp.text()
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
return True
async def authorize(self, email, password, mobile_id):
headers = {"user-agent": const.USER_AGENT}
async with aiohttp.ClientSession(headers=headers) as session:
if login_site := await self._load_login(session):
fw_uid, loaded, login_url = login_site
return False
if not (url := await self._login(session, email, password, fw_uid, loaded, login_url)):
return False
if not await self._get_token(session, url):
return False
post_headers = {"Content-Type": "application/json", "id-token": self._id_token}
data = {"appVersion": const.APP_VERSION, "mobileId": mobile_id, "osVersion": const.OS_VERSION,
"os": const.OS, "deviceModel": const.DEVICE_MODEL}
async with session.post(f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data) as resp:
json_data = await resp.json()
except json.JSONDecodeError:
_LOGGER.error("No JSON Data after POST: %s", await resp.text())
return False
self._cognito_token = json_data["cognitoUser"]["Token"]
return True