Compare commits
	
		
			3 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| b4b782c52c | |||
| e857fe91de | |||
| 79c9678492 | 
							
								
								
									
										2
									
								
								.github/workflows/python-check.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/python-check.yml
									
									
									
									
										vendored
									
									
								
							| @ -2,7 +2,7 @@ name: Python check | ||||
|  | ||||
| on: | ||||
|   push: | ||||
|     branches: [ "main", "refactor" ] | ||||
|     branches: [ "main" ] | ||||
|   pull_request: | ||||
|     branches: [ "main" ] | ||||
|  | ||||
|  | ||||
| @ -9,8 +9,9 @@ from urllib import parse | ||||
| from yarl import URL | ||||
|  | ||||
| from pyhon import const | ||||
| from pyhon.exceptions import HonAuthenticationError | ||||
|  | ||||
| _LOGGER = logging.getLogger() | ||||
| _LOGGER = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| class HonAuth: | ||||
| @ -23,6 +24,7 @@ class HonAuth: | ||||
|         self._cognito_token = "" | ||||
|         self._id_token = "" | ||||
|         self._device = device | ||||
|         self._called_urls = [] | ||||
|  | ||||
|     @property | ||||
|     def cognito_token(self): | ||||
| @ -40,6 +42,16 @@ class HonAuth: | ||||
|     def refresh_token(self): | ||||
|         return self._refresh_token | ||||
|  | ||||
|     async def _error_logger(self, response, fail=True): | ||||
|         result = "hOn Authentication Error\n" | ||||
|         for i, (status, url) in enumerate(self._called_urls): | ||||
|             result += f" {i + 1: 2d}     {status} - {url}\n" | ||||
|         result += f"ERROR - {response.status} - {response.request_info.url}\n" | ||||
|         result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}" | ||||
|         _LOGGER.error(result) | ||||
|         if fail: | ||||
|             raise HonAuthenticationError("Can't login") | ||||
|  | ||||
|     async def _load_login(self): | ||||
|         nonce = secrets.token_hex(16) | ||||
|         nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" | ||||
| @ -56,19 +68,28 @@ class HonAuth: | ||||
|         params = "&".join([f"{k}={v}" for k, v in params.items()]) | ||||
|         async with self._session.get( | ||||
|             f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" | ||||
|         ) as resp: | ||||
|             if not (login_url := re.findall("url = '(.+?)'", await resp.text())): | ||||
|         ) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             if not (login_url := re.findall("url = '(.+?)'", await response.text())): | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|         async with self._session.get(login_url[0], allow_redirects=False) as redirect1: | ||||
|             self._called_urls.append((redirect1.status, redirect1.request_info.url)) | ||||
|             if not (url := redirect1.headers.get("Location")): | ||||
|                 await self._error_logger(redirect1) | ||||
|                 return False | ||||
|         async with self._session.get(url, allow_redirects=False) as redirect2: | ||||
|             self._called_urls.append((redirect2.status, redirect2.request_info.url)) | ||||
|             if not ( | ||||
|                 url := redirect2.headers.get("Location") | ||||
|                 + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" | ||||
|             ): | ||||
|                 await self._error_logger(redirect2) | ||||
|                 return False | ||||
|         async with self._session.get(URL(url, encoded=True)) as login_screen: | ||||
|             self._called_urls.append( | ||||
|                 (login_screen.status, login_screen.request_info.url) | ||||
|             ) | ||||
|             if context := re.findall( | ||||
|                 '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() | ||||
|             ): | ||||
| @ -78,6 +99,7 @@ class HonAuth: | ||||
|                     "/".join(const.AUTH_API.split("/")[:-1]), "" | ||||
|                 ) | ||||
|                 return fw_uid, loaded, login_url | ||||
|             await self._error_logger(login_screen) | ||||
|         return False | ||||
|  | ||||
|     async def _login(self, fw_uid, loaded, login_url): | ||||
| @ -117,6 +139,7 @@ class HonAuth: | ||||
|             data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), | ||||
|             params=params, | ||||
|         ) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             if response.status == 200: | ||||
|                 try: | ||||
|                     data = await response.json() | ||||
| @ -127,31 +150,33 @@ class HonAuth: | ||||
|                     _LOGGER.error( | ||||
|                         "Can't get login url - %s", pformat(await response.json()) | ||||
|                     ) | ||||
|             _LOGGER.error( | ||||
|                 "Unable to login: %s\n%s", response.status, await response.text() | ||||
|             ) | ||||
|             await self._error_logger(response) | ||||
|             return "" | ||||
|  | ||||
|     async def _get_token(self, url): | ||||
|         async with self._session.get(url) as resp: | ||||
|             if resp.status != 200: | ||||
|                 _LOGGER.error("Unable to get token: %s", resp.status) | ||||
|         async with self._session.get(url) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             if response.status != 200: | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|             url = re.findall("href\\s*=\\s*[\"'](http.+?)[\"']", await resp.text()) | ||||
|             url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text()) | ||||
|             if not url: | ||||
|                 _LOGGER.error("Can't get login url - \n%s", await resp.text()) | ||||
|                 raise PermissionError | ||||
|         async with self._session.get(url[0]) as resp: | ||||
|             if resp.status != 200: | ||||
|                 _LOGGER.error("Unable to get token: %s", resp.status) | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|             url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) | ||||
|         if "ProgressiveLogin" in url[0]: | ||||
|             async with self._session.get(url[0]) as response: | ||||
|                 self._called_urls.append((response.status, response.request_info.url)) | ||||
|                 if response.status != 200: | ||||
|                     await self._error_logger(response) | ||||
|                     return False | ||||
|                 url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text()) | ||||
|         url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] | ||||
|         async with self._session.get(url) as resp: | ||||
|             if resp.status != 200: | ||||
|                 _LOGGER.error("Unable to connect to the login service: %s", resp.status) | ||||
|         async with self._session.get(url) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             if response.status != 200: | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|             text = await resp.text() | ||||
|             text = await response.text() | ||||
|         if access_token := re.findall("access_token=(.*?)&", text): | ||||
|             self._access_token = access_token[0] | ||||
|         if refresh_token := re.findall("refresh_token=(.*?)&", text): | ||||
| @ -174,11 +199,12 @@ class HonAuth: | ||||
|         data = self._device.get() | ||||
|         async with self._session.post( | ||||
|             f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data | ||||
|         ) as resp: | ||||
|         ) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             try: | ||||
|                 json_data = await resp.json() | ||||
|                 json_data = await response.json() | ||||
|             except json.JSONDecodeError: | ||||
|                 _LOGGER.error("No JSON Data after POST: %s", await resp.text()) | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|             self._cognito_token = json_data["cognitoUser"]["Token"] | ||||
|         return True | ||||
| @ -191,10 +217,12 @@ class HonAuth: | ||||
|         } | ||||
|         async with self._session.post( | ||||
|             f"{const.AUTH_API}/services/oauth2/token", params=params | ||||
|         ) as resp: | ||||
|             if resp.status >= 400: | ||||
|         ) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             if response.status >= 400: | ||||
|                 await self._error_logger(response, fail=False) | ||||
|                 return False | ||||
|             data = await resp.json() | ||||
|             data = await response.json() | ||||
|         self._id_token = data["id_token"] | ||||
|         self._access_token = data["access_token"] | ||||
|         return True | ||||
|  | ||||
| @ -6,6 +6,7 @@ import aiohttp | ||||
| from pyhon import const | ||||
| from pyhon.connection.auth import HonAuth, _LOGGER | ||||
| from pyhon.connection.device import HonDevice | ||||
| from pyhon.exceptions import HonAuthenticationError | ||||
|  | ||||
|  | ||||
| class HonBaseConnectionHandler: | ||||
| @ -50,9 +51,9 @@ class HonConnectionHandler(HonBaseConnectionHandler): | ||||
|         self._email = email | ||||
|         self._password = password | ||||
|         if not self._email: | ||||
|             raise PermissionError("Login-Error - An email address must be specified") | ||||
|             raise HonAuthenticationError("An email address must be specified") | ||||
|         if not self._password: | ||||
|             raise PermissionError("Login-Error - A password address must be specified") | ||||
|             raise HonAuthenticationError("A password address must be specified") | ||||
|         self._request_headers = {} | ||||
|  | ||||
|     @property | ||||
| @ -73,7 +74,7 @@ class HonConnectionHandler(HonBaseConnectionHandler): | ||||
|                 self._request_headers["cognito-token"] = self._auth.cognito_token | ||||
|                 self._request_headers["id-token"] = self._auth.id_token | ||||
|             else: | ||||
|                 raise PermissionError("Can't Login") | ||||
|                 raise HonAuthenticationError("Can't login") | ||||
|         return {h: v for h, v in self._request_headers.items() if h not in headers} | ||||
|  | ||||
|     @asynccontextmanager | ||||
| @ -100,7 +101,7 @@ class HonConnectionHandler(HonBaseConnectionHandler): | ||||
|                     response.status, | ||||
|                     await response.text(), | ||||
|                 ) | ||||
|                 raise PermissionError("Login failure") | ||||
|                 raise HonAuthenticationError("Login failure") | ||||
|             else: | ||||
|                 try: | ||||
|                     await response.json() | ||||
| @ -123,5 +124,5 @@ class HonAnonymousConnectionHandler(HonBaseConnectionHandler): | ||||
|         kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS | ||||
|         async with method(*args, **kwargs) as response: | ||||
|             if response.status == 403: | ||||
|                 print("Can't authorize") | ||||
|                 _LOGGER.error("Can't authenticate anymore") | ||||
|             yield response | ||||
|  | ||||
							
								
								
									
										2
									
								
								pyhon/exceptions.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								pyhon/exceptions.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,2 @@ | ||||
| class HonAuthenticationError(Exception): | ||||
|     pass | ||||
| @ -2,7 +2,10 @@ import re | ||||
|  | ||||
|  | ||||
| def str_to_float(string): | ||||
|     return float(string.replace(",", ".")) | ||||
|     try: | ||||
|         return int(string) | ||||
|     except ValueError: | ||||
|         return float(str(string.replace(",", "."))) | ||||
|  | ||||
|  | ||||
| class HonParameter: | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	