Compare commits
	
		
			12 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| d52d622785 | |||
| 9643f66549 | |||
| d26e33a055 | |||
| 0301427497 | |||
| 272556586e | |||
| e82c14ec99 | |||
| 970b94bfa7 | |||
| 33454f68b8 | |||
| 6b2c60d552 | |||
| 46e6a85e84 | |||
| 8c832b44cd | |||
| b4b782c52c | 
| @ -100,8 +100,6 @@ This generates a huge output. It is recommended to pipe this into a file | ||||
| $ pyhOn translate fr > hon_fr.yaml | ||||
| $ pyhOn translate en --json > hon_en.json | ||||
| ``` | ||||
| ## Tested devices | ||||
| - Haier Washing Machine HW90 | ||||
|  | ||||
| ## Usage example | ||||
| This library is used for the custom [HomeAssistant Integration "Haier hOn"](https://github.com/Andre0512/hOn). | ||||
|  | ||||
| @ -10,7 +10,7 @@ from pathlib import Path | ||||
| if __name__ == "__main__": | ||||
|     sys.path.insert(0, str(Path(__file__).parent.parent)) | ||||
|  | ||||
| from pyhon import Hon, HonAPI | ||||
| from pyhon import Hon, HonAPI, helper | ||||
|  | ||||
| _LOGGER = logging.getLogger(__name__) | ||||
|  | ||||
| @ -34,61 +34,6 @@ def get_arguments(): | ||||
|     return vars(parser.parse_args()) | ||||
|  | ||||
|  | ||||
| # yaml.dump() would be done the same, but needs an additional dependency... | ||||
| def pretty_print(data, key="", intend=0, is_list=False): | ||||
|     if type(data) is list: | ||||
|         if key: | ||||
|             print(f"{'  ' * intend}{'- ' if is_list else ''}{key}:") | ||||
|             intend += 1 | ||||
|         for i, value in enumerate(data): | ||||
|             pretty_print(value, intend=intend, is_list=True) | ||||
|     elif type(data) is dict: | ||||
|         if key: | ||||
|             print(f"{'  ' * intend}{'- ' if is_list else ''}{key}:") | ||||
|             intend += 1 | ||||
|         for i, (key, value) in enumerate(sorted(data.items())): | ||||
|             if is_list and not i: | ||||
|                 pretty_print(value, key=key, intend=intend, is_list=True) | ||||
|             elif is_list: | ||||
|                 pretty_print(value, key=key, intend=intend + 1) | ||||
|             else: | ||||
|                 pretty_print(value, key=key, intend=intend) | ||||
|     else: | ||||
|         print( | ||||
|             f"{'  ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}" | ||||
|         ) | ||||
|  | ||||
|  | ||||
| def key_print(data, key="", start=True): | ||||
|     if type(data) is list: | ||||
|         for i, value in enumerate(data): | ||||
|             key_print(value, key=f"{key}.{i}", start=False) | ||||
|     elif type(data) is dict: | ||||
|         for k, value in sorted(data.items()): | ||||
|             key_print(value, key=k if start else f"{key}.{k}", start=False) | ||||
|     else: | ||||
|         print(f"{key}: {data}") | ||||
|  | ||||
|  | ||||
| def create_command(commands, concat=False): | ||||
|     result = {} | ||||
|     for name, command in commands.items(): | ||||
|         if not concat: | ||||
|             result[name] = {} | ||||
|         for parameter, data in command.parameters.items(): | ||||
|             if data.typology == "enum": | ||||
|                 value = data.values | ||||
|             elif data.typology == "range": | ||||
|                 value = {"min": data.min, "max": data.max, "step": data.step} | ||||
|             else: | ||||
|                 continue | ||||
|             if not concat: | ||||
|                 result[name][parameter] = value | ||||
|             else: | ||||
|                 result[f"{name}.{parameter}"] = value | ||||
|     return result | ||||
|  | ||||
|  | ||||
| async def translate(language, json_output=False): | ||||
|     async with HonAPI(anonymous=True) as hon: | ||||
|         keys = await hon.translation_keys(language) | ||||
| @ -102,7 +47,7 @@ async def translate(language, json_output=False): | ||||
|             .replace("\\r", "") | ||||
|         ) | ||||
|         keys = json.loads(clean_keys) | ||||
|         pretty_print(keys) | ||||
|         print(helper.pretty_print(keys)) | ||||
|  | ||||
|  | ||||
| async def main(): | ||||
| @ -120,13 +65,25 @@ async def main(): | ||||
|             if args.get("keys"): | ||||
|                 data = device.data.copy() | ||||
|                 attr = "get" if args.get("all") else "pop" | ||||
|                 key_print(data["attributes"].__getattribute__(attr)("parameters")) | ||||
|                 key_print(data.__getattribute__(attr)("appliance")) | ||||
|                 key_print(data) | ||||
|                 pretty_print(create_command(device.commands, concat=True)) | ||||
|                 print( | ||||
|                     helper.key_print( | ||||
|                         data["attributes"].__getattribute__(attr)("parameters") | ||||
|                     ) | ||||
|                 ) | ||||
|                 print(helper.key_print(data.__getattribute__(attr)("appliance"))) | ||||
|                 print(helper.key_print(data)) | ||||
|                 print( | ||||
|                     helper.pretty_print( | ||||
|                         helper.create_command(device.commands, concat=True) | ||||
|                     ) | ||||
|                 ) | ||||
|             else: | ||||
|                 pretty_print({"data": device.data}) | ||||
|                 pretty_print({"settings": create_command(device.commands)}) | ||||
|                 print(helper.pretty_print({"data": device.data})) | ||||
|                 print( | ||||
|                     helper.pretty_print( | ||||
|                         {"settings": helper.create_command(device.commands)} | ||||
|                     ) | ||||
|                 ) | ||||
|  | ||||
|  | ||||
| def start(): | ||||
|  | ||||
| @ -1,12 +1,14 @@ | ||||
| import importlib | ||||
| from contextlib import suppress | ||||
| from typing import Optional, Dict | ||||
|  | ||||
| from pyhon import helper | ||||
| from pyhon.commands import HonCommand | ||||
| from pyhon.parameter import HonParameterFixed | ||||
|  | ||||
|  | ||||
| class HonAppliance: | ||||
|     def __init__(self, api, info): | ||||
|     def __init__(self, api, info: Dict, zone: int = 0) -> None: | ||||
|         if attributes := info.get("attributes"): | ||||
|             info["attributes"] = {v["parName"]: v["parValue"] for v in attributes} | ||||
|         self._info = info | ||||
| @ -16,6 +18,7 @@ class HonAppliance: | ||||
|         self._commands = {} | ||||
|         self._statistics = {} | ||||
|         self._attributes = {} | ||||
|         self._zone = zone | ||||
|  | ||||
|         try: | ||||
|             self._extra = importlib.import_module( | ||||
| @ -25,20 +28,21 @@ class HonAppliance: | ||||
|             self._extra = None | ||||
|  | ||||
|     def __getitem__(self, item): | ||||
|         if self._zone: | ||||
|             item += f"Z{self._zone}" | ||||
|         if "." in item: | ||||
|             result = self.data | ||||
|             for key in item.split("."): | ||||
|                 if all([k in "0123456789" for k in key]) and type(result) is list: | ||||
|                 if all(k in "0123456789" for k in key) and isinstance(result, list): | ||||
|                     result = result[int(key)] | ||||
|                 else: | ||||
|                     result = result[key] | ||||
|             return result | ||||
|         else: | ||||
|             if item in self.data: | ||||
|                 return self.data[item] | ||||
|             if item in self.attributes["parameters"]: | ||||
|                 return self.attributes["parameters"].get(item) | ||||
|             return self.info[item] | ||||
|         if item in self.data: | ||||
|             return self.data[item] | ||||
|         if item in self.attributes["parameters"]: | ||||
|             return self.attributes["parameters"].get(item) | ||||
|         return self.info[item] | ||||
|  | ||||
|     def get(self, item, default=None): | ||||
|         try: | ||||
| @ -46,25 +50,31 @@ class HonAppliance: | ||||
|         except (KeyError, IndexError): | ||||
|             return default | ||||
|  | ||||
|     def _check_name_zone(self, name: str, frontend: bool = True) -> str: | ||||
|         middle = " Z" if frontend else "_z" | ||||
|         if (attribute := self._info.get(name, "")) and self._zone: | ||||
|             return f"{attribute}{middle}{self._zone}" | ||||
|         return attribute | ||||
|  | ||||
|     @property | ||||
|     def appliance_model_id(self): | ||||
|     def appliance_model_id(self) -> str: | ||||
|         return self._info.get("applianceModelId") | ||||
|  | ||||
|     @property | ||||
|     def appliance_type(self): | ||||
|     def appliance_type(self) -> str: | ||||
|         return self._info.get("applianceTypeName") | ||||
|  | ||||
|     @property | ||||
|     def mac_address(self): | ||||
|         return self._info.get("macAddress") | ||||
|     def mac_address(self) -> str: | ||||
|         return self._check_name_zone("macAddress", frontend=False) | ||||
|  | ||||
|     @property | ||||
|     def model_name(self): | ||||
|         return self._info.get("modelName") | ||||
|     def model_name(self) -> str: | ||||
|         return self._check_name_zone("modelName") | ||||
|  | ||||
|     @property | ||||
|     def nick_name(self): | ||||
|         return self._info.get("nickName") | ||||
|     def nick_name(self) -> str: | ||||
|         return self._check_name_zone("nickName") | ||||
|  | ||||
|     @property | ||||
|     def commands_options(self): | ||||
| @ -172,3 +182,15 @@ class HonAppliance: | ||||
|         if self._extra: | ||||
|             return self._extra.data(result) | ||||
|         return result | ||||
|  | ||||
|     @property | ||||
|     def diagnose(self): | ||||
|         data = self.data.copy() | ||||
|         for sensible in ["PK", "SK", "serialNumber", "code", "coords"]: | ||||
|             data["appliance"].pop(sensible, None) | ||||
|         result = helper.pretty_print({"data": self.data}, whitespace="\u200B \u200B ") | ||||
|         result += helper.pretty_print( | ||||
|             {"commands": helper.create_command(self.commands)}, | ||||
|             whitespace="\u200B \u200B ", | ||||
|         ) | ||||
|         return result.replace(self.mac_address, "12-34-56-78-90-ab") | ||||
|  | ||||
| @ -25,6 +25,8 @@ class HonCommand: | ||||
|     def _create_parameters(self, parameters): | ||||
|         result = {} | ||||
|         for parameter, attributes in parameters.items(): | ||||
|             if parameter == "zoneMap" and self._device.zone: | ||||
|                 attributes["default"] = self._device.zone | ||||
|             match attributes.get("typology"): | ||||
|                 case "range": | ||||
|                     result[parameter] = HonParameterRange(parameter, attributes) | ||||
|  | ||||
| @ -1,46 +1,75 @@ | ||||
| import json | ||||
| import logging | ||||
| from datetime import datetime | ||||
| from typing import Dict, Optional | ||||
| from typing_extensions import Self | ||||
|  | ||||
| from pyhon import const | ||||
| from aiohttp import ClientSession | ||||
|  | ||||
| from pyhon import const, exceptions | ||||
| from pyhon.appliance import HonAppliance | ||||
| from pyhon.connection.auth import HonAuth | ||||
| from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler | ||||
|  | ||||
| _LOGGER = logging.getLogger() | ||||
|  | ||||
|  | ||||
| class HonAPI: | ||||
|     def __init__(self, email="", password="", anonymous=False, session=None) -> None: | ||||
|     def __init__( | ||||
|         self, | ||||
|         email: str = "", | ||||
|         password: str = "", | ||||
|         anonymous: bool = False, | ||||
|         session: Optional[ClientSession] = None, | ||||
|     ) -> None: | ||||
|         super().__init__() | ||||
|         self._email = email | ||||
|         self._password = password | ||||
|         self._anonymous = anonymous | ||||
|         self._hon = None | ||||
|         self._hon_anonymous = None | ||||
|         self._session = session | ||||
|         self._email: str = email | ||||
|         self._password: str = password | ||||
|         self._anonymous: bool = anonymous | ||||
|         self._hon_handler: Optional[HonConnectionHandler] = None | ||||
|         self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None | ||||
|         self._session: Optional[ClientSession] = session | ||||
|  | ||||
|     async def __aenter__(self): | ||||
|     async def __aenter__(self) -> Self: | ||||
|         return await self.create() | ||||
|  | ||||
|     async def __aexit__(self, exc_type, exc_val, exc_tb): | ||||
|     async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: | ||||
|         await self.close() | ||||
|  | ||||
|     async def create(self): | ||||
|         self._hon_anonymous = await HonAnonymousConnectionHandler( | ||||
|     @property | ||||
|     def auth(self) -> HonAuth: | ||||
|         if self._hon is None or self._hon.auth is None: | ||||
|             raise exceptions.NoAuthenticationException | ||||
|         return self._hon.auth | ||||
|  | ||||
|     @property | ||||
|     def _hon(self): | ||||
|         if self._hon_handler is None: | ||||
|             raise exceptions.NoAuthenticationException | ||||
|         return self._hon_handler | ||||
|  | ||||
|     @property | ||||
|     def _hon_anonymous(self): | ||||
|         if self._hon_anonymous_handler is None: | ||||
|             raise exceptions.NoAuthenticationException | ||||
|         return self._hon_anonymous_handler | ||||
|  | ||||
|     async def create(self) -> Self: | ||||
|         self._hon_anonymous_handler = await HonAnonymousConnectionHandler( | ||||
|             self._session | ||||
|         ).create() | ||||
|         if not self._anonymous: | ||||
|             self._hon = await HonConnectionHandler( | ||||
|             self._hon_handler = await HonConnectionHandler( | ||||
|                 self._email, self._password, self._session | ||||
|             ).create() | ||||
|         return self | ||||
|  | ||||
|     async def load_appliances(self): | ||||
|     async def load_appliances(self) -> Dict: | ||||
|         async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp: | ||||
|             return await resp.json() | ||||
|  | ||||
|     async def load_commands(self, appliance: HonAppliance): | ||||
|         params = { | ||||
|     async def load_commands(self, appliance: HonAppliance) -> Dict: | ||||
|         params: Dict = { | ||||
|             "applianceType": appliance.appliance_type, | ||||
|             "code": appliance.info["code"], | ||||
|             "applianceModelId": appliance.appliance_model_id, | ||||
| @ -51,58 +80,66 @@ class HonAPI: | ||||
|             "appVersion": const.APP_VERSION, | ||||
|             "series": appliance.info["series"], | ||||
|         } | ||||
|         url = f"{const.API_URL}/commands/v1/retrieve" | ||||
|         url: str = f"{const.API_URL}/commands/v1/retrieve" | ||||
|         async with self._hon.get(url, params=params) as response: | ||||
|             result = (await response.json()).get("payload", {}) | ||||
|             result: Dict = (await response.json()).get("payload", {}) | ||||
|             if not result or result.pop("resultCode") != "0": | ||||
|                 return {} | ||||
|             return result | ||||
|  | ||||
|     async def command_history(self, appliance: HonAppliance): | ||||
|         url = f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history" | ||||
|     async def command_history(self, appliance: HonAppliance) -> Dict: | ||||
|         url: str = ( | ||||
|             f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history" | ||||
|         ) | ||||
|         async with self._hon.get(url) as response: | ||||
|             result = await response.json() | ||||
|             result: Dict = await response.json() | ||||
|             if not result or not result.get("payload"): | ||||
|                 return {} | ||||
|             return result["payload"]["history"] | ||||
|  | ||||
|     async def last_activity(self, appliance: HonAppliance): | ||||
|         url = f"{const.API_URL}/commands/v1/retrieve-last-activity" | ||||
|         params = {"macAddress": appliance.mac_address} | ||||
|     async def last_activity(self, appliance: HonAppliance) -> Dict: | ||||
|         url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity" | ||||
|         params: Dict = {"macAddress": appliance.mac_address} | ||||
|         async with self._hon.get(url, params=params) as response: | ||||
|             result = await response.json() | ||||
|             result: Dict = await response.json() | ||||
|             if result and (activity := result.get("attributes")): | ||||
|                 return activity | ||||
|         return {} | ||||
|  | ||||
|     async def load_attributes(self, appliance: HonAppliance): | ||||
|         params = { | ||||
|     async def load_attributes(self, appliance: HonAppliance) -> Dict: | ||||
|         params: Dict = { | ||||
|             "macAddress": appliance.mac_address, | ||||
|             "applianceType": appliance.appliance_type, | ||||
|             "category": "CYCLE", | ||||
|         } | ||||
|         url = f"{const.API_URL}/commands/v1/context" | ||||
|         url: str = f"{const.API_URL}/commands/v1/context" | ||||
|         async with self._hon.get(url, params=params) as response: | ||||
|             return (await response.json()).get("payload", {}) | ||||
|  | ||||
|     async def load_statistics(self, appliance: HonAppliance): | ||||
|         params = { | ||||
|     async def load_statistics(self, appliance: HonAppliance) -> Dict: | ||||
|         params: Dict = { | ||||
|             "macAddress": appliance.mac_address, | ||||
|             "applianceType": appliance.appliance_type, | ||||
|         } | ||||
|         url = f"{const.API_URL}/commands/v1/statistics" | ||||
|         url: str = f"{const.API_URL}/commands/v1/statistics" | ||||
|         async with self._hon.get(url, params=params) as response: | ||||
|             return (await response.json()).get("payload", {}) | ||||
|  | ||||
|     async def send_command(self, appliance, command, parameters, ancillary_parameters): | ||||
|         now = datetime.utcnow().isoformat() | ||||
|         data = { | ||||
|     async def send_command( | ||||
|         self, | ||||
|         appliance: HonAppliance, | ||||
|         command: str, | ||||
|         parameters: Dict, | ||||
|         ancillary_parameters: Dict, | ||||
|     ) -> bool: | ||||
|         now: str = datetime.utcnow().isoformat() | ||||
|         data: Dict = { | ||||
|             "macAddress": appliance.mac_address, | ||||
|             "timestamp": f"{now[:-3]}Z", | ||||
|             "commandName": command, | ||||
|             "transactionId": f"{appliance.mac_address}_{now[:-3]}Z", | ||||
|             "applianceOptions": appliance.commands_options, | ||||
|             "appliance": self._hon.device.get(), | ||||
|             "device": self._hon.device.get(mobile=True), | ||||
|             "attributes": { | ||||
|                 "channel": "mobileApp", | ||||
|                 "origin": "standardProgram", | ||||
| @ -112,36 +149,37 @@ class HonAPI: | ||||
|             "parameters": parameters, | ||||
|             "applianceType": appliance.appliance_type, | ||||
|         } | ||||
|         url = f"{const.API_URL}/commands/v1/send" | ||||
|         async with self._hon.post(url, json=data) as resp: | ||||
|             json_data = await resp.json() | ||||
|         url: str = f"{const.API_URL}/commands/v1/send" | ||||
|         async with self._hon.post(url, json=data) as response: | ||||
|             json_data: Dict = await response.json() | ||||
|             if json_data.get("payload", {}).get("resultCode") == "0": | ||||
|                 return True | ||||
|             _LOGGER.error(await response.text()) | ||||
|         return False | ||||
|  | ||||
|     async def appliance_configuration(self): | ||||
|         url = f"{const.API_URL}/config/v1/appliance-configuration" | ||||
|     async def appliance_configuration(self) -> Dict: | ||||
|         url: str = f"{const.API_URL}/config/v1/appliance-configuration" | ||||
|         async with self._hon_anonymous.get(url) as response: | ||||
|             result = await response.json() | ||||
|             result: Dict = await response.json() | ||||
|             if result and (data := result.get("payload")): | ||||
|                 return data | ||||
|         return {} | ||||
|  | ||||
|     async def app_config(self, language="en", beta=True): | ||||
|         url = f"{const.API_URL}/app-config" | ||||
|         payload = { | ||||
|     async def app_config(self, language: str = "en", beta: bool = True) -> Dict: | ||||
|         url: str = f"{const.API_URL}/app-config" | ||||
|         payload_data: Dict = { | ||||
|             "languageCode": language, | ||||
|             "beta": beta, | ||||
|             "appVersion": const.APP_VERSION, | ||||
|             "os": const.OS, | ||||
|         } | ||||
|         payload = json.dumps(payload, separators=(",", ":")) | ||||
|         payload: str = json.dumps(payload_data, separators=(",", ":")) | ||||
|         async with self._hon_anonymous.post(url, data=payload) as response: | ||||
|             if (result := await response.json()) and (data := result.get("payload")): | ||||
|                 return data | ||||
|         return {} | ||||
|  | ||||
|     async def translation_keys(self, language="en"): | ||||
|     async def translation_keys(self, language: str = "en") -> Dict: | ||||
|         config = await self.app_config(language=language) | ||||
|         if url := config.get("language", {}).get("jsonPath"): | ||||
|             async with self._hon_anonymous.get(url) as response: | ||||
| @ -149,8 +187,8 @@ class HonAPI: | ||||
|                     return result | ||||
|         return {} | ||||
|  | ||||
|     async def close(self): | ||||
|         if self._hon: | ||||
|             await self._hon.close() | ||||
|         if self._hon_anonymous: | ||||
|             await self._hon_anonymous.close() | ||||
|     async def close(self) -> None: | ||||
|         if self._hon_handler is not None: | ||||
|             await self._hon_handler.close() | ||||
|         if self._hon_anonymous_handler is not None: | ||||
|             await self._hon_anonymous_handler.close() | ||||
|  | ||||
| @ -2,19 +2,24 @@ import json | ||||
| import logging | ||||
| import re | ||||
| import secrets | ||||
| import sys | ||||
| import urllib | ||||
| from datetime import datetime, timedelta | ||||
| from pprint import pformat | ||||
| from typing import List, Tuple | ||||
| from urllib import parse | ||||
| from urllib.parse import quote | ||||
|  | ||||
| from yarl import URL | ||||
|  | ||||
| from pyhon import const | ||||
| from pyhon import const, exceptions | ||||
|  | ||||
| _LOGGER = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| class HonAuth: | ||||
|     _TOKEN_EXPIRES_AFTER_HOURS = 8 | ||||
|     _TOKEN_EXPIRE_WARNING_HOURS = 7 | ||||
|  | ||||
|     def __init__(self, session, email, password, device) -> None: | ||||
|         self._session = session | ||||
|         self._email = email | ||||
| @ -24,6 +29,8 @@ class HonAuth: | ||||
|         self._cognito_token = "" | ||||
|         self._id_token = "" | ||||
|         self._device = device | ||||
|         self._called_urls: List[Tuple[int, str]] = [] | ||||
|         self._expires: datetime = datetime.utcnow() | ||||
|  | ||||
|     @property | ||||
|     def cognito_token(self): | ||||
| @ -41,6 +48,27 @@ class HonAuth: | ||||
|     def refresh_token(self): | ||||
|         return self._refresh_token | ||||
|  | ||||
|     def _check_token_expiration(self, hours): | ||||
|         return datetime.utcnow() >= self._expires + timedelta(hours=hours) | ||||
|  | ||||
|     @property | ||||
|     def token_is_expired(self) -> bool: | ||||
|         return self._check_token_expiration(self._TOKEN_EXPIRES_AFTER_HOURS) | ||||
|  | ||||
|     @property | ||||
|     def token_expires_soon(self) -> bool: | ||||
|         return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS) | ||||
|  | ||||
|     async def _error_logger(self, response, fail=True): | ||||
|         result = "hOn Authentication Error\n" | ||||
|         for i, (status, url) in enumerate(self._called_urls): | ||||
|             result += f" {i + 1: 2d}     {status} - {url}\n" | ||||
|         result += f"ERROR - {response.status} - {response.request_info.url}\n" | ||||
|         result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}" | ||||
|         _LOGGER.error(result) | ||||
|         if fail: | ||||
|             raise exceptions.HonAuthenticationError("Can't login") | ||||
|  | ||||
|     async def _load_login(self): | ||||
|         nonce = secrets.token_hex(16) | ||||
|         nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" | ||||
| @ -58,22 +86,34 @@ class HonAuth: | ||||
|         async with self._session.get( | ||||
|             f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" | ||||
|         ) as response: | ||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) | ||||
|             if not (login_url := re.findall("url = '(.+?)'", await response.text())): | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             text = await response.text() | ||||
|             self._expires = datetime.utcnow() | ||||
|             if not (login_url := re.findall("url = '(.+?)'", text)): | ||||
|                 if "oauth/done#access_token=" in text: | ||||
|                     self._parse_token_data(text) | ||||
|                     raise exceptions.HonNoAuthenticationNeeded() | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|         async with self._session.get(login_url[0], allow_redirects=False) as redirect1: | ||||
|             _LOGGER.debug("%s - %s", redirect1.status, redirect1.request_info.url) | ||||
|             self._called_urls.append((redirect1.status, redirect1.request_info.url)) | ||||
|             if not (url := redirect1.headers.get("Location")): | ||||
|                 await self._error_logger(redirect1) | ||||
|                 return False | ||||
|         async with self._session.get(url, allow_redirects=False) as redirect2: | ||||
|             _LOGGER.debug("%s - %s", redirect2.status, redirect2.request_info.url) | ||||
|             self._called_urls.append((redirect2.status, redirect2.request_info.url)) | ||||
|             if not ( | ||||
|                 url := redirect2.headers.get("Location") | ||||
|                 + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" | ||||
|             ): | ||||
|                 await self._error_logger(redirect2) | ||||
|                 return False | ||||
|         async with self._session.get(URL(url, encoded=True)) as login_screen: | ||||
|             _LOGGER.debug("%s - %s", login_screen.status, login_screen.request_info.url) | ||||
|         async with self._session.get( | ||||
|             URL(url, encoded=True), headers={"user-agent": const.USER_AGENT} | ||||
|         ) as login_screen: | ||||
|             self._called_urls.append( | ||||
|                 (login_screen.status, login_screen.request_info.url) | ||||
|             ) | ||||
|             if context := re.findall( | ||||
|                 '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() | ||||
|             ): | ||||
| @ -83,6 +123,7 @@ class HonAuth: | ||||
|                     "/".join(const.AUTH_API.split("/")[:-1]), "" | ||||
|                 ) | ||||
|                 return fw_uid, loaded, login_url | ||||
|             await self._error_logger(login_screen) | ||||
|         return False | ||||
|  | ||||
|     async def _login(self, fw_uid, loaded, login_url): | ||||
| @ -94,8 +135,8 @@ class HonAuth: | ||||
|                         "descriptor": "apex://LightningLoginCustomController/ACTION$login", | ||||
|                         "callingDescriptor": "markup://c:loginForm", | ||||
|                         "params": { | ||||
|                             "username": self._email, | ||||
|                             "password": self._password, | ||||
|                             "username": quote(self._email), | ||||
|                             "password": quote(self._password), | ||||
|                             "startUrl": parse.unquote( | ||||
|                                 login_url.split("startURL=")[-1] | ||||
|                             ).split("%3D")[0], | ||||
| @ -122,7 +163,7 @@ class HonAuth: | ||||
|             data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), | ||||
|             params=params, | ||||
|         ) as response: | ||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             if response.status == 200: | ||||
|                 try: | ||||
|                     data = await response.json() | ||||
| @ -133,69 +174,72 @@ class HonAuth: | ||||
|                     _LOGGER.error( | ||||
|                         "Can't get login url - %s", pformat(await response.json()) | ||||
|                     ) | ||||
|             _LOGGER.error( | ||||
|                 "Unable to login: %s\n%s", response.status, await response.text() | ||||
|             ) | ||||
|             await self._error_logger(response) | ||||
|             return "" | ||||
|  | ||||
|     async def _get_token(self, url): | ||||
|         async with self._session.get(url) as response: | ||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) | ||||
|             if response.status != 200: | ||||
|                 _LOGGER.error("Unable to get token: %s", response.status) | ||||
|                 return False | ||||
|             url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text()) | ||||
|         if not url: | ||||
|             _LOGGER.error("Can't get login url - \n%s", await response.text()) | ||||
|             raise PermissionError | ||||
|         if "ProgressiveLogin" in url[0]: | ||||
|             async with self._session.get(url[0]) as response: | ||||
|                 _LOGGER.debug("%s - %s", response.status, response.request_info.url) | ||||
|                 if response.status != 200: | ||||
|                     _LOGGER.error("Unable to get token: %s", response.status) | ||||
|                     return False | ||||
|                 url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text()) | ||||
|         url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] | ||||
|         async with self._session.get(url) as response: | ||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) | ||||
|             if response.status != 200: | ||||
|                 _LOGGER.error( | ||||
|                     "Unable to connect to the login service: %s", response.status | ||||
|                 ) | ||||
|                 return False | ||||
|             text = await response.text() | ||||
|     def _parse_token_data(self, text): | ||||
|         if access_token := re.findall("access_token=(.*?)&", text): | ||||
|             self._access_token = access_token[0] | ||||
|         if refresh_token := re.findall("refresh_token=(.*?)&", text): | ||||
|             self._refresh_token = refresh_token[0] | ||||
|         if id_token := re.findall("id_token=(.*?)&", text): | ||||
|             self._id_token = id_token[0] | ||||
|  | ||||
|     async def _get_token(self, url): | ||||
|         async with self._session.get(url) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             if response.status != 200: | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|             url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text()) | ||||
|             if not url: | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|         if "ProgressiveLogin" in url[0]: | ||||
|             async with self._session.get(url[0]) as response: | ||||
|                 self._called_urls.append((response.status, response.request_info.url)) | ||||
|                 if response.status != 200: | ||||
|                     await self._error_logger(response) | ||||
|                     return False | ||||
|                 url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text()) | ||||
|         url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] | ||||
|         async with self._session.get(url) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             if response.status != 200: | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|             self._parse_token_data(await response.text()) | ||||
|         return True | ||||
|  | ||||
|     async def authorize(self): | ||||
|         if login_site := await self._load_login(): | ||||
|             fw_uid, loaded, login_url = login_site | ||||
|         else: | ||||
|             return False | ||||
|         if not (url := await self._login(fw_uid, loaded, login_url)): | ||||
|             return False | ||||
|         if not await self._get_token(url): | ||||
|             return False | ||||
|  | ||||
|     async def _api_auth(self): | ||||
|         post_headers = {"id-token": self._id_token} | ||||
|         data = self._device.get() | ||||
|         async with self._session.post( | ||||
|             f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data | ||||
|         ) as response: | ||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             try: | ||||
|                 json_data = await response.json() | ||||
|             except json.JSONDecodeError: | ||||
|                 _LOGGER.error("No JSON Data after POST: %s", await response.text()) | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|             self._cognito_token = json_data["cognitoUser"]["Token"] | ||||
|         return True | ||||
|  | ||||
|     async def authenticate(self): | ||||
|         self.clear() | ||||
|         try: | ||||
|             if not (login_site := await self._load_login()): | ||||
|                 raise exceptions.HonAuthenticationError("Can't open login page") | ||||
|             if not (url := await self._login(*login_site)): | ||||
|                 raise exceptions.HonAuthenticationError("Can't login") | ||||
|             if not await self._get_token(url): | ||||
|                 raise exceptions.HonAuthenticationError("Can't get token") | ||||
|             if not await self._api_auth(): | ||||
|                 raise exceptions.HonAuthenticationError("Can't get api token") | ||||
|         except exceptions.HonNoAuthenticationNeeded: | ||||
|             return | ||||
|  | ||||
|     async def refresh(self): | ||||
|         params = { | ||||
|             "client_id": const.CLIENT_ID, | ||||
| @ -205,10 +249,20 @@ class HonAuth: | ||||
|         async with self._session.post( | ||||
|             f"{const.AUTH_API}/services/oauth2/token", params=params | ||||
|         ) as response: | ||||
|             _LOGGER.debug("%s - %s", response.status, response.request_info.url) | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             if response.status >= 400: | ||||
|                 await self._error_logger(response, fail=False) | ||||
|                 return False | ||||
|             data = await response.json() | ||||
|         self._expires = datetime.utcnow() | ||||
|         self._id_token = data["id_token"] | ||||
|         self._access_token = data["access_token"] | ||||
|         return True | ||||
|         return await self._api_auth() | ||||
|  | ||||
|     def clear(self): | ||||
|         self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2]) | ||||
|         self._called_urls = [] | ||||
|         self._cognito_token = "" | ||||
|         self._id_token = "" | ||||
|         self._access_token = "" | ||||
|         self._refresh_token = "" | ||||
|  | ||||
| @ -1,41 +1,43 @@ | ||||
| import secrets | ||||
| from typing import Dict | ||||
|  | ||||
| from pyhon import const | ||||
|  | ||||
|  | ||||
| class HonDevice: | ||||
|     def __init__(self): | ||||
|         self._app_version = const.APP_VERSION | ||||
|         self._os_version = const.OS_VERSION | ||||
|         self._os = const.OS | ||||
|         self._device_model = const.DEVICE_MODEL | ||||
|         self._mobile_id = secrets.token_hex(8) | ||||
|     def __init__(self) -> None: | ||||
|         self._app_version: str = const.APP_VERSION | ||||
|         self._os_version: int = const.OS_VERSION | ||||
|         self._os: str = const.OS | ||||
|         self._device_model: str = const.DEVICE_MODEL | ||||
|         self._mobile_id: str = secrets.token_hex(8) | ||||
|  | ||||
|     @property | ||||
|     def app_version(self): | ||||
|     def app_version(self) -> str: | ||||
|         return self._app_version | ||||
|  | ||||
|     @property | ||||
|     def os_version(self): | ||||
|     def os_version(self) -> int: | ||||
|         return self._os_version | ||||
|  | ||||
|     @property | ||||
|     def os(self): | ||||
|     def os(self) -> str: | ||||
|         return self._os | ||||
|  | ||||
|     @property | ||||
|     def device_model(self): | ||||
|     def device_model(self) -> str: | ||||
|         return self._device_model | ||||
|  | ||||
|     @property | ||||
|     def mobile_id(self): | ||||
|     def mobile_id(self) -> str: | ||||
|         return self._mobile_id | ||||
|  | ||||
|     def get(self): | ||||
|         return { | ||||
|     def get(self, mobile: bool = False) -> Dict: | ||||
|         result = { | ||||
|             "appVersion": self.app_version, | ||||
|             "mobileId": self.mobile_id, | ||||
|             "osVersion": self.os_version, | ||||
|             "os": self.os, | ||||
|             "osVersion": self.os_version, | ||||
|             "deviceModel": self.device_model, | ||||
|         } | ||||
|         return (result | {"mobileOs": result.pop("os")}) if mobile else result | ||||
|  | ||||
| @ -1,90 +1,117 @@ | ||||
| import json | ||||
| from collections.abc import Generator, AsyncIterator, Coroutine | ||||
| from contextlib import asynccontextmanager | ||||
| from typing import Optional, Callable, Dict | ||||
| from typing_extensions import Self | ||||
|  | ||||
| import aiohttp | ||||
|  | ||||
| from pyhon import const | ||||
| from pyhon import const, exceptions | ||||
| from pyhon.connection.auth import HonAuth, _LOGGER | ||||
| from pyhon.connection.device import HonDevice | ||||
| from pyhon.exceptions import HonAuthenticationError | ||||
|  | ||||
|  | ||||
| class HonBaseConnectionHandler: | ||||
|     _HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"} | ||||
|     _HEADERS: Dict = { | ||||
|         "user-agent": const.USER_AGENT, | ||||
|         "Content-Type": "application/json", | ||||
|     } | ||||
|  | ||||
|     def __init__(self, session=None): | ||||
|         self._session = session | ||||
|         self._auth = None | ||||
|     def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: | ||||
|         self._create_session: bool = session is None | ||||
|         self._session: Optional[aiohttp.ClientSession] = session | ||||
|         self._auth: Optional[HonAuth] = None | ||||
|  | ||||
|     async def __aenter__(self): | ||||
|     async def __aenter__(self) -> Self: | ||||
|         return await self.create() | ||||
|  | ||||
|     async def __aexit__(self, exc_type, exc_val, exc_tb): | ||||
|     async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: | ||||
|         await self.close() | ||||
|  | ||||
|     async def create(self): | ||||
|         self._session = aiohttp.ClientSession(headers=self._HEADERS) | ||||
|     @property | ||||
|     def auth(self) -> Optional[HonAuth]: | ||||
|         return self._auth | ||||
|  | ||||
|     async def create(self) -> Self: | ||||
|         if self._create_session: | ||||
|             self._session = aiohttp.ClientSession() | ||||
|         return self | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def _intercept(self, method, *args, loop=0, **kwargs): | ||||
|     def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs): | ||||
|         raise NotImplementedError | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def get(self, *args, **kwargs): | ||||
|     async def get(self, *args, **kwargs) -> AsyncIterator[Callable]: | ||||
|         if self._session is None: | ||||
|             raise exceptions.NoSessionException() | ||||
|         response: Callable | ||||
|         async with self._intercept(self._session.get, *args, **kwargs) as response: | ||||
|             yield response | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def post(self, *args, **kwargs): | ||||
|     async def post(self, *args, **kwargs) -> AsyncIterator[Callable]: | ||||
|         if self._session is None: | ||||
|             raise exceptions.NoSessionException() | ||||
|         response: Callable | ||||
|         async with self._intercept(self._session.post, *args, **kwargs) as response: | ||||
|             yield response | ||||
|  | ||||
|     async def close(self): | ||||
|         await self._session.close() | ||||
|     async def close(self) -> None: | ||||
|         if self._create_session and self._session is not None: | ||||
|             await self._session.close() | ||||
|  | ||||
|  | ||||
| class HonConnectionHandler(HonBaseConnectionHandler): | ||||
|     def __init__(self, email, password, session=None): | ||||
|     def __init__( | ||||
|         self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None | ||||
|     ) -> None: | ||||
|         super().__init__(session=session) | ||||
|         self._device = HonDevice() | ||||
|         self._email = email | ||||
|         self._password = password | ||||
|         self._device: HonDevice = HonDevice() | ||||
|         self._email: str = email | ||||
|         self._password: str = password | ||||
|         if not self._email: | ||||
|             raise PermissionError("Login-Error - An email address must be specified") | ||||
|             raise HonAuthenticationError("An email address must be specified") | ||||
|         if not self._password: | ||||
|             raise PermissionError("Login-Error - A password address must be specified") | ||||
|         self._request_headers = {} | ||||
|             raise HonAuthenticationError("A password address must be specified") | ||||
|  | ||||
|     @property | ||||
|     def device(self): | ||||
|     def device(self) -> HonDevice: | ||||
|         return self._device | ||||
|  | ||||
|     async def create(self): | ||||
|     async def create(self) -> Self: | ||||
|         await super().create() | ||||
|         self._auth = HonAuth(self._session, self._email, self._password, self._device) | ||||
|         self._auth: HonAuth = HonAuth( | ||||
|             self._session, self._email, self._password, self._device | ||||
|         ) | ||||
|         return self | ||||
|  | ||||
|     async def _check_headers(self, headers): | ||||
|         if ( | ||||
|             "cognito-token" not in self._request_headers | ||||
|             or "id-token" not in self._request_headers | ||||
|         ): | ||||
|             if await self._auth.authorize(): | ||||
|                 self._request_headers["cognito-token"] = self._auth.cognito_token | ||||
|                 self._request_headers["id-token"] = self._auth.id_token | ||||
|             else: | ||||
|                 raise PermissionError("Can't Login") | ||||
|         return {h: v for h, v in self._request_headers.items() if h not in headers} | ||||
|     async def _check_headers(self, headers: Dict) -> Dict: | ||||
|         if not (self._auth.cognito_token and self._auth.id_token): | ||||
|             await self._auth.authenticate() | ||||
|         headers["cognito-token"] = self._auth.cognito_token | ||||
|         headers["id-token"] = self._auth.id_token | ||||
|         return self._HEADERS | headers | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def _intercept(self, method, *args, loop=0, **kwargs): | ||||
|     async def _intercept( | ||||
|         self, method: Callable, *args, loop: int = 0, **kwargs | ||||
|     ) -> AsyncIterator: | ||||
|         kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) | ||||
|         async with method(*args, **kwargs) as response: | ||||
|             if response.status == 403 and not loop: | ||||
|             if ( | ||||
|                 self._auth.token_expires_soon or response.status in [401, 403] | ||||
|             ) and loop == 0: | ||||
|                 _LOGGER.info("Try refreshing token...") | ||||
|                 await self._auth.refresh() | ||||
|                 yield await self._intercept(method, *args, loop=loop + 1, **kwargs) | ||||
|             elif response.status == 403 and loop < 2: | ||||
|                 async with self._intercept( | ||||
|                     method, *args, loop=loop + 1, **kwargs | ||||
|                 ) as result: | ||||
|                     yield result | ||||
|             elif ( | ||||
|                 self._auth.token_is_expired or response.status in [401, 403] | ||||
|             ) and loop == 1: | ||||
|                 _LOGGER.warning( | ||||
|                     "%s - Error %s - %s", | ||||
|                     response.request_info.url, | ||||
| @ -92,7 +119,10 @@ class HonConnectionHandler(HonBaseConnectionHandler): | ||||
|                     await response.text(), | ||||
|                 ) | ||||
|                 await self.create() | ||||
|                 yield await self._intercept(method, *args, loop=loop + 1, **kwargs) | ||||
|                 async with self._intercept( | ||||
|                     method, *args, loop=loop + 1, **kwargs | ||||
|                 ) as result: | ||||
|                     yield result | ||||
|             elif loop >= 2: | ||||
|                 _LOGGER.error( | ||||
|                     "%s - Error %s - %s", | ||||
| @ -100,7 +130,7 @@ class HonConnectionHandler(HonBaseConnectionHandler): | ||||
|                     response.status, | ||||
|                     await response.text(), | ||||
|                 ) | ||||
|                 raise PermissionError("Login failure") | ||||
|                 raise HonAuthenticationError("Login failure") | ||||
|             else: | ||||
|                 try: | ||||
|                     await response.json() | ||||
| @ -112,16 +142,18 @@ class HonConnectionHandler(HonBaseConnectionHandler): | ||||
|                         response.status, | ||||
|                         await response.text(), | ||||
|                     ) | ||||
|                     yield {} | ||||
|                     raise HonAuthenticationError("Decode Error") | ||||
|  | ||||
|  | ||||
| class HonAnonymousConnectionHandler(HonBaseConnectionHandler): | ||||
|     _HEADERS = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} | ||||
|     _HEADERS: Dict = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def _intercept(self, method, *args, loop=0, **kwargs): | ||||
|     async def _intercept( | ||||
|         self, method: Callable, *args, loop: int = 0, **kwargs | ||||
|     ) -> AsyncIterator: | ||||
|         kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS | ||||
|         async with method(*args, **kwargs) as response: | ||||
|             if response.status == 403: | ||||
|                 print("Can't authorize") | ||||
|                 _LOGGER.error("Can't authenticate anymore") | ||||
|             yield response | ||||
|  | ||||
							
								
								
									
										14
									
								
								pyhon/exceptions.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										14
									
								
								pyhon/exceptions.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,14 @@ | ||||
| class HonAuthenticationError(Exception): | ||||
|     pass | ||||
|  | ||||
|  | ||||
| class HonNoAuthenticationNeeded(Exception): | ||||
|     pass | ||||
|  | ||||
|  | ||||
| class NoSessionException(Exception): | ||||
|     pass | ||||
|  | ||||
|  | ||||
| class NoAuthenticationException(Exception): | ||||
|     pass | ||||
							
								
								
									
										63
									
								
								pyhon/helper.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								pyhon/helper.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,63 @@ | ||||
| def key_print(data, key="", start=True): | ||||
|     result = "" | ||||
|     if isinstance(data, list): | ||||
|         for i, value in enumerate(data): | ||||
|             result += key_print(value, key=f"{key}.{i}", start=False) | ||||
|     elif isinstance(data, dict): | ||||
|         for k, value in sorted(data.items()): | ||||
|             result += key_print(value, key=k if start else f"{key}.{k}", start=False) | ||||
|     else: | ||||
|         result += f"{key}: {data}\n" | ||||
|     return result | ||||
|  | ||||
|  | ||||
| # yaml.dump() would be done the same, but needs an additional dependency... | ||||
| def pretty_print(data, key="", intend=0, is_list=False, whitespace="  "): | ||||
|     result = "" | ||||
|     if isinstance(data, list): | ||||
|         if key: | ||||
|             result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n" | ||||
|             intend += 1 | ||||
|         for i, value in enumerate(data): | ||||
|             result += pretty_print( | ||||
|                 value, intend=intend, is_list=True, whitespace=whitespace | ||||
|             ) | ||||
|     elif isinstance(data, dict): | ||||
|         if key: | ||||
|             result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n" | ||||
|             intend += 1 | ||||
|         for i, (key, value) in enumerate(sorted(data.items())): | ||||
|             if is_list and not i: | ||||
|                 result += pretty_print( | ||||
|                     value, key=key, intend=intend, is_list=True, whitespace=whitespace | ||||
|                 ) | ||||
|             elif is_list: | ||||
|                 result += pretty_print( | ||||
|                     value, key=key, intend=intend + 1, whitespace=whitespace | ||||
|                 ) | ||||
|             else: | ||||
|                 result += pretty_print( | ||||
|                     value, key=key, intend=intend, whitespace=whitespace | ||||
|                 ) | ||||
|     else: | ||||
|         result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n" | ||||
|     return result | ||||
|  | ||||
|  | ||||
| def create_command(commands, concat=False): | ||||
|     result = {} | ||||
|     for name, command in commands.items(): | ||||
|         if not concat: | ||||
|             result[name] = {} | ||||
|         for parameter, data in command.parameters.items(): | ||||
|             if data.typology == "enum": | ||||
|                 value = data.values | ||||
|             elif data.typology == "range": | ||||
|                 value = {"min": data.min, "max": data.max, "step": data.step} | ||||
|             else: | ||||
|                 continue | ||||
|             if not concat: | ||||
|                 result[name][parameter] = value | ||||
|             else: | ||||
|                 result[f"{name}.{parameter}"] = value | ||||
|     return result | ||||
							
								
								
									
										54
									
								
								pyhon/hon.py
									
									
									
									
									
								
							
							
						
						
									
										54
									
								
								pyhon/hon.py
									
									
									
									
									
								
							| @ -1,17 +1,20 @@ | ||||
| import asyncio | ||||
| from typing import List | ||||
| from typing import List, Optional, Dict | ||||
| from typing_extensions import Self | ||||
|  | ||||
| from pyhon import HonAPI | ||||
| from aiohttp import ClientSession | ||||
|  | ||||
| from pyhon import HonAPI, exceptions | ||||
| from pyhon.appliance import HonAppliance | ||||
|  | ||||
|  | ||||
| class Hon: | ||||
|     def __init__(self, email, password, session=None): | ||||
|         self._email = email | ||||
|         self._password = password | ||||
|         self._session = session | ||||
|         self._appliances = [] | ||||
|         self._api = None | ||||
|     def __init__(self, email: str, password: str, session: ClientSession | None = None): | ||||
|         self._email: str = email | ||||
|         self._password: str = password | ||||
|         self._session: ClientSession | None = session | ||||
|         self._appliances: List[HonAppliance] = [] | ||||
|         self._api: Optional[HonAPI] = None | ||||
|  | ||||
|     async def __aenter__(self): | ||||
|         return await self.create() | ||||
| @ -19,7 +22,13 @@ class Hon: | ||||
|     async def __aexit__(self, exc_type, exc_val, exc_tb): | ||||
|         await self.close() | ||||
|  | ||||
|     async def create(self): | ||||
|     @property | ||||
|     def api(self) -> HonAPI: | ||||
|         if self._api is None: | ||||
|             raise exceptions.NoAuthenticationException | ||||
|         return self._api | ||||
|  | ||||
|     async def create(self) -> Self: | ||||
|         self._api = await HonAPI( | ||||
|             self._email, self._password, session=self._session | ||||
|         ).create() | ||||
| @ -30,19 +39,24 @@ class Hon: | ||||
|     def appliances(self) -> List[HonAppliance]: | ||||
|         return self._appliances | ||||
|  | ||||
|     async def _create_appliance(self, appliance: Dict, zone=0) -> None: | ||||
|         appliance = HonAppliance(self._api, appliance, zone=zone) | ||||
|         if appliance.mac_address is None: | ||||
|             return | ||||
|         await asyncio.gather( | ||||
|             *[ | ||||
|                 appliance.load_attributes(), | ||||
|                 appliance.load_commands(), | ||||
|                 appliance.load_statistics(), | ||||
|             ] | ||||
|         ) | ||||
|         self._appliances.append(appliance) | ||||
|  | ||||
|     async def setup(self): | ||||
|         for appliance in (await self._api.load_appliances())["payload"]["appliances"]: | ||||
|             appliance = HonAppliance(self._api, appliance) | ||||
|             if appliance.mac_address is None: | ||||
|                 continue | ||||
|             await asyncio.gather( | ||||
|                 *[ | ||||
|                     appliance.load_attributes(), | ||||
|                     appliance.load_commands(), | ||||
|                     appliance.load_statistics(), | ||||
|                 ] | ||||
|             ) | ||||
|             self._appliances.append(appliance) | ||||
|             for zone in range(int(appliance.get("zone", "0"))): | ||||
|                 await self._create_appliance(appliance, zone=zone + 1) | ||||
|             await self._create_appliance(appliance) | ||||
|  | ||||
|     async def close(self): | ||||
|         await self._api.close() | ||||
|  | ||||
| @ -5,7 +5,7 @@ def str_to_float(string): | ||||
|     try: | ||||
|         return int(string) | ||||
|     except ValueError: | ||||
|         return float(str(string.replace(",", "."))) | ||||
|         return float(str(string).replace(",", ".")) | ||||
|  | ||||
|  | ||||
| class HonParameter: | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	