Compare commits
	
		
			16 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 411effd814 | |||
| 04f19c4609 | |||
| 69be63df2a | |||
| 6c44aa895d | |||
| 8372c75e30 | |||
| 5b91747ec1 | |||
| 8da2018302 | |||
| 03187745bf | |||
| 461a247ad3 | |||
| 834f25a639 | |||
| 46ff9be4a2 | |||
| a1618bb18a | |||
| a957d7ac0f | |||
| f54b7b2dbf | |||
| b6ca12ebff | |||
| 4a0ee8569b | 
							
								
								
									
										5
									
								
								.github/workflows/python-check.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										5
									
								
								.github/workflows/python-check.yml
									
									
									
									
										vendored
									
									
								
							| @ -25,12 +25,15 @@ jobs: | ||||
|       run: | | ||||
|         python -m pip install --upgrade pip | ||||
|         python -m pip install -r requirements.txt | ||||
|         python -m pip install flake8 pylint black | ||||
|         python -m pip install -r requirements_dev.txt | ||||
|     - name: Lint with flake8 | ||||
|       run: | | ||||
|         # stop the build if there are Python syntax errors or undefined names | ||||
|         flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics | ||||
|         flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics | ||||
|     - name: Type check with mypy | ||||
|       run: | | ||||
|         mypy pyhon/ | ||||
|     # - name: Analysing the code with pylint | ||||
|     #   run: | | ||||
|     #     pylint --max-line-length 88 $(git ls-files '*.py') | ||||
|  | ||||
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @ -4,3 +4,4 @@ __pycache__/ | ||||
| dist/ | ||||
| **/*.egg-info/ | ||||
| test* | ||||
| build/ | ||||
|  | ||||
| @ -6,7 +6,7 @@ | ||||
| [](https://www.python.org/) | ||||
| [](https://github.com/Andre0512/pyhOn/blob/main/LICENSE) | ||||
| [](https://pypistats.org/packages/pyhon)   | ||||
| Control your Haier appliances with python! | ||||
| Control your Haier, Candy and Hoover appliances with python! | ||||
| The idea behind this library is, to make the use of all available commands as simple as possible. | ||||
|  | ||||
| ## Installation | ||||
|  | ||||
| @ -1,24 +1,30 @@ | ||||
| import importlib | ||||
| from contextlib import suppress | ||||
| from typing import Optional, Dict | ||||
| from typing import Optional, Dict, Any | ||||
| from typing import TYPE_CHECKING | ||||
|  | ||||
| from pyhon import helper | ||||
| from pyhon.commands import HonCommand | ||||
| from pyhon.parameter import HonParameterFixed | ||||
| from pyhon.parameter.fixed import HonParameterFixed | ||||
|  | ||||
| if TYPE_CHECKING: | ||||
|     from pyhon import HonAPI | ||||
|  | ||||
|  | ||||
| class HonAppliance: | ||||
|     def __init__(self, api, info: Dict, zone: int = 0) -> None: | ||||
|     def __init__( | ||||
|         self, api: Optional["HonAPI"], info: Dict[str, Any], zone: int = 0 | ||||
|     ) -> None: | ||||
|         if attributes := info.get("attributes"): | ||||
|             info["attributes"] = {v["parName"]: v["parValue"] for v in attributes} | ||||
|         self._info = info | ||||
|         self._api = api | ||||
|         self._appliance_model = {} | ||||
|         self._info: Dict = info | ||||
|         self._api: Optional[HonAPI] = api | ||||
|         self._appliance_model: Dict = {} | ||||
|  | ||||
|         self._commands = {} | ||||
|         self._statistics = {} | ||||
|         self._attributes = {} | ||||
|         self._zone = zone | ||||
|         self._commands: Dict = {} | ||||
|         self._statistics: Dict = {} | ||||
|         self._attributes: Dict = {} | ||||
|         self._zone: int = zone | ||||
|  | ||||
|         try: | ||||
|             self._extra = importlib.import_module( | ||||
| @ -58,14 +64,18 @@ class HonAppliance: | ||||
|  | ||||
|     @property | ||||
|     def appliance_model_id(self) -> str: | ||||
|         return self._info.get("applianceModelId") | ||||
|         return self._info.get("applianceModelId", "") | ||||
|  | ||||
|     @property | ||||
|     def appliance_type(self) -> str: | ||||
|         return self._info.get("applianceTypeName") | ||||
|         return self._info.get("applianceTypeName", "") | ||||
|  | ||||
|     @property | ||||
|     def mac_address(self) -> str: | ||||
|         return self.info.get("macAddress", "") | ||||
|  | ||||
|     @property | ||||
|     def unique_id(self) -> str: | ||||
|         return self._check_name_zone("macAddress", frontend=False) | ||||
|  | ||||
|     @property | ||||
| @ -96,6 +106,10 @@ class HonAppliance: | ||||
|     def info(self): | ||||
|         return self._info | ||||
|  | ||||
|     @property | ||||
|     def zone(self) -> int: | ||||
|         return self._zone | ||||
|  | ||||
|     async def _recover_last_command_states(self, commands): | ||||
|         command_history = await self._api.command_history(self) | ||||
|         for name, command in commands.items(): | ||||
| @ -110,8 +124,8 @@ class HonAppliance: | ||||
|             if last is None: | ||||
|                 continue | ||||
|             parameters = command_history[last].get("command", {}).get("parameters", {}) | ||||
|             if command._multi and parameters.get("program"): | ||||
|                 command.set_program(parameters.pop("program").split(".")[-1].lower()) | ||||
|             if command.programs and parameters.get("program"): | ||||
|                 command.program = parameters.pop("program").split(".")[-1].lower() | ||||
|                 command = self.commands[name] | ||||
|             for key, data in command.settings.items(): | ||||
|                 if ( | ||||
| @ -135,7 +149,12 @@ class HonAppliance: | ||||
|                 for program, attr2 in attr.items(): | ||||
|                     program = program.split(".")[-1].lower() | ||||
|                     cmd = HonCommand( | ||||
|                         command, attr2, self._api, self, multi=multi, program=program | ||||
|                         command, | ||||
|                         attr2, | ||||
|                         self._api, | ||||
|                         self, | ||||
|                         programs=multi, | ||||
|                         program_name=program, | ||||
|                     ) | ||||
|                     multi[program] = cmd | ||||
|                     commands[command] = cmd | ||||
| @ -156,7 +175,9 @@ class HonAppliance: | ||||
|     def parameters(self): | ||||
|         result = {} | ||||
|         for name, command in self._commands.items(): | ||||
|             for key, parameter in command.parameters.items(): | ||||
|             for key, parameter in ( | ||||
|                 command.parameters | command.ancillary_parameters | ||||
|             ).items(): | ||||
|                 result.setdefault(name, {})[key] = parameter.value | ||||
|         return result | ||||
|  | ||||
|  | ||||
							
								
								
									
										9
									
								
								pyhon/appliances/dw.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										9
									
								
								pyhon/appliances/dw.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,9 @@ | ||||
| class Appliance: | ||||
|     def data(self, data): | ||||
|         if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED": | ||||
|             data["attributes"]["parameters"]["machMode"] = "0" | ||||
|         data["active"] = bool(data.get("attributes", {}).get("activity")) | ||||
|         return data | ||||
|  | ||||
|     def settings(self, settings): | ||||
|         return settings | ||||
| @ -1,23 +0,0 @@ | ||||
| from pyhon.parameter import HonParameterEnum | ||||
|  | ||||
|  | ||||
| class Appliance: | ||||
|     _FILTERS = { | ||||
|         "default": "^(?!iot_(?:recipe|guided))\\S+$", | ||||
|         "recipe": "iot_recipe_", | ||||
|         "guided": "iot_guided_", | ||||
|     } | ||||
|  | ||||
|     def __init__(self): | ||||
|         filters = list(self._FILTERS.values()) | ||||
|         data = {"defaultValue": filters[0], "enumValues": filters} | ||||
|         self._program_filter = HonParameterEnum("program_filter", data) | ||||
|  | ||||
|     def data(self, data): | ||||
|         return data | ||||
|  | ||||
|     def settings(self, settings): | ||||
|         settings["program_filter"] = self._program_filter | ||||
|         value = self._FILTERS[self._program_filter.value] | ||||
|         settings["startProgram.program"].filter = value | ||||
|         return settings | ||||
| @ -1,32 +1,47 @@ | ||||
| from pyhon.parameter import ( | ||||
|     HonParameterFixed, | ||||
|     HonParameterEnum, | ||||
|     HonParameterRange, | ||||
|     HonParameterProgram, | ||||
| ) | ||||
| from typing import Optional, Dict, Any, List, TYPE_CHECKING | ||||
|  | ||||
| from pyhon.parameter.base import HonParameter | ||||
| from pyhon.parameter.enum import HonParameterEnum | ||||
| from pyhon.parameter.fixed import HonParameterFixed | ||||
| from pyhon.parameter.program import HonParameterProgram | ||||
| from pyhon.parameter.range import HonParameterRange | ||||
|  | ||||
| if TYPE_CHECKING: | ||||
|     from pyhon import HonAPI | ||||
|     from pyhon.appliance import HonAppliance | ||||
|  | ||||
|  | ||||
| class HonCommand: | ||||
|     def __init__(self, name, attributes, connector, device, multi=None, program=""): | ||||
|         self._connector = connector | ||||
|         self._device = device | ||||
|         self._name = name | ||||
|         self._multi = multi or {} | ||||
|         self._program = program | ||||
|         self._description = attributes.get("description", "") | ||||
|         self._parameters = self._create_parameters(attributes.get("parameters", {})) | ||||
|         self._ancillary_parameters = self._create_parameters( | ||||
|     def __init__( | ||||
|         self, | ||||
|         name: str, | ||||
|         attributes: Dict[str, Any], | ||||
|         api: "HonAPI", | ||||
|         appliance: "HonAppliance", | ||||
|         programs: Optional[Dict[str, "HonCommand"]] = None, | ||||
|         program_name: str = "", | ||||
|     ): | ||||
|         self._api: HonAPI = api | ||||
|         self._appliance: "HonAppliance" = appliance | ||||
|         self._name: str = name | ||||
|         self._programs: Optional[Dict[str, "HonCommand"]] = programs or {} | ||||
|         self._program_name: str = program_name | ||||
|         self._description: str = attributes.get("description", "") | ||||
|         self._parameters: Dict[str, HonParameter] = self._create_parameters( | ||||
|             attributes.get("parameters", {}) | ||||
|         ) | ||||
|         self._ancillary_parameters: Dict[str, HonParameter] = self._create_parameters( | ||||
|             attributes.get("ancillaryParameters", {}) | ||||
|         ) | ||||
|  | ||||
|     def __repr__(self): | ||||
|     def __repr__(self) -> str: | ||||
|         return f"{self._name} command" | ||||
|  | ||||
|     def _create_parameters(self, parameters): | ||||
|         result = {} | ||||
|     def _create_parameters(self, parameters: Dict) -> Dict[str, HonParameter]: | ||||
|         result: Dict[str, HonParameter] = {} | ||||
|         for parameter, attributes in parameters.items(): | ||||
|             if parameter == "zoneMap" and self._device.zone: | ||||
|                 attributes["default"] = self._device.zone | ||||
|             if parameter == "zoneMap" and self._appliance.zone: | ||||
|                 attributes["default"] = self._appliance.zone | ||||
|             match attributes.get("typology"): | ||||
|                 case "range": | ||||
|                     result[parameter] = HonParameterRange(parameter, attributes) | ||||
| @ -34,39 +49,46 @@ class HonCommand: | ||||
|                     result[parameter] = HonParameterEnum(parameter, attributes) | ||||
|                 case "fixed": | ||||
|                     result[parameter] = HonParameterFixed(parameter, attributes) | ||||
|         if self._multi: | ||||
|         if self._programs: | ||||
|             result["program"] = HonParameterProgram("program", self) | ||||
|         return result | ||||
|  | ||||
|     @property | ||||
|     def parameters(self): | ||||
|     def parameters(self) -> Dict[str, HonParameter]: | ||||
|         return self._parameters | ||||
|  | ||||
|     @property | ||||
|     def ancillary_parameters(self): | ||||
|         return { | ||||
|             key: parameter.value | ||||
|             for key, parameter in self._ancillary_parameters.items() | ||||
|         } | ||||
|     def ancillary_parameters(self) -> Dict[str, HonParameter]: | ||||
|         return self._ancillary_parameters | ||||
|  | ||||
|     async def send(self): | ||||
|         parameters = { | ||||
|             name: parameter.value for name, parameter in self._parameters.items() | ||||
|         } | ||||
|         return await self._connector.send_command( | ||||
|             self._device, self._name, parameters, self.ancillary_parameters | ||||
|     async def send(self) -> bool: | ||||
|         params = {k: v.value for k, v in self._parameters.items()} | ||||
|         ancillary_params = {k: v.value for k, v in self._ancillary_parameters.items()} | ||||
|         return await self._api.send_command( | ||||
|             self._appliance, self._name, params, ancillary_params | ||||
|         ) | ||||
|  | ||||
|     def get_programs(self): | ||||
|         return self._multi | ||||
|     @property | ||||
|     def programs(self) -> Dict[str, "HonCommand"]: | ||||
|         if self._programs is None: | ||||
|             return {} | ||||
|         return self._programs | ||||
|  | ||||
|     def set_program(self, program): | ||||
|         self._device.commands[self._name] = self._multi[program] | ||||
|     @property | ||||
|     def program(self) -> str: | ||||
|         return self._program_name | ||||
|  | ||||
|     def _get_settings_keys(self, command=None): | ||||
|         command = command or self | ||||
|     @program.setter | ||||
|     def program(self, program: str) -> None: | ||||
|         self._appliance.commands[self._name] = self.programs[program] | ||||
|  | ||||
|     def _get_settings_keys(self, command: Optional["HonCommand"] = None) -> List[str]: | ||||
|         if command is None: | ||||
|             command = self | ||||
|         keys = [] | ||||
|         for key, parameter in command._parameters.items(): | ||||
|         for key, parameter in ( | ||||
|             command._parameters | command._ancillary_parameters | ||||
|         ).items(): | ||||
|             if isinstance(parameter, HonParameterFixed): | ||||
|                 continue | ||||
|             if key not in keys: | ||||
| @ -74,19 +96,22 @@ class HonCommand: | ||||
|         return keys | ||||
|  | ||||
|     @property | ||||
|     def setting_keys(self): | ||||
|         if not self._multi: | ||||
|     def setting_keys(self) -> List[str]: | ||||
|         if not self._programs: | ||||
|             return self._get_settings_keys() | ||||
|         result = [ | ||||
|             key for cmd in self._multi.values() for key in self._get_settings_keys(cmd) | ||||
|             key | ||||
|             for cmd in self._programs.values() | ||||
|             for key in self._get_settings_keys(cmd) | ||||
|         ] | ||||
|         return list(set(result + ["program"])) | ||||
|  | ||||
|     @property | ||||
|     def settings(self): | ||||
|     def settings(self) -> Dict[str, HonParameter]: | ||||
|         """Parameters with typology enum and range""" | ||||
|         return { | ||||
|             s: self._parameters.get(s) | ||||
|             s: param | ||||
|             for s in self.setting_keys | ||||
|             if self._parameters.get(s) is not None | ||||
|             if (param := self._parameters.get(s)) is not None | ||||
|             or (param := self._ancillary_parameters.get(s)) is not None | ||||
|         } | ||||
|  | ||||
| @ -2,14 +2,15 @@ import json | ||||
| import logging | ||||
| from datetime import datetime | ||||
| from typing import Dict, Optional | ||||
| from typing_extensions import Self | ||||
|  | ||||
| from aiohttp import ClientSession | ||||
| from typing_extensions import Self | ||||
|  | ||||
| from pyhon import const, exceptions | ||||
| from pyhon.appliance import HonAppliance | ||||
| from pyhon.connection.auth import HonAuth | ||||
| from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler | ||||
| from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler | ||||
| from pyhon.connection.handler.hon import HonConnectionHandler | ||||
|  | ||||
| _LOGGER = logging.getLogger() | ||||
|  | ||||
|  | ||||
| @ -3,52 +3,65 @@ import logging | ||||
| import re | ||||
| import secrets | ||||
| import urllib | ||||
| from contextlib import suppress | ||||
| from dataclasses import dataclass | ||||
| from datetime import datetime, timedelta | ||||
| from pprint import pformat | ||||
| from typing import List, Tuple | ||||
| from typing import Dict, Optional | ||||
| from urllib import parse | ||||
| from urllib.parse import quote | ||||
|  | ||||
| from aiohttp import ClientResponse | ||||
| from yarl import URL | ||||
|  | ||||
| from pyhon import const, exceptions | ||||
| from pyhon.connection.handler.auth import HonAuthConnectionHandler | ||||
|  | ||||
| _LOGGER = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| @dataclass | ||||
| class HonLoginData: | ||||
|     url: str = "" | ||||
|     email: str = "" | ||||
|     password: str = "" | ||||
|     fw_uid: str = "" | ||||
|     loaded: Optional[Dict] = None | ||||
|  | ||||
|  | ||||
| class HonAuth: | ||||
|     _TOKEN_EXPIRES_AFTER_HOURS = 8 | ||||
|     _TOKEN_EXPIRE_WARNING_HOURS = 7 | ||||
|  | ||||
|     def __init__(self, session, email, password, device) -> None: | ||||
|         self._session = session | ||||
|         self._email = email | ||||
|         self._password = password | ||||
|         self._request = HonAuthConnectionHandler(session) | ||||
|         self._login_data = HonLoginData() | ||||
|         self._login_data.email = email | ||||
|         self._login_data.password = password | ||||
|         self._access_token = "" | ||||
|         self._refresh_token = "" | ||||
|         self._cognito_token = "" | ||||
|         self._id_token = "" | ||||
|         self._device = device | ||||
|         self._called_urls: List[Tuple[int, str]] = [] | ||||
|         self._expires: datetime = datetime.utcnow() | ||||
|  | ||||
|     @property | ||||
|     def cognito_token(self): | ||||
|     def cognito_token(self) -> str: | ||||
|         return self._cognito_token | ||||
|  | ||||
|     @property | ||||
|     def id_token(self): | ||||
|     def id_token(self) -> str: | ||||
|         return self._id_token | ||||
|  | ||||
|     @property | ||||
|     def access_token(self): | ||||
|     def access_token(self) -> str: | ||||
|         return self._access_token | ||||
|  | ||||
|     @property | ||||
|     def refresh_token(self): | ||||
|     def refresh_token(self) -> str: | ||||
|         return self._refresh_token | ||||
|  | ||||
|     def _check_token_expiration(self, hours): | ||||
|     def _check_token_expiration(self, hours: int) -> bool: | ||||
|         return datetime.utcnow() >= self._expires + timedelta(hours=hours) | ||||
|  | ||||
|     @property | ||||
| @ -59,34 +72,39 @@ class HonAuth: | ||||
|     def token_expires_soon(self) -> bool: | ||||
|         return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS) | ||||
|  | ||||
|     async def _error_logger(self, response, fail=True): | ||||
|         result = "hOn Authentication Error\n" | ||||
|         for i, (status, url) in enumerate(self._called_urls): | ||||
|             result += f" {i + 1: 2d}     {status} - {url}\n" | ||||
|         result += f"ERROR - {response.status} - {response.request_info.url}\n" | ||||
|         result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}" | ||||
|         _LOGGER.error(result) | ||||
|     async def _error_logger(self, response: ClientResponse, fail: bool = True) -> None: | ||||
|         output = "hOn Authentication Error\n" | ||||
|         for i, (status, url) in enumerate(self._request.called_urls): | ||||
|             output += f" {i + 1: 2d}     {status} - {url}\n" | ||||
|         output += f"ERROR - {response.status} - {response.request_info.url}\n" | ||||
|         output += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}" | ||||
|         _LOGGER.error(output) | ||||
|         if fail: | ||||
|             raise exceptions.HonAuthenticationError("Can't login") | ||||
|  | ||||
|     async def _load_login(self): | ||||
|     @staticmethod | ||||
|     def _generate_nonce() -> str: | ||||
|         nonce = secrets.token_hex(16) | ||||
|         nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" | ||||
|         return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" | ||||
|  | ||||
|     async def _load_login(self) -> bool: | ||||
|         login_url = await self._introduce() | ||||
|         login_url = await self._handle_redirects(login_url) | ||||
|         return await self._login_url(login_url) | ||||
|  | ||||
|     async def _introduce(self) -> str: | ||||
|         redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done") | ||||
|         params = { | ||||
|             "response_type": "token+id_token", | ||||
|             "client_id": const.CLIENT_ID, | ||||
|             "redirect_uri": urllib.parse.quote( | ||||
|                 f"{const.APP}://mobilesdk/detect/oauth/done" | ||||
|             ), | ||||
|             "redirect_uri": redirect_uri, | ||||
|             "display": "touch", | ||||
|             "scope": "api openid refresh_token web", | ||||
|             "nonce": nonce, | ||||
|             "nonce": self._generate_nonce(), | ||||
|         } | ||||
|         params = "&".join([f"{k}={v}" for k, v in params.items()]) | ||||
|         async with self._session.get( | ||||
|             f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" | ||||
|         ) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|         params_encode = "&".join([f"{k}={v}" for k, v in params.items()]) | ||||
|         url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params_encode}" | ||||
|         async with self._request.get(url) as response: | ||||
|             text = await response.text() | ||||
|             self._expires = datetime.utcnow() | ||||
|             if not (login_url := re.findall("url = '(.+?)'", text)): | ||||
| @ -94,90 +112,77 @@ class HonAuth: | ||||
|                     self._parse_token_data(text) | ||||
|                     raise exceptions.HonNoAuthenticationNeeded() | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|         async with self._session.get(login_url[0], allow_redirects=False) as redirect1: | ||||
|             self._called_urls.append((redirect1.status, redirect1.request_info.url)) | ||||
|             if not (url := redirect1.headers.get("Location")): | ||||
|                 await self._error_logger(redirect1) | ||||
|                 return False | ||||
|         async with self._session.get(url, allow_redirects=False) as redirect2: | ||||
|             self._called_urls.append((redirect2.status, redirect2.request_info.url)) | ||||
|             if not ( | ||||
|                 url := redirect2.headers.get("Location") | ||||
|                 + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" | ||||
|             ): | ||||
|                 await self._error_logger(redirect2) | ||||
|                 return False | ||||
|         async with self._session.get( | ||||
|             URL(url, encoded=True), headers={"user-agent": const.USER_AGENT} | ||||
|         ) as login_screen: | ||||
|             self._called_urls.append( | ||||
|                 (login_screen.status, login_screen.request_info.url) | ||||
|             ) | ||||
|             if context := re.findall( | ||||
|                 '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() | ||||
|             ): | ||||
|         return login_url[0] | ||||
|  | ||||
|     async def _manual_redirect(self, url: str) -> str: | ||||
|         async with self._request.get(url, allow_redirects=False) as response: | ||||
|             if not (new_location := response.headers.get("Location", "")): | ||||
|                 await self._error_logger(response) | ||||
|         return new_location | ||||
|  | ||||
|     async def _handle_redirects(self, login_url) -> str: | ||||
|         redirect1 = await self._manual_redirect(login_url) | ||||
|         redirect2 = await self._manual_redirect(redirect1) | ||||
|         return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn" | ||||
|  | ||||
|     async def _login_url(self, login_url: str) -> bool: | ||||
|         headers = {"user-agent": const.USER_AGENT} | ||||
|         url = URL(login_url, encoded=True) | ||||
|         async with self._request.get(url, headers=headers) as response: | ||||
|             text = await response.text() | ||||
|             if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text): | ||||
|                 fw_uid, loaded_str = context[0] | ||||
|                 loaded = json.loads(loaded_str) | ||||
|                 login_url = login_url[0].replace( | ||||
|                 self._login_data.fw_uid = fw_uid | ||||
|                 self._login_data.loaded = json.loads(loaded_str) | ||||
|                 self._login_data.url = login_url.replace( | ||||
|                     "/".join(const.AUTH_API.split("/")[:-1]), "" | ||||
|                 ) | ||||
|                 return fw_uid, loaded, login_url | ||||
|             await self._error_logger(login_screen) | ||||
|                 return True | ||||
|             await self._error_logger(response) | ||||
|         return False | ||||
|  | ||||
|     async def _login(self, fw_uid, loaded, login_url): | ||||
|         data = { | ||||
|             "message": { | ||||
|                 "actions": [ | ||||
|                     { | ||||
|                         "id": "79;a", | ||||
|                         "descriptor": "apex://LightningLoginCustomController/ACTION$login", | ||||
|                         "callingDescriptor": "markup://c:loginForm", | ||||
|                         "params": { | ||||
|                             "username": quote(self._email), | ||||
|                             "password": quote(self._password), | ||||
|                             "startUrl": parse.unquote( | ||||
|                                 login_url.split("startURL=")[-1] | ||||
|                             ).split("%3D")[0], | ||||
|                         }, | ||||
|                     } | ||||
|                 ] | ||||
|     async def _login(self) -> str: | ||||
|         start_url = self._login_data.url.rsplit("startURL=", maxsplit=1)[-1] | ||||
|         start_url = parse.unquote(start_url).split("%3D")[0] | ||||
|         action = { | ||||
|             "id": "79;a", | ||||
|             "descriptor": "apex://LightningLoginCustomController/ACTION$login", | ||||
|             "callingDescriptor": "markup://c:loginForm", | ||||
|             "params": { | ||||
|                 "username": quote(self._login_data.email), | ||||
|                 "password": quote(self._login_data.password), | ||||
|                 "startUrl": start_url, | ||||
|             }, | ||||
|         } | ||||
|         data = { | ||||
|             "message": {"actions": [action]}, | ||||
|             "aura.context": { | ||||
|                 "mode": "PROD", | ||||
|                 "fwuid": fw_uid, | ||||
|                 "fwuid": self._login_data.fw_uid, | ||||
|                 "app": "siteforce:loginApp2", | ||||
|                 "loaded": loaded, | ||||
|                 "loaded": self._login_data.loaded, | ||||
|                 "dn": [], | ||||
|                 "globals": {}, | ||||
|                 "uad": False, | ||||
|             }, | ||||
|             "aura.pageURI": login_url, | ||||
|             "aura.pageURI": self._login_data.url, | ||||
|             "aura.token": None, | ||||
|         } | ||||
|         params = {"r": 3, "other.LightningLoginCustom.login": 1} | ||||
|         async with self._session.post( | ||||
|         async with self._request.post( | ||||
|             const.AUTH_API + "/s/sfsites/aura", | ||||
|             headers={"Content-Type": "application/x-www-form-urlencoded"}, | ||||
|             data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), | ||||
|             params=params, | ||||
|         ) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             if response.status == 200: | ||||
|                 try: | ||||
|                     data = await response.json() | ||||
|                     return data["events"][0]["attributes"]["values"]["url"] | ||||
|                 except json.JSONDecodeError: | ||||
|                     pass | ||||
|                 except KeyError: | ||||
|                     _LOGGER.error( | ||||
|                         "Can't get login url - %s", pformat(await response.json()) | ||||
|                     ) | ||||
|                 with suppress(json.JSONDecodeError, KeyError): | ||||
|                     result = await response.json() | ||||
|                     return result["events"][0]["attributes"]["values"]["url"] | ||||
|             await self._error_logger(response) | ||||
|             return "" | ||||
|  | ||||
|     def _parse_token_data(self, text): | ||||
|     def _parse_token_data(self, text: str) -> None: | ||||
|         if access_token := re.findall("access_token=(.*?)&", text): | ||||
|             self._access_token = access_token[0] | ||||
|         if refresh_token := re.findall("refresh_token=(.*?)&", text): | ||||
| @ -185,39 +190,39 @@ class HonAuth: | ||||
|         if id_token := re.findall("id_token=(.*?)&", text): | ||||
|             self._id_token = id_token[0] | ||||
|  | ||||
|     async def _get_token(self, url): | ||||
|         async with self._session.get(url) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|     async def _get_token(self, url: str) -> bool: | ||||
|         async with self._request.get(url) as response: | ||||
|             if response.status != 200: | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|             url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text()) | ||||
|             if not url: | ||||
|             url_search = re.findall( | ||||
|                 "href\\s*=\\s*[\"'](.+?)[\"']", await response.text() | ||||
|             ) | ||||
|             if not url_search: | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|         if "ProgressiveLogin" in url[0]: | ||||
|             async with self._session.get(url[0]) as response: | ||||
|                 self._called_urls.append((response.status, response.request_info.url)) | ||||
|         if "ProgressiveLogin" in url_search[0]: | ||||
|             async with self._request.get(url_search[0]) as response: | ||||
|                 if response.status != 200: | ||||
|                     await self._error_logger(response) | ||||
|                     return False | ||||
|                 url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text()) | ||||
|         url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] | ||||
|         async with self._session.get(url) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|                 url_search = re.findall( | ||||
|                     "href\\s*=\\s*[\"'](.*?)[\"']", await response.text() | ||||
|                 ) | ||||
|         url = "/".join(const.AUTH_API.split("/")[:-1]) + url_search[0] | ||||
|         async with self._request.get(url) as response: | ||||
|             if response.status != 200: | ||||
|                 await self._error_logger(response) | ||||
|                 return False | ||||
|             self._parse_token_data(await response.text()) | ||||
|         return True | ||||
|  | ||||
|     async def _api_auth(self): | ||||
|     async def _api_auth(self) -> bool: | ||||
|         post_headers = {"id-token": self._id_token} | ||||
|         data = self._device.get() | ||||
|         async with self._session.post( | ||||
|         async with self._request.post( | ||||
|             f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data | ||||
|         ) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             try: | ||||
|                 json_data = await response.json() | ||||
|             except json.JSONDecodeError: | ||||
| @ -226,12 +231,12 @@ class HonAuth: | ||||
|             self._cognito_token = json_data["cognitoUser"]["Token"] | ||||
|         return True | ||||
|  | ||||
|     async def authenticate(self): | ||||
|     async def authenticate(self) -> None: | ||||
|         self.clear() | ||||
|         try: | ||||
|             if not (login_site := await self._load_login()): | ||||
|             if not await self._load_login(): | ||||
|                 raise exceptions.HonAuthenticationError("Can't open login page") | ||||
|             if not (url := await self._login(*login_site)): | ||||
|             if not (url := await self._login()): | ||||
|                 raise exceptions.HonAuthenticationError("Can't login") | ||||
|             if not await self._get_token(url): | ||||
|                 raise exceptions.HonAuthenticationError("Can't get token") | ||||
| @ -240,16 +245,15 @@ class HonAuth: | ||||
|         except exceptions.HonNoAuthenticationNeeded: | ||||
|             return | ||||
|  | ||||
|     async def refresh(self): | ||||
|     async def refresh(self) -> bool: | ||||
|         params = { | ||||
|             "client_id": const.CLIENT_ID, | ||||
|             "refresh_token": self._refresh_token, | ||||
|             "grant_type": "refresh_token", | ||||
|         } | ||||
|         async with self._session.post( | ||||
|         async with self._request.post( | ||||
|             f"{const.AUTH_API}/services/oauth2/token", params=params | ||||
|         ) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             if response.status >= 400: | ||||
|                 await self._error_logger(response, fail=False) | ||||
|                 return False | ||||
| @ -259,9 +263,9 @@ class HonAuth: | ||||
|         self._access_token = data["access_token"] | ||||
|         return await self._api_auth() | ||||
|  | ||||
|     def clear(self): | ||||
|     def clear(self) -> None: | ||||
|         self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2]) | ||||
|         self._called_urls = [] | ||||
|         self._request.called_urls = [] | ||||
|         self._cognito_token = "" | ||||
|         self._id_token = "" | ||||
|         self._access_token = "" | ||||
|  | ||||
| @ -1,159 +0,0 @@ | ||||
| import json | ||||
| from collections.abc import Generator, AsyncIterator, Coroutine | ||||
| from contextlib import asynccontextmanager | ||||
| from typing import Optional, Callable, Dict | ||||
| from typing_extensions import Self | ||||
|  | ||||
| import aiohttp | ||||
|  | ||||
| from pyhon import const, exceptions | ||||
| from pyhon.connection.auth import HonAuth, _LOGGER | ||||
| from pyhon.connection.device import HonDevice | ||||
| from pyhon.exceptions import HonAuthenticationError | ||||
|  | ||||
|  | ||||
| class HonBaseConnectionHandler: | ||||
|     _HEADERS: Dict = { | ||||
|         "user-agent": const.USER_AGENT, | ||||
|         "Content-Type": "application/json", | ||||
|     } | ||||
|  | ||||
|     def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: | ||||
|         self._create_session: bool = session is None | ||||
|         self._session: Optional[aiohttp.ClientSession] = session | ||||
|         self._auth: Optional[HonAuth] = None | ||||
|  | ||||
|     async def __aenter__(self) -> Self: | ||||
|         return await self.create() | ||||
|  | ||||
|     async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: | ||||
|         await self.close() | ||||
|  | ||||
|     @property | ||||
|     def auth(self) -> Optional[HonAuth]: | ||||
|         return self._auth | ||||
|  | ||||
|     async def create(self) -> Self: | ||||
|         if self._create_session: | ||||
|             self._session = aiohttp.ClientSession() | ||||
|         return self | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs): | ||||
|         raise NotImplementedError | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def get(self, *args, **kwargs) -> AsyncIterator[Callable]: | ||||
|         if self._session is None: | ||||
|             raise exceptions.NoSessionException() | ||||
|         response: Callable | ||||
|         async with self._intercept(self._session.get, *args, **kwargs) as response: | ||||
|             yield response | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def post(self, *args, **kwargs) -> AsyncIterator[Callable]: | ||||
|         if self._session is None: | ||||
|             raise exceptions.NoSessionException() | ||||
|         response: Callable | ||||
|         async with self._intercept(self._session.post, *args, **kwargs) as response: | ||||
|             yield response | ||||
|  | ||||
|     async def close(self) -> None: | ||||
|         if self._create_session and self._session is not None: | ||||
|             await self._session.close() | ||||
|  | ||||
|  | ||||
| class HonConnectionHandler(HonBaseConnectionHandler): | ||||
|     def __init__( | ||||
|         self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None | ||||
|     ) -> None: | ||||
|         super().__init__(session=session) | ||||
|         self._device: HonDevice = HonDevice() | ||||
|         self._email: str = email | ||||
|         self._password: str = password | ||||
|         if not self._email: | ||||
|             raise HonAuthenticationError("An email address must be specified") | ||||
|         if not self._password: | ||||
|             raise HonAuthenticationError("A password address must be specified") | ||||
|  | ||||
|     @property | ||||
|     def device(self) -> HonDevice: | ||||
|         return self._device | ||||
|  | ||||
|     async def create(self) -> Self: | ||||
|         await super().create() | ||||
|         self._auth: HonAuth = HonAuth( | ||||
|             self._session, self._email, self._password, self._device | ||||
|         ) | ||||
|         return self | ||||
|  | ||||
|     async def _check_headers(self, headers: Dict) -> Dict: | ||||
|         if not (self._auth.cognito_token and self._auth.id_token): | ||||
|             await self._auth.authenticate() | ||||
|         headers["cognito-token"] = self._auth.cognito_token | ||||
|         headers["id-token"] = self._auth.id_token | ||||
|         return self._HEADERS | headers | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def _intercept( | ||||
|         self, method: Callable, *args, loop: int = 0, **kwargs | ||||
|     ) -> AsyncIterator: | ||||
|         kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) | ||||
|         async with method(*args, **kwargs) as response: | ||||
|             if ( | ||||
|                 self._auth.token_expires_soon or response.status in [401, 403] | ||||
|             ) and loop == 0: | ||||
|                 _LOGGER.info("Try refreshing token...") | ||||
|                 await self._auth.refresh() | ||||
|                 async with self._intercept( | ||||
|                     method, *args, loop=loop + 1, **kwargs | ||||
|                 ) as result: | ||||
|                     yield result | ||||
|             elif ( | ||||
|                 self._auth.token_is_expired or response.status in [401, 403] | ||||
|             ) and loop == 1: | ||||
|                 _LOGGER.warning( | ||||
|                     "%s - Error %s - %s", | ||||
|                     response.request_info.url, | ||||
|                     response.status, | ||||
|                     await response.text(), | ||||
|                 ) | ||||
|                 await self.create() | ||||
|                 async with self._intercept( | ||||
|                     method, *args, loop=loop + 1, **kwargs | ||||
|                 ) as result: | ||||
|                     yield result | ||||
|             elif loop >= 2: | ||||
|                 _LOGGER.error( | ||||
|                     "%s - Error %s - %s", | ||||
|                     response.request_info.url, | ||||
|                     response.status, | ||||
|                     await response.text(), | ||||
|                 ) | ||||
|                 raise HonAuthenticationError("Login failure") | ||||
|             else: | ||||
|                 try: | ||||
|                     await response.json() | ||||
|                     yield response | ||||
|                 except json.JSONDecodeError: | ||||
|                     _LOGGER.warning( | ||||
|                         "%s - JsonDecodeError %s - %s", | ||||
|                         response.request_info.url, | ||||
|                         response.status, | ||||
|                         await response.text(), | ||||
|                     ) | ||||
|                     raise HonAuthenticationError("Decode Error") | ||||
|  | ||||
|  | ||||
| class HonAnonymousConnectionHandler(HonBaseConnectionHandler): | ||||
|     _HEADERS: Dict = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def _intercept( | ||||
|         self, method: Callable, *args, loop: int = 0, **kwargs | ||||
|     ) -> AsyncIterator: | ||||
|         kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS | ||||
|         async with method(*args, **kwargs) as response: | ||||
|             if response.status == 403: | ||||
|                 _LOGGER.error("Can't authenticate anymore") | ||||
|             yield response | ||||
							
								
								
									
										0
									
								
								pyhon/connection/handler/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								pyhon/connection/handler/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										21
									
								
								pyhon/connection/handler/anonym.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								pyhon/connection/handler/anonym.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,21 @@ | ||||
| import logging | ||||
| from collections.abc import AsyncIterator | ||||
| from contextlib import asynccontextmanager | ||||
| from typing import Callable, Dict | ||||
|  | ||||
| from pyhon import const | ||||
| from pyhon.connection.handler.base import ConnectionHandler | ||||
|  | ||||
| _LOGGER = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| class HonAnonymousConnectionHandler(ConnectionHandler): | ||||
|     _HEADERS: Dict = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def _intercept(self, method: Callable, *args, **kwargs) -> AsyncIterator: | ||||
|         kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS | ||||
|         async with method(*args, **kwargs) as response: | ||||
|             if response.status == 403: | ||||
|                 _LOGGER.error("Can't authenticate anymore") | ||||
|             yield response | ||||
							
								
								
									
										36
									
								
								pyhon/connection/handler/auth.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								pyhon/connection/handler/auth.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,36 @@ | ||||
| import logging | ||||
| from collections.abc import AsyncIterator | ||||
| from contextlib import asynccontextmanager | ||||
| from typing import Optional, Callable, List, Tuple | ||||
|  | ||||
| import aiohttp | ||||
|  | ||||
| from pyhon import const | ||||
| from pyhon.connection.handler.base import ConnectionHandler | ||||
|  | ||||
| _LOGGER = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| class HonAuthConnectionHandler(ConnectionHandler): | ||||
|     _HEADERS = {"user-agent": const.USER_AGENT} | ||||
|  | ||||
|     def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: | ||||
|         super().__init__(session) | ||||
|         self._called_urls: List[Tuple[int, str]] = [] | ||||
|  | ||||
|     @property | ||||
|     def called_urls(self) -> List[Tuple[int, str]]: | ||||
|         return self._called_urls | ||||
|  | ||||
|     @called_urls.setter | ||||
|     def called_urls(self, called_urls: List[Tuple[int, str]]) -> None: | ||||
|         self._called_urls = called_urls | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def _intercept( | ||||
|         self, method: Callable, *args, loop: int = 0, **kwargs | ||||
|     ) -> AsyncIterator: | ||||
|         kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS | ||||
|         async with method(*args, **kwargs) as response: | ||||
|             self._called_urls.append((response.status, response.request_info.url)) | ||||
|             yield response | ||||
							
								
								
									
										57
									
								
								pyhon/connection/handler/base.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										57
									
								
								pyhon/connection/handler/base.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,57 @@ | ||||
| import logging | ||||
| from collections.abc import AsyncIterator | ||||
| from contextlib import asynccontextmanager | ||||
| from typing import Optional, Callable, Dict | ||||
|  | ||||
| import aiohttp | ||||
| from typing_extensions import Self | ||||
|  | ||||
| from pyhon import const, exceptions | ||||
|  | ||||
| _LOGGER = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| class ConnectionHandler: | ||||
|     _HEADERS: Dict = { | ||||
|         "user-agent": const.USER_AGENT, | ||||
|         "Content-Type": "application/json", | ||||
|     } | ||||
|  | ||||
|     def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: | ||||
|         self._create_session: bool = session is None | ||||
|         self._session: Optional[aiohttp.ClientSession] = session | ||||
|  | ||||
|     async def __aenter__(self) -> Self: | ||||
|         return await self.create() | ||||
|  | ||||
|     async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: | ||||
|         await self.close() | ||||
|  | ||||
|     async def create(self) -> Self: | ||||
|         if self._create_session: | ||||
|             self._session = aiohttp.ClientSession() | ||||
|         return self | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs): | ||||
|         raise NotImplementedError | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def get(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]: | ||||
|         if self._session is None: | ||||
|             raise exceptions.NoSessionException() | ||||
|         response: aiohttp.ClientResponse | ||||
|         async with self._intercept(self._session.get, *args, **kwargs) as response: | ||||
|             yield response | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def post(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]: | ||||
|         if self._session is None: | ||||
|             raise exceptions.NoSessionException() | ||||
|         response: aiohttp.ClientResponse | ||||
|         async with self._intercept(self._session.post, *args, **kwargs) as response: | ||||
|             yield response | ||||
|  | ||||
|     async def close(self) -> None: | ||||
|         if self._create_session and self._session is not None: | ||||
|             await self._session.close() | ||||
							
								
								
									
										102
									
								
								pyhon/connection/handler/hon.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										102
									
								
								pyhon/connection/handler/hon.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,102 @@ | ||||
| import json | ||||
| import logging | ||||
| from collections.abc import AsyncIterator | ||||
| from contextlib import asynccontextmanager | ||||
| from typing import Optional, Callable, Dict | ||||
|  | ||||
| import aiohttp | ||||
| from typing_extensions import Self | ||||
|  | ||||
| from pyhon.connection.auth import HonAuth | ||||
| from pyhon.connection.device import HonDevice | ||||
| from pyhon.connection.handler.base import ConnectionHandler | ||||
| from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException | ||||
|  | ||||
| _LOGGER = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| class HonConnectionHandler(ConnectionHandler): | ||||
|     def __init__( | ||||
|         self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None | ||||
|     ) -> None: | ||||
|         super().__init__(session=session) | ||||
|         self._device: HonDevice = HonDevice() | ||||
|         self._email: str = email | ||||
|         self._password: str = password | ||||
|         if not self._email: | ||||
|             raise HonAuthenticationError("An email address must be specified") | ||||
|         if not self._password: | ||||
|             raise HonAuthenticationError("A password address must be specified") | ||||
|         self._auth: Optional[HonAuth] = None | ||||
|  | ||||
|     @property | ||||
|     def auth(self) -> HonAuth: | ||||
|         if self._auth is None: | ||||
|             raise NoAuthenticationException() | ||||
|         return self._auth | ||||
|  | ||||
|     @property | ||||
|     def device(self) -> HonDevice: | ||||
|         return self._device | ||||
|  | ||||
|     async def create(self) -> Self: | ||||
|         await super().create() | ||||
|         self._auth = HonAuth(self._session, self._email, self._password, self._device) | ||||
|         return self | ||||
|  | ||||
|     async def _check_headers(self, headers: Dict) -> Dict: | ||||
|         if not (self.auth.cognito_token and self.auth.id_token): | ||||
|             await self.auth.authenticate() | ||||
|         headers["cognito-token"] = self.auth.cognito_token | ||||
|         headers["id-token"] = self.auth.id_token | ||||
|         return self._HEADERS | headers | ||||
|  | ||||
|     @asynccontextmanager | ||||
|     async def _intercept( | ||||
|         self, method: Callable, *args, loop: int = 0, **kwargs | ||||
|     ) -> AsyncIterator: | ||||
|         kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) | ||||
|         async with method(*args, **kwargs) as response: | ||||
|             if ( | ||||
|                 self.auth.token_expires_soon or response.status in [401, 403] | ||||
|             ) and loop == 0: | ||||
|                 _LOGGER.info("Try refreshing token...") | ||||
|                 await self.auth.refresh() | ||||
|                 async with self._intercept( | ||||
|                     method, *args, loop=loop + 1, **kwargs | ||||
|                 ) as result: | ||||
|                     yield result | ||||
|             elif ( | ||||
|                 self.auth.token_is_expired or response.status in [401, 403] | ||||
|             ) and loop == 1: | ||||
|                 _LOGGER.warning( | ||||
|                     "%s - Error %s - %s", | ||||
|                     response.request_info.url, | ||||
|                     response.status, | ||||
|                     await response.text(), | ||||
|                 ) | ||||
|                 await self.create() | ||||
|                 async with self._intercept( | ||||
|                     method, *args, loop=loop + 1, **kwargs | ||||
|                 ) as result: | ||||
|                     yield result | ||||
|             elif loop >= 2: | ||||
|                 _LOGGER.error( | ||||
|                     "%s - Error %s - %s", | ||||
|                     response.request_info.url, | ||||
|                     response.status, | ||||
|                     await response.text(), | ||||
|                 ) | ||||
|                 raise HonAuthenticationError("Login failure") | ||||
|             else: | ||||
|                 try: | ||||
|                     await response.json() | ||||
|                     yield response | ||||
|                 except json.JSONDecodeError: | ||||
|                     _LOGGER.warning( | ||||
|                         "%s - JsonDecodeError %s - %s", | ||||
|                         response.request_info.url, | ||||
|                         response.status, | ||||
|                         await response.text(), | ||||
|                     ) | ||||
|                     raise HonAuthenticationError("Decode Error") | ||||
| @ -49,7 +49,7 @@ def create_command(commands, concat=False): | ||||
|     for name, command in commands.items(): | ||||
|         if not concat: | ||||
|             result[name] = {} | ||||
|         for parameter, data in command.parameters.items(): | ||||
|         for parameter, data in command.settings.items(): | ||||
|             if data.typology == "enum": | ||||
|                 value = data.values | ||||
|             elif data.typology == "range": | ||||
|  | ||||
							
								
								
									
										32
									
								
								pyhon/hon.py
									
									
									
									
									
								
							
							
						
						
									
										32
									
								
								pyhon/hon.py
									
									
									
									
									
								
							| @ -1,8 +1,9 @@ | ||||
| import asyncio | ||||
| from typing import List, Optional, Dict | ||||
| from typing_extensions import Self | ||||
| from types import TracebackType | ||||
| from typing import List, Optional, Dict, Any, Type | ||||
|  | ||||
| from aiohttp import ClientSession | ||||
| from typing_extensions import Self | ||||
|  | ||||
| from pyhon import HonAPI, exceptions | ||||
| from pyhon.appliance import HonAppliance | ||||
| @ -16,10 +17,15 @@ class Hon: | ||||
|         self._appliances: List[HonAppliance] = [] | ||||
|         self._api: Optional[HonAPI] = None | ||||
|  | ||||
|     async def __aenter__(self): | ||||
|     async def __aenter__(self) -> Self: | ||||
|         return await self.create() | ||||
|  | ||||
|     async def __aexit__(self, exc_type, exc_val, exc_tb): | ||||
|     async def __aexit__( | ||||
|         self, | ||||
|         exc_type: Optional[Type[BaseException]], | ||||
|         exc: Optional[BaseException], | ||||
|         traceback: Optional[TracebackType], | ||||
|     ) -> None: | ||||
|         await self.close() | ||||
|  | ||||
|     @property | ||||
| @ -39,8 +45,8 @@ class Hon: | ||||
|     def appliances(self) -> List[HonAppliance]: | ||||
|         return self._appliances | ||||
|  | ||||
|     async def _create_appliance(self, appliance: Dict, zone=0) -> None: | ||||
|         appliance = HonAppliance(self._api, appliance, zone=zone) | ||||
|     async def _create_appliance(self, appliance_data: Dict[str, Any], zone=0) -> None: | ||||
|         appliance = HonAppliance(self._api, appliance_data, zone=zone) | ||||
|         if appliance.mac_address is None: | ||||
|             return | ||||
|         await asyncio.gather( | ||||
| @ -52,11 +58,13 @@ class Hon: | ||||
|         ) | ||||
|         self._appliances.append(appliance) | ||||
|  | ||||
|     async def setup(self): | ||||
|         for appliance in (await self._api.load_appliances())["payload"]["appliances"]: | ||||
|             for zone in range(int(appliance.get("zone", "0"))): | ||||
|                 await self._create_appliance(appliance, zone=zone + 1) | ||||
|     async def setup(self) -> None: | ||||
|         appliance: Dict | ||||
|         for appliance in (await self.api.load_appliances())["payload"]["appliances"]: | ||||
|             if (zones := int(appliance.get("zone", "0"))) > 1: | ||||
|                 for zone in range(zones): | ||||
|                     await self._create_appliance(appliance.copy(), zone=zone + 1) | ||||
|             await self._create_appliance(appliance) | ||||
|  | ||||
|     async def close(self): | ||||
|         await self._api.close() | ||||
|     async def close(self) -> None: | ||||
|         await self.api.close() | ||||
|  | ||||
| @ -1,157 +0,0 @@ | ||||
| import re | ||||
|  | ||||
|  | ||||
| def str_to_float(string): | ||||
|     try: | ||||
|         return int(string) | ||||
|     except ValueError: | ||||
|         return float(str(string).replace(",", ".")) | ||||
|  | ||||
|  | ||||
| class HonParameter: | ||||
|     def __init__(self, key, attributes): | ||||
|         self._key = key | ||||
|         self._category = attributes.get("category") | ||||
|         self._typology = attributes.get("typology") | ||||
|         self._mandatory = attributes.get("mandatory") | ||||
|         self._value = "" | ||||
|  | ||||
|     @property | ||||
|     def key(self): | ||||
|         return self._key | ||||
|  | ||||
|     @property | ||||
|     def value(self): | ||||
|         return self._value if self._value is not None else "0" | ||||
|  | ||||
|     @property | ||||
|     def category(self): | ||||
|         return self._category | ||||
|  | ||||
|     @property | ||||
|     def typology(self): | ||||
|         return self._typology | ||||
|  | ||||
|     @property | ||||
|     def mandatory(self): | ||||
|         return self._mandatory | ||||
|  | ||||
|  | ||||
| class HonParameterFixed(HonParameter): | ||||
|     def __init__(self, key, attributes): | ||||
|         super().__init__(key, attributes) | ||||
|         self._value = attributes.get("fixedValue", None) | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return f"{self.__class__} (<{self.key}> fixed)" | ||||
|  | ||||
|     @property | ||||
|     def value(self): | ||||
|         return self._value if self._value is not None else "0" | ||||
|  | ||||
|     @value.setter | ||||
|     def value(self, value): | ||||
|         if not value == self._value: | ||||
|             raise ValueError("Can't change fixed value") | ||||
|  | ||||
|  | ||||
| class HonParameterRange(HonParameter): | ||||
|     def __init__(self, key, attributes): | ||||
|         super().__init__(key, attributes) | ||||
|         self._min = str_to_float(attributes["minimumValue"]) | ||||
|         self._max = str_to_float(attributes["maximumValue"]) | ||||
|         self._step = str_to_float(attributes["incrementValue"]) | ||||
|         self._default = str_to_float(attributes.get("defaultValue", self._min)) | ||||
|         self._value = self._default | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return f"{self.__class__} (<{self.key}> [{self._min} - {self._max}])" | ||||
|  | ||||
|     @property | ||||
|     def min(self): | ||||
|         return self._min | ||||
|  | ||||
|     @property | ||||
|     def max(self): | ||||
|         return self._max | ||||
|  | ||||
|     @property | ||||
|     def step(self): | ||||
|         return self._step | ||||
|  | ||||
|     @property | ||||
|     def value(self): | ||||
|         return self._value if self._value is not None else self._min | ||||
|  | ||||
|     @value.setter | ||||
|     def value(self, value): | ||||
|         value = str_to_float(value) | ||||
|         if self._min <= value <= self._max and not value % self._step: | ||||
|             self._value = value | ||||
|         else: | ||||
|             raise ValueError( | ||||
|                 f"Allowed: min {self._min} max {self._max} step {self._step}" | ||||
|             ) | ||||
|  | ||||
|  | ||||
| class HonParameterEnum(HonParameter): | ||||
|     def __init__(self, key, attributes): | ||||
|         super().__init__(key, attributes) | ||||
|         self._default = attributes.get("defaultValue") | ||||
|         self._value = self._default or "0" | ||||
|         self._values = attributes.get("enumValues") | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return f"{self.__class__} (<{self.key}> {self.values})" | ||||
|  | ||||
|     @property | ||||
|     def values(self): | ||||
|         return [str(value) for value in self._values] | ||||
|  | ||||
|     @property | ||||
|     def value(self): | ||||
|         return self._value if self._value is not None else self.values[0] | ||||
|  | ||||
|     @value.setter | ||||
|     def value(self, value): | ||||
|         if value in self.values: | ||||
|             self._value = value | ||||
|         else: | ||||
|             raise ValueError(f"Allowed values {self._value}") | ||||
|  | ||||
|  | ||||
| class HonParameterProgram(HonParameterEnum): | ||||
|     def __init__(self, key, command): | ||||
|         super().__init__(key, {}) | ||||
|         self._command = command | ||||
|         self._value = command._program | ||||
|         self._values = command._multi | ||||
|         self._typology = "enum" | ||||
|         self._filter = "" | ||||
|  | ||||
|     @property | ||||
|     def value(self): | ||||
|         return self._value | ||||
|  | ||||
|     @value.setter | ||||
|     def value(self, value): | ||||
|         if value in self.values: | ||||
|             self._command.set_program(value) | ||||
|         else: | ||||
|             raise ValueError(f"Allowed values {self._values}") | ||||
|  | ||||
|     @property | ||||
|     def filter(self): | ||||
|         return self._filter | ||||
|  | ||||
|     @filter.setter | ||||
|     def filter(self, filter): | ||||
|         self._filter = filter | ||||
|  | ||||
|     @property | ||||
|     def values(self): | ||||
|         values = [] | ||||
|         for value in self._values: | ||||
|             if not self._filter or re.findall(self._filter, str(value)): | ||||
|                 values.append(str(value)) | ||||
|         return sorted(values) | ||||
							
								
								
									
										0
									
								
								pyhon/parameter/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								pyhon/parameter/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										30
									
								
								pyhon/parameter/base.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								pyhon/parameter/base.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,30 @@ | ||||
| from typing import Dict, Any | ||||
|  | ||||
|  | ||||
| class HonParameter: | ||||
|     def __init__(self, key: str, attributes: Dict[str, Any]) -> None: | ||||
|         self._key = key | ||||
|         self._category: str = attributes.get("category", "") | ||||
|         self._typology: str = attributes.get("typology", "") | ||||
|         self._mandatory: int = attributes.get("mandatory", 0) | ||||
|         self._value: str | float = "" | ||||
|  | ||||
|     @property | ||||
|     def key(self) -> str: | ||||
|         return self._key | ||||
|  | ||||
|     @property | ||||
|     def value(self) -> str | float: | ||||
|         return self._value if self._value is not None else "0" | ||||
|  | ||||
|     @property | ||||
|     def category(self) -> str: | ||||
|         return self._category | ||||
|  | ||||
|     @property | ||||
|     def typology(self) -> str: | ||||
|         return self._typology | ||||
|  | ||||
|     @property | ||||
|     def mandatory(self) -> int: | ||||
|         return self._mandatory | ||||
							
								
								
									
										29
									
								
								pyhon/parameter/enum.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										29
									
								
								pyhon/parameter/enum.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,29 @@ | ||||
| from typing import Dict, Any, List | ||||
|  | ||||
| from pyhon.parameter.base import HonParameter | ||||
|  | ||||
|  | ||||
| class HonParameterEnum(HonParameter): | ||||
|     def __init__(self, key: str, attributes: Dict[str, Any]) -> None: | ||||
|         super().__init__(key, attributes) | ||||
|         self._default = attributes.get("defaultValue") | ||||
|         self._value = self._default or "0" | ||||
|         self._values: List[str] = attributes.get("enumValues", []) | ||||
|  | ||||
|     def __repr__(self) -> str: | ||||
|         return f"{self.__class__} (<{self.key}> {self.values})" | ||||
|  | ||||
|     @property | ||||
|     def values(self) -> List[str]: | ||||
|         return [str(value) for value in self._values] | ||||
|  | ||||
|     @property | ||||
|     def value(self) -> str | float: | ||||
|         return self._value if self._value is not None else self.values[0] | ||||
|  | ||||
|     @value.setter | ||||
|     def value(self, value: str) -> None: | ||||
|         if value in self.values: | ||||
|             self._value = value | ||||
|         else: | ||||
|             raise ValueError(f"Allowed values {self._value}") | ||||
							
								
								
									
										21
									
								
								pyhon/parameter/fixed.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								pyhon/parameter/fixed.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,21 @@ | ||||
| from typing import Dict, Any | ||||
|  | ||||
| from pyhon.parameter.base import HonParameter | ||||
|  | ||||
|  | ||||
| class HonParameterFixed(HonParameter): | ||||
|     def __init__(self, key: str, attributes: Dict[str, Any]) -> None: | ||||
|         super().__init__(key, attributes) | ||||
|         self._value = attributes.get("fixedValue", None) | ||||
|  | ||||
|     def __repr__(self) -> str: | ||||
|         return f"{self.__class__} (<{self.key}> fixed)" | ||||
|  | ||||
|     @property | ||||
|     def value(self) -> str | float: | ||||
|         return self._value if self._value is not None else "0" | ||||
|  | ||||
|     @value.setter | ||||
|     def value(self, value: str | float) -> None: | ||||
|         # Fixed values seems being not so fixed as thought | ||||
|         self._value = value | ||||
							
								
								
									
										33
									
								
								pyhon/parameter/program.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								pyhon/parameter/program.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,33 @@ | ||||
| from typing import List, TYPE_CHECKING, Dict | ||||
|  | ||||
| from pyhon.parameter.enum import HonParameterEnum | ||||
|  | ||||
| if TYPE_CHECKING: | ||||
|     from pyhon.commands import HonCommand | ||||
|  | ||||
|  | ||||
| class HonParameterProgram(HonParameterEnum): | ||||
|     _FILTER = ["iot_recipe", "iot_guided"] | ||||
|  | ||||
|     def __init__(self, key: str, command: "HonCommand") -> None: | ||||
|         super().__init__(key, {}) | ||||
|         self._command = command | ||||
|         self._value: str = command.program | ||||
|         self._programs: Dict[str, "HonCommand"] = command.programs | ||||
|         self._typology: str = "enum" | ||||
|  | ||||
|     @property | ||||
|     def value(self) -> str | float: | ||||
|         return self._value | ||||
|  | ||||
|     @value.setter | ||||
|     def value(self, value: str) -> None: | ||||
|         if value in self.values: | ||||
|             self._command.program = value | ||||
|         else: | ||||
|             raise ValueError(f"Allowed values {self.values}") | ||||
|  | ||||
|     @property | ||||
|     def values(self) -> List[str]: | ||||
|         values = [v for v in self._programs if all(f not in v for f in self._FILTER)] | ||||
|         return sorted(values) | ||||
							
								
								
									
										49
									
								
								pyhon/parameter/range.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								pyhon/parameter/range.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,49 @@ | ||||
| from typing import Dict, Any | ||||
|  | ||||
| from pyhon.parameter.base import HonParameter | ||||
|  | ||||
|  | ||||
| def str_to_float(string: str | float) -> float: | ||||
|     try: | ||||
|         return int(string) | ||||
|     except ValueError: | ||||
|         return float(str(string).replace(",", ".")) | ||||
|  | ||||
|  | ||||
| class HonParameterRange(HonParameter): | ||||
|     def __init__(self, key: str, attributes: Dict[str, Any]) -> None: | ||||
|         super().__init__(key, attributes) | ||||
|         self._min: float = str_to_float(attributes["minimumValue"]) | ||||
|         self._max: float = str_to_float(attributes["maximumValue"]) | ||||
|         self._step: float = str_to_float(attributes["incrementValue"]) | ||||
|         self._default: float = str_to_float(attributes.get("defaultValue", self._min)) | ||||
|         self._value: float = self._default | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return f"{self.__class__} (<{self.key}> [{self._min} - {self._max}])" | ||||
|  | ||||
|     @property | ||||
|     def min(self) -> float: | ||||
|         return self._min | ||||
|  | ||||
|     @property | ||||
|     def max(self) -> float: | ||||
|         return self._max | ||||
|  | ||||
|     @property | ||||
|     def step(self) -> float: | ||||
|         return self._step | ||||
|  | ||||
|     @property | ||||
|     def value(self) -> float: | ||||
|         return self._value if self._value is not None else self._min | ||||
|  | ||||
|     @value.setter | ||||
|     def value(self, value: float) -> None: | ||||
|         value = str_to_float(value) | ||||
|         if self._min <= value <= self._max and not value % self._step: | ||||
|             self._value = value | ||||
|         else: | ||||
|             raise ValueError( | ||||
|                 f"Allowed: min {self._min} max {self._max} step {self._step}" | ||||
|             ) | ||||
| @ -1 +1,2 @@ | ||||
| aiohttp | ||||
| aiohttp==3.8.4 | ||||
| yarl==1.8.2 | ||||
|  | ||||
							
								
								
									
										4
									
								
								requirements_dev.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								requirements_dev.txt
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,4 @@ | ||||
| black==23.3.0 | ||||
| flake8==6.0.0 | ||||
| mypy==1.2.0 | ||||
| pylint==2.17.2 | ||||
		Reference in New Issue
	
	Block a user
	