Clean up authentication
This commit is contained in:
		
							
								
								
									
										166
									
								
								pyhon/auth.py
									
									
									
									
									
								
							
							
						
						
									
										166
									
								
								pyhon/auth.py
									
									
									
									
									
								
							| @ -6,6 +6,7 @@ import urllib | ||||
| from urllib import parse | ||||
|  | ||||
| import aiohttp as aiohttp | ||||
| from yarl import URL | ||||
|  | ||||
| from pyhon import const | ||||
|  | ||||
| @ -14,7 +15,8 @@ _LOGGER = logging.getLogger() | ||||
|  | ||||
| class HonAuth: | ||||
|     def __init__(self) -> None: | ||||
|         self._framework = "" | ||||
|         self._access_token = "" | ||||
|         self._refresh_token = "" | ||||
|         self._cognito_token = "" | ||||
|         self._id_token = "" | ||||
|  | ||||
| @ -26,71 +28,15 @@ class HonAuth: | ||||
|     def id_token(self): | ||||
|         return self._id_token | ||||
|  | ||||
|     async def _get_frontdoor_url(self, session, email, password): | ||||
|         data = { | ||||
|             "message": { | ||||
|                 "actions": [ | ||||
|                     { | ||||
|                         "id": "79;a", | ||||
|                         "descriptor": "apex://LightningLoginCustomController/ACTION$login", | ||||
|                         "callingDescriptor": "markup://c:loginForm", | ||||
|                         "params": { | ||||
|                             "username": email, | ||||
|                             "password": password, | ||||
|                             "startUrl": "" | ||||
|                         } | ||||
|                     } | ||||
|                 ] | ||||
|             }, | ||||
|             "aura.context": { | ||||
|                 "mode": "PROD", | ||||
|                 "fwuid": self._framework, | ||||
|                 "app": "siteforce:loginApp2", | ||||
|                 "loaded": {"APPLICATION@markup://siteforce:loginApp2": "YtNc5oyHTOvavSB9Q4rtag"}, | ||||
|                 "dn": [], | ||||
|                 "globals": {}, | ||||
|                 "uad": False}, | ||||
|             "aura.pageURI": f"SmartHome/s/login/?language={const.LANGUAGE}", | ||||
|             "aura.token": None} | ||||
|     @property | ||||
|     def access_token(self): | ||||
|         return self._access_token | ||||
|  | ||||
|         params = {"r": 3, "other.LightningLoginCustom.login": 1} | ||||
|         async with session.post( | ||||
|                 const.AUTH_API + "/s/sfsites/aura", | ||||
|                 headers={"Content-Type": "application/x-www-form-urlencoded"}, | ||||
|                 data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), | ||||
|                 params=params | ||||
|         ) as response: | ||||
|             if response.status != 200: | ||||
|                 _LOGGER.error("Unable to connect to the login service: %s\n%s", response.status, await response.text()) | ||||
|                 return "" | ||||
|             try: | ||||
|                 text = await response.text() | ||||
|                 return (await response.json())["events"][0]["attributes"]["values"]["url"] | ||||
|             except json.JSONDecodeError: | ||||
|                 if framework := re.findall('clientOutOfSync.*?Expected: ([\\w-]+?) Actual: (.*?)"', text): | ||||
|                     self._framework, actual = framework[0] | ||||
|                     _LOGGER.debug('Framework update from "%s" to "%s"', self._framework, actual) | ||||
|                     return await self._get_frontdoor_url(session, email, password) | ||||
|                 _LOGGER.error("Unable to retrieve the frontdoor URL. Message: " + text) | ||||
|                 return "" | ||||
|     @property | ||||
|     def refresh_token(self): | ||||
|         return self._refresh_token | ||||
|  | ||||
|     async def _prepare_login(self, session, email, password): | ||||
|         if not (frontdoor_url := await self._get_frontdoor_url(session, email, password)): | ||||
|             return False | ||||
|  | ||||
|         async with session.get(frontdoor_url) as resp: | ||||
|             if resp.status != 200: | ||||
|                 _LOGGER.error("Unable to connect to the login service: %s", resp.status) | ||||
|                 return False | ||||
|  | ||||
|         params = {"retURL": "/SmartHome/apex/CustomCommunitiesLanding"} | ||||
|         async with session.get(f"{const.AUTH_API}/apex/ProgressiveLogin", params=params) as resp: | ||||
|             if resp.status != 200: | ||||
|                 _LOGGER.error("Unable to connect to the login service: %s", resp.status) | ||||
|                 return False | ||||
|         return True | ||||
|  | ||||
|     async def _login(self, session): | ||||
|     async def _load_login(self, session): | ||||
|         nonce = secrets.token_hex(16) | ||||
|         nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" | ||||
|         params = { | ||||
| @ -101,18 +47,98 @@ class HonAuth: | ||||
|             "scope": "api openid refresh_token web", | ||||
|             "nonce": nonce | ||||
|         } | ||||
|         headers = {"user-agent": "Chrome/110.0.5481.153"} | ||||
|         params = "&".join([f"{k}={v}" for k, v in params.items()]) | ||||
|         async with session.get(f"{const.AUTH_API}/services/oauth2/authorize?{params}") as resp: | ||||
|             if id_token := re.findall("id_token=(.*?)&", await resp.text()): | ||||
|                 self._id_token = id_token[0] | ||||
|                 return True | ||||
|         async with session.get(f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}") as resp: | ||||
|             if not (login_url := re.findall("url = '(.+?)'", await resp.text())): | ||||
|                 return False | ||||
|         async with session.get(login_url[0], allow_redirects=False) as redirect1: | ||||
|             if not (url := redirect1.headers.get("Location")): | ||||
|                 return False | ||||
|         async with session.get(url, allow_redirects=False) as redirect2: | ||||
|             if not (url := redirect2.headers.get("Location") + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"): | ||||
|                 return False | ||||
|         async with session.get(URL(url, encoded=True), headers=headers) as login_screen: | ||||
|             if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()): | ||||
|                 fw_uid, loaded_str = context[0] | ||||
|                 loaded = json.loads(loaded_str) | ||||
|                 login_url = login_url[0].replace("/".join(const.AUTH_API.split("/")[:-1]), "") | ||||
|                 return fw_uid, loaded, login_url | ||||
|         return False | ||||
|  | ||||
|     async def _login(self, session, email, password, fw_uid, loaded, login_url): | ||||
|         data = { | ||||
|             "message": { | ||||
|                 "actions": [ | ||||
|                     { | ||||
|                         "id": "79;a", | ||||
|                         "descriptor": "apex://LightningLoginCustomController/ACTION$login", | ||||
|                         "callingDescriptor": "markup://c:loginForm", | ||||
|                         "params": { | ||||
|                             "username": email, | ||||
|                             "password": password, | ||||
|                             "startUrl": parse.unquote(login_url.split("startURL=")[-1]).split("%3D")[0] | ||||
|                         } | ||||
|                     } | ||||
|                 ] | ||||
|             }, | ||||
|             "aura.context": { | ||||
|                 "mode": "PROD", | ||||
|                 "fwuid": fw_uid, | ||||
|                 "app": "siteforce:loginApp2", | ||||
|                 "loaded": loaded, | ||||
|                 "dn": [], | ||||
|                 "globals": {}, | ||||
|                 "uad": False}, | ||||
|             "aura.pageURI": login_url, | ||||
|             "aura.token": None} | ||||
|  | ||||
|         params = {"r": 3, "other.LightningLoginCustom.login": 1} | ||||
|         async with session.post( | ||||
|                 const.AUTH_API + "/s/sfsites/aura", | ||||
|                 headers={"Content-Type": "application/x-www-form-urlencoded"}, | ||||
|                 data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), | ||||
|                 params=params | ||||
|         ) as response: | ||||
|             if response.status == 200: | ||||
|                 try: | ||||
|                     return (await response.json())["events"][0]["attributes"]["values"]["url"] | ||||
|                 except json.JSONDecodeError: | ||||
|                     pass | ||||
|             _LOGGER.error("Unable to login: %s\n%s", response.status, await response.text()) | ||||
|             return "" | ||||
|  | ||||
|     async def _get_token(self, session, url): | ||||
|         async with session.get(url) as resp: | ||||
|             if resp.status != 200: | ||||
|                 _LOGGER.error("Unable to get token: %s", resp.status) | ||||
|                 return False | ||||
|             url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) | ||||
|         async with session.get(url[0]) as resp: | ||||
|             if resp.status != 200: | ||||
|                 _LOGGER.error("Unable to get token: %s", resp.status) | ||||
|                 return False | ||||
|             url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) | ||||
|             url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] | ||||
|         async with session.get(url) as resp: | ||||
|             if resp.status != 200: | ||||
|                 _LOGGER.error("Unable to connect to the login service: %s", resp.status) | ||||
|                 return False | ||||
|             text = await resp.text() | ||||
|         if access_token := re.findall("access_token=(.*?)&", text): | ||||
|             self._access_token = access_token[0] | ||||
|         if refresh_token := re.findall("refresh_token=(.*?)&", text): | ||||
|             self._refresh_token = refresh_token[0] | ||||
|         if id_token := re.findall("id_token=(.*?)&", text): | ||||
|             self._id_token = id_token[0] | ||||
|         return True | ||||
|  | ||||
|     async def authorize(self, email, password, mobile_id): | ||||
|         async with aiohttp.ClientSession() as session: | ||||
|             if not await self._prepare_login(session, email, password): | ||||
|             fw_uid, loaded, login_url = await self._load_login(session) | ||||
|             if not (url := await self._login(session, email, password, fw_uid, loaded, login_url)): | ||||
|                 return False | ||||
|             if not await self._login(session): | ||||
|             if not await self._get_token(session, url): | ||||
|                 return False | ||||
|  | ||||
|             post_headers = {"Content-Type": "application/json", "id-token": self._id_token} | ||||
|  | ||||
		Reference in New Issue
	
	Block a user