Compare commits
	
		
			52 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| ccff32e6c1 | |||
| 22cbd7474a | |||
| dd61b24eed | |||
| ea8f481b01 | |||
| 7dcb34559b | |||
| 5db13a90e7 | |||
| 9ee5dbc956 | |||
| d4c6ccdce3 | |||
| 9594b9ebd8 | |||
| b011d98e07 | |||
| ad864286fc | |||
| 13cff8caa0 | |||
| 5fc6245806 | |||
| 1dad0e14b8 | |||
| b04c601ad6 | |||
| 3ec0f5a006 | |||
| 78bc85132f | |||
| 191928287f | |||
| c0aab8b99d | |||
| b37715d0ca | |||
| 411effd814 | |||
| 04f19c4609 | |||
| a68dcac379 | |||
| 69be63df2a | |||
| 6c44aa895d | |||
| 8372c75e30 | |||
| 40cc0013a5 | |||
| f2aa3dc8fd | |||
| e887371bec | |||
| 5b91747ec1 | |||
| 8da2018302 | |||
| 03187745bf | |||
| 461a247ad3 | |||
| 834f25a639 | |||
| 46ff9be4a2 | |||
| a1618bb18a | |||
| a957d7ac0f | |||
| f54b7b2dbf | |||
| b6ca12ebff | |||
| 4a0ee8569b | |||
| d52d622785 | |||
| 9643f66549 | |||
| d26e33a055 | |||
| 0301427497 | |||
| 272556586e | |||
| e82c14ec99 | |||
| 970b94bfa7 | |||
| 33454f68b8 | |||
| 6b2c60d552 | |||
| 46e6a85e84 | |||
| 8c832b44cd | |||
| 18b0ecdd68 | 
							
								
								
									
										5
									
								
								.github/workflows/python-check.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										5
									
								
								.github/workflows/python-check.yml
									
									
									
									
										vendored
									
									
								
							| @ -25,12 +25,15 @@ jobs: | |||||||
|       run: | |       run: | | ||||||
|         python -m pip install --upgrade pip |         python -m pip install --upgrade pip | ||||||
|         python -m pip install -r requirements.txt |         python -m pip install -r requirements.txt | ||||||
|         python -m pip install flake8 pylint black |         python -m pip install -r requirements_dev.txt | ||||||
|     - name: Lint with flake8 |     - name: Lint with flake8 | ||||||
|       run: | |       run: | | ||||||
|         # stop the build if there are Python syntax errors or undefined names |         # stop the build if there are Python syntax errors or undefined names | ||||||
|         flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics |         flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics | ||||||
|         flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics |         flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics | ||||||
|  |     - name: Type check with mypy | ||||||
|  |       run: | | ||||||
|  |         mypy pyhon/ | ||||||
|     # - name: Analysing the code with pylint |     # - name: Analysing the code with pylint | ||||||
|     #   run: | |     #   run: | | ||||||
|     #     pylint --max-line-length 88 $(git ls-files '*.py') |     #     pylint --max-line-length 88 $(git ls-files '*.py') | ||||||
|  | |||||||
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @ -4,3 +4,4 @@ __pycache__/ | |||||||
| dist/ | dist/ | ||||||
| **/*.egg-info/ | **/*.egg-info/ | ||||||
| test* | test* | ||||||
|  | build/ | ||||||
|  | |||||||
| @ -6,7 +6,7 @@ | |||||||
| [](https://www.python.org/) | [](https://www.python.org/) | ||||||
| [](https://github.com/Andre0512/pyhOn/blob/main/LICENSE) | [](https://github.com/Andre0512/pyhOn/blob/main/LICENSE) | ||||||
| [](https://pypistats.org/packages/pyhon)   | [](https://pypistats.org/packages/pyhon)   | ||||||
| Control your Haier appliances with python! | Control your Haier, Candy and Hoover appliances with python! | ||||||
| The idea behind this library is, to make the use of all available commands as simple as possible. | The idea behind this library is, to make the use of all available commands as simple as possible. | ||||||
|  |  | ||||||
| ## Installation | ## Installation | ||||||
| @ -100,8 +100,6 @@ This generates a huge output. It is recommended to pipe this into a file | |||||||
| $ pyhOn translate fr > hon_fr.yaml | $ pyhOn translate fr > hon_fr.yaml | ||||||
| $ pyhOn translate en --json > hon_en.json | $ pyhOn translate en --json > hon_en.json | ||||||
| ``` | ``` | ||||||
| ## Tested devices |  | ||||||
| - Haier Washing Machine HW90 |  | ||||||
|  |  | ||||||
| ## Usage example | ## Usage example | ||||||
| This library is used for the custom [HomeAssistant Integration "Haier hOn"](https://github.com/Andre0512/hOn). | This library is used for the custom [HomeAssistant Integration "Haier hOn"](https://github.com/Andre0512/hOn). | ||||||
|  | |||||||
| @ -10,7 +10,7 @@ from pathlib import Path | |||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|     sys.path.insert(0, str(Path(__file__).parent.parent)) |     sys.path.insert(0, str(Path(__file__).parent.parent)) | ||||||
|  |  | ||||||
| from pyhon import Hon, HonAPI | from pyhon import Hon, HonAPI, helper | ||||||
|  |  | ||||||
| _LOGGER = logging.getLogger(__name__) | _LOGGER = logging.getLogger(__name__) | ||||||
|  |  | ||||||
| @ -34,61 +34,6 @@ def get_arguments(): | |||||||
|     return vars(parser.parse_args()) |     return vars(parser.parse_args()) | ||||||
|  |  | ||||||
|  |  | ||||||
| # yaml.dump() would be done the same, but needs an additional dependency... |  | ||||||
| def pretty_print(data, key="", intend=0, is_list=False): |  | ||||||
|     if type(data) is list: |  | ||||||
|         if key: |  | ||||||
|             print(f"{'  ' * intend}{'- ' if is_list else ''}{key}:") |  | ||||||
|             intend += 1 |  | ||||||
|         for i, value in enumerate(data): |  | ||||||
|             pretty_print(value, intend=intend, is_list=True) |  | ||||||
|     elif type(data) is dict: |  | ||||||
|         if key: |  | ||||||
|             print(f"{'  ' * intend}{'- ' if is_list else ''}{key}:") |  | ||||||
|             intend += 1 |  | ||||||
|         for i, (key, value) in enumerate(sorted(data.items())): |  | ||||||
|             if is_list and not i: |  | ||||||
|                 pretty_print(value, key=key, intend=intend, is_list=True) |  | ||||||
|             elif is_list: |  | ||||||
|                 pretty_print(value, key=key, intend=intend + 1) |  | ||||||
|             else: |  | ||||||
|                 pretty_print(value, key=key, intend=intend) |  | ||||||
|     else: |  | ||||||
|         print( |  | ||||||
|             f"{'  ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}" |  | ||||||
|         ) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def key_print(data, key="", start=True): |  | ||||||
|     if type(data) is list: |  | ||||||
|         for i, value in enumerate(data): |  | ||||||
|             key_print(value, key=f"{key}.{i}", start=False) |  | ||||||
|     elif type(data) is dict: |  | ||||||
|         for k, value in sorted(data.items()): |  | ||||||
|             key_print(value, key=k if start else f"{key}.{k}", start=False) |  | ||||||
|     else: |  | ||||||
|         print(f"{key}: {data}") |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def create_command(commands, concat=False): |  | ||||||
|     result = {} |  | ||||||
|     for name, command in commands.items(): |  | ||||||
|         if not concat: |  | ||||||
|             result[name] = {} |  | ||||||
|         for parameter, data in command.parameters.items(): |  | ||||||
|             if data.typology == "enum": |  | ||||||
|                 value = data.values |  | ||||||
|             elif data.typology == "range": |  | ||||||
|                 value = {"min": data.min, "max": data.max, "step": data.step} |  | ||||||
|             else: |  | ||||||
|                 continue |  | ||||||
|             if not concat: |  | ||||||
|                 result[name][parameter] = value |  | ||||||
|             else: |  | ||||||
|                 result[f"{name}.{parameter}"] = value |  | ||||||
|     return result |  | ||||||
|  |  | ||||||
|  |  | ||||||
| async def translate(language, json_output=False): | async def translate(language, json_output=False): | ||||||
|     async with HonAPI(anonymous=True) as hon: |     async with HonAPI(anonymous=True) as hon: | ||||||
|         keys = await hon.translation_keys(language) |         keys = await hon.translation_keys(language) | ||||||
| @ -102,7 +47,7 @@ async def translate(language, json_output=False): | |||||||
|             .replace("\\r", "") |             .replace("\\r", "") | ||||||
|         ) |         ) | ||||||
|         keys = json.loads(clean_keys) |         keys = json.loads(clean_keys) | ||||||
|         pretty_print(keys) |         print(helper.pretty_print(keys)) | ||||||
|  |  | ||||||
|  |  | ||||||
| async def main(): | async def main(): | ||||||
| @ -120,13 +65,20 @@ async def main(): | |||||||
|             if args.get("keys"): |             if args.get("keys"): | ||||||
|                 data = device.data.copy() |                 data = device.data.copy() | ||||||
|                 attr = "get" if args.get("all") else "pop" |                 attr = "get" if args.get("all") else "pop" | ||||||
|                 key_print(data["attributes"].__getattribute__(attr)("parameters")) |                 print( | ||||||
|                 key_print(data.__getattribute__(attr)("appliance")) |                     helper.key_print( | ||||||
|                 key_print(data) |                         data["attributes"].__getattribute__(attr)("parameters") | ||||||
|                 pretty_print(create_command(device.commands, concat=True)) |                     ) | ||||||
|  |                 ) | ||||||
|  |                 print(helper.key_print(data.__getattribute__(attr)("appliance"))) | ||||||
|  |                 print(helper.key_print(data)) | ||||||
|  |                 print( | ||||||
|  |                     helper.pretty_print( | ||||||
|  |                         helper.create_command(device.commands, concat=True) | ||||||
|  |                     ) | ||||||
|  |                 ) | ||||||
|             else: |             else: | ||||||
|                 pretty_print({"data": device.data}) |                 print(device.diagnose("  ")) | ||||||
|                 pretty_print({"settings": create_command(device.commands)}) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def start(): | def start(): | ||||||
|  | |||||||
| @ -1,39 +1,57 @@ | |||||||
| import importlib | import importlib | ||||||
|  | import logging | ||||||
| from contextlib import suppress | from contextlib import suppress | ||||||
|  | from datetime import datetime, timedelta | ||||||
|  | from typing import Optional, Dict, Any | ||||||
|  | from typing import TYPE_CHECKING | ||||||
|  |  | ||||||
|  | from pyhon import helper, exceptions | ||||||
| from pyhon.commands import HonCommand | from pyhon.commands import HonCommand | ||||||
| from pyhon.parameter import HonParameterFixed | from pyhon.parameter.fixed import HonParameterFixed | ||||||
|  |  | ||||||
|  | if TYPE_CHECKING: | ||||||
|  |     from pyhon import HonAPI | ||||||
|  |  | ||||||
|  | _LOGGER = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonAppliance: | class HonAppliance: | ||||||
|     def __init__(self, api, info): |     _MINIMAL_UPDATE_INTERVAL = 5  # seconds | ||||||
|  |  | ||||||
|  |     def __init__( | ||||||
|  |         self, api: Optional["HonAPI"], info: Dict[str, Any], zone: int = 0 | ||||||
|  |     ) -> None: | ||||||
|         if attributes := info.get("attributes"): |         if attributes := info.get("attributes"): | ||||||
|             info["attributes"] = {v["parName"]: v["parValue"] for v in attributes} |             info["attributes"] = {v["parName"]: v["parValue"] for v in attributes} | ||||||
|         self._info = info |         self._info: Dict = info | ||||||
|         self._api = api |         self._api: Optional[HonAPI] = api | ||||||
|         self._appliance_model = {} |         self._appliance_model: Dict = {} | ||||||
|  |  | ||||||
|         self._commands = {} |         self._commands: Dict = {} | ||||||
|         self._statistics = {} |         self._statistics: Dict = {} | ||||||
|         self._attributes = {} |         self._attributes: Dict = {} | ||||||
|  |         self._zone: int = zone | ||||||
|  |         self._additional_data: Dict[str, Any] = {} | ||||||
|  |         self._last_update = None | ||||||
|  |  | ||||||
|         try: |         try: | ||||||
|             self._extra = importlib.import_module( |             self._extra = importlib.import_module( | ||||||
|                 f"pyhon.appliances.{self.appliance_type.lower()}" |                 f"pyhon.appliances.{self.appliance_type.lower()}" | ||||||
|             ).Appliance() |             ).Appliance(self) | ||||||
|         except ModuleNotFoundError: |         except ModuleNotFoundError: | ||||||
|             self._extra = None |             self._extra = None | ||||||
|  |  | ||||||
|     def __getitem__(self, item): |     def __getitem__(self, item): | ||||||
|  |         if self._zone: | ||||||
|  |             item += f"Z{self._zone}" | ||||||
|         if "." in item: |         if "." in item: | ||||||
|             result = self.data |             result = self.data | ||||||
|             for key in item.split("."): |             for key in item.split("."): | ||||||
|                 if all([k in "0123456789" for k in key]) and type(result) is list: |                 if all(k in "0123456789" for k in key) and isinstance(result, list): | ||||||
|                     result = result[int(key)] |                     result = result[int(key)] | ||||||
|                 else: |                 else: | ||||||
|                     result = result[key] |                     result = result[key] | ||||||
|             return result |             return result | ||||||
|         else: |  | ||||||
|         if item in self.data: |         if item in self.data: | ||||||
|             return self.data[item] |             return self.data[item] | ||||||
|         if item in self.attributes["parameters"]: |         if item in self.attributes["parameters"]: | ||||||
| @ -46,25 +64,35 @@ class HonAppliance: | |||||||
|         except (KeyError, IndexError): |         except (KeyError, IndexError): | ||||||
|             return default |             return default | ||||||
|  |  | ||||||
|     @property |     def _check_name_zone(self, name: str, frontend: bool = True) -> str: | ||||||
|     def appliance_model_id(self): |         middle = " Z" if frontend else "_z" | ||||||
|         return self._info.get("applianceModelId") |         if (attribute := self._info.get(name, "")) and self._zone: | ||||||
|  |             return f"{attribute}{middle}{self._zone}" | ||||||
|  |         return attribute | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def appliance_type(self): |     def appliance_model_id(self) -> str: | ||||||
|         return self._info.get("applianceTypeName") |         return self._info.get("applianceModelId", "") | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def mac_address(self): |     def appliance_type(self) -> str: | ||||||
|         return self._info.get("macAddress") |         return self._info.get("applianceTypeName", "") | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def model_name(self): |     def mac_address(self) -> str: | ||||||
|         return self._info.get("modelName") |         return self.info.get("macAddress", "") | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def nick_name(self): |     def unique_id(self) -> str: | ||||||
|         return self._info.get("nickName") |         return self._check_name_zone("macAddress", frontend=False) | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def model_name(self) -> str: | ||||||
|  |         return self._check_name_zone("modelName") | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def nick_name(self) -> str: | ||||||
|  |         return self._check_name_zone("nickName") | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def commands_options(self): |     def commands_options(self): | ||||||
| @ -86,9 +114,23 @@ class HonAppliance: | |||||||
|     def info(self): |     def info(self): | ||||||
|         return self._info |         return self._info | ||||||
|  |  | ||||||
|     async def _recover_last_command_states(self, commands): |     @property | ||||||
|         command_history = await self._api.command_history(self) |     def additional_data(self): | ||||||
|         for name, command in commands.items(): |         return self._additional_data | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def zone(self) -> int: | ||||||
|  |         return self._zone | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def api(self) -> "HonAPI": | ||||||
|  |         if self._api is None: | ||||||
|  |             raise exceptions.NoAuthenticationException | ||||||
|  |         return self._api | ||||||
|  |  | ||||||
|  |     async def _recover_last_command_states(self): | ||||||
|  |         command_history = await self.api.command_history(self) | ||||||
|  |         for name, command in self._commands.items(): | ||||||
|             last = next( |             last = next( | ||||||
|                 ( |                 ( | ||||||
|                     index |                     index | ||||||
| @ -100,8 +142,11 @@ class HonAppliance: | |||||||
|             if last is None: |             if last is None: | ||||||
|                 continue |                 continue | ||||||
|             parameters = command_history[last].get("command", {}).get("parameters", {}) |             parameters = command_history[last].get("command", {}).get("parameters", {}) | ||||||
|             if command._multi and parameters.get("program"): |             if command.categories: | ||||||
|                 command.set_program(parameters.pop("program").split(".")[-1].lower()) |                 if parameters.get("program"): | ||||||
|  |                     command.category = parameters.pop("program").split(".")[-1].lower() | ||||||
|  |                 else: | ||||||
|  |                     command.category = parameters.pop("category") | ||||||
|                 command = self.commands[name] |                 command = self.commands[name] | ||||||
|             for key, data in command.settings.items(): |             for key, data in command.settings.items(): | ||||||
|                 if ( |                 if ( | ||||||
| @ -111,64 +156,119 @@ class HonAppliance: | |||||||
|                     with suppress(ValueError): |                     with suppress(ValueError): | ||||||
|                         data.value = parameters.get(key) |                         data.value = parameters.get(key) | ||||||
|  |  | ||||||
|     async def load_commands(self): |     def _get_categories(self, command, data): | ||||||
|         raw = await self._api.load_commands(self) |         categories = {} | ||||||
|         self._appliance_model = raw.pop("applianceModel") |         for category, value in data.items(): | ||||||
|         for item in ["settings", "options", "dictionaryId"]: |             result = self._get_command(value, command, category, categories) | ||||||
|             raw.pop(item) |             if result: | ||||||
|         commands = {} |                 if "PROGRAM" in category: | ||||||
|         for command, attr in raw.items(): |                     category = category.split(".")[-1].lower() | ||||||
|             if "parameters" in attr: |                 categories[category] = result[0] | ||||||
|                 commands[command] = HonCommand(command, attr, self._api, self) |         if categories: | ||||||
|             elif "parameters" in attr[list(attr)[0]]: |             return [list(categories.values())[0]] | ||||||
|                 multi = {} |         return [] | ||||||
|                 for program, attr2 in attr.items(): |  | ||||||
|                     program = program.split(".")[-1].lower() |     def _get_commands(self, data): | ||||||
|                     cmd = HonCommand( |         commands = [] | ||||||
|                         command, attr2, self._api, self, multi=multi, program=program |         for command, value in data.items(): | ||||||
|  |             commands += self._get_command(value, command, "") | ||||||
|  |         return {c.name: c for c in commands} | ||||||
|  |  | ||||||
|  |     def _get_command(self, data, command="", category="", categories=None): | ||||||
|  |         commands = [] | ||||||
|  |         if isinstance(data, dict): | ||||||
|  |             if data.get("description") and data.get("protocolType", None): | ||||||
|  |                 commands += [ | ||||||
|  |                     HonCommand( | ||||||
|  |                         command, | ||||||
|  |                         data, | ||||||
|  |                         self, | ||||||
|  |                         category_name=category, | ||||||
|  |                         categories=categories, | ||||||
|                     ) |                     ) | ||||||
|                     multi[program] = cmd |                 ] | ||||||
|                     commands[command] = cmd |             else: | ||||||
|         self._commands = commands |                 commands += self._get_categories(command, data) | ||||||
|         await self._recover_last_command_states(commands) |         elif category: | ||||||
|  |             self._additional_data.setdefault(command, {})[category] = data | ||||||
|  |         else: | ||||||
|  |             self._additional_data[command] = data | ||||||
|  |         return commands | ||||||
|  |  | ||||||
|  |     async def load_commands(self): | ||||||
|  |         raw = await self.api.load_commands(self) | ||||||
|  |         self._appliance_model = raw.pop("applianceModel") | ||||||
|  |         raw.pop("dictionaryId") | ||||||
|  |         self._commands = self._get_commands(raw) | ||||||
|  |         await self._recover_last_command_states() | ||||||
|  |  | ||||||
|  |     async def load_attributes(self): | ||||||
|  |         self._attributes = await self.api.load_attributes(self) | ||||||
|  |         for name, values in self._attributes.pop("shadow").get("parameters").items(): | ||||||
|  |             self._attributes.setdefault("parameters", {})[name] = values["parNewVal"] | ||||||
|  |  | ||||||
|  |     async def load_statistics(self): | ||||||
|  |         self._statistics = await self.api.load_statistics(self) | ||||||
|  |  | ||||||
|  |     async def update(self): | ||||||
|  |         now = datetime.now() | ||||||
|  |         if not self._last_update or self._last_update < now - timedelta( | ||||||
|  |             seconds=self._MINIMAL_UPDATE_INTERVAL | ||||||
|  |         ): | ||||||
|  |             self._last_update = now | ||||||
|  |             await self.load_attributes() | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def command_parameters(self): | ||||||
|  |         return {n: c.parameter_value for n, c in self._commands.items()} | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def settings(self): |     def settings(self): | ||||||
|         result = {} |         result = {} | ||||||
|         for name, command in self._commands.items(): |         for name, command in self._commands.items(): | ||||||
|             for key, setting in command.settings.items(): |             for key in command.setting_keys: | ||||||
|  |                 setting = command.settings.get(key) | ||||||
|                 result[f"{name}.{key}"] = setting |                 result[f"{name}.{key}"] = setting | ||||||
|         if self._extra: |         if self._extra: | ||||||
|             return self._extra.settings(result) |             return self._extra.settings(result) | ||||||
|         return result |         return result | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def parameters(self): |     def available_settings(self): | ||||||
|         result = {} |         result = [] | ||||||
|         for name, command in self._commands.items(): |         for name, command in self._commands.items(): | ||||||
|             for key, parameter in command.parameters.items(): |             for key in command.setting_keys: | ||||||
|                 result.setdefault(name, {})[key] = parameter.value |                 result.append(f"{name}.{key}") | ||||||
|         return result |         return result | ||||||
|  |  | ||||||
|     async def load_attributes(self): |  | ||||||
|         self._attributes = await self._api.load_attributes(self) |  | ||||||
|         for name, values in self._attributes.pop("shadow").get("parameters").items(): |  | ||||||
|             self._attributes.setdefault("parameters", {})[name] = values["parNewVal"] |  | ||||||
|  |  | ||||||
|     async def load_statistics(self): |  | ||||||
|         self._statistics = await self._api.load_statistics(self) |  | ||||||
|  |  | ||||||
|     async def update(self): |  | ||||||
|         await self.load_attributes() |  | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def data(self): |     def data(self): | ||||||
|         result = { |         result = { | ||||||
|             "attributes": self.attributes, |             "attributes": self.attributes, | ||||||
|             "appliance": self.info, |             "appliance": self.info, | ||||||
|             "statistics": self.statistics, |             "statistics": self.statistics, | ||||||
|             **self.parameters, |             "additional_data": self._additional_data, | ||||||
|  |             **self.command_parameters, | ||||||
|         } |         } | ||||||
|         if self._extra: |         if self._extra: | ||||||
|             return self._extra.data(result) |             return self._extra.data(result) | ||||||
|         return result |         return result | ||||||
|  |  | ||||||
|  |     def diagnose(self, whitespace="\u200B \u200B "): | ||||||
|  |         data = { | ||||||
|  |             "attributes": self.attributes.copy(), | ||||||
|  |             "appliance": self.info, | ||||||
|  |             "additional_data": self._additional_data, | ||||||
|  |         } | ||||||
|  |         data |= {n: c.parameter_groups for n, c in self._commands.items()} | ||||||
|  |         extra = {n: c.data for n, c in self._commands.items() if c.data} | ||||||
|  |         if extra: | ||||||
|  |             data |= {"extra_command_data": extra} | ||||||
|  |         for sensible in ["PK", "SK", "serialNumber", "code", "coords"]: | ||||||
|  |             data["appliance"].pop(sensible, None) | ||||||
|  |         result = helper.pretty_print({"data": data}, whitespace=whitespace) | ||||||
|  |         result += helper.pretty_print( | ||||||
|  |             {"commands": helper.create_command(self.commands)}, | ||||||
|  |             whitespace=whitespace, | ||||||
|  |         ) | ||||||
|  |         return result.replace(self.mac_address, "xx-xx-xx-xx-xx-xx") | ||||||
|  | |||||||
							
								
								
									
										12
									
								
								pyhon/appliances/dw.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								pyhon/appliances/dw.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,12 @@ | |||||||
|  | class Appliance: | ||||||
|  |     def __init__(self, appliance): | ||||||
|  |         self.parent = appliance | ||||||
|  |  | ||||||
|  |     def data(self, data): | ||||||
|  |         if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED": | ||||||
|  |             data["attributes"]["parameters"]["machMode"] = "0" | ||||||
|  |         data["active"] = bool(data.get("attributes", {}).get("activity")) | ||||||
|  |         return data | ||||||
|  |  | ||||||
|  |     def settings(self, settings): | ||||||
|  |         return settings | ||||||
| @ -1,23 +1,17 @@ | |||||||
| from pyhon.parameter import HonParameterEnum |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class Appliance: | class Appliance: | ||||||
|     _FILTERS = { |     def __init__(self, appliance): | ||||||
|         "default": "^(?!iot_(?:recipe|guided))\\S+$", |         self.parent = appliance | ||||||
|         "recipe": "iot_recipe_", |  | ||||||
|         "guided": "iot_guided_", |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     def __init__(self): |  | ||||||
|         filters = list(self._FILTERS.values()) |  | ||||||
|         data = {"defaultValue": filters[0], "enumValues": filters} |  | ||||||
|         self._program_filter = HonParameterEnum("program_filter", data) |  | ||||||
|  |  | ||||||
|     def data(self, data): |     def data(self, data): | ||||||
|  |         if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED": | ||||||
|  |             data["attributes"]["parameters"]["temp"] = "0" | ||||||
|  |             data["attributes"]["parameters"]["onOffStatus"] = "0" | ||||||
|  |             data["attributes"]["parameters"]["remoteCtrValid"] = "0" | ||||||
|  |             data["attributes"]["parameters"]["remainingTimeMM"] = "0" | ||||||
|  |  | ||||||
|  |         data["active"] = data["attributes"]["parameters"]["onOffStatus"] == "1" | ||||||
|  |  | ||||||
|         return data |         return data | ||||||
|  |  | ||||||
|     def settings(self, settings): |     def settings(self, settings): | ||||||
|         settings["program_filter"] = self._program_filter |  | ||||||
|         value = self._FILTERS[self._program_filter.value] |  | ||||||
|         settings["startProgram.program"].filter = value |  | ||||||
|         return settings |         return settings | ||||||
|  | |||||||
| @ -1,10 +1,22 @@ | |||||||
|  | from pyhon.parameter.fixed import HonParameterFixed | ||||||
|  |  | ||||||
|  |  | ||||||
| class Appliance: | class Appliance: | ||||||
|  |     def __init__(self, appliance): | ||||||
|  |         self.parent = appliance | ||||||
|  |  | ||||||
|     def data(self, data): |     def data(self, data): | ||||||
|         if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED": |         if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED": | ||||||
|             data["attributes"]["parameters"]["machMode"] = "0" |             data["attributes"]["parameters"]["machMode"] = "0" | ||||||
|         data["active"] = bool(data.get("attributes", {}).get("activity")) |         data["active"] = bool(data.get("attributes", {}).get("activity")) | ||||||
|         data["pause"] = data["attributes"]["parameters"]["machMode"] == "3" |         data["pause"] = data["attributes"]["parameters"]["machMode"] == "3" | ||||||
|  |         if program := int(data["attributes"]["parameters"]["prCode"]): | ||||||
|  |             ids = self.parent.settings["startProgram.program"].ids | ||||||
|  |             data["programName"] = ids.get(program, "") | ||||||
|         return data |         return data | ||||||
|  |  | ||||||
|     def settings(self, settings): |     def settings(self, settings): | ||||||
|  |         dry_level = settings.get("startProgram.dryLevel") | ||||||
|  |         if isinstance(dry_level, HonParameterFixed) and dry_level.value == "11": | ||||||
|  |             settings.pop("startProgram.dryLevel", None) | ||||||
|         return settings |         return settings | ||||||
|  | |||||||
| @ -1,4 +1,7 @@ | |||||||
| class Appliance: | class Appliance: | ||||||
|  |     def __init__(self, appliance): | ||||||
|  |         self.parent = appliance | ||||||
|  |  | ||||||
|     def data(self, data): |     def data(self, data): | ||||||
|         if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED": |         if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED": | ||||||
|             data["attributes"]["parameters"]["machMode"] = "0" |             data["attributes"]["parameters"]["machMode"] = "0" | ||||||
|  | |||||||
| @ -1,4 +1,7 @@ | |||||||
| class Appliance: | class Appliance: | ||||||
|  |     def __init__(self, appliance): | ||||||
|  |         self.parent = appliance | ||||||
|  |  | ||||||
|     def data(self, data): |     def data(self, data): | ||||||
|         if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED": |         if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED": | ||||||
|             data["attributes"]["parameters"]["machMode"] = "0" |             data["attributes"]["parameters"]["machMode"] = "0" | ||||||
|  | |||||||
| @ -1,90 +1,132 @@ | |||||||
| from pyhon.parameter import ( | from typing import Optional, Dict, Any, List, TYPE_CHECKING, Union | ||||||
|     HonParameterFixed, |  | ||||||
|     HonParameterEnum, | from pyhon.parameter.base import HonParameter | ||||||
|     HonParameterRange, | from pyhon.parameter.enum import HonParameterEnum | ||||||
|     HonParameterProgram, | from pyhon.parameter.fixed import HonParameterFixed | ||||||
| ) | from pyhon.parameter.program import HonParameterProgram | ||||||
|  | from pyhon.parameter.range import HonParameterRange | ||||||
|  |  | ||||||
|  | if TYPE_CHECKING: | ||||||
|  |     from pyhon import HonAPI | ||||||
|  |     from pyhon.appliance import HonAppliance | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonCommand: | class HonCommand: | ||||||
|     def __init__(self, name, attributes, connector, device, multi=None, program=""): |     def __init__( | ||||||
|         self._connector = connector |         self, | ||||||
|         self._device = device |         name: str, | ||||||
|         self._name = name |         attributes: Dict[str, Any], | ||||||
|         self._multi = multi or {} |         appliance: "HonAppliance", | ||||||
|         self._program = program |         categories: Optional[Dict[str, "HonCommand"]] = None, | ||||||
|         self._description = attributes.get("description", "") |         category_name: str = "", | ||||||
|         self._parameters = self._create_parameters(attributes.get("parameters", {})) |     ): | ||||||
|         self._ancillary_parameters = self._create_parameters( |         self._api: HonAPI = appliance.api | ||||||
|             attributes.get("ancillaryParameters", {}) |         self._appliance: "HonAppliance" = appliance | ||||||
|         ) |         self._name: str = name | ||||||
|  |         self._categories: Optional[Dict[str, "HonCommand"]] = categories | ||||||
|  |         self._category_name: str = category_name | ||||||
|  |         self._description: str = attributes.pop("description", "") | ||||||
|  |         self._protocol_type: str = attributes.pop("protocolType", "") | ||||||
|  |         self._parameters: Dict[str, HonParameter] = {} | ||||||
|  |         self._data: Dict[str, Any] = {} | ||||||
|  |         self._available_settings: Dict[str, HonParameter] = {} | ||||||
|  |         self._load_parameters(attributes) | ||||||
|  |  | ||||||
|     def __repr__(self): |     def __repr__(self) -> str: | ||||||
|         return f"{self._name} command" |         return f"{self._name} command" | ||||||
|  |  | ||||||
|     def _create_parameters(self, parameters): |     @property | ||||||
|         result = {} |     def name(self): | ||||||
|         for parameter, attributes in parameters.items(): |         return self._name | ||||||
|             match attributes.get("typology"): |  | ||||||
|                 case "range": |  | ||||||
|                     result[parameter] = HonParameterRange(parameter, attributes) |  | ||||||
|                 case "enum": |  | ||||||
|                     result[parameter] = HonParameterEnum(parameter, attributes) |  | ||||||
|                 case "fixed": |  | ||||||
|                     result[parameter] = HonParameterFixed(parameter, attributes) |  | ||||||
|         if self._multi: |  | ||||||
|             result["program"] = HonParameterProgram("program", self) |  | ||||||
|         return result |  | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def parameters(self): |     def data(self): | ||||||
|  |         return self._data | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def parameters(self) -> Dict[str, HonParameter]: | ||||||
|         return self._parameters |         return self._parameters | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def ancillary_parameters(self): |     def settings(self) -> Dict[str, HonParameter]: | ||||||
|         return { |         return self._parameters | ||||||
|             key: parameter.value |  | ||||||
|             for key, parameter in self._ancillary_parameters.items() |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|     async def send(self): |     @property | ||||||
|         parameters = { |     def parameter_groups(self) -> Dict[str, Dict[str, Union[str, float]]]: | ||||||
|             name: parameter.value for name, parameter in self._parameters.items() |         result: Dict[str, Dict[str, Union[str, float]]] = {} | ||||||
|         } |         for name, parameter in self._parameters.items(): | ||||||
|         return await self._connector.send_command( |             result.setdefault(parameter.group, {})[name] = parameter.value | ||||||
|             self._device, self._name, parameters, self.ancillary_parameters |         return result | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def parameter_value(self) -> Dict[str, Union[str, float]]: | ||||||
|  |         return {n: p.value for n, p in self._parameters.items()} | ||||||
|  |  | ||||||
|  |     def _load_parameters(self, attributes): | ||||||
|  |         for key, items in attributes.items(): | ||||||
|  |             for name, data in items.items(): | ||||||
|  |                 self._create_parameters(data, name, key) | ||||||
|  |  | ||||||
|  |     def _create_parameters(self, data: Dict, name: str, parameter: str) -> None: | ||||||
|  |         if name == "zoneMap" and self._appliance.zone: | ||||||
|  |             data["default"] = self._appliance.zone | ||||||
|  |         match data.get("typology"): | ||||||
|  |             case "range": | ||||||
|  |                 self._parameters[name] = HonParameterRange(name, data, parameter) | ||||||
|  |             case "enum": | ||||||
|  |                 self._parameters[name] = HonParameterEnum(name, data, parameter) | ||||||
|  |             case "fixed": | ||||||
|  |                 self._parameters[name] = HonParameterFixed(name, data, parameter) | ||||||
|  |             case _: | ||||||
|  |                 self._data[name] = data | ||||||
|  |                 return | ||||||
|  |         if self._category_name: | ||||||
|  |             name = "program" if "PROGRAM" in self._category_name else "category" | ||||||
|  |             self._parameters[name] = HonParameterProgram(name, self, "custom") | ||||||
|  |  | ||||||
|  |     async def send(self) -> bool: | ||||||
|  |         params = self.parameter_groups["parameters"] | ||||||
|  |         ancillary_params = self.parameter_groups["ancillary_parameters"] | ||||||
|  |         return await self._api.send_command( | ||||||
|  |             self._appliance, self._name, params, ancillary_params | ||||||
|         ) |         ) | ||||||
|  |  | ||||||
|     def get_programs(self): |     @property | ||||||
|         return self._multi |     def categories(self) -> Dict[str, "HonCommand"]: | ||||||
|  |         if self._categories is None: | ||||||
|     def set_program(self, program): |             return {"_": self} | ||||||
|         self._device.commands[self._name] = self._multi[program] |         return self._categories | ||||||
|  |  | ||||||
|     def _get_settings_keys(self, command=None): |  | ||||||
|         command = command or self |  | ||||||
|         keys = [] |  | ||||||
|         for key, parameter in command._parameters.items(): |  | ||||||
|             if isinstance(parameter, HonParameterFixed): |  | ||||||
|                 continue |  | ||||||
|             if key not in keys: |  | ||||||
|                 keys.append(key) |  | ||||||
|         return keys |  | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def setting_keys(self): |     def category(self) -> str: | ||||||
|         if not self._multi: |         return self._category_name | ||||||
|             return self._get_settings_keys() |  | ||||||
|         result = [ |     @category.setter | ||||||
|             key for cmd in self._multi.values() for key in self._get_settings_keys(cmd) |     def category(self, category: str) -> None: | ||||||
|         ] |         self._appliance.commands[self._name] = self.categories[category] | ||||||
|         return list(set(result + ["program"])) |  | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def settings(self): |     def setting_keys(self) -> List[str]: | ||||||
|         """Parameters with typology enum and range""" |         return list( | ||||||
|         return { |             {param for cmd in self.categories.values() for param in cmd.parameters} | ||||||
|             s: self._parameters.get(s) |         ) | ||||||
|             for s in self.setting_keys |  | ||||||
|             if self._parameters.get(s) is not None |     @staticmethod | ||||||
|         } |     def _more_options(first: HonParameter, second: HonParameter): | ||||||
|  |         if isinstance(first, HonParameterFixed) and not isinstance( | ||||||
|  |             second, HonParameterFixed | ||||||
|  |         ): | ||||||
|  |             return second | ||||||
|  |         if len(second.values) > len(first.values): | ||||||
|  |             return second | ||||||
|  |         return first | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def available_settings(self) -> Dict[str, HonParameter]: | ||||||
|  |         result: Dict[str, HonParameter] = {} | ||||||
|  |         for command in self.categories.values(): | ||||||
|  |             for name, parameter in command.parameters.items(): | ||||||
|  |                 if name in result: | ||||||
|  |                     result[name] = self._more_options(result[name], parameter) | ||||||
|  |                 result[name] = parameter | ||||||
|  |         return result | ||||||
|  | |||||||
| @ -1,108 +1,160 @@ | |||||||
| import json | import json | ||||||
| import logging | import logging | ||||||
| from datetime import datetime | from datetime import datetime | ||||||
|  | from typing import Dict, Optional | ||||||
|  |  | ||||||
| from pyhon import const | from aiohttp import ClientSession | ||||||
|  | from typing_extensions import Self | ||||||
|  |  | ||||||
|  | from pyhon import const, exceptions | ||||||
| from pyhon.appliance import HonAppliance | from pyhon.appliance import HonAppliance | ||||||
| from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler | from pyhon.connection.auth import HonAuth | ||||||
|  | from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler | ||||||
|  | from pyhon.connection.handler.hon import HonConnectionHandler | ||||||
|  |  | ||||||
| _LOGGER = logging.getLogger() | _LOGGER = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonAPI: | class HonAPI: | ||||||
|     def __init__(self, email="", password="", anonymous=False, session=None) -> None: |     def __init__( | ||||||
|  |         self, | ||||||
|  |         email: str = "", | ||||||
|  |         password: str = "", | ||||||
|  |         anonymous: bool = False, | ||||||
|  |         session: Optional[ClientSession] = None, | ||||||
|  |     ) -> None: | ||||||
|         super().__init__() |         super().__init__() | ||||||
|         self._email = email |         self._email: str = email | ||||||
|         self._password = password |         self._password: str = password | ||||||
|         self._anonymous = anonymous |         self._anonymous: bool = anonymous | ||||||
|         self._hon = None |         self._hon_handler: Optional[HonConnectionHandler] = None | ||||||
|         self._hon_anonymous = None |         self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None | ||||||
|         self._session = session |         self._session: Optional[ClientSession] = session | ||||||
|  |  | ||||||
|     async def __aenter__(self): |     async def __aenter__(self) -> Self: | ||||||
|         return await self.create() |         return await self.create() | ||||||
|  |  | ||||||
|     async def __aexit__(self, exc_type, exc_val, exc_tb): |     async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: | ||||||
|         await self.close() |         await self.close() | ||||||
|  |  | ||||||
|     async def create(self): |     @property | ||||||
|         self._hon_anonymous = await HonAnonymousConnectionHandler( |     def auth(self) -> HonAuth: | ||||||
|  |         if self._hon is None or self._hon.auth is None: | ||||||
|  |             raise exceptions.NoAuthenticationException | ||||||
|  |         return self._hon.auth | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def _hon(self): | ||||||
|  |         if self._hon_handler is None: | ||||||
|  |             raise exceptions.NoAuthenticationException | ||||||
|  |         return self._hon_handler | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def _hon_anonymous(self): | ||||||
|  |         if self._hon_anonymous_handler is None: | ||||||
|  |             raise exceptions.NoAuthenticationException | ||||||
|  |         return self._hon_anonymous_handler | ||||||
|  |  | ||||||
|  |     async def create(self) -> Self: | ||||||
|  |         self._hon_anonymous_handler = await HonAnonymousConnectionHandler( | ||||||
|             self._session |             self._session | ||||||
|         ).create() |         ).create() | ||||||
|         if not self._anonymous: |         if not self._anonymous: | ||||||
|             self._hon = await HonConnectionHandler( |             self._hon_handler = await HonConnectionHandler( | ||||||
|                 self._email, self._password, self._session |                 self._email, self._password, self._session | ||||||
|             ).create() |             ).create() | ||||||
|         return self |         return self | ||||||
|  |  | ||||||
|     async def load_appliances(self): |     async def load_appliances(self) -> Dict: | ||||||
|         async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp: |         async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp: | ||||||
|             return await resp.json() |             return await resp.json() | ||||||
|  |  | ||||||
|     async def load_commands(self, appliance: HonAppliance): |     async def load_commands(self, appliance: HonAppliance) -> Dict: | ||||||
|         params = { |         params: Dict = { | ||||||
|             "applianceType": appliance.appliance_type, |             "applianceType": appliance.appliance_type, | ||||||
|             "code": appliance.info["code"], |             "code": appliance.info["code"], | ||||||
|             "applianceModelId": appliance.appliance_model_id, |             "applianceModelId": appliance.appliance_model_id, | ||||||
|             "firmwareId": appliance.info["eepromId"], |  | ||||||
|             "macAddress": appliance.mac_address, |             "macAddress": appliance.mac_address, | ||||||
|             "fwVersion": appliance.info["fwVersion"], |  | ||||||
|             "os": const.OS, |             "os": const.OS, | ||||||
|             "appVersion": const.APP_VERSION, |             "appVersion": const.APP_VERSION, | ||||||
|             "series": appliance.info["series"], |             "series": appliance.info["series"], | ||||||
|         } |         } | ||||||
|         url = f"{const.API_URL}/commands/v1/retrieve" |         if firmware_id := appliance.info.get("eepromId"): | ||||||
|  |             params["firmwareId"] = firmware_id | ||||||
|  |         if firmware_version := appliance.info.get("fwVersion"): | ||||||
|  |             params["fwVersion"] = firmware_version | ||||||
|  |         url: str = f"{const.API_URL}/commands/v1/retrieve" | ||||||
|         async with self._hon.get(url, params=params) as response: |         async with self._hon.get(url, params=params) as response: | ||||||
|             result = (await response.json()).get("payload", {}) |             result: Dict = (await response.json()).get("payload", {}) | ||||||
|             if not result or result.pop("resultCode") != "0": |             if not result or result.pop("resultCode") != "0": | ||||||
|                 return {} |                 return {} | ||||||
|             return result |             return result | ||||||
|  |  | ||||||
|     async def command_history(self, appliance: HonAppliance): |     async def command_history(self, appliance: HonAppliance) -> Dict: | ||||||
|         url = f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history" |         url: str = ( | ||||||
|  |             f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history" | ||||||
|  |         ) | ||||||
|         async with self._hon.get(url) as response: |         async with self._hon.get(url) as response: | ||||||
|             result = await response.json() |             result: Dict = await response.json() | ||||||
|             if not result or not result.get("payload"): |             if not result or not result.get("payload"): | ||||||
|                 return {} |                 return {} | ||||||
|             return result["payload"]["history"] |             return result["payload"]["history"] | ||||||
|  |  | ||||||
|     async def last_activity(self, appliance: HonAppliance): |     async def last_activity(self, appliance: HonAppliance) -> Dict: | ||||||
|         url = f"{const.API_URL}/commands/v1/retrieve-last-activity" |         url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity" | ||||||
|         params = {"macAddress": appliance.mac_address} |         params: Dict = {"macAddress": appliance.mac_address} | ||||||
|         async with self._hon.get(url, params=params) as response: |         async with self._hon.get(url, params=params) as response: | ||||||
|             result = await response.json() |             result: Dict = await response.json() | ||||||
|             if result and (activity := result.get("attributes")): |             if result and (activity := result.get("attributes")): | ||||||
|                 return activity |                 return activity | ||||||
|         return {} |         return {} | ||||||
|  |  | ||||||
|     async def load_attributes(self, appliance: HonAppliance): |     async def appliance_model(self, appliance: HonAppliance) -> Dict: | ||||||
|         params = { |         url: str = f"{const.API_URL}/commands/v1/appliance-model" | ||||||
|  |         params: Dict = { | ||||||
|  |             "code": appliance.info["code"], | ||||||
|  |             "macAddress": appliance.mac_address, | ||||||
|  |         } | ||||||
|  |         async with self._hon.get(url, params=params) as response: | ||||||
|  |             result: Dict = await response.json() | ||||||
|  |             if result and (activity := result.get("attributes")): | ||||||
|  |                 return activity | ||||||
|  |         return {} | ||||||
|  |  | ||||||
|  |     async def load_attributes(self, appliance: HonAppliance) -> Dict: | ||||||
|  |         params: Dict = { | ||||||
|             "macAddress": appliance.mac_address, |             "macAddress": appliance.mac_address, | ||||||
|             "applianceType": appliance.appliance_type, |             "applianceType": appliance.appliance_type, | ||||||
|             "category": "CYCLE", |             "category": "CYCLE", | ||||||
|         } |         } | ||||||
|         url = f"{const.API_URL}/commands/v1/context" |         url: str = f"{const.API_URL}/commands/v1/context" | ||||||
|         async with self._hon.get(url, params=params) as response: |         async with self._hon.get(url, params=params) as response: | ||||||
|             return (await response.json()).get("payload", {}) |             return (await response.json()).get("payload", {}) | ||||||
|  |  | ||||||
|     async def load_statistics(self, appliance: HonAppliance): |     async def load_statistics(self, appliance: HonAppliance) -> Dict: | ||||||
|         params = { |         params: Dict = { | ||||||
|             "macAddress": appliance.mac_address, |             "macAddress": appliance.mac_address, | ||||||
|             "applianceType": appliance.appliance_type, |             "applianceType": appliance.appliance_type, | ||||||
|         } |         } | ||||||
|         url = f"{const.API_URL}/commands/v1/statistics" |         url: str = f"{const.API_URL}/commands/v1/statistics" | ||||||
|         async with self._hon.get(url, params=params) as response: |         async with self._hon.get(url, params=params) as response: | ||||||
|             return (await response.json()).get("payload", {}) |             return (await response.json()).get("payload", {}) | ||||||
|  |  | ||||||
|     async def send_command(self, appliance, command, parameters, ancillary_parameters): |     async def send_command( | ||||||
|         now = datetime.utcnow().isoformat() |         self, | ||||||
|         data = { |         appliance: HonAppliance, | ||||||
|  |         command: str, | ||||||
|  |         parameters: Dict, | ||||||
|  |         ancillary_parameters: Dict, | ||||||
|  |     ) -> bool: | ||||||
|  |         now: str = datetime.utcnow().isoformat() | ||||||
|  |         data: Dict = { | ||||||
|             "macAddress": appliance.mac_address, |             "macAddress": appliance.mac_address, | ||||||
|             "timestamp": f"{now[:-3]}Z", |             "timestamp": f"{now[:-3]}Z", | ||||||
|             "commandName": command, |             "commandName": command, | ||||||
|             "transactionId": f"{appliance.mac_address}_{now[:-3]}Z", |             "transactionId": f"{appliance.mac_address}_{now[:-3]}Z", | ||||||
|             "applianceOptions": appliance.commands_options, |             "applianceOptions": appliance.commands_options, | ||||||
|             "appliance": self._hon.device.get(), |             "device": self._hon.device.get(mobile=True), | ||||||
|             "attributes": { |             "attributes": { | ||||||
|                 "channel": "mobileApp", |                 "channel": "mobileApp", | ||||||
|                 "origin": "standardProgram", |                 "origin": "standardProgram", | ||||||
| @ -112,36 +164,37 @@ class HonAPI: | |||||||
|             "parameters": parameters, |             "parameters": parameters, | ||||||
|             "applianceType": appliance.appliance_type, |             "applianceType": appliance.appliance_type, | ||||||
|         } |         } | ||||||
|         url = f"{const.API_URL}/commands/v1/send" |         url: str = f"{const.API_URL}/commands/v1/send" | ||||||
|         async with self._hon.post(url, json=data) as resp: |         async with self._hon.post(url, json=data) as response: | ||||||
|             json_data = await resp.json() |             json_data: Dict = await response.json() | ||||||
|             if json_data.get("payload", {}).get("resultCode") == "0": |             if json_data.get("payload", {}).get("resultCode") == "0": | ||||||
|                 return True |                 return True | ||||||
|  |             _LOGGER.error(await response.text()) | ||||||
|         return False |         return False | ||||||
|  |  | ||||||
|     async def appliance_configuration(self): |     async def appliance_configuration(self) -> Dict: | ||||||
|         url = f"{const.API_URL}/config/v1/appliance-configuration" |         url: str = f"{const.API_URL}/config/v1/program-list-rules" | ||||||
|         async with self._hon_anonymous.get(url) as response: |         async with self._hon_anonymous.get(url) as response: | ||||||
|             result = await response.json() |             result: Dict = await response.json() | ||||||
|             if result and (data := result.get("payload")): |             if result and (data := result.get("payload")): | ||||||
|                 return data |                 return data | ||||||
|         return {} |         return {} | ||||||
|  |  | ||||||
|     async def app_config(self, language="en", beta=True): |     async def app_config(self, language: str = "en", beta: bool = True) -> Dict: | ||||||
|         url = f"{const.API_URL}/app-config" |         url: str = f"{const.API_URL}/app-config" | ||||||
|         payload = { |         payload_data: Dict = { | ||||||
|             "languageCode": language, |             "languageCode": language, | ||||||
|             "beta": beta, |             "beta": beta, | ||||||
|             "appVersion": const.APP_VERSION, |             "appVersion": const.APP_VERSION, | ||||||
|             "os": const.OS, |             "os": const.OS, | ||||||
|         } |         } | ||||||
|         payload = json.dumps(payload, separators=(",", ":")) |         payload: str = json.dumps(payload_data, separators=(",", ":")) | ||||||
|         async with self._hon_anonymous.post(url, data=payload) as response: |         async with self._hon_anonymous.post(url, data=payload) as response: | ||||||
|             if (result := await response.json()) and (data := result.get("payload")): |             if (result := await response.json()) and (data := result.get("payload")): | ||||||
|                 return data |                 return data | ||||||
|         return {} |         return {} | ||||||
|  |  | ||||||
|     async def translation_keys(self, language="en"): |     async def translation_keys(self, language: str = "en") -> Dict: | ||||||
|         config = await self.app_config(language=language) |         config = await self.app_config(language=language) | ||||||
|         if url := config.get("language", {}).get("jsonPath"): |         if url := config.get("language", {}).get("jsonPath"): | ||||||
|             async with self._hon_anonymous.get(url) as response: |             async with self._hon_anonymous.get(url) as response: | ||||||
| @ -149,8 +202,8 @@ class HonAPI: | |||||||
|                     return result |                     return result | ||||||
|         return {} |         return {} | ||||||
|  |  | ||||||
|     async def close(self): |     async def close(self) -> None: | ||||||
|         if self._hon: |         if self._hon_handler is not None: | ||||||
|             await self._hon.close() |             await self._hon_handler.close() | ||||||
|         if self._hon_anonymous: |         if self._hon_anonymous_handler is not None: | ||||||
|             await self._hon_anonymous.close() |             await self._hon_anonymous_handler.close() | ||||||
|  | |||||||
| @ -3,226 +3,276 @@ import logging | |||||||
| import re | import re | ||||||
| import secrets | import secrets | ||||||
| import urllib | import urllib | ||||||
| from pprint import pformat | from contextlib import suppress | ||||||
|  | from dataclasses import dataclass | ||||||
|  | from datetime import datetime, timedelta | ||||||
|  | from typing import Dict, Optional | ||||||
| from urllib import parse | from urllib import parse | ||||||
|  | from urllib.parse import quote | ||||||
|  |  | ||||||
|  | from aiohttp import ClientResponse | ||||||
| from yarl import URL | from yarl import URL | ||||||
|  |  | ||||||
| from pyhon import const | from pyhon import const, exceptions | ||||||
| from pyhon.exceptions import HonAuthenticationError | from pyhon.connection.handler.auth import HonAuthConnectionHandler | ||||||
|  |  | ||||||
| _LOGGER = logging.getLogger(__name__) | _LOGGER = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @dataclass | ||||||
|  | class HonLoginData: | ||||||
|  |     url: str = "" | ||||||
|  |     email: str = "" | ||||||
|  |     password: str = "" | ||||||
|  |     fw_uid: str = "" | ||||||
|  |     loaded: Optional[Dict] = None | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonAuth: | class HonAuth: | ||||||
|  |     _TOKEN_EXPIRES_AFTER_HOURS = 8 | ||||||
|  |     _TOKEN_EXPIRE_WARNING_HOURS = 7 | ||||||
|  |  | ||||||
|     def __init__(self, session, email, password, device) -> None: |     def __init__(self, session, email, password, device) -> None: | ||||||
|         self._session = session |         self._session = session | ||||||
|         self._email = email |         self._request = HonAuthConnectionHandler(session) | ||||||
|         self._password = password |         self._login_data = HonLoginData() | ||||||
|  |         self._login_data.email = email | ||||||
|  |         self._login_data.password = password | ||||||
|         self._access_token = "" |         self._access_token = "" | ||||||
|         self._refresh_token = "" |         self._refresh_token = "" | ||||||
|         self._cognito_token = "" |         self._cognito_token = "" | ||||||
|         self._id_token = "" |         self._id_token = "" | ||||||
|         self._device = device |         self._device = device | ||||||
|         self._called_urls = [] |         self._expires: datetime = datetime.utcnow() | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def cognito_token(self): |     def cognito_token(self) -> str: | ||||||
|         return self._cognito_token |         return self._cognito_token | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def id_token(self): |     def id_token(self) -> str: | ||||||
|         return self._id_token |         return self._id_token | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def access_token(self): |     def access_token(self) -> str: | ||||||
|         return self._access_token |         return self._access_token | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def refresh_token(self): |     def refresh_token(self) -> str: | ||||||
|         return self._refresh_token |         return self._refresh_token | ||||||
|  |  | ||||||
|     async def _error_logger(self, response, fail=True): |     def _check_token_expiration(self, hours: int) -> bool: | ||||||
|         result = "hOn Authentication Error\n" |         return datetime.utcnow() >= self._expires + timedelta(hours=hours) | ||||||
|         for i, (status, url) in enumerate(self._called_urls): |  | ||||||
|             result += f" {i + 1: 2d}     {status} - {url}\n" |  | ||||||
|         result += f"ERROR - {response.status} - {response.request_info.url}\n" |  | ||||||
|         result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}" |  | ||||||
|         _LOGGER.error(result) |  | ||||||
|         if fail: |  | ||||||
|             raise HonAuthenticationError("Can't login") |  | ||||||
|  |  | ||||||
|     async def _load_login(self): |     @property | ||||||
|  |     def token_is_expired(self) -> bool: | ||||||
|  |         return self._check_token_expiration(self._TOKEN_EXPIRES_AFTER_HOURS) | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def token_expires_soon(self) -> bool: | ||||||
|  |         return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS) | ||||||
|  |  | ||||||
|  |     async def _error_logger(self, response: ClientResponse, fail: bool = True) -> None: | ||||||
|  |         output = "hOn Authentication Error\n" | ||||||
|  |         for i, (status, url) in enumerate(self._request.called_urls): | ||||||
|  |             output += f" {i + 1: 2d}     {status} - {url}\n" | ||||||
|  |         output += f"ERROR - {response.status} - {response.request_info.url}\n" | ||||||
|  |         output += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}" | ||||||
|  |         _LOGGER.error(output) | ||||||
|  |         if fail: | ||||||
|  |             raise exceptions.HonAuthenticationError("Can't login") | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def _generate_nonce() -> str: | ||||||
|         nonce = secrets.token_hex(16) |         nonce = secrets.token_hex(16) | ||||||
|         nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" |         return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" | ||||||
|  |  | ||||||
|  |     async def _load_login(self) -> bool: | ||||||
|  |         login_url = await self._introduce() | ||||||
|  |         login_url = await self._handle_redirects(login_url) | ||||||
|  |         return await self._login_url(login_url) | ||||||
|  |  | ||||||
|  |     async def _introduce(self) -> str: | ||||||
|  |         redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done") | ||||||
|         params = { |         params = { | ||||||
|             "response_type": "token+id_token", |             "response_type": "token+id_token", | ||||||
|             "client_id": const.CLIENT_ID, |             "client_id": const.CLIENT_ID, | ||||||
|             "redirect_uri": urllib.parse.quote( |             "redirect_uri": redirect_uri, | ||||||
|                 f"{const.APP}://mobilesdk/detect/oauth/done" |  | ||||||
|             ), |  | ||||||
|             "display": "touch", |             "display": "touch", | ||||||
|             "scope": "api openid refresh_token web", |             "scope": "api openid refresh_token web", | ||||||
|             "nonce": nonce, |             "nonce": self._generate_nonce(), | ||||||
|         } |         } | ||||||
|         params = "&".join([f"{k}={v}" for k, v in params.items()]) |         params_encode = "&".join([f"{k}={v}" for k, v in params.items()]) | ||||||
|         async with self._session.get( |         url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params_encode}" | ||||||
|             f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" |         async with self._request.get(url) as response: | ||||||
|         ) as response: |             text = await response.text() | ||||||
|             self._called_urls.append((response.status, response.request_info.url)) |             self._expires = datetime.utcnow() | ||||||
|             if not (login_url := re.findall("url = '(.+?)'", await response.text())): |             if not (login_url := re.findall("url = '(.+?)'", text)): | ||||||
|  |                 if "oauth/done#access_token=" in text: | ||||||
|  |                     self._parse_token_data(text) | ||||||
|  |                     raise exceptions.HonNoAuthenticationNeeded() | ||||||
|                 await self._error_logger(response) |                 await self._error_logger(response) | ||||||
|                 return False |         return login_url[0] | ||||||
|         async with self._session.get(login_url[0], allow_redirects=False) as redirect1: |  | ||||||
|             self._called_urls.append((redirect1.status, redirect1.request_info.url)) |     async def _manual_redirect(self, url: str) -> str: | ||||||
|             if not (url := redirect1.headers.get("Location")): |         async with self._request.get(url, allow_redirects=False) as response: | ||||||
|                 await self._error_logger(redirect1) |             if not (new_location := response.headers.get("Location", "")): | ||||||
|                 return False |                 await self._error_logger(response) | ||||||
|         async with self._session.get(url, allow_redirects=False) as redirect2: |         return new_location | ||||||
|             self._called_urls.append((redirect2.status, redirect2.request_info.url)) |  | ||||||
|             if not ( |     async def _handle_redirects(self, login_url) -> str: | ||||||
|                 url := redirect2.headers.get("Location") |         redirect1 = await self._manual_redirect(login_url) | ||||||
|                 + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" |         redirect2 = await self._manual_redirect(redirect1) | ||||||
|             ): |         return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn" | ||||||
|                 await self._error_logger(redirect2) |  | ||||||
|                 return False |     async def _login_url(self, login_url: str) -> bool: | ||||||
|         async with self._session.get(URL(url, encoded=True)) as login_screen: |         headers = {"user-agent": const.USER_AGENT} | ||||||
|             self._called_urls.append( |         url = URL(login_url, encoded=True) | ||||||
|                 (login_screen.status, login_screen.request_info.url) |         async with self._request.get(url, headers=headers) as response: | ||||||
|             ) |             text = await response.text() | ||||||
|             if context := re.findall( |             if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text): | ||||||
|                 '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() |  | ||||||
|             ): |  | ||||||
|                 fw_uid, loaded_str = context[0] |                 fw_uid, loaded_str = context[0] | ||||||
|                 loaded = json.loads(loaded_str) |                 self._login_data.fw_uid = fw_uid | ||||||
|                 login_url = login_url[0].replace( |                 self._login_data.loaded = json.loads(loaded_str) | ||||||
|  |                 self._login_data.url = login_url.replace( | ||||||
|                     "/".join(const.AUTH_API.split("/")[:-1]), "" |                     "/".join(const.AUTH_API.split("/")[:-1]), "" | ||||||
|                 ) |                 ) | ||||||
|                 return fw_uid, loaded, login_url |                 return True | ||||||
|             await self._error_logger(login_screen) |             await self._error_logger(response) | ||||||
|         return False |         return False | ||||||
|  |  | ||||||
|     async def _login(self, fw_uid, loaded, login_url): |     async def _login(self) -> str: | ||||||
|         data = { |         start_url = self._login_data.url.rsplit("startURL=", maxsplit=1)[-1] | ||||||
|             "message": { |         start_url = parse.unquote(start_url).split("%3D")[0] | ||||||
|                 "actions": [ |         action = { | ||||||
|                     { |  | ||||||
|             "id": "79;a", |             "id": "79;a", | ||||||
|             "descriptor": "apex://LightningLoginCustomController/ACTION$login", |             "descriptor": "apex://LightningLoginCustomController/ACTION$login", | ||||||
|             "callingDescriptor": "markup://c:loginForm", |             "callingDescriptor": "markup://c:loginForm", | ||||||
|             "params": { |             "params": { | ||||||
|                             "username": self._email, |                 "username": quote(self._login_data.email), | ||||||
|                             "password": self._password, |                 "password": quote(self._login_data.password), | ||||||
|                             "startUrl": parse.unquote( |                 "startUrl": start_url, | ||||||
|                                 login_url.split("startURL=")[-1] |  | ||||||
|                             ).split("%3D")[0], |  | ||||||
|             }, |             }, | ||||||
|         } |         } | ||||||
|                 ] |         data = { | ||||||
|             }, |             "message": {"actions": [action]}, | ||||||
|             "aura.context": { |             "aura.context": { | ||||||
|                 "mode": "PROD", |                 "mode": "PROD", | ||||||
|                 "fwuid": fw_uid, |                 "fwuid": self._login_data.fw_uid, | ||||||
|                 "app": "siteforce:loginApp2", |                 "app": "siteforce:loginApp2", | ||||||
|                 "loaded": loaded, |                 "loaded": self._login_data.loaded, | ||||||
|                 "dn": [], |                 "dn": [], | ||||||
|                 "globals": {}, |                 "globals": {}, | ||||||
|                 "uad": False, |                 "uad": False, | ||||||
|             }, |             }, | ||||||
|             "aura.pageURI": login_url, |             "aura.pageURI": self._login_data.url, | ||||||
|             "aura.token": None, |             "aura.token": None, | ||||||
|         } |         } | ||||||
|         params = {"r": 3, "other.LightningLoginCustom.login": 1} |         params = {"r": 3, "other.LightningLoginCustom.login": 1} | ||||||
|         async with self._session.post( |         async with self._request.post( | ||||||
|             const.AUTH_API + "/s/sfsites/aura", |             const.AUTH_API + "/s/sfsites/aura", | ||||||
|             headers={"Content-Type": "application/x-www-form-urlencoded"}, |             headers={"Content-Type": "application/x-www-form-urlencoded"}, | ||||||
|             data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), |             data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), | ||||||
|             params=params, |             params=params, | ||||||
|         ) as response: |         ) as response: | ||||||
|             self._called_urls.append((response.status, response.request_info.url)) |  | ||||||
|             if response.status == 200: |             if response.status == 200: | ||||||
|                 try: |                 with suppress(json.JSONDecodeError, KeyError): | ||||||
|                     data = await response.json() |                     result = await response.json() | ||||||
|                     return data["events"][0]["attributes"]["values"]["url"] |                     return result["events"][0]["attributes"]["values"]["url"] | ||||||
|                 except json.JSONDecodeError: |  | ||||||
|                     pass |  | ||||||
|                 except KeyError: |  | ||||||
|                     _LOGGER.error( |  | ||||||
|                         "Can't get login url - %s", pformat(await response.json()) |  | ||||||
|                     ) |  | ||||||
|             await self._error_logger(response) |             await self._error_logger(response) | ||||||
|             return "" |             return "" | ||||||
|  |  | ||||||
|     async def _get_token(self, url): |     def _parse_token_data(self, text: str) -> bool: | ||||||
|         async with self._session.get(url) as response: |  | ||||||
|             self._called_urls.append((response.status, response.request_info.url)) |  | ||||||
|             if response.status != 200: |  | ||||||
|                 await self._error_logger(response) |  | ||||||
|                 return False |  | ||||||
|             url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text()) |  | ||||||
|             if not url: |  | ||||||
|                 await self._error_logger(response) |  | ||||||
|                 return False |  | ||||||
|         if "ProgressiveLogin" in url[0]: |  | ||||||
|             async with self._session.get(url[0]) as response: |  | ||||||
|                 self._called_urls.append((response.status, response.request_info.url)) |  | ||||||
|                 if response.status != 200: |  | ||||||
|                     await self._error_logger(response) |  | ||||||
|                     return False |  | ||||||
|                 url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text()) |  | ||||||
|         url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] |  | ||||||
|         async with self._session.get(url) as response: |  | ||||||
|             self._called_urls.append((response.status, response.request_info.url)) |  | ||||||
|             if response.status != 200: |  | ||||||
|                 await self._error_logger(response) |  | ||||||
|                 return False |  | ||||||
|             text = await response.text() |  | ||||||
|         if access_token := re.findall("access_token=(.*?)&", text): |         if access_token := re.findall("access_token=(.*?)&", text): | ||||||
|             self._access_token = access_token[0] |             self._access_token = access_token[0] | ||||||
|         if refresh_token := re.findall("refresh_token=(.*?)&", text): |         if refresh_token := re.findall("refresh_token=(.*?)&", text): | ||||||
|             self._refresh_token = refresh_token[0] |             self._refresh_token = refresh_token[0] | ||||||
|         if id_token := re.findall("id_token=(.*?)&", text): |         if id_token := re.findall("id_token=(.*?)&", text): | ||||||
|             self._id_token = id_token[0] |             self._id_token = id_token[0] | ||||||
|  |         return True if access_token and refresh_token and id_token else False | ||||||
|  |  | ||||||
|  |     async def _get_token(self, url: str) -> bool: | ||||||
|  |         async with self._request.get(url) as response: | ||||||
|  |             if response.status != 200: | ||||||
|  |                 await self._error_logger(response) | ||||||
|  |                 return False | ||||||
|  |             url_search = re.findall( | ||||||
|  |                 "href\\s*=\\s*[\"'](.+?)[\"']", await response.text() | ||||||
|  |             ) | ||||||
|  |             if not url_search: | ||||||
|  |                 await self._error_logger(response) | ||||||
|  |                 return False | ||||||
|  |         if "ProgressiveLogin" in url_search[0]: | ||||||
|  |             async with self._request.get(url_search[0]) as response: | ||||||
|  |                 if response.status != 200: | ||||||
|  |                     await self._error_logger(response) | ||||||
|  |                     return False | ||||||
|  |                 url_search = re.findall( | ||||||
|  |                     "href\\s*=\\s*[\"'](.*?)[\"']", await response.text() | ||||||
|  |                 ) | ||||||
|  |         url = "/".join(const.AUTH_API.split("/")[:-1]) + url_search[0] | ||||||
|  |         async with self._request.get(url) as response: | ||||||
|  |             if response.status != 200: | ||||||
|  |                 await self._error_logger(response) | ||||||
|  |                 return False | ||||||
|  |             if not self._parse_token_data(await response.text()): | ||||||
|  |                 await self._error_logger(response) | ||||||
|  |                 return False | ||||||
|         return True |         return True | ||||||
|  |  | ||||||
|     async def authorize(self): |     async def _api_auth(self) -> bool: | ||||||
|         if login_site := await self._load_login(): |  | ||||||
|             fw_uid, loaded, login_url = login_site |  | ||||||
|         else: |  | ||||||
|             return False |  | ||||||
|         if not (url := await self._login(fw_uid, loaded, login_url)): |  | ||||||
|             return False |  | ||||||
|         if not await self._get_token(url): |  | ||||||
|             return False |  | ||||||
|  |  | ||||||
|         post_headers = {"id-token": self._id_token} |         post_headers = {"id-token": self._id_token} | ||||||
|         data = self._device.get() |         data = self._device.get() | ||||||
|         async with self._session.post( |         async with self._request.post( | ||||||
|             f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data |             f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data | ||||||
|         ) as response: |         ) as response: | ||||||
|             self._called_urls.append((response.status, response.request_info.url)) |  | ||||||
|             try: |             try: | ||||||
|                 json_data = await response.json() |                 json_data = await response.json() | ||||||
|             except json.JSONDecodeError: |             except json.JSONDecodeError: | ||||||
|                 await self._error_logger(response) |                 await self._error_logger(response) | ||||||
|                 return False |                 return False | ||||||
|             self._cognito_token = json_data["cognitoUser"]["Token"] |             self._cognito_token = json_data.get("cognitoUser", {}).get("Token", "") | ||||||
|  |             if not self._cognito_token: | ||||||
|  |                 _LOGGER.error(json_data) | ||||||
|  |                 raise exceptions.HonAuthenticationError() | ||||||
|         return True |         return True | ||||||
|  |  | ||||||
|     async def refresh(self): |     async def authenticate(self) -> None: | ||||||
|  |         self.clear() | ||||||
|  |         try: | ||||||
|  |             if not await self._load_login(): | ||||||
|  |                 raise exceptions.HonAuthenticationError("Can't open login page") | ||||||
|  |             if not (url := await self._login()): | ||||||
|  |                 raise exceptions.HonAuthenticationError("Can't login") | ||||||
|  |             if not await self._get_token(url): | ||||||
|  |                 raise exceptions.HonAuthenticationError("Can't get token") | ||||||
|  |             if not await self._api_auth(): | ||||||
|  |                 raise exceptions.HonAuthenticationError("Can't get api token") | ||||||
|  |         except exceptions.HonNoAuthenticationNeeded: | ||||||
|  |             return | ||||||
|  |  | ||||||
|  |     async def refresh(self) -> bool: | ||||||
|         params = { |         params = { | ||||||
|             "client_id": const.CLIENT_ID, |             "client_id": const.CLIENT_ID, | ||||||
|             "refresh_token": self._refresh_token, |             "refresh_token": self._refresh_token, | ||||||
|             "grant_type": "refresh_token", |             "grant_type": "refresh_token", | ||||||
|         } |         } | ||||||
|         async with self._session.post( |         async with self._request.post( | ||||||
|             f"{const.AUTH_API}/services/oauth2/token", params=params |             f"{const.AUTH_API}/services/oauth2/token", params=params | ||||||
|         ) as response: |         ) as response: | ||||||
|             self._called_urls.append((response.status, response.request_info.url)) |  | ||||||
|             if response.status >= 400: |             if response.status >= 400: | ||||||
|                 await self._error_logger(response, fail=False) |                 await self._error_logger(response, fail=False) | ||||||
|                 return False |                 return False | ||||||
|             data = await response.json() |             data = await response.json() | ||||||
|  |         self._expires = datetime.utcnow() | ||||||
|         self._id_token = data["id_token"] |         self._id_token = data["id_token"] | ||||||
|         self._access_token = data["access_token"] |         self._access_token = data["access_token"] | ||||||
|         return True |         return await self._api_auth() | ||||||
|  |  | ||||||
|  |     def clear(self) -> None: | ||||||
|  |         self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2]) | ||||||
|  |         self._request.called_urls = [] | ||||||
|  |         self._cognito_token = "" | ||||||
|  |         self._id_token = "" | ||||||
|  |         self._access_token = "" | ||||||
|  |         self._refresh_token = "" | ||||||
|  | |||||||
| @ -1,41 +1,43 @@ | |||||||
| import secrets | import secrets | ||||||
|  | from typing import Dict | ||||||
|  |  | ||||||
| from pyhon import const | from pyhon import const | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonDevice: | class HonDevice: | ||||||
|     def __init__(self): |     def __init__(self) -> None: | ||||||
|         self._app_version = const.APP_VERSION |         self._app_version: str = const.APP_VERSION | ||||||
|         self._os_version = const.OS_VERSION |         self._os_version: int = const.OS_VERSION | ||||||
|         self._os = const.OS |         self._os: str = const.OS | ||||||
|         self._device_model = const.DEVICE_MODEL |         self._device_model: str = const.DEVICE_MODEL | ||||||
|         self._mobile_id = secrets.token_hex(8) |         self._mobile_id: str = secrets.token_hex(8) | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def app_version(self): |     def app_version(self) -> str: | ||||||
|         return self._app_version |         return self._app_version | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def os_version(self): |     def os_version(self) -> int: | ||||||
|         return self._os_version |         return self._os_version | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def os(self): |     def os(self) -> str: | ||||||
|         return self._os |         return self._os | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def device_model(self): |     def device_model(self) -> str: | ||||||
|         return self._device_model |         return self._device_model | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def mobile_id(self): |     def mobile_id(self) -> str: | ||||||
|         return self._mobile_id |         return self._mobile_id | ||||||
|  |  | ||||||
|     def get(self): |     def get(self, mobile: bool = False) -> Dict: | ||||||
|         return { |         result = { | ||||||
|             "appVersion": self.app_version, |             "appVersion": self.app_version, | ||||||
|             "mobileId": self.mobile_id, |             "mobileId": self.mobile_id, | ||||||
|             "osVersion": self.os_version, |  | ||||||
|             "os": self.os, |             "os": self.os, | ||||||
|  |             "osVersion": self.os_version, | ||||||
|             "deviceModel": self.device_model, |             "deviceModel": self.device_model, | ||||||
|         } |         } | ||||||
|  |         return (result | {"mobileOs": result.pop("os")}) if mobile else result | ||||||
|  | |||||||
| @ -1,128 +0,0 @@ | |||||||
| import json |  | ||||||
| from contextlib import asynccontextmanager |  | ||||||
|  |  | ||||||
| import aiohttp |  | ||||||
|  |  | ||||||
| from pyhon import const |  | ||||||
| from pyhon.connection.auth import HonAuth, _LOGGER |  | ||||||
| from pyhon.connection.device import HonDevice |  | ||||||
| from pyhon.exceptions import HonAuthenticationError |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonBaseConnectionHandler: |  | ||||||
|     _HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"} |  | ||||||
|  |  | ||||||
|     def __init__(self, session=None): |  | ||||||
|         self._session = session |  | ||||||
|         self._auth = None |  | ||||||
|  |  | ||||||
|     async def __aenter__(self): |  | ||||||
|         return await self.create() |  | ||||||
|  |  | ||||||
|     async def __aexit__(self, exc_type, exc_val, exc_tb): |  | ||||||
|         await self.close() |  | ||||||
|  |  | ||||||
|     async def create(self): |  | ||||||
|         self._session = aiohttp.ClientSession(headers=self._HEADERS) |  | ||||||
|         return self |  | ||||||
|  |  | ||||||
|     @asynccontextmanager |  | ||||||
|     async def _intercept(self, method, *args, loop=0, **kwargs): |  | ||||||
|         raise NotImplementedError |  | ||||||
|  |  | ||||||
|     @asynccontextmanager |  | ||||||
|     async def get(self, *args, **kwargs): |  | ||||||
|         async with self._intercept(self._session.get, *args, **kwargs) as response: |  | ||||||
|             yield response |  | ||||||
|  |  | ||||||
|     @asynccontextmanager |  | ||||||
|     async def post(self, *args, **kwargs): |  | ||||||
|         async with self._intercept(self._session.post, *args, **kwargs) as response: |  | ||||||
|             yield response |  | ||||||
|  |  | ||||||
|     async def close(self): |  | ||||||
|         await self._session.close() |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonConnectionHandler(HonBaseConnectionHandler): |  | ||||||
|     def __init__(self, email, password, session=None): |  | ||||||
|         super().__init__(session=session) |  | ||||||
|         self._device = HonDevice() |  | ||||||
|         self._email = email |  | ||||||
|         self._password = password |  | ||||||
|         if not self._email: |  | ||||||
|             raise HonAuthenticationError("An email address must be specified") |  | ||||||
|         if not self._password: |  | ||||||
|             raise HonAuthenticationError("A password address must be specified") |  | ||||||
|         self._request_headers = {} |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def device(self): |  | ||||||
|         return self._device |  | ||||||
|  |  | ||||||
|     async def create(self): |  | ||||||
|         await super().create() |  | ||||||
|         self._auth = HonAuth(self._session, self._email, self._password, self._device) |  | ||||||
|         return self |  | ||||||
|  |  | ||||||
|     async def _check_headers(self, headers): |  | ||||||
|         if ( |  | ||||||
|             "cognito-token" not in self._request_headers |  | ||||||
|             or "id-token" not in self._request_headers |  | ||||||
|         ): |  | ||||||
|             if await self._auth.authorize(): |  | ||||||
|                 self._request_headers["cognito-token"] = self._auth.cognito_token |  | ||||||
|                 self._request_headers["id-token"] = self._auth.id_token |  | ||||||
|             else: |  | ||||||
|                 raise HonAuthenticationError("Can't login") |  | ||||||
|         return {h: v for h, v in self._request_headers.items() if h not in headers} |  | ||||||
|  |  | ||||||
|     @asynccontextmanager |  | ||||||
|     async def _intercept(self, method, *args, loop=0, **kwargs): |  | ||||||
|         kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) |  | ||||||
|         async with method(*args, **kwargs) as response: |  | ||||||
|             if response.status == 403 and not loop: |  | ||||||
|                 _LOGGER.info("Try refreshing token...") |  | ||||||
|                 await self._auth.refresh() |  | ||||||
|                 yield await self._intercept(method, *args, loop=loop + 1, **kwargs) |  | ||||||
|             elif response.status == 403 and loop < 2: |  | ||||||
|                 _LOGGER.warning( |  | ||||||
|                     "%s - Error %s - %s", |  | ||||||
|                     response.request_info.url, |  | ||||||
|                     response.status, |  | ||||||
|                     await response.text(), |  | ||||||
|                 ) |  | ||||||
|                 await self.create() |  | ||||||
|                 yield await self._intercept(method, *args, loop=loop + 1, **kwargs) |  | ||||||
|             elif loop >= 2: |  | ||||||
|                 _LOGGER.error( |  | ||||||
|                     "%s - Error %s - %s", |  | ||||||
|                     response.request_info.url, |  | ||||||
|                     response.status, |  | ||||||
|                     await response.text(), |  | ||||||
|                 ) |  | ||||||
|                 raise HonAuthenticationError("Login failure") |  | ||||||
|             else: |  | ||||||
|                 try: |  | ||||||
|                     await response.json() |  | ||||||
|                     yield response |  | ||||||
|                 except json.JSONDecodeError: |  | ||||||
|                     _LOGGER.warning( |  | ||||||
|                         "%s - JsonDecodeError %s - %s", |  | ||||||
|                         response.request_info.url, |  | ||||||
|                         response.status, |  | ||||||
|                         await response.text(), |  | ||||||
|                     ) |  | ||||||
|                     yield {} |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonAnonymousConnectionHandler(HonBaseConnectionHandler): |  | ||||||
|     _HEADERS = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} |  | ||||||
|  |  | ||||||
|     @asynccontextmanager |  | ||||||
|     async def _intercept(self, method, *args, loop=0, **kwargs): |  | ||||||
|         kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS |  | ||||||
|         async with method(*args, **kwargs) as response: |  | ||||||
|             if response.status == 403: |  | ||||||
|                 _LOGGER.error("Can't authenticate anymore") |  | ||||||
|             yield response |  | ||||||
							
								
								
									
										0
									
								
								pyhon/connection/handler/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								pyhon/connection/handler/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										21
									
								
								pyhon/connection/handler/anonym.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								pyhon/connection/handler/anonym.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,21 @@ | |||||||
|  | import logging | ||||||
|  | from collections.abc import AsyncIterator | ||||||
|  | from contextlib import asynccontextmanager | ||||||
|  | from typing import Callable, Dict | ||||||
|  |  | ||||||
|  | from pyhon import const | ||||||
|  | from pyhon.connection.handler.base import ConnectionHandler | ||||||
|  |  | ||||||
|  | _LOGGER = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class HonAnonymousConnectionHandler(ConnectionHandler): | ||||||
|  |     _HEADERS: Dict = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} | ||||||
|  |  | ||||||
|  |     @asynccontextmanager | ||||||
|  |     async def _intercept(self, method: Callable, *args, **kwargs) -> AsyncIterator: | ||||||
|  |         kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS | ||||||
|  |         async with method(*args, **kwargs) as response: | ||||||
|  |             if response.status == 403: | ||||||
|  |                 _LOGGER.error("Can't authenticate anymore") | ||||||
|  |             yield response | ||||||
							
								
								
									
										36
									
								
								pyhon/connection/handler/auth.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								pyhon/connection/handler/auth.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,36 @@ | |||||||
|  | import logging | ||||||
|  | from collections.abc import AsyncIterator | ||||||
|  | from contextlib import asynccontextmanager | ||||||
|  | from typing import Optional, Callable, List, Tuple | ||||||
|  |  | ||||||
|  | import aiohttp | ||||||
|  |  | ||||||
|  | from pyhon import const | ||||||
|  | from pyhon.connection.handler.base import ConnectionHandler | ||||||
|  |  | ||||||
|  | _LOGGER = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class HonAuthConnectionHandler(ConnectionHandler): | ||||||
|  |     _HEADERS = {"user-agent": const.USER_AGENT} | ||||||
|  |  | ||||||
|  |     def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: | ||||||
|  |         super().__init__(session) | ||||||
|  |         self._called_urls: List[Tuple[int, str]] = [] | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def called_urls(self) -> List[Tuple[int, str]]: | ||||||
|  |         return self._called_urls | ||||||
|  |  | ||||||
|  |     @called_urls.setter | ||||||
|  |     def called_urls(self, called_urls: List[Tuple[int, str]]) -> None: | ||||||
|  |         self._called_urls = called_urls | ||||||
|  |  | ||||||
|  |     @asynccontextmanager | ||||||
|  |     async def _intercept( | ||||||
|  |         self, method: Callable, *args, loop: int = 0, **kwargs | ||||||
|  |     ) -> AsyncIterator: | ||||||
|  |         kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS | ||||||
|  |         async with method(*args, **kwargs) as response: | ||||||
|  |             self._called_urls.append((response.status, response.request_info.url)) | ||||||
|  |             yield response | ||||||
							
								
								
									
										57
									
								
								pyhon/connection/handler/base.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										57
									
								
								pyhon/connection/handler/base.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,57 @@ | |||||||
|  | import logging | ||||||
|  | from collections.abc import AsyncIterator | ||||||
|  | from contextlib import asynccontextmanager | ||||||
|  | from typing import Optional, Callable, Dict | ||||||
|  |  | ||||||
|  | import aiohttp | ||||||
|  | from typing_extensions import Self | ||||||
|  |  | ||||||
|  | from pyhon import const, exceptions | ||||||
|  |  | ||||||
|  | _LOGGER = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class ConnectionHandler: | ||||||
|  |     _HEADERS: Dict = { | ||||||
|  |         "user-agent": const.USER_AGENT, | ||||||
|  |         "Content-Type": "application/json", | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: | ||||||
|  |         self._create_session: bool = session is None | ||||||
|  |         self._session: Optional[aiohttp.ClientSession] = session | ||||||
|  |  | ||||||
|  |     async def __aenter__(self) -> Self: | ||||||
|  |         return await self.create() | ||||||
|  |  | ||||||
|  |     async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: | ||||||
|  |         await self.close() | ||||||
|  |  | ||||||
|  |     async def create(self) -> Self: | ||||||
|  |         if self._create_session: | ||||||
|  |             self._session = aiohttp.ClientSession() | ||||||
|  |         return self | ||||||
|  |  | ||||||
|  |     @asynccontextmanager | ||||||
|  |     def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs): | ||||||
|  |         raise NotImplementedError | ||||||
|  |  | ||||||
|  |     @asynccontextmanager | ||||||
|  |     async def get(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]: | ||||||
|  |         if self._session is None: | ||||||
|  |             raise exceptions.NoSessionException() | ||||||
|  |         response: aiohttp.ClientResponse | ||||||
|  |         async with self._intercept(self._session.get, *args, **kwargs) as response: | ||||||
|  |             yield response | ||||||
|  |  | ||||||
|  |     @asynccontextmanager | ||||||
|  |     async def post(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]: | ||||||
|  |         if self._session is None: | ||||||
|  |             raise exceptions.NoSessionException() | ||||||
|  |         response: aiohttp.ClientResponse | ||||||
|  |         async with self._intercept(self._session.post, *args, **kwargs) as response: | ||||||
|  |             yield response | ||||||
|  |  | ||||||
|  |     async def close(self) -> None: | ||||||
|  |         if self._create_session and self._session is not None: | ||||||
|  |             await self._session.close() | ||||||
							
								
								
									
										102
									
								
								pyhon/connection/handler/hon.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										102
									
								
								pyhon/connection/handler/hon.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,102 @@ | |||||||
|  | import json | ||||||
|  | import logging | ||||||
|  | from collections.abc import AsyncIterator | ||||||
|  | from contextlib import asynccontextmanager | ||||||
|  | from typing import Optional, Callable, Dict | ||||||
|  |  | ||||||
|  | import aiohttp | ||||||
|  | from typing_extensions import Self | ||||||
|  |  | ||||||
|  | from pyhon.connection.auth import HonAuth | ||||||
|  | from pyhon.connection.device import HonDevice | ||||||
|  | from pyhon.connection.handler.base import ConnectionHandler | ||||||
|  | from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException | ||||||
|  |  | ||||||
|  | _LOGGER = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class HonConnectionHandler(ConnectionHandler): | ||||||
|  |     def __init__( | ||||||
|  |         self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None | ||||||
|  |     ) -> None: | ||||||
|  |         super().__init__(session=session) | ||||||
|  |         self._device: HonDevice = HonDevice() | ||||||
|  |         self._email: str = email | ||||||
|  |         self._password: str = password | ||||||
|  |         if not self._email: | ||||||
|  |             raise HonAuthenticationError("An email address must be specified") | ||||||
|  |         if not self._password: | ||||||
|  |             raise HonAuthenticationError("A password address must be specified") | ||||||
|  |         self._auth: Optional[HonAuth] = None | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def auth(self) -> HonAuth: | ||||||
|  |         if self._auth is None: | ||||||
|  |             raise NoAuthenticationException() | ||||||
|  |         return self._auth | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def device(self) -> HonDevice: | ||||||
|  |         return self._device | ||||||
|  |  | ||||||
|  |     async def create(self) -> Self: | ||||||
|  |         await super().create() | ||||||
|  |         self._auth = HonAuth(self._session, self._email, self._password, self._device) | ||||||
|  |         return self | ||||||
|  |  | ||||||
|  |     async def _check_headers(self, headers: Dict) -> Dict: | ||||||
|  |         if not (self.auth.cognito_token and self.auth.id_token): | ||||||
|  |             await self.auth.authenticate() | ||||||
|  |         headers["cognito-token"] = self.auth.cognito_token | ||||||
|  |         headers["id-token"] = self.auth.id_token | ||||||
|  |         return self._HEADERS | headers | ||||||
|  |  | ||||||
|  |     @asynccontextmanager | ||||||
|  |     async def _intercept( | ||||||
|  |         self, method: Callable, *args, loop: int = 0, **kwargs | ||||||
|  |     ) -> AsyncIterator: | ||||||
|  |         kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) | ||||||
|  |         async with method(*args, **kwargs) as response: | ||||||
|  |             if ( | ||||||
|  |                 self.auth.token_expires_soon or response.status in [401, 403] | ||||||
|  |             ) and loop == 0: | ||||||
|  |                 _LOGGER.info("Try refreshing token...") | ||||||
|  |                 await self.auth.refresh() | ||||||
|  |                 async with self._intercept( | ||||||
|  |                     method, *args, loop=loop + 1, **kwargs | ||||||
|  |                 ) as result: | ||||||
|  |                     yield result | ||||||
|  |             elif ( | ||||||
|  |                 self.auth.token_is_expired or response.status in [401, 403] | ||||||
|  |             ) and loop == 1: | ||||||
|  |                 _LOGGER.warning( | ||||||
|  |                     "%s - Error %s - %s", | ||||||
|  |                     response.request_info.url, | ||||||
|  |                     response.status, | ||||||
|  |                     await response.text(), | ||||||
|  |                 ) | ||||||
|  |                 await self.create() | ||||||
|  |                 async with self._intercept( | ||||||
|  |                     method, *args, loop=loop + 1, **kwargs | ||||||
|  |                 ) as result: | ||||||
|  |                     yield result | ||||||
|  |             elif loop >= 2: | ||||||
|  |                 _LOGGER.error( | ||||||
|  |                     "%s - Error %s - %s", | ||||||
|  |                     response.request_info.url, | ||||||
|  |                     response.status, | ||||||
|  |                     await response.text(), | ||||||
|  |                 ) | ||||||
|  |                 raise HonAuthenticationError("Login failure") | ||||||
|  |             else: | ||||||
|  |                 try: | ||||||
|  |                     await response.json() | ||||||
|  |                     yield response | ||||||
|  |                 except json.JSONDecodeError: | ||||||
|  |                     _LOGGER.warning( | ||||||
|  |                         "%s - JsonDecodeError %s - %s", | ||||||
|  |                         response.request_info.url, | ||||||
|  |                         response.status, | ||||||
|  |                         await response.text(), | ||||||
|  |                     ) | ||||||
|  |                     raise HonAuthenticationError("Decode Error") | ||||||
| @ -1,2 +1,14 @@ | |||||||
| class HonAuthenticationError(Exception): | class HonAuthenticationError(Exception): | ||||||
|     pass |     pass | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class HonNoAuthenticationNeeded(Exception): | ||||||
|  |     pass | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class NoSessionException(Exception): | ||||||
|  |     pass | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class NoAuthenticationException(Exception): | ||||||
|  |     pass | ||||||
|  | |||||||
							
								
								
									
										63
									
								
								pyhon/helper.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								pyhon/helper.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,63 @@ | |||||||
|  | def key_print(data, key="", start=True): | ||||||
|  |     result = "" | ||||||
|  |     if isinstance(data, list): | ||||||
|  |         for i, value in enumerate(data): | ||||||
|  |             result += key_print(value, key=f"{key}.{i}", start=False) | ||||||
|  |     elif isinstance(data, dict): | ||||||
|  |         for k, value in sorted(data.items()): | ||||||
|  |             result += key_print(value, key=k if start else f"{key}.{k}", start=False) | ||||||
|  |     else: | ||||||
|  |         result += f"{key}: {data}\n" | ||||||
|  |     return result | ||||||
|  |  | ||||||
|  |  | ||||||
|  | # yaml.dump() would be done the same, but needs an additional dependency... | ||||||
|  | def pretty_print(data, key="", intend=0, is_list=False, whitespace="  "): | ||||||
|  |     result = "" | ||||||
|  |     if isinstance(data, list): | ||||||
|  |         if key: | ||||||
|  |             result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n" | ||||||
|  |             intend += 1 | ||||||
|  |         for i, value in enumerate(data): | ||||||
|  |             result += pretty_print( | ||||||
|  |                 value, intend=intend, is_list=True, whitespace=whitespace | ||||||
|  |             ) | ||||||
|  |     elif isinstance(data, dict): | ||||||
|  |         if key: | ||||||
|  |             result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n" | ||||||
|  |             intend += 1 | ||||||
|  |         for i, (key, value) in enumerate(sorted(data.items())): | ||||||
|  |             if is_list and not i: | ||||||
|  |                 result += pretty_print( | ||||||
|  |                     value, key=key, intend=intend, is_list=True, whitespace=whitespace | ||||||
|  |                 ) | ||||||
|  |             elif is_list: | ||||||
|  |                 result += pretty_print( | ||||||
|  |                     value, key=key, intend=intend + 1, whitespace=whitespace | ||||||
|  |                 ) | ||||||
|  |             else: | ||||||
|  |                 result += pretty_print( | ||||||
|  |                     value, key=key, intend=intend, whitespace=whitespace | ||||||
|  |                 ) | ||||||
|  |     else: | ||||||
|  |         result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n" | ||||||
|  |     return result | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def create_command(commands, concat=False): | ||||||
|  |     result = {} | ||||||
|  |     for name, command in commands.items(): | ||||||
|  |         if not concat: | ||||||
|  |             result[name] = {} | ||||||
|  |         for parameter, data in command.available_settings.items(): | ||||||
|  |             if data.typology == "enum": | ||||||
|  |                 value = data.values | ||||||
|  |             elif data.typology == "range": | ||||||
|  |                 value = {"min": data.min, "max": data.max, "step": data.step} | ||||||
|  |             else: | ||||||
|  |                 continue | ||||||
|  |             if not concat: | ||||||
|  |                 result[name][parameter] = value | ||||||
|  |             else: | ||||||
|  |                 result[f"{name}.{parameter}"] = value | ||||||
|  |     return result | ||||||
							
								
								
									
										58
									
								
								pyhon/hon.py
									
									
									
									
									
								
							
							
						
						
									
										58
									
								
								pyhon/hon.py
									
									
									
									
									
								
							| @ -1,25 +1,40 @@ | |||||||
| import asyncio | import asyncio | ||||||
| from typing import List | from types import TracebackType | ||||||
|  | from typing import List, Optional, Dict, Any, Type | ||||||
|  |  | ||||||
| from pyhon import HonAPI | from aiohttp import ClientSession | ||||||
|  | from typing_extensions import Self | ||||||
|  |  | ||||||
|  | from pyhon import HonAPI, exceptions | ||||||
| from pyhon.appliance import HonAppliance | from pyhon.appliance import HonAppliance | ||||||
|  |  | ||||||
|  |  | ||||||
| class Hon: | class Hon: | ||||||
|     def __init__(self, email, password, session=None): |     def __init__(self, email: str, password: str, session: ClientSession | None = None): | ||||||
|         self._email = email |         self._email: str = email | ||||||
|         self._password = password |         self._password: str = password | ||||||
|         self._session = session |         self._session: ClientSession | None = session | ||||||
|         self._appliances = [] |         self._appliances: List[HonAppliance] = [] | ||||||
|         self._api = None |         self._api: Optional[HonAPI] = None | ||||||
|  |  | ||||||
|     async def __aenter__(self): |     async def __aenter__(self) -> Self: | ||||||
|         return await self.create() |         return await self.create() | ||||||
|  |  | ||||||
|     async def __aexit__(self, exc_type, exc_val, exc_tb): |     async def __aexit__( | ||||||
|  |         self, | ||||||
|  |         exc_type: Optional[Type[BaseException]], | ||||||
|  |         exc: Optional[BaseException], | ||||||
|  |         traceback: Optional[TracebackType], | ||||||
|  |     ) -> None: | ||||||
|         await self.close() |         await self.close() | ||||||
|  |  | ||||||
|     async def create(self): |     @property | ||||||
|  |     def api(self) -> HonAPI: | ||||||
|  |         if self._api is None: | ||||||
|  |             raise exceptions.NoAuthenticationException | ||||||
|  |         return self._api | ||||||
|  |  | ||||||
|  |     async def create(self) -> Self: | ||||||
|         self._api = await HonAPI( |         self._api = await HonAPI( | ||||||
|             self._email, self._password, session=self._session |             self._email, self._password, session=self._session | ||||||
|         ).create() |         ).create() | ||||||
| @ -30,11 +45,10 @@ class Hon: | |||||||
|     def appliances(self) -> List[HonAppliance]: |     def appliances(self) -> List[HonAppliance]: | ||||||
|         return self._appliances |         return self._appliances | ||||||
|  |  | ||||||
|     async def setup(self): |     async def _create_appliance(self, appliance_data: Dict[str, Any], zone=0) -> None: | ||||||
|         for appliance in (await self._api.load_appliances())["payload"]["appliances"]: |         appliance = HonAppliance(self._api, appliance_data, zone=zone) | ||||||
|             appliance = HonAppliance(self._api, appliance) |         if appliance.mac_address == "": | ||||||
|             if appliance.mac_address is None: |             return | ||||||
|                 continue |  | ||||||
|         await asyncio.gather( |         await asyncio.gather( | ||||||
|             *[ |             *[ | ||||||
|                 appliance.load_attributes(), |                 appliance.load_attributes(), | ||||||
| @ -44,5 +58,13 @@ class Hon: | |||||||
|         ) |         ) | ||||||
|         self._appliances.append(appliance) |         self._appliances.append(appliance) | ||||||
|  |  | ||||||
|     async def close(self): |     async def setup(self) -> None: | ||||||
|         await self._api.close() |         appliance: Dict | ||||||
|  |         for appliance in (await self.api.load_appliances())["payload"]["appliances"]: | ||||||
|  |             if (zones := int(appliance.get("zone", "0"))) > 1: | ||||||
|  |                 for zone in range(zones): | ||||||
|  |                     await self._create_appliance(appliance.copy(), zone=zone + 1) | ||||||
|  |             await self._create_appliance(appliance) | ||||||
|  |  | ||||||
|  |     async def close(self) -> None: | ||||||
|  |         await self.api.close() | ||||||
|  | |||||||
| @ -1,157 +0,0 @@ | |||||||
| import re |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def str_to_float(string): |  | ||||||
|     try: |  | ||||||
|         return int(string) |  | ||||||
|     except ValueError: |  | ||||||
|         return float(str(string.replace(",", "."))) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonParameter: |  | ||||||
|     def __init__(self, key, attributes): |  | ||||||
|         self._key = key |  | ||||||
|         self._category = attributes.get("category") |  | ||||||
|         self._typology = attributes.get("typology") |  | ||||||
|         self._mandatory = attributes.get("mandatory") |  | ||||||
|         self._value = "" |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def key(self): |  | ||||||
|         return self._key |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def value(self): |  | ||||||
|         return self._value if self._value is not None else "0" |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def category(self): |  | ||||||
|         return self._category |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def typology(self): |  | ||||||
|         return self._typology |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def mandatory(self): |  | ||||||
|         return self._mandatory |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonParameterFixed(HonParameter): |  | ||||||
|     def __init__(self, key, attributes): |  | ||||||
|         super().__init__(key, attributes) |  | ||||||
|         self._value = attributes.get("fixedValue", None) |  | ||||||
|  |  | ||||||
|     def __repr__(self): |  | ||||||
|         return f"{self.__class__} (<{self.key}> fixed)" |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def value(self): |  | ||||||
|         return self._value if self._value is not None else "0" |  | ||||||
|  |  | ||||||
|     @value.setter |  | ||||||
|     def value(self, value): |  | ||||||
|         if not value == self._value: |  | ||||||
|             raise ValueError("Can't change fixed value") |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonParameterRange(HonParameter): |  | ||||||
|     def __init__(self, key, attributes): |  | ||||||
|         super().__init__(key, attributes) |  | ||||||
|         self._min = str_to_float(attributes["minimumValue"]) |  | ||||||
|         self._max = str_to_float(attributes["maximumValue"]) |  | ||||||
|         self._step = str_to_float(attributes["incrementValue"]) |  | ||||||
|         self._default = str_to_float(attributes.get("defaultValue", self._min)) |  | ||||||
|         self._value = self._default |  | ||||||
|  |  | ||||||
|     def __repr__(self): |  | ||||||
|         return f"{self.__class__} (<{self.key}> [{self._min} - {self._max}])" |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def min(self): |  | ||||||
|         return self._min |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def max(self): |  | ||||||
|         return self._max |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def step(self): |  | ||||||
|         return self._step |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def value(self): |  | ||||||
|         return self._value if self._value is not None else self._min |  | ||||||
|  |  | ||||||
|     @value.setter |  | ||||||
|     def value(self, value): |  | ||||||
|         value = str_to_float(value) |  | ||||||
|         if self._min <= value <= self._max and not value % self._step: |  | ||||||
|             self._value = value |  | ||||||
|         else: |  | ||||||
|             raise ValueError( |  | ||||||
|                 f"Allowed: min {self._min} max {self._max} step {self._step}" |  | ||||||
|             ) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonParameterEnum(HonParameter): |  | ||||||
|     def __init__(self, key, attributes): |  | ||||||
|         super().__init__(key, attributes) |  | ||||||
|         self._default = attributes.get("defaultValue") |  | ||||||
|         self._value = self._default or "0" |  | ||||||
|         self._values = attributes.get("enumValues") |  | ||||||
|  |  | ||||||
|     def __repr__(self): |  | ||||||
|         return f"{self.__class__} (<{self.key}> {self.values})" |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def values(self): |  | ||||||
|         return [str(value) for value in self._values] |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def value(self): |  | ||||||
|         return self._value if self._value is not None else self.values[0] |  | ||||||
|  |  | ||||||
|     @value.setter |  | ||||||
|     def value(self, value): |  | ||||||
|         if value in self.values: |  | ||||||
|             self._value = value |  | ||||||
|         else: |  | ||||||
|             raise ValueError(f"Allowed values {self._value}") |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class HonParameterProgram(HonParameterEnum): |  | ||||||
|     def __init__(self, key, command): |  | ||||||
|         super().__init__(key, {}) |  | ||||||
|         self._command = command |  | ||||||
|         self._value = command._program |  | ||||||
|         self._values = command._multi |  | ||||||
|         self._typology = "enum" |  | ||||||
|         self._filter = "" |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def value(self): |  | ||||||
|         return self._value |  | ||||||
|  |  | ||||||
|     @value.setter |  | ||||||
|     def value(self, value): |  | ||||||
|         if value in self.values: |  | ||||||
|             self._command.set_program(value) |  | ||||||
|         else: |  | ||||||
|             raise ValueError(f"Allowed values {self._values}") |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def filter(self): |  | ||||||
|         return self._filter |  | ||||||
|  |  | ||||||
|     @filter.setter |  | ||||||
|     def filter(self, filter): |  | ||||||
|         self._filter = filter |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def values(self): |  | ||||||
|         values = [] |  | ||||||
|         for value in self._values: |  | ||||||
|             if not self._filter or re.findall(self._filter, str(value)): |  | ||||||
|                 values.append(str(value)) |  | ||||||
|         return sorted(values) |  | ||||||
							
								
								
									
										0
									
								
								pyhon/parameter/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								pyhon/parameter/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										39
									
								
								pyhon/parameter/base.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								pyhon/parameter/base.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,39 @@ | |||||||
|  | from typing import Dict, Any, List | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class HonParameter: | ||||||
|  |     def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: | ||||||
|  |         self._key = key | ||||||
|  |         self._category: str = attributes.get("category", "") | ||||||
|  |         self._typology: str = attributes.get("typology", "") | ||||||
|  |         self._mandatory: int = attributes.get("mandatory", 0) | ||||||
|  |         self._value: str | float = "" | ||||||
|  |         self._group: str = group | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def key(self) -> str: | ||||||
|  |         return self._key | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def value(self) -> str | float: | ||||||
|  |         return self._value if self._value is not None else "0" | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def values(self) -> List[str]: | ||||||
|  |         return [str(self.value)] | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def category(self) -> str: | ||||||
|  |         return self._category | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def typology(self) -> str: | ||||||
|  |         return self._typology | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def mandatory(self) -> int: | ||||||
|  |         return self._mandatory | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def group(self) -> str: | ||||||
|  |         return self._group | ||||||
							
								
								
									
										29
									
								
								pyhon/parameter/enum.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										29
									
								
								pyhon/parameter/enum.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,29 @@ | |||||||
|  | from typing import Dict, Any, List | ||||||
|  |  | ||||||
|  | from pyhon.parameter.base import HonParameter | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class HonParameterEnum(HonParameter): | ||||||
|  |     def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: | ||||||
|  |         super().__init__(key, attributes, group) | ||||||
|  |         self._default = attributes.get("defaultValue") | ||||||
|  |         self._value = self._default or "0" | ||||||
|  |         self._values: List[str] = attributes.get("enumValues", []) | ||||||
|  |  | ||||||
|  |     def __repr__(self) -> str: | ||||||
|  |         return f"{self.__class__} (<{self.key}> {self.values})" | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def values(self) -> List[str]: | ||||||
|  |         return [str(value) for value in self._values] | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def value(self) -> str | float: | ||||||
|  |         return self._value if self._value is not None else self.values[0] | ||||||
|  |  | ||||||
|  |     @value.setter | ||||||
|  |     def value(self, value: str) -> None: | ||||||
|  |         if value in self.values: | ||||||
|  |             self._value = value | ||||||
|  |         else: | ||||||
|  |             raise ValueError(f"Allowed values {self._value}") | ||||||
							
								
								
									
										21
									
								
								pyhon/parameter/fixed.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								pyhon/parameter/fixed.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,21 @@ | |||||||
|  | from typing import Dict, Any | ||||||
|  |  | ||||||
|  | from pyhon.parameter.base import HonParameter | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class HonParameterFixed(HonParameter): | ||||||
|  |     def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: | ||||||
|  |         super().__init__(key, attributes, group) | ||||||
|  |         self._value = attributes.get("fixedValue", None) | ||||||
|  |  | ||||||
|  |     def __repr__(self) -> str: | ||||||
|  |         return f"{self.__class__} (<{self.key}> fixed)" | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def value(self) -> str | float: | ||||||
|  |         return self._value if self._value is not None else "0" | ||||||
|  |  | ||||||
|  |     @value.setter | ||||||
|  |     def value(self, value: str | float) -> None: | ||||||
|  |         # Fixed values seems being not so fixed as thought | ||||||
|  |         self._value = value | ||||||
							
								
								
									
										45
									
								
								pyhon/parameter/program.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								pyhon/parameter/program.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,45 @@ | |||||||
|  | from typing import List, TYPE_CHECKING, Dict | ||||||
|  |  | ||||||
|  | from pyhon.parameter.enum import HonParameterEnum | ||||||
|  |  | ||||||
|  | if TYPE_CHECKING: | ||||||
|  |     from pyhon.commands import HonCommand | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class HonParameterProgram(HonParameterEnum): | ||||||
|  |     _FILTER = ["iot_recipe", "iot_guided"] | ||||||
|  |  | ||||||
|  |     def __init__(self, key: str, command: "HonCommand", group: str) -> None: | ||||||
|  |         super().__init__(key, {}, group) | ||||||
|  |         self._command = command | ||||||
|  |         if "PROGRAM" in command.category: | ||||||
|  |             self._value: str = command.category.split(".")[-1].lower() | ||||||
|  |         else: | ||||||
|  |             self._value: str = command.category | ||||||
|  |         self._programs: Dict[str, "HonCommand"] = command.categories | ||||||
|  |         self._typology: str = "enum" | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def value(self) -> str | float: | ||||||
|  |         return self._value | ||||||
|  |  | ||||||
|  |     @value.setter | ||||||
|  |     def value(self, value: str) -> None: | ||||||
|  |         if value in self.values: | ||||||
|  |             self._command.category = value | ||||||
|  |         else: | ||||||
|  |             raise ValueError(f"Allowed values {self.values}") | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def values(self) -> List[str]: | ||||||
|  |         values = [v for v in self._programs if all(f not in v for f in self._FILTER)] | ||||||
|  |         return sorted(values) | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def ids(self): | ||||||
|  |         values = { | ||||||
|  |             int(p.parameters["prCode"].value): n | ||||||
|  |             for i, (n, p) in enumerate(self._programs.items()) | ||||||
|  |             if "iot_" not in n and p.parameters.get("prCode") | ||||||
|  |         } | ||||||
|  |         return dict(sorted(values.items())) | ||||||
							
								
								
									
										53
									
								
								pyhon/parameter/range.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								pyhon/parameter/range.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,53 @@ | |||||||
|  | from typing import Dict, Any, List | ||||||
|  |  | ||||||
|  | from pyhon.parameter.base import HonParameter | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def str_to_float(string: str | float) -> float: | ||||||
|  |     try: | ||||||
|  |         return int(string) | ||||||
|  |     except ValueError: | ||||||
|  |         return float(str(string).replace(",", ".")) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class HonParameterRange(HonParameter): | ||||||
|  |     def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: | ||||||
|  |         super().__init__(key, attributes, group) | ||||||
|  |         self._min: float = str_to_float(attributes["minimumValue"]) | ||||||
|  |         self._max: float = str_to_float(attributes["maximumValue"]) | ||||||
|  |         self._step: float = str_to_float(attributes["incrementValue"]) | ||||||
|  |         self._default: float = str_to_float(attributes.get("defaultValue", self._min)) | ||||||
|  |         self._value: float = self._default | ||||||
|  |  | ||||||
|  |     def __repr__(self): | ||||||
|  |         return f"{self.__class__} (<{self.key}> [{self._min} - {self._max}])" | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def min(self) -> float: | ||||||
|  |         return self._min | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def max(self) -> float: | ||||||
|  |         return self._max | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def step(self) -> float: | ||||||
|  |         return self._step | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def value(self) -> float: | ||||||
|  |         return self._value if self._value is not None else self._min | ||||||
|  |  | ||||||
|  |     @value.setter | ||||||
|  |     def value(self, value: float) -> None: | ||||||
|  |         value = str_to_float(value) | ||||||
|  |         if self._min <= value <= self._max and not value % self._step: | ||||||
|  |             self._value = value | ||||||
|  |         else: | ||||||
|  |             raise ValueError( | ||||||
|  |                 f"Allowed: min {self._min} max {self._max} step {self._step}" | ||||||
|  |             ) | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def values(self) -> List[str]: | ||||||
|  |         return [str(i) for i in range(int(self.min), int(self.max) + 1, int(self.step))] | ||||||
| @ -1 +1,2 @@ | |||||||
| aiohttp | aiohttp==3.8.4 | ||||||
|  | yarl==1.8.2 | ||||||
|  | |||||||
							
								
								
									
										4
									
								
								requirements_dev.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								requirements_dev.txt
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,4 @@ | |||||||
|  | black==23.3.0 | ||||||
|  | flake8==6.0.0 | ||||||
|  | mypy==1.2.0 | ||||||
|  | pylint==2.17.2 | ||||||
							
								
								
									
										2
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								setup.py
									
									
									
									
									
								
							| @ -7,7 +7,7 @@ with open("README.md", "r") as f: | |||||||
|  |  | ||||||
| setup( | setup( | ||||||
|     name="pyhOn", |     name="pyhOn", | ||||||
|     version="0.6.4", |     version="0.10.2", | ||||||
|     author="Andre Basche", |     author="Andre Basche", | ||||||
|     description="Control hOn devices with python", |     description="Control hOn devices with python", | ||||||
|     long_description=long_description, |     long_description=long_description, | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	