YOGYUI

힐스테이트 광교산::주방 비디오폰 세대현관문/공동현관문 기능 분리 (HomeAssistant) 본문

홈네트워크(IoT)/힐스테이트 광교산

힐스테이트 광교산::주방 비디오폰 세대현관문/공동현관문 기능 분리 (HomeAssistant)

요겨 2024. 2. 5. 23:00
반응형

현대통신 RS-485 연동코드: 주방 비디오폰 세대현관문/공동현관문 호출 상태 및 문열기 명령 기능 분리

힐스테이트 광교산에서 사용하려고 만든 현대통신 RS-485 연동 코드가 이래저래 입소문(?)을 타면서 소스코드를 클론해 사용하는 유저분들이 조금씩 늘어감에 따라 기능 수정 및 보완 요구사항도 발생하고 있다 ^^

 

가장 최근 요청받은 이슈는 Baudrate 3840을 사용하는 주방 서브폰의 세대현관문 및 공동현관문 열림 제어 시 각각의 엔티티를 분리함과 동시에 초인종 호출 중 상태를 홈어시스턴트(HomeAssistant, HA)에 연동하고 싶다는 사항이다

 

기존에는 주방 서브폰의 하드웨어적 특성을 따라 세대현관문과 공동현관문을 분리하지 않고 하나로 묶어서 '초인종 호출중 상태 표시' 및 '문열림' 기능을 하게 구현해뒀었는데, RS-485 패킷 자체는 둘을 서로 분리할 수 있기 때문에 이번 기회에 대대적으로 코드를 수정해봤다

주방 서브폰 FSM State Diagram

 

 

코드는 깃허브 리포지터리에 커밋 아이디 37a4fd240eccdf0a10efb8701670b4bbb91f1d43 push해둔 상태

https://github.com/YOGYUI/HomeNetwork/commit/37a4fd240eccdf0a10efb8701670b4bbb91f1d43

 

- 주방 서브폰: 세대현관문/공동현관문 상태 및 mqtt pub/sub 분리 · YOGYUI/HomeNetwork@37a4fd2

- 주방 서브폰: 세대현관문/공동현관문 HA Lock, Sensor 엔티티 config 추가

github.com

본 포스팅에서는 어떤 변경사항이 있었는지 간단하게 짚어보도록 한다

1. 상태 변수 분리

class SubPhone(Device):
    # 세대현관문 호출 상태, 문잠김 상태
    state_lock_front: StateDoorLock = StateDoorLock.Secured
    state_ringing_front: int = 0
    state_ringing_front_prev: int = 0
    # 공동현관문 호출 상태, 문잠김 상태
    state_lock_communal: StateDoorLock = StateDoorLock.Secured
    state_ringring_communal: int = 0
    state_ringring_communal_prev: int = 0
    
    def updateState(self, _: int, **kwargs):
        ringing_front = kwargs.get('ringing_front')
        if ringing_front is not None:
            if ringing_front:
                self.state_ringing_front = 1
            else:
                self.state_ringing_front = 0
            self.publishMQTT()
            self.state_ringing_front_prev = self.state_ringing_front
        ringing_communal = kwargs.get('ringing_communal')
        if ringing_communal is not None:
            if ringing_communal:
                self.state_ringring_communal = 1
            else:
                self.state_ringring_communal = 0
            self.publishMQTT()
            self.state_ringring_communal_prev = self.state_ringring_communal
        lock_front = kwargs.get('lock_front')
        if lock_front is not None:
            self.state_lock_front = StateDoorLock(lock_front)
            self.publishMQTT()
        lock_communal = kwargs.get('lock_communal')
        if lock_communal is not None:
            self.publishMQTT()
            writeLog(f"Lock Communal: {self.state_lock_communal.name}", self)

기존에는 SubPhone 클래스 내에 멤버변수로 세대현관문/공동현관문 어느 것이든 하나라도 호출중인지 여부를 가리키는 state_ringing 하나만 갖고 있었는데, 엔티티 분리를 위해 state_ringing_front, state_ringing_communal 두 개의 멤버변수로 분리해 패킷 파싱 결과에 따라 값을 변경하도록 수정했다

2. 패킷 파서(Packet Parser) 수정

class PacketParser:
    def interpretPacket(self, packet: bytearray):
        if self.type_interpret == ParserType.REGULAR:
            # 중략
        elif self.type_interpret == ParserType.SUBPHONE:
            if (packet[1] & 0xF0) == 0xB0:  # 현관 도어폰 호출
                packet_info['device'] = 'front door'
                self.handleFrontDoor(packet)
            elif (packet[1] & 0xF0) == 0x50:  # 공동 현관문 호출
                packet_info['device'] = 'communal door'
                self.handleCommunalDoor(packet)
    
    def handleFrontDoor(self, packet: bytearray):
        result = {'device': DeviceType.SUBPHONE}
        notify: bool = True
        if packet[1] == 0xB5:
            # 현관 도어폰 초인종 호출 (월패드 -> 서브폰)
            result['ringing_front'] = 1
        elif packet[1] == 0xB6:
            # 현관 도어폰 초인종 호출 종료 (월패드 -> 서브폰)
            result['ringing_front'] = 0
        elif packet[1] == 0xB9:
            # 서브폰에서 현관 통화 시작 (서브폰 -> 월패드)
            result['streaming'] = 1
        elif packet[1] == 0xBA:
            # 서브폰에서 현관 통화 종료 (서브폰 -> 월패드)
            result['streaming'] = 0
        elif packet[1] == 0xB4:
            # 서브폰에서 현관문 열림 명령 (서브폰 -> 월패드)
            result['lock_front'] = 0  # Unsecured
        elif packet[1] in [0xBB, 0xB8]:
            # 현관 도어폰 통화 종료
            result['ringing_front'] = 0
            result['streaming'] = 0
            result['lock_front'] = 1  # Secured
        if notify:
            self.updateDeviceState(result)

    def handleCommunalDoor(self, packet: bytearray):
        result = {'device': DeviceType.SUBPHONE}
        notify: bool = True
        if packet[1] == 0x5A:
            # 공동현관문 호출 (월패드 -> 서브폰)
            result['ringing_communal'] = 1
        elif packet[1] == 0x5C:
            # 공동현관문 호출 종료 (월패드 -> 서브폰)
            result['ringing_communal'] = 0
        elif packet[1] == 0x5E:
            # 공동현관문 통화 종료
            result['ringing_communal'] = 0
            result['streaming'] = 0
            result['lock_communal'] = 1  # Secured
        if notify:
            self.updateDeviceState(result)

현대통신 월패드와 동영엠텍 주방 서브폰간의 RS-485 통신 패킷 명세는 아래 글을 참고

힐스테이트 광교산::주방 비디오폰 연동 - 세대 및 공동 현관문 제어 (애플 홈킷)

 

힐스테이트 광교산::주방 비디오폰 연동 - 세대 및 공동 현관문 제어 (애플 홈킷)

주방에 설치되어 있는 (주)동영엠텍의 DM-D5102QMS 주방용 TV폰의 RS-485 패킷 해석 및 응용 관련 글을 2개 올린 바 있다 힐스테이트 광교산::주방 비디오폰 RS-485 패킷 해석 힐스테이트 광교산::주방 비

yogyui.tistory.com

패킷 해석 클래스인 PacketParser는 기존에도 handleFrontDoor, handleCommunalDoor로 메서드가 분리되어 있었는데, 디바이스 상태 업데이트 시 넘겨주는 딕셔너리의 키가 분리되어 있지를 않았다

딕셔너리 키를 'lock_front', 'ringing_front', 'lock_communal', 'ringing_communal'과 같이 분리하도록 수정

class Home:
    def updateDeviceState(self, result: dict):
        dev_type: DeviceType = result.get('device')
        dev_idx: int = result.get('index')
        room_idx: int = result.get('room_index')
        device = self.findDevice(dev_type, dev_idx, room_idx)
        if dev_type is DeviceType.SUBPHONE:
            device.updateState(
                0, 
                ringing_front=result.get('ringing_front'),
                ringing_communal=result.get('ringing_communal'),
                streaming=result.get('streaming'),
                doorlock=result.get('doorlock'),
                lock_front=result.get('lock_front'),
                lock_communal=result.get('lock_communal')
            )

Home 클래스의 디바이스 상태 업데이트 메서드인 updateDeviceState 내에서도 서브폰 관련 업데이트 시 호출 인자로 lock_front와 lock_communal을 추가했다 (기존 인자 doorlock은 올드 버전 사용 유저의 호환성을 위해 남겨둠)

3. MQTT 토픽 분리

class SubPhone(Device):
    def publishMQTT(self):
        obj = {
            "streaming_state": self.state_streaming,
            "lock_front_state": self.state_lock_front.name,
            "lock_communal_state": self.state_lock_communal.name
        }
        self.mqtt_client.publish(self.mqtt_publish_topic, json.dumps(obj), 1)
        
        if self.state_ringing_front != self.state_ringing_front_prev:
            obj = {"state": self.state_ringing_front}
            self.mqtt_client.publish(self.mqtt_publish_topic + '/doorbell/front', json.dumps(obj), 1)

        if self.state_ringring_communal != self.state_ringring_communal_prev:
            obj = {"state": self.state_ringring_communal}
            self.mqtt_client.publish(self.mqtt_publish_topic + '/doorbell/communal', json.dumps(obj), 1)

 

현재 상태를 업데이트하기 위해 발행(publish)하는 MQTT 토픽은 총 3종류로 구분했다

  • home/state/subphone/0/0: 영상 송출중 여부, 세대현관문 잠김 여부, 공동현관문 잠김 여부
  • home/state/subphone/0/0/doorbell/front: 세대현관문 초인종 호출중 여부
  • home/state/subphone/0/0/doorbell/communal: 공동현관문 초인종 호출중 여부

이리저리 테스트하다보니 모두 하나의 토픽으로 묶는 것보다 초인종 호출중 여부 상태는 별도의 토픽으로 관리하는게 더 합리적이라 생각해 굳이 나눠서 구현했다 (사실 왜 이렇게 너저분하게 짰는지 기억이 잘 나질 않는다 -_-;;)

class Home:
    def onMqttCommandSubPhone(self, topic: str, message: dict):
        splt = topic.split('/')
        room_idx = int(splt[-2])
        dev_idx = int(splt[-1])
        device = self.findDevice(DeviceType.SUBPHONE, dev_idx, room_idx)
        if 'streaming_state' in message.keys():
            self.command(
                device=device,
                category='streaming',
                target=message['streaming_state']
            )
        if 'lock_front_state' in message.keys():
            self.command(
                device=device,
                category='lock_front',
                target=message['lock_front_state']
            )
        if 'lock_communal_state' in message.keys():
            self.command(
                device=device,
                category='lock_communal',
                target=message['lock_communal_state']
            )

 

구독(subscribe)하는 MQTT 토픽은 기존과 동일하게 home/command/subphone/0/0 으로 한개로 고정하고, json 포맷 페이로드의 키를 구분해 명령을 기능별로 별도로 수행하게 구현했다 (기존의 doorlock을 lock_front_state와 lock_communal_state로 구분해 두 개의 문을 별도로 열게 수정)

※ 공동현관문은 어차피 호출중 상태가 아닌 경우에는 집안에서 패킷으로 열 수는 없다

class ThreadCommandQueue(threading.Thread):
    def run(self):
        while self._keepAlive:
            if not self._queue.empty():
                elem = self._queue.get()
                dev = elem.get('device')
                category = elem.get('category')
                target = elem.get('target')
                parser = elem.get('parser')
                if isinstance(dev, SubPhone):
                    if category == 'streaming':
                        self.set_subphone_streaming_state(dev, target, parser)
                    elif category == 'lock_front':
                        self.set_subphone_lock_front_state(dev, target, parser)
                    elif category == 'lock_communal':
                        self.set_subphone_lock_communal_state(dev, target, parser)
    
    def set_subphone_lock_front_state(self, dev: SubPhone, target: str, parser: PacketParser):
        if target == "Unsecured":
            dev.updateState(0, lock_front=0)  # 0: Unsecured
            packet_open = dev.makePacketOpenFrontDoor()
            if not dev.state_streaming:
                parser.sendPacket(dev.makePacketSetVideoStreamingState(1))
                time.sleep(0.2)
            parser.sendPacket(packet_open)
            time.sleep(0.2)
            parser.sendPacket(dev.makePacketSetVideoStreamingState(0))
        elif target == "Secured":
            dev.updateState(0, lock_front=1)  # 1: Secured
    
    def set_subphone_lock_communal_state(self, dev: SubPhone, target: str, parser: PacketParser):
        if target == "Unsecured":
            dev.updateState(0, lock_communal=0)  # 0: Unsecured
            packet_open = dev.makePacketOpenCommunalDoor()
            prev_state_ringing = dev.state_ringring_communal
            if not dev.state_streaming:
                parser.sendPacket(dev.makePacketSetVideoStreamingState(1))
                time.sleep(0.2)
            parser.sendPacket(packet_open)
            time.sleep(0.2)
            parser.sendPacket(dev.makePacketSetVideoStreamingState(0))
            if not prev_state_ringing:
                dev.updateState(0, lock_communal=1)  # 1: Secured
        elif target == "Secured":
            dev.updateState(0, lock_communal=1)  # 1: Secured

디바이스 제어 핸들링 스레드는 별다른 것 없이 세대현관문, 공동현관문 열기 관련 메서드 2개로 분리한 것이 전부이다

※ 세대현관문은 호출중 상태가 아니라도 주방 서브폰의 영상 송출 상태로 만든 뒤 열기가 가능한 것이 특징

4. HA MQTT Discovery 엔티티 추가

class SubPhone(Device):
    def configMQTT(self):
        # 세대현관문 도어락 엔티티 추가
        topic = f'{self.ha_discovery_prefix}/lock/{self.unique_id}_front/config'
        obj = {
            "name": self.name + " Lock (Front)",
            "object_id": self.unique_id + "_lock_front",
            "unique_id": self.unique_id + "_lock_front",
            "state_topic": self.mqtt_publish_topic,
            "command_topic": self.mqtt_subscribe_topic,
            "value_template": '{{ value_json.lock_front_state }}',
            "payload_lock": '{ "lock_front_state": "Secured" }',
            "payload_unlock": '{ "lock_front_state": "Unsecured" }',
            "state_locked": "Secured",
            "state_unlocked": "Unsecured",
            "state_jammed": "Jammed",
            "icon": "mdi:door-closed-lock",
        }
        self.mqtt_client.publish(topic, json.dumps(obj), 1, True)

        # 세대현관문 바이너리 센서(호출 중) 엔티티 추가
        topic = f'{self.ha_discovery_prefix}/binary_sensor/{self.unique_id}_front/config'
        obj = {
            "name": self.name + " Ringing (front)",
            "object_id": self.unique_id + "_ringing_front",
            "unique_id": self.unique_id + "_ringing_front",
            "state_topic": self.mqtt_publish_topic + '/doorbell/front',
            "value_template": '{ "state": {{ value_json.state }} }',
            "payload_on": '{ "state": 1 }',
            "payload_off": '{ "state": 0 }',
            "device_class": "sound",
        }
        self.mqtt_client.publish(topic, json.dumps(obj), 1, True)

        # 공동현관문 도어락 엔티티 추가
        topic = f'{self.ha_discovery_prefix}/lock/{self.unique_id}_communal/config'
        obj = {
            "name": self.name + " Lock (Communal)",
            "object_id": self.unique_id + "_lock_communal",
            "unique_id": self.unique_id + "_lock_communal",
            "state_topic": self.mqtt_publish_topic,
            "command_topic": self.mqtt_subscribe_topic,
            "value_template": '{{ value_json.lock_communal_state }}',
            "payload_lock": '{ "lock_communal_state": "Secured" }',
            "payload_unlock": '{ "lock_communal_state": "Unsecured" }',
            "state_locked": "Secured",
            "state_unlocked": "Unsecured",
            "state_jammed": "Jammed",
            "icon": "mdi:door-closed-lock",
        }
        self.mqtt_client.publish(topic, json.dumps(obj), 1, True)

        # 공동현관문 바이너리 센서(호출 중) 엔티티 추가
        topic = f'{self.ha_discovery_prefix}/binary_sensor/{self.unique_id}_communal/config'
        obj = {
            "name": self.name + " Ringing (Communal)",
            "object_id": self.unique_id + "_ringing_communal",
            "unique_id": self.unique_id + "_ringing_communal",
            "state_topic": self.mqtt_publish_topic + '/doorbell/communal',
            "value_template": '{ "state": {{ value_json.state }} }',
            "payload_on": '{ "state": 1 }',
            "payload_off": '{ "state": 0 }',
            "device_class": "sound",
        }
        self.mqtt_client.publish(topic, json.dumps(obj), 1, True)

이제는 당연하게 python 앱 실행 시 자동으로 HA의 MQTT Discovery 기능을 위해 config 토픽은 발행해 HA에 구성요소가 자동으로 추가되게 해줬다

(세대현관문/공동현관문 문열림 기능은 lock으로, 초인종 호출중 상태는 binary_sensor로 간단하게 만들어봤다)

앱을 실행하면 자동으로 4개의 구성요소가 추가된다

  • 구성요소 이름: Kitchen Subphone Lock (Front)
    구성요소 unique id: lock.subphone_0_0_lock_front
    타입: lock
  • 구성요소 이름: Kitchen Subphone Lock (Communal)
    구성요소 unique id: lock.subphone_0_0_lock_communal
    타입: lock
  • 구성요소 이름: Kitchen Subphone Ringing (Front)
    구성요소 unique id: lock.subphone_0_0_ringing_front
    타입: binary_sensor
  • 구성요소 이름: Kitchen Subphone Ringing (Communal)
    구성요소 unique id: lock.subphone_0_0_ringing_communal
    타입: binary_sensor

구성요소 편집기에서 이름을 입맛에 맞게 바꿔주면 된다

대시보드에 4개 엔티티만 모아봤다 ㅋㅋ

5. HA 자동화(automation) 추가

요구사항에서 공동현관문 관련 자동화를 구현하고자 한다는 내용이 있었는데, 추측해보자면 공동현관문이 호출되면 자동으로 공동현관문이 열리게 하고 싶다는게 아닐까 생각한다

 

HA에서는 이와 관련된 자동화를 쉽게 구현할 수 있다

트리거로 '공동현관문 초인종 호출이 시작되었을 경우'를 만들기 위해 binary_sensor가 이전은 'off' 상태에서 이후 'on'이 될 때 자동화가 실행되도록 만들었다 (경과 시간을 설정하면 일정 시간 후 문이 열리게 만들 수 있음)

동작으로 서비스 중 lock.unlock을 호출하게 만들면 끝~!

조건으로 재실 센서 등을 활용해 "내가 집에 있을 때" 같은 컨디션들을 추가해주면 된다

5.1. 테스트

공동현관문에서 세대호출을 누르면 즉시 문이 열린다 ㅋㅋ

음식 배달이나 택배 수령 시 꽤 유용하게 쓰일 것 같긴 하다

※ 나는 항상 애플워치를 차고 다니며, 주방 서브폰의 영상도 받아볼 수 있기 때문에 방문자 얼굴을 보고 Siri 음성명령으로 공동현관문을 열어준다

 

끝~!

반응형
Comments