317 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			317 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import importlib
 | |
| import logging
 | |
| import re
 | |
| from datetime import datetime, timedelta
 | |
| from pathlib import Path
 | |
| from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload
 | |
| 
 | |
| from pyhon import diagnose, exceptions
 | |
| from pyhon.appliances.base import ApplianceBase
 | |
| from pyhon.attributes import HonAttribute
 | |
| from pyhon.command_loader import HonCommandLoader
 | |
| from pyhon.commands import HonCommand
 | |
| from pyhon.parameter.base import HonParameter
 | |
| from pyhon.parameter.enum import HonParameterEnum
 | |
| from pyhon.parameter.range import HonParameterRange
 | |
| from pyhon.typedefs import Parameter
 | |
| 
 | |
| if TYPE_CHECKING:
 | |
|     from pyhon import HonAPI
 | |
| 
 | |
| _LOGGER = logging.getLogger(__name__)
 | |
| 
 | |
| T = TypeVar("T")
 | |
| 
 | |
| 
 | |
| # pylint: disable=too-many-public-methods,too-many-instance-attributes
 | |
| class HonAppliance:
 | |
|     _MINIMAL_UPDATE_INTERVAL = 5  # seconds
 | |
| 
 | |
|     def __init__(
 | |
|         self, api: Optional["HonAPI"], info: Dict[str, Any], zone: int = 0
 | |
|     ) -> None:
 | |
|         if attributes := info.get("attributes"):
 | |
|             info["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
 | |
|         self._info: Dict[str, Any] = info
 | |
|         self._api: Optional[HonAPI] = api
 | |
|         self._appliance_model: Dict[str, Any] = {}
 | |
| 
 | |
|         self._commands: Dict[str, HonCommand] = {}
 | |
|         self._statistics: Dict[str, Any] = {}
 | |
|         self._attributes: Dict[str, Any] = {}
 | |
|         self._zone: int = zone
 | |
|         self._additional_data: Dict[str, Any] = {}
 | |
|         self._last_update: Optional[datetime] = None
 | |
|         self._default_setting = HonParameter("", {}, "")
 | |
| 
 | |
|         try:
 | |
|             self._extra: Optional[ApplianceBase] = importlib.import_module(
 | |
|                 f"pyhon.appliances.{self.appliance_type.lower()}"
 | |
|             ).Appliance(self)
 | |
|         except ModuleNotFoundError:
 | |
|             self._extra = None
 | |
| 
 | |
|     def _get_nested_item(self, item: str) -> Any:
 | |
|         result: List[Any] | Dict[str, Any] = self.data
 | |
|         for key in item.split("."):
 | |
|             if all(k in "0123456789" for k in key) and isinstance(result, list):
 | |
|                 result = result[int(key)]
 | |
|             elif isinstance(result, dict):
 | |
|                 result = result[key]
 | |
|         return result
 | |
| 
 | |
|     def __getitem__(self, item: str) -> Any:
 | |
|         if self._zone:
 | |
|             item += f"Z{self._zone}"
 | |
|         if "." in item:
 | |
|             return self._get_nested_item(item)
 | |
|         if item in self.data:
 | |
|             return self.data[item]
 | |
|         if item in self.attributes["parameters"]:
 | |
|             return self.attributes["parameters"][item].value
 | |
|         return self.info[item]
 | |
| 
 | |
|     @overload
 | |
|     def get(self, item: str, default: None = None) -> Any:
 | |
|         ...
 | |
| 
 | |
|     @overload
 | |
|     def get(self, item: str, default: T) -> T:
 | |
|         ...
 | |
| 
 | |
|     def get(self, item: str, default: Optional[T] = None) -> Any:
 | |
|         try:
 | |
|             return self[item]
 | |
|         except (KeyError, IndexError):
 | |
|             return default
 | |
| 
 | |
|     def _check_name_zone(self, name: str, frontend: bool = True) -> str:
 | |
|         zone = " Z" if frontend else "_z"
 | |
|         attribute: str = self._info.get(name, "")
 | |
|         if attribute and self._zone:
 | |
|             return f"{attribute}{zone}{self._zone}"
 | |
|         return attribute
 | |
| 
 | |
|     @property
 | |
|     def appliance_model_id(self) -> str:
 | |
|         return str(self._info.get("applianceModelId", ""))
 | |
| 
 | |
|     @property
 | |
|     def appliance_type(self) -> str:
 | |
|         return str(self._info.get("applianceTypeName", ""))
 | |
| 
 | |
|     @property
 | |
|     def mac_address(self) -> str:
 | |
|         return str(self.info.get("macAddress", ""))
 | |
| 
 | |
|     @property
 | |
|     def unique_id(self) -> str:
 | |
|         default_mac = "xx-xx-xx-xx-xx-xx"
 | |
|         import_name = f"{self.appliance_type.lower()}_{self.appliance_model_id}"
 | |
|         result = self._check_name_zone("macAddress", frontend=False)
 | |
|         result = result.replace(default_mac, import_name)
 | |
|         return result
 | |
| 
 | |
|     @property
 | |
|     def model_name(self) -> str:
 | |
|         return self._check_name_zone("modelName")
 | |
| 
 | |
|     @property
 | |
|     def brand(self) -> str:
 | |
|         brand = self._check_name_zone("brand")
 | |
|         return brand[0].upper() + brand[1:]
 | |
| 
 | |
|     @property
 | |
|     def nick_name(self) -> str:
 | |
|         result = self._check_name_zone("nickName")
 | |
|         if not result or re.findall("^[xX1\\s-]+$", result):
 | |
|             return self.model_name
 | |
|         return result
 | |
| 
 | |
|     @property
 | |
|     def code(self) -> str:
 | |
|         code: str = self.info.get("code", "")
 | |
|         if code:
 | |
|             return code
 | |
|         serial_number: str = self.info.get("serialNumber", "")
 | |
|         return serial_number[:8] if len(serial_number) < 18 else serial_number[:11]
 | |
| 
 | |
|     @property
 | |
|     def model_id(self) -> int:
 | |
|         return int(self._info.get("applianceModelId", 0))
 | |
| 
 | |
|     @property
 | |
|     def options(self) -> Dict[str, Any]:
 | |
|         return dict(self._appliance_model.get("options", {}))
 | |
| 
 | |
|     @property
 | |
|     def commands(self) -> Dict[str, HonCommand]:
 | |
|         return self._commands
 | |
| 
 | |
|     @property
 | |
|     def attributes(self) -> Dict[str, Any]:
 | |
|         return self._attributes
 | |
| 
 | |
|     @property
 | |
|     def statistics(self) -> Dict[str, Any]:
 | |
|         return self._statistics
 | |
| 
 | |
|     @property
 | |
|     def info(self) -> Dict[str, Any]:
 | |
|         return self._info
 | |
| 
 | |
|     @property
 | |
|     def additional_data(self) -> Dict[str, Any]:
 | |
|         return self._additional_data
 | |
| 
 | |
|     @property
 | |
|     def zone(self) -> int:
 | |
|         return self._zone
 | |
| 
 | |
|     @property
 | |
|     def api(self) -> "HonAPI":
 | |
|         """api connection object"""
 | |
|         if self._api is None:
 | |
|             raise exceptions.NoAuthenticationException("Missing hOn login")
 | |
|         return self._api
 | |
| 
 | |
|     async def load_commands(self) -> None:
 | |
|         command_loader = HonCommandLoader(self.api, self)
 | |
|         await command_loader.load_commands()
 | |
|         self._commands = command_loader.commands
 | |
|         self._additional_data = command_loader.additional_data
 | |
|         self._appliance_model = command_loader.appliance_data
 | |
|         self.sync_params_to_command("settings")
 | |
| 
 | |
|     async def load_attributes(self) -> None:
 | |
|         attributes = await self.api.load_attributes(self)
 | |
|         for name, values in attributes.pop("shadow", {}).get("parameters", {}).items():
 | |
|             if name in self._attributes.get("parameters", {}):
 | |
|                 self._attributes["parameters"][name].update(values)
 | |
|             else:
 | |
|                 self._attributes.setdefault("parameters", {})[name] = HonAttribute(
 | |
|                     values
 | |
|                 )
 | |
|         self._attributes |= attributes
 | |
|         if self._extra:
 | |
|             self._attributes = self._extra.attributes(self._attributes)
 | |
| 
 | |
|     async def load_statistics(self) -> None:
 | |
|         self._statistics = await self.api.load_statistics(self)
 | |
|         self._statistics |= await self.api.load_maintenance(self)
 | |
| 
 | |
|     async def update(self, force: bool = False) -> None:
 | |
|         now = datetime.now()
 | |
|         min_age = now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
 | |
|         if force or not self._last_update or self._last_update < min_age:
 | |
|             self._last_update = now
 | |
|             await self.load_attributes()
 | |
|             self.sync_params_to_command("settings")
 | |
| 
 | |
|     @property
 | |
|     def command_parameters(self) -> Dict[str, Dict[str, str | float]]:
 | |
|         return {n: c.parameter_value for n, c in self._commands.items()}
 | |
| 
 | |
|     @property
 | |
|     def settings(self) -> Dict[str, Parameter]:
 | |
|         result: Dict[str, Parameter] = {}
 | |
|         for name, command in self._commands.items():
 | |
|             for key in command.setting_keys:
 | |
|                 setting = command.settings.get(key, self._default_setting)
 | |
|                 result[f"{name}.{key}"] = setting
 | |
|         if self._extra:
 | |
|             return self._extra.settings(result)
 | |
|         return result
 | |
| 
 | |
|     @property
 | |
|     def available_settings(self) -> List[str]:
 | |
|         result = []
 | |
|         for name, command in self._commands.items():
 | |
|             for key in command.setting_keys:
 | |
|                 result.append(f"{name}.{key}")
 | |
|         return result
 | |
| 
 | |
|     @property
 | |
|     def data(self) -> Dict[str, Any]:
 | |
|         result = {
 | |
|             "attributes": self.attributes,
 | |
|             "appliance": self.info,
 | |
|             "statistics": self.statistics,
 | |
|             "additional_data": self._additional_data,
 | |
|             **self.command_parameters,
 | |
|             **self.attributes,
 | |
|         }
 | |
|         return result
 | |
| 
 | |
|     @property
 | |
|     def diagnose(self) -> str:
 | |
|         return diagnose.yaml_export(self, anonymous=True)
 | |
| 
 | |
|     async def data_archive(self, path: Path) -> str:
 | |
|         return await diagnose.zip_archive(self, path, anonymous=True)
 | |
| 
 | |
|     def sync_command_to_params(self, command_name: str) -> None:
 | |
|         if not (command := self.commands.get(command_name)):
 | |
|             return
 | |
|         for key in self.attributes.get("parameters", {}):
 | |
|             if new := command.parameters.get(key):
 | |
|                 self.attributes["parameters"][key].update(
 | |
|                     str(new.intern_value), shield=True
 | |
|                 )
 | |
| 
 | |
|     def sync_params_to_command(self, command_name: str) -> None:
 | |
|         if not (command := self.commands.get(command_name)):
 | |
|             return
 | |
|         for key in command.setting_keys:
 | |
|             if (
 | |
|                 new := self.attributes.get("parameters", {}).get(key)
 | |
|             ) is None or new.value == "":
 | |
|                 continue
 | |
|             setting = command.settings[key]
 | |
|             try:
 | |
|                 if not isinstance(setting, HonParameterRange):
 | |
|                     command.settings[key].value = str(new.value)
 | |
|                 else:
 | |
|                     command.settings[key].value = float(new.value)
 | |
|             except ValueError as error:
 | |
|                 _LOGGER.info("Can't set %s - %s", key, error)
 | |
|                 continue
 | |
| 
 | |
|     def sync_command(
 | |
|         self,
 | |
|         main: str,
 | |
|         target: Optional[List[str] | str] = None,
 | |
|         to_sync: Optional[List[str] | bool] = None,
 | |
|     ) -> None:
 | |
|         base: Optional[HonCommand] = self.commands.get(main)
 | |
|         if not base:
 | |
|             return
 | |
|         for command, data in self.commands.items():
 | |
|             if command == main or target and command not in target:
 | |
|                 continue
 | |
| 
 | |
|             for name, target_param in data.parameters.items():
 | |
|                 if not (base_param := base.parameters.get(name)):
 | |
|                     continue
 | |
|                 if to_sync and (
 | |
|                     (isinstance(to_sync, list) and name not in to_sync)
 | |
|                     or not base_param.mandatory
 | |
|                 ):
 | |
|                     continue
 | |
|                 self.sync_parameter(base_param, target_param)
 | |
| 
 | |
|     def sync_parameter(self, main: Parameter, target: Parameter) -> None:
 | |
|         if isinstance(main, HonParameterRange) and isinstance(
 | |
|             target, HonParameterRange
 | |
|         ):
 | |
|             target.max = main.max
 | |
|             target.min = main.min
 | |
|             target.step = main.step
 | |
|         elif isinstance(target, HonParameterRange):
 | |
|             target.max = int(main.value)
 | |
|             target.min = int(main.value)
 | |
|             target.step = 1
 | |
|         elif isinstance(target, HonParameterEnum):
 | |
|             target.values = main.values
 | |
|         target.value = main.value
 |