YOGYUI

힐스테이트 광교산::난방 - 애플 홈킷 + 구글 어시스턴트 연동 본문

홈네트워크(IoT)/힐스테이트 광교산

힐스테이트 광교산::난방 - 애플 홈킷 + 구글 어시스턴트 연동

요겨 2022. 6. 17. 00:11
반응형

지난 포스트에서 난방 관련 RS-485 패킷 후킹 및 분석을 완료했다 (링크)

시리얼 패킷 파싱 및 명령 패킷 생성 python 코드를 작성하고 홈네트워크 플랫폼 액세서리를 추가해주자

1. python 코드 작성

깃헙 저장소 hillstate-thermostat 브랜치로 소스코드를 커밋했다

https://github.com/YOGYUI/HomeNetwork/tree/hillstate-thermostat

 

GitHub - YOGYUI/HomeNetwork: HomeNetwork(Homebridge) Repo

HomeNetwork(Homebridge) Repo. Contribute to YOGYUI/HomeNetwork development by creating an account on GitHub.

github.com

시리얼 패킷 파서는 다음과 같이 변경해줬다 (기존에 클래스 이름을 ParserGas라고 했는데, ParserVarious로 변경했다.. 마땅한 이름이 안떠올라 ㅠㅠ)

class ParserVarious(SerialParser):    
    def interpretPacket(self, packet: bytearray):
        try:
            if packet[2:4] == bytearray([0x01, 0x1B]):  # 가스차단기
                if packet[4] == 0x01:  # 상태 쿼리
                    pass
                elif packet[4] == 0x02:  # 상태 변경 명령
                    pass
                elif packet[4] == 0x04:  # 상태 응답
                    state = 0 if packet[8] == 0x03 else 1
                    result = {
                        'device': 'gasvalve',
                        'state': state
                    }
                    self.sig_parse_result.emit(result)
            elif packet[2:4] == bytearray([0x01, 0x18]):  # 난방
                room_idx = packet[6] & 0x0F
                if packet[4] == 0x01:  # 상태 쿼리
                    pass
                elif packet[4] == 0x02:  # On/Off, 온도 변경 명령
                    pass
                elif packet[4] == 0x04:  # 상태 응답
                    if room_idx == 0:  # 일반 쿼리 (존재하는 모든 디바이스)
                        thermostat_count = (len(packet) - 10) // 3
                        for idx in range(thermostat_count):
                            dev_packet = packet[8 + idx * 3: 8 + (idx + 1) * 3]
                            if dev_packet[0] != 0x00:  # 0이면 존재하지 않는 디바이스
                                state = 0 if dev_packet[0] == 0x04 else 1                            
                                temp_current = dev_packet[1]  # 현재 온도
                                temp_config = dev_packet[2]  # 설정 온도
                                result = {
                                    'device': 'thermostat',
                                    'room_index': idx + 1,
                                    'state': state,
                                    'temp_current': temp_current,
                                    'temp_config': temp_config
                                }
                                self.sig_parse_result.emit(result)
                    else:  # 상태 변경 명령 직후 응답
                        if packet[5] in [0x45, 0x46]:  # 0x46: On/Off 설정 변경에 대한 응답, 0x45: 온도 설정 변경에 대한 응답
                            state = 0 if packet[8] == 0x04 else 1
                            temp_current = packet[9]  # 현재 온도
                            temp_config = packet[10]  # 설정 온도
                            result = {
                                'device': 'thermostat',
                                'room_index': room_idx,
                                'state': state,
                                'temp_current': temp_current,
                                'temp_config': temp_config
                            }
                            self.sig_parse_result.emit(result)
        except Exception as e:
            writeLog('interpretPacket::Exception::{} ({})'.format(e, packet), self)

라인수가 많아서 복잡해보이지만, 찬찬히 살펴보면 상태, 현재온도, 설정온도 3개 파싱하는게 전부인 것을 알 수 있다

 

난방 관련 디바이스 클래스는 Thermostat으로 이름을 정했으며, 온도 설정 시리얼 패킷 생성 메서드가 추가된 것 말고는 다른 Device 상속 객체들과 구조는 유사하다

class Thermostat(Device):
    temp_current: int = 0  # 현재 온도
    temp_current_prev: int = 0  # 현재 온도 버퍼
    temp_config: int = 0  # 난방 설정 온도
    temp_config_prev: int = 0  # 난방 설정 온도 버퍼
    temp_range: List[int]  # 설정 가능한 온도값의 범위

    def __init__(self, name: str = 'Thermostat', **kwargs):
        super().__init__(name, **kwargs)
        self.temp_range = [0, 100]
    
    def __repr__(self):
        repr_txt = f'<{self.name}({self.__class__.__name__} at {hex(id(self))})'
        repr_txt += f' Room Idx: {self.room_index}'
        repr_txt += '>'
        return repr_txt
    
    def publish_mqtt(self):
        obj = {
            "state": 'HEAT' if self.state == 1 else 'OFF',
            "currentTemperature": self.temp_current, 
            "targetTemperature": self.temp_config
        }
        if self.mqtt_client is not None:
            self.mqtt_client.publish(self.mqtt_publish_topic, json.dumps(obj), 1)
    
    def setTemperatureRange(self, range_min: int, range_max: int):
        self.temp_range[0] = range_min
        self.temp_range[1] = range_max
        self.temp_current = max(range_min, min(range_max, self.temp_current))
        self.temp_config = max(range_min, min(range_max, self.temp_config))

    def setState(self, state: int, **kwargs):
        self.state = state
        if not self.init:
            self.publish_mqtt()
            self.init = True
        if self.state != self.state_prev:
            self.publish_mqtt()
        self.state_prev = self.state
        # 온도값 인자
        temp_current = kwargs.get('temp_current')
        if temp_current is not None:
            self.temp_current = temp_current
            if self.temp_current != self.temp_current_prev:
                self.publish_mqtt()
            self.temp_current_prev = self.temp_current
        temp_config = kwargs.get('temp_config')
        if temp_config is not None:
            self.temp_config = temp_config
            if self.temp_config != self.temp_config_prev:
                self.publish_mqtt()
            self.temp_config_prev = self.temp_config
    
    def makePacketQueryState(self) -> bytearray:
        # F7 0B 01 18 01 46 10 00 00 XX EE
        # XX: Checksum (XOR SUM)
        packet = bytearray([0xF7, 0x0B, 0x01, 0x18, 0x01, 0x46])
        packet.extend([0x10, 0x00, 0x00])
        packet.append(self.calcXORChecksum(packet))
        packet.append(0xEE)
        return packet

    def makePacketSetState(self, state: bool):
        # F7 0B 01 18 02 46 XX YY 00 ZZ EE
        # XX: 상위 4비트 = 1, 하위 4비트 = Room Index
        # YY: 0x01=On, 0x04=Off
        # ZZ: Checksum (XOR SUM)
        packet = bytearray([0xF7, 0x0B, 0x01, 0x18, 0x02, 0x46])
        packet.append(0x10 + (self.room_index & 0x0F))
        if state:
            packet.extend([0x01, 0x00])
        else:
            packet.extend([0x04, 0x00])
        packet.append(self.calcXORChecksum(packet))
        packet.append(0xEE)
        return packet
        
    def makePacketSetTemperature(self, temperature: int):
        # F7 0B 01 18 02 45 XX YY 00 ZZ EE
        # XX: 상위 4비트 = 1, 하위 4비트 = Room Index
        # YY: 온도 설정값
        # ZZ: Checksum (XOR SUM)
        packet = bytearray([0xF7, 0x0B, 0x01, 0x18, 0x02, 0x45])
        packet.append(0x10 + (self.room_index & 0x0F))
        packet.extend([temperature & 0xFF, 0x00])
        packet.append(self.calcXORChecksum(packet))
        packet.append(0xEE)
        return packet

 

주방과 같이 난방 디바이스가 없는 공간도 있으므로, Room 객체는 초기화할 때 난방 기기가 있는지 여부를 플래그 처리하도록 하자

class Room:
    name: str = 'Room'
    index: int = 0
    lights: List[Light]
    outlets: List[Outlet]
    has_thermostat: bool = False
    thermostat: Union[Thermostat, None] = None

    def __init__(
        self, name: str = 'Room', 
        index: int = 0, 
        light_count: int = 0, 
        outlet_count: int = 0,
        has_thermostat: bool = False,
        **kwargs
    ):
        self.name = name
        self.index = index
        # 중략
        self.has_thermostat = has_thermostat
        if self.has_thermostat:
            self.thermostat = Thermostat(
                name=f'Thermostat',
                room_index=self.index, 
                mqtt_client=kwargs.get('mqtt_client')
            )

        writeLog(f'Room Created >> {self}', self)

 

난방 관련 명령은 작동 On/Off 외에 온도 설정 기능도 있으므로 MQTT 메시지 핸들러 구문 및 명령 큐 처리 쓰레드에 메서드 한개를 추가해주자

class Home:
    def onMqttClientMessage(self, _, userdata, message):
        """
        Homebridge Publish, App Subscribe
        사용자에 의한 명령 MQTT 토픽 핸들링
        """
        if self.enable_mqtt_console_log:
            writeLog('Mqtt Client Message: {}, {}'.format(userdata, message), self)
        topic = message.topic
        msg_dict = json.loads(message.payload.decode("utf-8"))
        if 'thermostat/command' in topic:
            splt = topic.split('/')
            room_idx = int(splt[-1])
            room = self.getRoomObjectByIndex(room_idx)
            if room is not None and room.has_thermostat:
                if 'state' in msg_dict.keys():
                    self.command(
                        device=room.thermostat,
                        category='state',
                        target=msg_dict['state']
                    )
                if 'targetTemperature' in msg_dict.keys():
                    self.command(
                        device=room.thermostat,
                        category='temperature',
                        target=msg_dict['targetTemperature']
                    )
class ThreadCommandQueue(threading.Thread):
    _keepAlive: bool = True

    def __init__(self, queue_: queue.Queue):
        threading.Thread.__init__(self, name='Command Queue Thread')
        self._queue = queue_
        self._retry_cnt = 10
        self._delay_response = 0.4
        self.sig_terminated = Callback()

    def run(self):
        writeLog('Started', self)
        while self._keepAlive:
            if not self._queue.empty():
                elem = self._queue.get()
                elem_txt = '\n'
                for k, v in elem.items():
                    elem_txt += f'  {k}: {v}\n'
                writeLog(f'Get Command Queue: \n{{{elem_txt}}}', self)
                try:
                    dev = elem['device']
                    category = elem['category']
                    target = elem['target']
                    parser = elem['parser']
                    if target is None:
                        continue

                    if isinstance(dev, Thermostat):
                        if category == 'state':
                            if target == 'OFF':
                                self.set_state_common(dev, 0, parser)
                            elif target == 'HEAT':
                                self.set_state_common(dev, 1, parser)
                        elif category == 'temperature':
                            self.set_temperature(dev, target, parser)
                except Exception as e:
                    writeLog(str(e), self)
            else:
                time.sleep(1e-3)
        writeLog('Terminated', self)
        self.sig_terminated.emit()

    def stop(self):
        self._keepAlive = False

    def set_state_common(self, dev: Device, target: int, parser: SerialParser):
        cnt = 0
        packet_command = dev.makePacketSetState(bool(target))
        packet_query = dev.makePacketQueryState()
        for _ in range(self._retry_cnt):
            if dev.state == target:
                break
            parser.sendPacket(packet_command)
            cnt += 1
            time.sleep(0.2)
            if dev.state == target:
                break
            parser.sendPacket(packet_query)
            time.sleep(0.2)
        if cnt > 0:
            writeLog('set_state_common::send # = {}'.format(cnt), self)
            time.sleep(self._delay_response)
        dev.publish_mqtt()

    def set_temperature(self, dev: Thermostat, target: float, parser: SerialParser):
        # 힐스테이트는 온도값 범위가 정수형이므로 올림처리해준다
        target_temp = math.ceil(target)
        cnt = 0
        packet_command = dev.makePacketSetTemperature(target_temp)
        packet_query = dev.makePacketQueryState()
        for _ in range(self._retry_cnt):
            if dev.temp_config == target_temp:
                break
            parser.sendPacket(packet_command)
            cnt += 1
            time.sleep(0.2)
            if dev.temp_config == target_temp:
                break
            parser.sendPacket(packet_query)
            time.sleep(0.2)
        if cnt > 0:
            writeLog('set_temperature::send # = {}'.format(cnt), self)
            time.sleep(self._delay_response)
        dev.publish_mqtt()

후술하겠지만, 난방 관련 상태 설정 메시지는 0, 1 boolean값이 아니라 'OFF', 'HEAT'와 같이 문자열 형태로 전달되므로 적절히 처리해줘야 한다

또한 힐스테이트에 설치된 난방기기들은 정수형으로 온도값을 처리하므로, Homebridge나 Home Assistant가 Floating형 온도값을 보내줄 때 이를 올림 처리 (math.ceil)해서 희망 온도를 설정할 수 있도록 해줬다

(ex: 24.5도 설정 명령 내려올 때 25도로 올림)

 

2. Homebridge, Home Assistant 액세서리 추가

마찬가지로, 조명/아울렛/가스밸브와는 달리 동작 On/Off 뿐만 아니라 온도 설정 및 현재 온도 가져오기 기능도 구현해야 하기 때문에 액세서리 스크립트도 조금은 복잡하다

Homebridge MQTT-Thing 플러그인의 Thermostat 아이템 설명을 참고해서 작성해주자

https://github.com/arachnetech/homebridge-mqttthing/blob/master/docs/Accessories.md#thermostat

{
    "accessory": "mqttthing",
    "type": "thermostat",
    "name": "Livingroom Thermostat (MQTT)",
    "url": "mosquitto broker url",
    "username": "mosquitto auth id",
    "password": "mosquitto auth password",
    "topics": {
        "getCurrentHeatingCoolingState": {
            "topic": "home/hillstate/thermostat/state/1",
            "apply": "return JSON.parse(message).state;"
        },
        "setTargetHeatingCoolingState": {
            "topic": "home/hillstate/thermostat/command/1",
            "apply": "return JSON.stringify({state: message});"
        },
        "getTargetHeatingCoolingState": {
            "topic": "home/hillstate/thermostat/state/1",
            "apply": "return JSON.parse(message).state;"
         },
        "getCurrentTemperature": {
            "topic": "home/hillstate/thermostat/state/1",
            "apply": "return JSON.parse(message).currentTemperature;"
        },
        "setTargetTemperature": {
            "topic": "home/hillstate/thermostat/command/1",
            "apply": "return JSON.stringify({targetTemperature: message});"
        },
        "getTargetTemperature": {
            "topic": "home/hillstate/thermostat/state/1",
            "apply": "return JSON.parse(message).targetTemperature;"
        }
    },
    "minTemperature": 18,
    "maxTemperature": 30,
    "restrictHeatingCoolingState": [
        0,
        1
    ],
    "logMqtt": true
}

가동 상태 조회/명령, 현재 온도 조회, 희망 온도 조회/설정에 대한 6개 종류의 MQTT 토픽이 필요한데, 모두 /state, /command 두 종류 토픽 안에 메시지를 json 형태로 모든 정보를 담는 방향으로 구현했다 (python 코드와 매칭)

에어컨은 따로 액세서리를 구현할 거라서 HeatingCoolingState는 0, 1 로 'OFF', 'HEAT' 두 상태만 설정

 

climate:
  - platform: mqtt
    name: "거실 난방"
    unique_id: "livingroom_thermostat"
    modes:
      - "off"
      - "heat"
    mode_state_topic: "home/hillstate/thermostat/state/1"
    mode_state_template: "{{ value_json.state.lower() }}"
    mode_command_topic: "home/hillstate/thermostat/command/1"
    mode_command_template: >-
      {% set values = {'off': '"OFF"', 'heat': '"HEAT"'} %}
      { "state": {{ values[value] if value in values.keys() else "OFF" }} }
    temperature_state_topic: "home/hillstate/thermostat/state/1"
    temperature_state_template: "{{ value_json.targetTemperature }}"
    temperature_command_topic: "home/hillstate/thermostat/command/1"
    temperature_command_template: '{ "targetTemperature": {{ value }} }'
    current_temperature_topic: "home/hillstate/thermostat/state/1"
    current_temperature_template: "{{ value_json.currentTemperature }}"
    min_temp: 18
    max_temp: 30
    precision: 1

Home Assistant 설정도 유사한 느낌으로 스크립트를 작성할 수 있다

HA는 제어 해상도(precision)을 정할 수 있는데, Homebridge는 그런 기능이 없어서 아쉽다 ㅠ (찾아보면 있을래나?)

 

거실, 침실, 서재, 컴퓨터방 4개 영역에 대해 액세서리를 다 추가해주자 (topic의 공간 인덱스만 바꾸면 된다)

Homebridge 액세서리
Home Assistant 액세서리 추가

3. 작동 테스트

월패드 UI의 상태 변경 적용에 상당한 레이턴시가 있다

가스밸브와 마찬가지로, 여러 기기들이 하나의 RS-485포트로 쿼리-응답이 이뤄지다보니 동일 기기간 쿼리 시간차가 5초정도 간격이 있으니 어쩔 수 없다 (그래도 월패드에서 변경했을 때 홈킷에는 즉각적으로 변경이 된다)

 

주방 싱크대 아래를 보니 난방용 온수 모듈 6개가 옹기종기 모여있다

허니웰社의 MC3000A 바닥난방용 밸브 구동기를 제어하는게 핵심인 것 같다

https://blog.daum.net/aratech2018/44

모듈 6개의 케이블은 모두 좌측에 있는 허니웰사의 제어모듈에 연결되어 있다

 

아이폰으로 난방을 켜고 밸브가 제대로 작동하는지 확인해봤다

명령이 전달되고 거의 25초 후에 밸브가 열렸다 (월패드로 제어해도 마찬가지)

난방 밸브 제어모듈이 늦게 응답하는건지, 원래 난방이란게 이렇게 늦게 제어되는건지... 알수는 없지만 일단 정상적으로 열린다는 걸 확인했다는데 의의를 둔다 ㅎㅎ

 

마지막으로 거실, 침실, 컴퓨터방, 서재 4공간 모두 하나씩 난방을 켜보면서 밸브가 어떤게 열리는지 확인해봤다

거실이랑 침실은 공간이 넓어서 그런지 배관이 두 개 깔려있는 것으로 보인다 (바닥난방쪽은 영 문외한이라.. ㅋㅋ)

사전점검 때 난방을 안켜봐서 정작 골고루 잘 따뜻해지는지는 잘 모르겠다 ㅠ

겨울에 켜보고 문제있다 싶으면 하자수리 신청하지 뭐...


차근차근 진도를 잘 빼고 있다

나름 최선을 다해서 객체지향 구현을 해뒀기에 코딩 시간은 10~20분정도밖에 안걸린다 (이게 다 광교아이파크때 삽질을 거듭하며 고생한 결과)

패킷 자체도 구조가 명확하고 체크섬 계산 알고리즘도 쉬워서 따로 명령 패킷을 캡쳐해두지 않고 동적으로 패킷 생성이 가능하기 때문에 시간도 얼마 안걸리고 재미도 쏠쏠하다

[TODO]

  • 조명 On/Off
  • 아울렛(전원 콘센트) On/Off - 실시간 전력량 조회는 불가능
  • 도시가스 차단
  • 난방 제어 (On/Off, 희망 온도 설정, 현재 방 온도 가져오기)
  • 시스템 에어컨 (냉방 및 공기청정)
  • 환기 (전열교환기)
  • 엘리베이터 호출
  • 도어락 해제
  • Optional: 현관 비디오폰 영상, 거실 천장 모션 센서

 

반응형
Comments