Compare commits
	
		
			5 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 33454f68b8 | |||
| 6b2c60d552 | |||
| 46e6a85e84 | |||
| 8c832b44cd | |||
| b4b782c52c | 
							
								
								
									
										83
									
								
								pyhon/__main__.py
									
									
									
									
									
										
										
										Executable file → Normal file
									
								
							
							
						
						
									
										83
									
								
								pyhon/__main__.py
									
									
									
									
									
										
										
										Executable file → Normal file
									
								
							| @ -10,7 +10,7 @@ from pathlib import Path | |||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|     sys.path.insert(0, str(Path(__file__).parent.parent)) |     sys.path.insert(0, str(Path(__file__).parent.parent)) | ||||||
|  |  | ||||||
| from pyhon import Hon, HonAPI | from pyhon import Hon, HonAPI, helper | ||||||
|  |  | ||||||
| _LOGGER = logging.getLogger(__name__) | _LOGGER = logging.getLogger(__name__) | ||||||
|  |  | ||||||
| @ -34,61 +34,6 @@ def get_arguments(): | |||||||
|     return vars(parser.parse_args()) |     return vars(parser.parse_args()) | ||||||
|  |  | ||||||
|  |  | ||||||
| # yaml.dump() would be done the same, but needs an additional dependency... |  | ||||||
| def pretty_print(data, key="", intend=0, is_list=False): |  | ||||||
|     if type(data) is list: |  | ||||||
|         if key: |  | ||||||
|             print(f"{'  ' * intend}{'- ' if is_list else ''}{key}:") |  | ||||||
|             intend += 1 |  | ||||||
|         for i, value in enumerate(data): |  | ||||||
|             pretty_print(value, intend=intend, is_list=True) |  | ||||||
|     elif type(data) is dict: |  | ||||||
|         if key: |  | ||||||
|             print(f"{'  ' * intend}{'- ' if is_list else ''}{key}:") |  | ||||||
|             intend += 1 |  | ||||||
|         for i, (key, value) in enumerate(sorted(data.items())): |  | ||||||
|             if is_list and not i: |  | ||||||
|                 pretty_print(value, key=key, intend=intend, is_list=True) |  | ||||||
|             elif is_list: |  | ||||||
|                 pretty_print(value, key=key, intend=intend + 1) |  | ||||||
|             else: |  | ||||||
|                 pretty_print(value, key=key, intend=intend) |  | ||||||
|     else: |  | ||||||
|         print( |  | ||||||
|             f"{'  ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}" |  | ||||||
|         ) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def key_print(data, key="", start=True): |  | ||||||
|     if type(data) is list: |  | ||||||
|         for i, value in enumerate(data): |  | ||||||
|             key_print(value, key=f"{key}.{i}", start=False) |  | ||||||
|     elif type(data) is dict: |  | ||||||
|         for k, value in sorted(data.items()): |  | ||||||
|             key_print(value, key=k if start else f"{key}.{k}", start=False) |  | ||||||
|     else: |  | ||||||
|         print(f"{key}: {data}") |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def create_command(commands, concat=False): |  | ||||||
|     result = {} |  | ||||||
|     for name, command in commands.items(): |  | ||||||
|         if not concat: |  | ||||||
|             result[name] = {} |  | ||||||
|         for parameter, data in command.parameters.items(): |  | ||||||
|             if data.typology == "enum": |  | ||||||
|                 value = data.values |  | ||||||
|             elif data.typology == "range": |  | ||||||
|                 value = {"min": data.min, "max": data.max, "step": data.step} |  | ||||||
|             else: |  | ||||||
|                 continue |  | ||||||
|             if not concat: |  | ||||||
|                 result[name][parameter] = value |  | ||||||
|             else: |  | ||||||
|                 result[f"{name}.{parameter}"] = value |  | ||||||
|     return result |  | ||||||
|  |  | ||||||
|  |  | ||||||
| async def translate(language, json_output=False): | async def translate(language, json_output=False): | ||||||
|     async with HonAPI(anonymous=True) as hon: |     async with HonAPI(anonymous=True) as hon: | ||||||
|         keys = await hon.translation_keys(language) |         keys = await hon.translation_keys(language) | ||||||
| @ -102,7 +47,7 @@ async def translate(language, json_output=False): | |||||||
|             .replace("\\r", "") |             .replace("\\r", "") | ||||||
|         ) |         ) | ||||||
|         keys = json.loads(clean_keys) |         keys = json.loads(clean_keys) | ||||||
|         pretty_print(keys) |         print(helper.pretty_print(keys)) | ||||||
|  |  | ||||||
|  |  | ||||||
| async def main(): | async def main(): | ||||||
| @ -120,13 +65,25 @@ async def main(): | |||||||
|             if args.get("keys"): |             if args.get("keys"): | ||||||
|                 data = device.data.copy() |                 data = device.data.copy() | ||||||
|                 attr = "get" if args.get("all") else "pop" |                 attr = "get" if args.get("all") else "pop" | ||||||
|                 key_print(data["attributes"].__getattribute__(attr)("parameters")) |                 print( | ||||||
|                 key_print(data.__getattribute__(attr)("appliance")) |                     helper.key_print( | ||||||
|                 key_print(data) |                         data["attributes"].__getattribute__(attr)("parameters") | ||||||
|                 pretty_print(create_command(device.commands, concat=True)) |                     ) | ||||||
|  |                 ) | ||||||
|  |                 print(helper.key_print(data.__getattribute__(attr)("appliance"))) | ||||||
|  |                 print(helper.key_print(data)) | ||||||
|  |                 print( | ||||||
|  |                     helper.pretty_print( | ||||||
|  |                         helper.create_command(device.commands, concat=True) | ||||||
|  |                     ) | ||||||
|  |                 ) | ||||||
|             else: |             else: | ||||||
|                 pretty_print({"data": device.data}) |                 print(helper.pretty_print({"data": device.data})) | ||||||
|                 pretty_print({"settings": create_command(device.commands)}) |                 print( | ||||||
|  |                     helper.pretty_print( | ||||||
|  |                         {"settings": helper.create_command(device.commands)} | ||||||
|  |                     ) | ||||||
|  |                 ) | ||||||
|  |  | ||||||
|  |  | ||||||
| def start(): | def start(): | ||||||
|  | |||||||
| @ -1,6 +1,7 @@ | |||||||
| import importlib | import importlib | ||||||
| from contextlib import suppress | from contextlib import suppress | ||||||
|  |  | ||||||
|  | from pyhon import helper | ||||||
| from pyhon.commands import HonCommand | from pyhon.commands import HonCommand | ||||||
| from pyhon.parameter import HonParameterFixed | from pyhon.parameter import HonParameterFixed | ||||||
|  |  | ||||||
| @ -172,3 +173,15 @@ class HonAppliance: | |||||||
|         if self._extra: |         if self._extra: | ||||||
|             return self._extra.data(result) |             return self._extra.data(result) | ||||||
|         return result |         return result | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def diagnose(self): | ||||||
|  |         data = self.data.copy() | ||||||
|  |         for sensible in ["PK", "SK", "serialNumber", "code"]: | ||||||
|  |             data["appliance"].pop(sensible, None) | ||||||
|  |         result = helper.pretty_print({"data": self.data}, whitespace="\u200B \u200B ") | ||||||
|  |         result += helper.pretty_print( | ||||||
|  |             {"commands": helper.create_command(self.commands)}, | ||||||
|  |             whitespace="\u200B \u200B ", | ||||||
|  |         ) | ||||||
|  |         return result.replace(self.mac_address, "12-34-56-78-90-ab") | ||||||
|  | |||||||
| @ -2,14 +2,15 @@ import json | |||||||
| import logging | import logging | ||||||
| import re | import re | ||||||
| import secrets | import secrets | ||||||
| import sys |  | ||||||
| import urllib | import urllib | ||||||
| from pprint import pformat | from pprint import pformat | ||||||
| from urllib import parse | from urllib import parse | ||||||
|  | from urllib.parse import quote | ||||||
|  |  | ||||||
| from yarl import URL | from yarl import URL | ||||||
|  |  | ||||||
| from pyhon import const | from pyhon import const | ||||||
|  | from pyhon.exceptions import HonAuthenticationError | ||||||
|  |  | ||||||
| _LOGGER = logging.getLogger(__name__) | _LOGGER = logging.getLogger(__name__) | ||||||
|  |  | ||||||
| @ -24,6 +25,7 @@ class HonAuth: | |||||||
|         self._cognito_token = "" |         self._cognito_token = "" | ||||||
|         self._id_token = "" |         self._id_token = "" | ||||||
|         self._device = device |         self._device = device | ||||||
|  |         self._called_urls = [] | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def cognito_token(self): |     def cognito_token(self): | ||||||
| @ -41,6 +43,16 @@ class HonAuth: | |||||||
|     def refresh_token(self): |     def refresh_token(self): | ||||||
|         return self._refresh_token |         return self._refresh_token | ||||||
|  |  | ||||||
|  |     async def _error_logger(self, response, fail=True): | ||||||
|  |         result = "hOn Authentication Error\n" | ||||||
|  |         for i, (status, url) in enumerate(self._called_urls): | ||||||
|  |             result += f" {i + 1: 2d}     {status} - {url}\n" | ||||||
|  |         result += f"ERROR - {response.status} - {response.request_info.url}\n" | ||||||
|  |         result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}" | ||||||
|  |         _LOGGER.error(result) | ||||||
|  |         if fail: | ||||||
|  |             raise HonAuthenticationError("Can't login") | ||||||
|  |  | ||||||
|     async def _load_login(self): |     async def _load_login(self): | ||||||
|         nonce = secrets.token_hex(16) |         nonce = secrets.token_hex(16) | ||||||
|         nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" |         nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" | ||||||
| @ -58,22 +70,29 @@ class HonAuth: | |||||||
|         async with self._session.get( |         async with self._session.get( | ||||||
|             f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" |             f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" | ||||||
|         ) as response: |         ) as response: | ||||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) |             self._called_urls.append((response.status, response.request_info.url)) | ||||||
|             if not (login_url := re.findall("url = '(.+?)'", await response.text())): |             if not (login_url := re.findall("url = '(.+?)'", await response.text())): | ||||||
|  |                 await self._error_logger(response) | ||||||
|                 return False |                 return False | ||||||
|         async with self._session.get(login_url[0], allow_redirects=False) as redirect1: |         async with self._session.get(login_url[0], allow_redirects=False) as redirect1: | ||||||
|             _LOGGER.debug("%s - %s", redirect1.status, redirect1.request_info.url) |             self._called_urls.append((redirect1.status, redirect1.request_info.url)) | ||||||
|             if not (url := redirect1.headers.get("Location")): |             if not (url := redirect1.headers.get("Location")): | ||||||
|  |                 await self._error_logger(redirect1) | ||||||
|                 return False |                 return False | ||||||
|         async with self._session.get(url, allow_redirects=False) as redirect2: |         async with self._session.get(url, allow_redirects=False) as redirect2: | ||||||
|             _LOGGER.debug("%s - %s", redirect2.status, redirect2.request_info.url) |             self._called_urls.append((redirect2.status, redirect2.request_info.url)) | ||||||
|             if not ( |             if not ( | ||||||
|                 url := redirect2.headers.get("Location") |                 url := redirect2.headers.get("Location") | ||||||
|                 + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" |                 + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" | ||||||
|             ): |             ): | ||||||
|  |                 await self._error_logger(redirect2) | ||||||
|                 return False |                 return False | ||||||
|         async with self._session.get(URL(url, encoded=True)) as login_screen: |         async with self._session.get( | ||||||
|             _LOGGER.debug("%s - %s", login_screen.status, login_screen.request_info.url) |             URL(url, encoded=True), headers={"user-agent": const.USER_AGENT} | ||||||
|  |         ) as login_screen: | ||||||
|  |             self._called_urls.append( | ||||||
|  |                 (login_screen.status, login_screen.request_info.url) | ||||||
|  |             ) | ||||||
|             if context := re.findall( |             if context := re.findall( | ||||||
|                 '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() |                 '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() | ||||||
|             ): |             ): | ||||||
| @ -83,6 +102,7 @@ class HonAuth: | |||||||
|                     "/".join(const.AUTH_API.split("/")[:-1]), "" |                     "/".join(const.AUTH_API.split("/")[:-1]), "" | ||||||
|                 ) |                 ) | ||||||
|                 return fw_uid, loaded, login_url |                 return fw_uid, loaded, login_url | ||||||
|  |             await self._error_logger(login_screen) | ||||||
|         return False |         return False | ||||||
|  |  | ||||||
|     async def _login(self, fw_uid, loaded, login_url): |     async def _login(self, fw_uid, loaded, login_url): | ||||||
| @ -94,8 +114,8 @@ class HonAuth: | |||||||
|                         "descriptor": "apex://LightningLoginCustomController/ACTION$login", |                         "descriptor": "apex://LightningLoginCustomController/ACTION$login", | ||||||
|                         "callingDescriptor": "markup://c:loginForm", |                         "callingDescriptor": "markup://c:loginForm", | ||||||
|                         "params": { |                         "params": { | ||||||
|                             "username": self._email, |                             "username": quote(self._email), | ||||||
|                             "password": self._password, |                             "password": quote(self._password), | ||||||
|                             "startUrl": parse.unquote( |                             "startUrl": parse.unquote( | ||||||
|                                 login_url.split("startURL=")[-1] |                                 login_url.split("startURL=")[-1] | ||||||
|                             ).split("%3D")[0], |                             ).split("%3D")[0], | ||||||
| @ -122,7 +142,7 @@ class HonAuth: | |||||||
|             data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), |             data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), | ||||||
|             params=params, |             params=params, | ||||||
|         ) as response: |         ) as response: | ||||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) |             self._called_urls.append((response.status, response.request_info.url)) | ||||||
|             if response.status == 200: |             if response.status == 200: | ||||||
|                 try: |                 try: | ||||||
|                     data = await response.json() |                     data = await response.json() | ||||||
| @ -133,35 +153,31 @@ class HonAuth: | |||||||
|                     _LOGGER.error( |                     _LOGGER.error( | ||||||
|                         "Can't get login url - %s", pformat(await response.json()) |                         "Can't get login url - %s", pformat(await response.json()) | ||||||
|                     ) |                     ) | ||||||
|             _LOGGER.error( |             await self._error_logger(response) | ||||||
|                 "Unable to login: %s\n%s", response.status, await response.text() |  | ||||||
|             ) |  | ||||||
|             return "" |             return "" | ||||||
|  |  | ||||||
|     async def _get_token(self, url): |     async def _get_token(self, url): | ||||||
|         async with self._session.get(url) as response: |         async with self._session.get(url) as response: | ||||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) |             self._called_urls.append((response.status, response.request_info.url)) | ||||||
|             if response.status != 200: |             if response.status != 200: | ||||||
|                 _LOGGER.error("Unable to get token: %s", response.status) |                 await self._error_logger(response) | ||||||
|                 return False |                 return False | ||||||
|             url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text()) |             url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text()) | ||||||
|         if not url: |             if not url: | ||||||
|             _LOGGER.error("Can't get login url - \n%s", await response.text()) |                 await self._error_logger(response) | ||||||
|             raise PermissionError |                 return False | ||||||
|         if "ProgressiveLogin" in url[0]: |         if "ProgressiveLogin" in url[0]: | ||||||
|             async with self._session.get(url[0]) as response: |             async with self._session.get(url[0]) as response: | ||||||
|                 _LOGGER.debug("%s - %s", response.status, response.request_info.url) |                 self._called_urls.append((response.status, response.request_info.url)) | ||||||
|                 if response.status != 200: |                 if response.status != 200: | ||||||
|                     _LOGGER.error("Unable to get token: %s", response.status) |                     await self._error_logger(response) | ||||||
|                     return False |                     return False | ||||||
|                 url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text()) |                 url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text()) | ||||||
|         url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] |         url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] | ||||||
|         async with self._session.get(url) as response: |         async with self._session.get(url) as response: | ||||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) |             self._called_urls.append((response.status, response.request_info.url)) | ||||||
|             if response.status != 200: |             if response.status != 200: | ||||||
|                 _LOGGER.error( |                 await self._error_logger(response) | ||||||
|                     "Unable to connect to the login service: %s", response.status |  | ||||||
|                 ) |  | ||||||
|                 return False |                 return False | ||||||
|             text = await response.text() |             text = await response.text() | ||||||
|         if access_token := re.findall("access_token=(.*?)&", text): |         if access_token := re.findall("access_token=(.*?)&", text): | ||||||
| @ -181,17 +197,19 @@ class HonAuth: | |||||||
|             return False |             return False | ||||||
|         if not await self._get_token(url): |         if not await self._get_token(url): | ||||||
|             return False |             return False | ||||||
|  |         return await self._api_auth() | ||||||
|  |  | ||||||
|  |     async def _api_auth(self): | ||||||
|         post_headers = {"id-token": self._id_token} |         post_headers = {"id-token": self._id_token} | ||||||
|         data = self._device.get() |         data = self._device.get() | ||||||
|         async with self._session.post( |         async with self._session.post( | ||||||
|             f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data |             f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data | ||||||
|         ) as response: |         ) as response: | ||||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) |             self._called_urls.append((response.status, response.request_info.url)) | ||||||
|             try: |             try: | ||||||
|                 json_data = await response.json() |                 json_data = await response.json() | ||||||
|             except json.JSONDecodeError: |             except json.JSONDecodeError: | ||||||
|                 _LOGGER.error("No JSON Data after POST: %s", await response.text()) |                 await self._error_logger(response) | ||||||
|                 return False |                 return False | ||||||
|             self._cognito_token = json_data["cognitoUser"]["Token"] |             self._cognito_token = json_data["cognitoUser"]["Token"] | ||||||
|         return True |         return True | ||||||
| @ -205,10 +223,11 @@ class HonAuth: | |||||||
|         async with self._session.post( |         async with self._session.post( | ||||||
|             f"{const.AUTH_API}/services/oauth2/token", params=params |             f"{const.AUTH_API}/services/oauth2/token", params=params | ||||||
|         ) as response: |         ) as response: | ||||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) |             self._called_urls.append((response.status, response.request_info.url)) | ||||||
|             if response.status >= 400: |             if response.status >= 400: | ||||||
|  |                 await self._error_logger(response, fail=False) | ||||||
|                 return False |                 return False | ||||||
|             data = await response.json() |             data = await response.json() | ||||||
|         self._id_token = data["id_token"] |         self._id_token = data["id_token"] | ||||||
|         self._access_token = data["access_token"] |         self._access_token = data["access_token"] | ||||||
|         return True |         return await self._api_auth() | ||||||
|  | |||||||
| @ -6,12 +6,14 @@ import aiohttp | |||||||
| from pyhon import const | from pyhon import const | ||||||
| from pyhon.connection.auth import HonAuth, _LOGGER | from pyhon.connection.auth import HonAuth, _LOGGER | ||||||
| from pyhon.connection.device import HonDevice | from pyhon.connection.device import HonDevice | ||||||
|  | from pyhon.exceptions import HonAuthenticationError | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonBaseConnectionHandler: | class HonBaseConnectionHandler: | ||||||
|     _HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"} |     _HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"} | ||||||
|  |  | ||||||
|     def __init__(self, session=None): |     def __init__(self, session=None): | ||||||
|  |         self._create_session = session is None | ||||||
|         self._session = session |         self._session = session | ||||||
|         self._auth = None |         self._auth = None | ||||||
|  |  | ||||||
| @ -22,7 +24,8 @@ class HonBaseConnectionHandler: | |||||||
|         await self.close() |         await self.close() | ||||||
|  |  | ||||||
|     async def create(self): |     async def create(self): | ||||||
|         self._session = aiohttp.ClientSession(headers=self._HEADERS) |         if self._create_session: | ||||||
|  |             self._session = aiohttp.ClientSession() | ||||||
|         return self |         return self | ||||||
|  |  | ||||||
|     @asynccontextmanager |     @asynccontextmanager | ||||||
| @ -40,7 +43,8 @@ class HonBaseConnectionHandler: | |||||||
|             yield response |             yield response | ||||||
|  |  | ||||||
|     async def close(self): |     async def close(self): | ||||||
|         await self._session.close() |         if self._create_session: | ||||||
|  |             await self._session.close() | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonConnectionHandler(HonBaseConnectionHandler): | class HonConnectionHandler(HonBaseConnectionHandler): | ||||||
| @ -50,9 +54,9 @@ class HonConnectionHandler(HonBaseConnectionHandler): | |||||||
|         self._email = email |         self._email = email | ||||||
|         self._password = password |         self._password = password | ||||||
|         if not self._email: |         if not self._email: | ||||||
|             raise PermissionError("Login-Error - An email address must be specified") |             raise HonAuthenticationError("An email address must be specified") | ||||||
|         if not self._password: |         if not self._password: | ||||||
|             raise PermissionError("Login-Error - A password address must be specified") |             raise HonAuthenticationError("A password address must be specified") | ||||||
|         self._request_headers = {} |         self._request_headers = {} | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
| @ -73,26 +77,34 @@ class HonConnectionHandler(HonBaseConnectionHandler): | |||||||
|                 self._request_headers["cognito-token"] = self._auth.cognito_token |                 self._request_headers["cognito-token"] = self._auth.cognito_token | ||||||
|                 self._request_headers["id-token"] = self._auth.id_token |                 self._request_headers["id-token"] = self._auth.id_token | ||||||
|             else: |             else: | ||||||
|                 raise PermissionError("Can't Login") |                 raise HonAuthenticationError("Can't login") | ||||||
|         return {h: v for h, v in self._request_headers.items() if h not in headers} |         return self._HEADERS | headers | self._request_headers | ||||||
|  |  | ||||||
|     @asynccontextmanager |     @asynccontextmanager | ||||||
|     async def _intercept(self, method, *args, loop=0, **kwargs): |     async def _intercept(self, method, *args, loop=0, **kwargs): | ||||||
|         kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) |         kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) | ||||||
|         async with method(*args, **kwargs) as response: |         async with method(*args, **kwargs) as response: | ||||||
|             if response.status == 403 and not loop: |             if response.status in [401, 403] and loop == 0: | ||||||
|                 _LOGGER.info("Try refreshing token...") |                 _LOGGER.info("Try refreshing token...") | ||||||
|                 await self._auth.refresh() |                 await self._auth.refresh() | ||||||
|                 yield await self._intercept(method, *args, loop=loop + 1, **kwargs) |                 async with self._intercept( | ||||||
|             elif response.status == 403 and loop < 2: |                     method, *args, loop=loop + 1, **kwargs | ||||||
|  |                 ) as result: | ||||||
|  |                     yield result | ||||||
|  |             elif response.status in [401, 403] and loop == 1: | ||||||
|                 _LOGGER.warning( |                 _LOGGER.warning( | ||||||
|                     "%s - Error %s - %s", |                     "%s - Error %s - %s", | ||||||
|                     response.request_info.url, |                     response.request_info.url, | ||||||
|                     response.status, |                     response.status, | ||||||
|                     await response.text(), |                     await response.text(), | ||||||
|                 ) |                 ) | ||||||
|  |                 self._request_headers = {} | ||||||
|  |                 self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2]) | ||||||
|                 await self.create() |                 await self.create() | ||||||
|                 yield await self._intercept(method, *args, loop=loop + 1, **kwargs) |                 async with self._intercept( | ||||||
|  |                     method, *args, loop=loop + 1, **kwargs | ||||||
|  |                 ) as result: | ||||||
|  |                     yield result | ||||||
|             elif loop >= 2: |             elif loop >= 2: | ||||||
|                 _LOGGER.error( |                 _LOGGER.error( | ||||||
|                     "%s - Error %s - %s", |                     "%s - Error %s - %s", | ||||||
| @ -100,7 +112,7 @@ class HonConnectionHandler(HonBaseConnectionHandler): | |||||||
|                     response.status, |                     response.status, | ||||||
|                     await response.text(), |                     await response.text(), | ||||||
|                 ) |                 ) | ||||||
|                 raise PermissionError("Login failure") |                 raise HonAuthenticationError("Login failure") | ||||||
|             else: |             else: | ||||||
|                 try: |                 try: | ||||||
|                     await response.json() |                     await response.json() | ||||||
| @ -123,5 +135,5 @@ class HonAnonymousConnectionHandler(HonBaseConnectionHandler): | |||||||
|         kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS |         kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS | ||||||
|         async with method(*args, **kwargs) as response: |         async with method(*args, **kwargs) as response: | ||||||
|             if response.status == 403: |             if response.status == 403: | ||||||
|                 print("Can't authorize") |                 _LOGGER.error("Can't authenticate anymore") | ||||||
|             yield response |             yield response | ||||||
|  | |||||||
							
								
								
									
										2
									
								
								pyhon/exceptions.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								pyhon/exceptions.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,2 @@ | |||||||
|  | class HonAuthenticationError(Exception): | ||||||
|  |     pass | ||||||
							
								
								
									
										63
									
								
								pyhon/helper.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								pyhon/helper.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,63 @@ | |||||||
|  | def key_print(data, key="", start=True): | ||||||
|  |     result = "" | ||||||
|  |     if isinstance(data, list): | ||||||
|  |         for i, value in enumerate(data): | ||||||
|  |             result += key_print(value, key=f"{key}.{i}", start=False) | ||||||
|  |     elif isinstance(data, dict): | ||||||
|  |         for k, value in sorted(data.items()): | ||||||
|  |             result += key_print(value, key=k if start else f"{key}.{k}", start=False) | ||||||
|  |     else: | ||||||
|  |         result += f"{key}: {data}\n" | ||||||
|  |     return result | ||||||
|  |  | ||||||
|  |  | ||||||
|  | # yaml.dump() would be done the same, but needs an additional dependency... | ||||||
|  | def pretty_print(data, key="", intend=0, is_list=False, whitespace="  "): | ||||||
|  |     result = "" | ||||||
|  |     if isinstance(data, list): | ||||||
|  |         if key: | ||||||
|  |             result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n" | ||||||
|  |             intend += 1 | ||||||
|  |         for i, value in enumerate(data): | ||||||
|  |             result += pretty_print( | ||||||
|  |                 value, intend=intend, is_list=True, whitespace=whitespace | ||||||
|  |             ) | ||||||
|  |     elif isinstance(data, dict): | ||||||
|  |         if key: | ||||||
|  |             result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n" | ||||||
|  |             intend += 1 | ||||||
|  |         for i, (key, value) in enumerate(sorted(data.items())): | ||||||
|  |             if is_list and not i: | ||||||
|  |                 result += pretty_print( | ||||||
|  |                     value, key=key, intend=intend, is_list=True, whitespace=whitespace | ||||||
|  |                 ) | ||||||
|  |             elif is_list: | ||||||
|  |                 result += pretty_print( | ||||||
|  |                     value, key=key, intend=intend + 1, whitespace=whitespace | ||||||
|  |                 ) | ||||||
|  |             else: | ||||||
|  |                 result += pretty_print( | ||||||
|  |                     value, key=key, intend=intend, whitespace=whitespace | ||||||
|  |                 ) | ||||||
|  |     else: | ||||||
|  |         result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n" | ||||||
|  |     return result | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def create_command(commands, concat=False): | ||||||
|  |     result = {} | ||||||
|  |     for name, command in commands.items(): | ||||||
|  |         if not concat: | ||||||
|  |             result[name] = {} | ||||||
|  |         for parameter, data in command.parameters.items(): | ||||||
|  |             if data.typology == "enum": | ||||||
|  |                 value = data.values | ||||||
|  |             elif data.typology == "range": | ||||||
|  |                 value = {"min": data.min, "max": data.max, "step": data.step} | ||||||
|  |             else: | ||||||
|  |                 continue | ||||||
|  |             if not concat: | ||||||
|  |                 result[name][parameter] = value | ||||||
|  |             else: | ||||||
|  |                 result[f"{name}.{parameter}"] = value | ||||||
|  |     return result | ||||||
| @ -5,7 +5,7 @@ def str_to_float(string): | |||||||
|     try: |     try: | ||||||
|         return int(string) |         return int(string) | ||||||
|     except ValueError: |     except ValueError: | ||||||
|         return float(str(string.replace(",", "."))) |         return float(str(string).replace(",", ".")) | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonParameter: | class HonParameter: | ||||||
|  | |||||||
							
								
								
									
										2
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								setup.py
									
									
									
									
									
								
							| @ -7,7 +7,7 @@ with open("README.md", "r") as f: | |||||||
|  |  | ||||||
| setup( | setup( | ||||||
|     name="pyhOn", |     name="pyhOn", | ||||||
|     version="0.6.3", |     version="0.7.2", | ||||||
|     author="Andre Basche", |     author="Andre Basche", | ||||||
|     description="Control hOn devices with python", |     description="Control hOn devices with python", | ||||||
|     long_description=long_description, |     long_description=long_description, | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	