YOGYUI

힐스테이트 광교산::엘리베이터 - 애플 홈킷 + 구글 어시스턴트 연동 본문

홈네트워크(IoT)/힐스테이트 광교산

힐스테이트 광교산::엘리베이터 - 애플 홈킷 + 구글 어시스턴트 연동

요겨 2022. 6. 24. 01:22
반응형

지난 포스트에서 엘리베이터 호출 관련 RS-485 패킷 후킹 및 분석을 완료했다(링크)

python 시리얼 패킷 파서 및 MQTT 메시지 핸들러 구문을 구현하고, Homebridge 및 Home Assistant 액세서를 추가해서 홈네트워크 플랫폼과 연동을 시켜보자

1. python 코드 작성

깃헙 저장소 hillstate-elevator 브랜치로 소스코드 커밋 완료

https://github.com/YOGYUI/HomeNetwork/tree/hillstate-elevator

 

GitHub - YOGYUI/HomeNetwork: HomeNetwork(Homebridge) Repo

HomeNetwork(Homebridge) Repo. Contribute to YOGYUI/HomeNetwork development by creating an account on GitHub.

github.com

 

시리얼 패킷 파서 구문은 지난번 패킷 후킹때 구현했던 것과 크게 차이가 없다

class ParserVarious(SerialParser):    
    def interpretPacket(self, packet: bytearray):
        try:
            if packet[3] == 0x18:  # 난방
                self.handleThermostat(packet)
            elif packet[3] == 0x1B:  # 가스차단기
                self.handleGasValve(packet)
            elif packet[3] == 0x1C:  # 시스템에어컨
                self.handleAirconditioner(packet)
            elif packet[3] == 0x2B:  # 환기 (전열교환기)
                self.handleVentilator(packet)
            elif packet[3] == 0x34:  # 엘리베이터
                self.handleElevator(packet)
            else:
                if packet[4] == 0x02:
                    print(self.prettifyPacket(packet))
        except Exception as e:
            writeLog('interpretPacket::Exception::{} ({})'.format(e, packet), self)

    def handleElevator(self, packet: bytearray):
        if packet[4] == 0x01:  # 상태 쿼리 (월패드 -> 복도 미니패드)
            state = packet[8] & 0x0F  # 0 = idle, 1 = arrived, 5 = moving(up), 6 = moving(down)
            elevator_index = (packet[8] & 0xF0) >> 4  # 0x0A or 0x0B
            floor = ['??', '??']
            if elevator_index == 0x0A:
                floor[0] = '{:02X}'.format(packet[9])
            elif elevator_index == 0x0B:
                floor[1] = '{:02X}'.format(packet[9])
            result = {
                'device': 'elevator',
                'state': state,
                'floor': floor
            }
            self.sig_parse_result.emit(result)
        elif packet[4] == 0x02:
            pass
        elif packet[4] == 0x04:  # 상태 응답 (복도 미니패드 -> 월패드)
            state = packet[8] & 0x0F  # 0 = idle, 1 = arrived, 5 = moving(up), 6 = moving(down)
            result = {
                'device': 'elevator',
                'state': state
            }
            self.sig_parse_result.emit(result)

엘리베이터의 상태(state)는 총 4가지로 나뉘게 되며, 각 값에 따른 의미는 다음과 같다

  • 0: IDLE, 호출되지 않은 상태
  • 1: ARRIVED, 도착한 상태
  • 5: MOVING, 하행 호출 후 이동중인 상태
  • 6: MOVING, 상행 호출 후 이동중인 상태

이 값들은 패킷 9번째 바이트의 하위 4비트값을 토대로 추출한 값이며, 이를 그대로 homebridge와 home assistant 액세서리들에도 적용하도록 하자 (따로 인코딩하기 귀찮고 헷갈린다)

 

엘리베이터 관련 객체는 다음과 같이 구현해봤다

class Elevator(Device):
    time_arrived: float = 0.
    time_threshold_arrived_change: float = 10.
    floor_list: List[str]
    moving_list: List[bool]

    def __init__(self, name: str = 'Elevator', count: int = 2, **kwargs):
        super().__init__(name, **kwargs)
        self.floor_list = ['??'] * count
        self.moving_list = [False] * count
    
    def __repr__(self):
        repr_txt = f'<{self.name}({self.__class__.__name__} at {hex(id(self))})'
        repr_txt += '>'
        return repr_txt
    
    def publish_mqtt(self):
        obj = {"state": self.state}
        if self.mqtt_client is not None:
            self.mqtt_client.publish(self.mqtt_publish_topic, json.dumps(obj), 1)
    
    def updateState(self, state: int, **kwargs):
        self.state = state  # 0 = idle, 1 = arrived, 5 = moving(up), 6 = moving(down)
        if not self.init:
            self.publish_mqtt()
            self.init = True
        if self.state != self.state_prev:
            if self.state == 1:
                self.time_arrived = time.perf_counter()
                self.publish_mqtt()
                self.state_prev = self.state
            else:
                if self.state_prev == 1:
                    # 도착 후 상태가 다시 idle로 바뀔 때 시간차가 적으면 occupancy sensor가 즉시 off가 되어
                    # notification이 제대로 되지 않는 문제가 있어, 상태 변화 딜레이를 줘야한다
                    time_elapsed_last_arrived = time.perf_counter() - self.time_arrived
                    if time_elapsed_last_arrived > self.time_threshold_arrived_change:
                        self.publish_mqtt()
                        self.state_prev = self.state
                else:
                    self.publish_mqtt()
                    self.state_prev = self.state
        if 'floor' in kwargs.keys():
            floor = kwargs.get('floor')
            try:
                for i in range(len(self.floor_list)):
                    if floor[i] != '??':
                        self.floor_list[i] = floor[i]
            except Exception:
                pass
    
    def makePacketCallDownside(self) -> bytearray:
        # 하행 호출
        # F7 0B 01 34 02 41 10 06 00 XX EE
        packet = bytearray([0xF7, 0x0B, 0x01, 0x34])
        packet.append(0x02)
        packet.extend([0x41, 0x10, 0x06, 0x00])
        packet.append(self.calcXORChecksum(packet))
        packet.append(0xEE)
        return packet

엘리베이터가 이동중일 때 현재 층수(floor)도 일단 구현은 해봤는데, 패킷에 나타나는 값이 의미하는걸 정확히 파악하지 못해서 딱히 특별한 용도로 사용하지는 않는다 ㅎㅎ

또한, 패킷 파서에 의해 엘리베이터가 도착(state=1) 후 1초도 되지 않아 IDLE(state=0)로 상태가 변하게 되는데, 즉시 상태를 변화시키면 도착 알림 센서를 트리거할 시간이 짧아 제대로 작동하지 않는 경우가 있어 도착 후 10초간 상태를 유지하도록 간단하게 트릭을 추가해줬다

 

class ThreadCommandQueue(threading.Thread):
    _keepAlive: bool = True

    def __init__(self, queue_: queue.Queue):
        threading.Thread.__init__(self, name='Command Queue Thread')
        self._queue = queue_
        self._retry_cnt = 10
        self._delay_response = 0.4
        self.sig_terminated = Callback()

    def run(self):
        writeLog('Started', self)
        while self._keepAlive:
            if not self._queue.empty():
                elem = self._queue.get()
                elem_txt = '\n'
                for k, v in elem.items():
                    elem_txt += f'  {k}: {v}\n'
                writeLog(f'Get Command Queue: \n{{{elem_txt}}}', self)
                try:
                    dev = elem['device']
                    category = elem['category']
                    target = elem['target']
                    parser = elem['parser']
                    if target is None:
                        continue

                    if isinstance(dev, Light):
                        # 생략
                    elif isinstance(dev, Outlet):
                        # 생략
                    elif isinstance(dev, GasValve):
                        # 생략
                    elif isinstance(dev, Thermostat):
                        # 생략
                    elif isinstance(dev, Ventilator):
                        # 생략
                    elif isinstance(dev, AirConditioner):
                        # 생략
                    elif isinstance(dev, Elevator):
                        if category == 'state':
                            self.set_elevator_call(dev, target, parser)
                except Exception as e:
                    writeLog(str(e), self)
            else:
                time.sleep(1e-3)
        writeLog('Terminated', self)
        self.sig_terminated.emit()

    def stop(self):
        self._keepAlive = False

    def set_elevator_call(self, dev: Elevator, target: int, parser: SerialParser):
        cnt = 0
        if target == 5:
            packet_command = dev.makePacketCallUpside()
        elif target == 6:
            packet_command = dev.makePacketCallDownside()
        else:
            return
        while cnt < self._retry_cnt:
            if dev.state == target:
                break
            if parser.isSerialLineBusy():
                time.sleep(1e-3)  # prevent cpu occupation
                continue
            parser.sendPacket(packet_command)
            cnt += 1
            time.sleep(0.2)  # wait for parsing response
        if cnt > 0:
            writeLog('set_elevator_call({})::send # = {}'.format(target, cnt), self)
            time.sleep(self._delay_response)
        dev.publish_mqtt()

명령 큐 쓰레드에서는 엘리베이터 호출 관련 메서드를 따로 만들어주고, 목표 상태값(target)은 5 혹은 6으로 받을 수 있게 구현해줬다 (어차피 6=상행 호출은 작동하지도 않지만 ㅠ)

2. Homebridge, Home Assistant 액세서리 추가

광교아이파크때와 마찬가지로 호출용 switch, 그리고 도착을 알리기 위해 occupancy sensor 두 개의 액세서리를 활용하기로 했다 (나중에 좀 더 합리적인 아이템으로 바꾸는 방법을 찾아봐야겠다;;)

{
    "accessory": "mqttthing",
    "type": "switch",
    "name": "Elevator Call Downside (MQTT)",
    "url": "mosquitto broker url",
    "username": "mosquitto auth id",
    "password": "mosquitto auth password",
    "topics": {
        "getOn": {
            "topic": "home/hillstate/elevator/state",
            "apply": "return (JSON.parse(message).state == 6);"
        },
        "setOn": {
            "topic": "home/hillstate/elevator/command",
            "apply": "return JSON.stringify({state: 6});"
        }
    },
    "integerValue": true,
    "onValue": 1,
    "offValue": 0,
    "logMqtt": true
},
{
    "accessory": "mqttthing",
    "type": "occupancySensor",
    "name": "Elevator Arrived (MQTT)",
    "url": "mosquitto broker url",
    "username": "mosquitto auth id",
    "password": "mosquitto auth password",
    "topics": {
        "getOccupancyDetected": {
            "topic": "home/hillstate/elevator/state",
            "apply": "return (JSON.parse(message).state == 1);"
        }
    },
    "integerValue": true,
    "onValue": 1,
    "offValue": 0,
    "logMqtt": true
}

Homebridge 액세서리 추가

 

Home Assistant도 유사하게 추가해줬다 (안드로이드 기기는 거의 쓰지 않다보니 더 대충 구현해버렸다 ㅋㅋ)

HA 커뮤니티를 보면 엘리베이터가 도착하면 스피커(구글 홈 미니)로 도착했다는 음성 메시지를 호출하는 등 고급 기능들을 구현한 사람들이 많던데.. 나중에 시간내서 한번 따라해봐야겠다 ㅎㅎ

3. 작동 테스트

엘리베이터 호출(하행) - 도착 시 알림 모두 정상적으로 동작한다 (물론 Siri 음성명령도 잘 먹힌다!)



아이폰으로 호출을 하면 마치 복도의 제어패드에서 사람이 엘리베이터 호출 버튼을 누른 것과 같은 동작을 하는 것을 알 수 있다 (즉, 명령 패킷으로는 복도 제어패드가 토글되는 동작을 프로그래밍으로 구현할 수 있는 것을 알 수 있다)


이제 홈네트워크 연동 작업이 마무리단계다 

아마 RS-485 포트 2개를 통해 연계할 수 있는 건 거의 다 한 느낌인데.. 월패드로 이것저것 건드려보면서 더 할만한게 남았나 살펴봐야겠다 (아직 4번째 바이트가 0x2A랑 0x48인 패킷의 정체도 남았다!)

 

[TODO]

  • 조명 On/Off
  • 아울렛(전원 콘센트) On/Off - 실시간 전력량 조회는 불가능
  • 도시가스 차단
  • 난방 제어 (On/Off, 희망 온도 설정, 현재 방 온도 가져오기)
  • 환기 (전열교환기)
  • 시스템 에어컨 (냉방 및 공기청정)
  • 엘리베이터 호출
  • 도어락 해제
  • Optional: 현관 비디오폰 영상, 거실 천장 모션 센서
반응형
Comments