YOGYUI

힐스테이트 광교산::환기(전열교환기) - 애플 홈킷 + 구글 어시스턴트 연동 본문

홈네트워크(IoT)/힐스테이트 광교산

힐스테이트 광교산::환기(전열교환기) - 애플 홈킷 + 구글 어시스턴트 연동

요겨 2022. 6. 18. 19:14
반응형

지난 포스트에서 환기(전열교환기) 관련 RS-485 패킷 후킹 및 분석을 완료했다 (링크)

이제껏 구현한 조명/아울렛/가스밸브/난방과 크게 패킷 구조가 다르지 않으니 최대한 코드 구조를 재활용하면서 홈네트워크 플랫폼과 연동하도록 한다

1. python 코드 작성

깃헙 저장소 hillstate-ventilator 브랜치로 소스코드 커밋 완료

https://github.com/YOGYUI/HomeNetwork/tree/hillstate-ventilator

 

GitHub - YOGYUI/HomeNetwork: HomeNetwork(Homebridge) Repo

HomeNetwork(Homebridge) Repo. Contribute to YOGYUI/HomeNetwork development by creating an account on GitHub.

github.com

홈브릿지나 홈어시스턴트의 환기 관련 액세서리는 일반 선풍기(fan) 액세서리를 활용했는데, 선풍기의 풍량은 0 ~ 100으로 퍼센트(%) 단위를 사용하는 반면 전열교환기는 약, 중, 강 세단계로 풍량이 정해져있기 때문에 mqtt 메시지를 주고받을 때 이를 적절히 환산해주는 알고리즘이 필요하다

- 알고리즘이라 해서 거창한 건 아니고.. 그냥 0~30%까지는 약풍, 31 ~ 60%까지는 중풍, 61 ~ 100%까지는 강풍으로 범위 기반 인코딩 방식을 사용했다 ㅎㅎ

 

시리얼 패킷 파서는 가스밸브, 난방과 유사하게 짤 수 있다

class ParserVarious(SerialParser):    
    def interpretPacket(self, packet: bytearray):
        try:
            if packet[2:4] == bytearray([0x01, 0x1B]):  # 가스차단기
                # 가스밸브 관련 코드 (생략)
            elif packet[2:4] == bytearray([0x01, 0x18]):  # 난방
                # 난방 관련 코드 (생략)
            elif packet[2:4] == bytearray([0x01, 0x2B]):  # 환기 (전열교환기)
                if packet[4] == 0x01:
                    pass
                elif packet[4] == 0x02:
                    pass
                elif packet[4] == 0x04:
                    state = 0 if packet[8] == 0x02 else 1
                    rotation_speed = packet[9]  # 0x01=약, 0x03=중, 0x07=강
                    result = {
                        'device': 'ventilator',
                        'state': state
                    }
                    if rotation_speed != 0:
                        result['rotation_speed'] = rotation_speed
                    self.sig_parse_result.emit(result)
        except Exception as e:
            writeLog('interpretPacket::Exception::{} ({})'.format(e, packet), self)

전열교환기의 작동이 중지되었을 때는 풍량이 0값으로 들어오는데 이를 굳이 디바이스 객체에 적용할 필요는 없으므로 조건문을 달아둔 게 다른 디바이스들과의 차별점

 

전열교환기 관련 디바이스 클래스 이름은 Ventilator로 정했고, 다른 기기들과 마찬가지로 Device 객체를 상속했다

class Ventilator(Device):
    rotation_speed: int = 0
    rotation_speed_prev: int = 0

    def __init__(self, name: str = 'Ventilator', **kwargs):
        super().__init__(name, **kwargs)
    
    def __repr__(self):
        repr_txt = f'<{self.name}({self.__class__.__name__} at {hex(id(self))})'
        repr_txt += '>'
        return repr_txt
    
    def publish_mqtt(self):
        obj = {"state": self.state}
        if self.state:
            if self.rotation_speed == 0x01:
                obj['rotationspeed'] = 30
            elif self.rotation_speed == 0x03:
                obj['rotationspeed'] = 60
            elif self.rotation_speed == 0x07:
                obj['rotationspeed'] = 100
        if self.mqtt_client is not None:
            self.mqtt_client.publish(self.mqtt_publish_topic, json.dumps(obj), 1)
    
    def setState(self, state: int, **kwargs):
        self.state = state
        if not self.init:
            self.publish_mqtt()
            self.init = True
        if self.state != self.state_prev:
            self.publish_mqtt()
        self.state_prev = self.state
        # 풍량 인자
        rotation_speed = kwargs.get('rotation_speed')
        if rotation_speed is not None:
            self.rotation_speed = rotation_speed
            if self.rotation_speed != self.rotation_speed_prev:
                self.publish_mqtt()
            self.rotation_speed_prev = self.rotation_speed
    
    def makePacketQueryState(self) -> bytearray:
        # F7 0B 01 2B 01 40 11 00 00 XX EE
        # XX: Checksum (XOR SUM)
        packet = bytearray([0xF7, 0x0B, 0x01, 0x2B, 0x01, 0x40])
        packet.extend([0x11, 0x00, 0x00])
        packet.append(self.calcXORChecksum(packet))
        packet.append(0xEE)
        return packet

    def makePacketSetState(self, state: bool):
        # F7 0B 01 2B 02 40 11 XX 00 YY EE
        # XX: 0x01=On, 0x02=Off
        # YY: Checksum (XOR SUM)
        packet = bytearray([0xF7, 0x0B, 0x01, 0x2B, 0x02, 0x40])
        # packet.append(0x10 + (self.room_index & 0x0F))
        packet.append(0x11)  # 환기는 거실(공간인덱스 1)에 설치된걸로 설정되어 있다
        if state:
            packet.extend([0x01, 0x00])
        else:
            packet.extend([0x02, 0x00])
        packet.append(self.calcXORChecksum(packet))
        packet.append(0xEE)
        return packet

    def makePacketSetRotationSpeed(self, rotation_speed: int):
        # F7 0B 01 2B 02 42 11 XX 00 YY EE
        # XX: 풍량 (0x01=약, 0x03=중, 0x07=강)
        # YY: Checksum (XOR SUM)
        packet = bytearray([0xF7, 0x0B, 0x01, 0x2B, 0x02, 0x42])
        # packet.append(0x10 + (self.room_index & 0x0F))
        packet.append(0x11)  # 환기는 거실(공간인덱스 1)에 설치된걸로 설정되어 있다
        packet.append(rotation_speed)
        packet.append(0x00)
        packet.append(self.calcXORChecksum(packet))
        packet.append(0xEE)
        return packet

MQTT 메시지를 발행하는 publish_mqtt, 풍량 설정 명령 패킷을 만들어는 메서드 makePacketSetRotationSpeed 두 메서드를 보면 풍량 관련 인코딩이 어떤 식으로 되는지 알 수 있다

(홈네트워크 플랫폼에서 실제 표시되는 풍량은 30%, 60%, 100% 세단계로 고정된다)

 

MQTT 메시지 핸들러 및 명령 큐 처리 쓰레드도 다른 기기들과 유사하게 짤 수 있다

  • 홈네트워크 플랫폼에서 fan 가동을 시작할 경우 state=1, rotationspeed=100이 default로 메시지가 두 번 넘어오는데, 최초 기동 시 강풍으로 하고 싶지는 않았기에 기존 상태가 켜져 있을 경우에만 풍량 조절이 이루어지도록 구현했다
    (힐스테이트 월패드는 가동 시작 시 약풍으로 가동이 시작된다)
class Home:
    def onMqttClientMessage(self, _, userdata, message):
        """
        Homebridge Publish, App Subscribe
        사용자에 의한 명령 MQTT 토픽 핸들링
        """
        if self.enable_mqtt_console_log:
            writeLog('Mqtt Client Message: {}, {}'.format(userdata, message), self)
        topic = message.topic
        msg_dict = json.loads(message.payload.decode("utf-8"))
        if 'light/command' in topic:
            # 조명 관련 처리 구문 (생략)
        if 'outlet/command' in topic:
            # 아울렛(콘센트) 관련 처리 구문 (생략)
        if 'gasvalve/command' in topic:
            # 가스밸브 관련 처리 구문 (생략)
        if 'thermostat/command' in topic:
            # 난방 관련 처리 구문 (생략)
        if 'ventilator/command' in topic:
            if 'state' in msg_dict.keys():
                self.command(
                    device=self.ventilator,
                    category='state',
                    target=msg_dict['state']
                )
            if 'rotationspeed' in msg_dict.keys():
                if self.ventilator.state == 1:
                    # 전원이 켜져있을 경우에만 풍량설정 가능하도록..
                    # 최초 전원 ON시 풍량 '약'으로 설정!
                    self.command(
                        device=self.ventilator,
                        category='rotationspeed',
                        target=msg_dict['rotationspeed']
                    )
class ThreadCommandQueue(threading.Thread):
    def run(self):
        writeLog('Started', self)
        while self._keepAlive:
            if not self._queue.empty():
                elem = self._queue.get()
                elem_txt = '\n'
                for k, v in elem.items():
                    elem_txt += f'  {k}: {v}\n'
                writeLog(f'Get Command Queue: \n{{{elem_txt}}}', self)
                try:
                    dev = elem['device']
                    category = elem['category']
                    target = elem['target']
                    parser = elem['parser']
                    if target is None:
                        continue

                    if isinstance(dev, Light):
                        # 조명 관련 (생략)
                    elif isinstance(dev, Outlet):
                        # 아울렛(콘센트) 관련 (생략)
                    elif isinstance(dev, GasValve):
                        # 가스밸브 관련 (생략)
                    elif isinstance(dev, Thermostat):
                        # 난방 관련 (생략)
                    elif isinstance(dev, Ventilator):
                        if category == 'state':
                            self.set_state_common(dev, target, parser)
                        elif category == 'rotationspeed':
                            self.set_rotation_speed(dev, target, parser)
                except Exception as e:
                    writeLog(str(e), self)
            else:
                time.sleep(1e-3)
        writeLog('Terminated', self)
        self.sig_terminated.emit()
    
    def set_rotation_speed(self, dev: Ventilator, target: int, parser: SerialParser):
        # Speed 값 변환 (100단계의 풍량을 세단계로 나누어 1, 3, 7 중 하나로)
        if target <= 30:
            conv = 0x01
        elif target <= 60:
            conv = 0x03
        else:
            conv = 0x07
        cnt = 0
        packet_command = dev.makePacketSetRotationSpeed(conv)
        packet_query = dev.makePacketQueryState()
        for _ in range(self._retry_cnt):
            if dev.rotation_speed == conv:
                break
            parser.sendPacket(packet_command)
            cnt += 1
            time.sleep(0.2)
            if dev.rotation_speed == conv:
                break
            parser.sendPacket(packet_query)
            time.sleep(0.2)
        if cnt > 0:
            writeLog('set_rotation_speed::send # = {}'.format(cnt), self)
            time.sleep(self._delay_response)
        dev.publish_mqtt()

2. Homebridge, Home Assistant 액세서리 추가

homebridge 액세서리는 다음과 같은 템플릿으로 추가할 수 있다 (fan)

{
    "accessory": "mqttthing",
    "type": "fan",
    "name": "Ventilator (MQTT)",
    "url": "mosquitto broker url",
    "username": "mosquitto auth id",
    "password": "mosquitto auth password",
    "caption": "Ventilator(MQTT)",
    "topics": {
        "getOn": {
            "topic": "home/hillstate/ventilator/state",
            "apply": "return JSON.parse(message).state;"
        },
        "setOn": {
            "topic": "home/hillstate/ventilator/command",
            "apply": "return JSON.stringify({state: message});"
        },
        "getRotationSpeed": {
            "topic": "home/hillstate/ventilator/state",
            "apply": "return JSON.parse(message).rotationspeed;"
        },
        "setRotationSpeed": {
            "topic": "home/hillstate/ventilator/command",
            "apply": "return JSON.stringify({rotationspeed: message});"
        }
    },
    "integerValue": true,
    "logMqtt": true
}

homebridge 액세서리 추가

 

HA 액세서리도 동일한 방식으로 MQTT 방식으로 구동하게 만들면 된다

fan:
  - platform: mqtt
    name: "전열교환기"
    unique_id: "ventilator"
    state_topic: "home/hillstate/ventilator/state"
    state_value_template: "{% if value_json.state %} ON {% else %} OFF {% endif %}"
    command_topic: "home/hillstate/ventilator/command"
    command_template: >- 
      {% set values = {'OFF': 0, 'ON': 1} %}
      { "state": {{ values[value] if value in values.keys() else 0 }} }
    percentage_state_topic: "home/hillstate/ventilator/state"
    percentage_value_template: "{{ value_json.rotationspeed }}"
    percentage_command_topic: "home/hillstate/ventilator/command"
    percentage_command_template: '{ "rotationspeed": {{ value }} }'
    speed_range_min: 1
    speed_range_max: 100

Home Assistant 액세서리 추가

3. 작동 테스트

가스밸브, 난방과 마찬가지로 환기 역시 홈킷에서 상태 변경 후 월패드에 UI 업데이트될 때까지 레이턴시가 있다 (원인: 쿼리 주기)

 

반면 월패드에서 상태 변경시 아이폰에는 변경 내용이 거의 실시간으로 적용된다~


일반 공기청정기는 집안의 미세먼지나 부유물질만 걸러줄 뿐 집안에 쌓이는 이산화탄소 등 걸러줘야 하는 기체 성분은 없애주지 못하기 때문에 창문을 열어서 환기를 주기적으로 해줘야되는데, 전열교환기는 기상상황이 안좋을 때 (비가 오거나 미세먼지 농도가 심할 때 등) 집안을 환기하는데 약간의 도움이 되는 장치라 활용도가 꽤 높다

 

홈네트워크의 자동화(automation) 기능을 활용해서 매일 일정 시간마다 전열교환기를 작동하게 해주면 맑은 실내공기를 기대할 수 있다

나는 매일 오전 9시에 작동 (풍량=약)시키고, 오후 9시에 끄는 자동화로 구성해서 사용 중 (자동화는 간단하게 애플 홈앱으로 구성)

 

[TODO]

  • 조명 On/Off
  • 아울렛(전원 콘센트) On/Off - 실시간 전력량 조회는 불가능
  • 도시가스 차단
  • 난방 제어 (On/Off, 희망 온도 설정, 현재 방 온도 가져오기)
  • 환기 (전열교환기)
  • 시스템 에어컨 (냉방 및 공기청정)
  • 엘리베이터 호출
  • 도어락 해제
  • Optional: 현관 비디오폰 영상, 거실 천장 모션 센서

 

 

 

반응형