YOGYUI

힐스테이트 광교산::주방 비디오폰 연동 - HEMS(에너지 모니터링) 본문

홈네트워크(IoT)/힐스테이트 광교산

힐스테이트 광교산::주방 비디오폰 연동 - HEMS(에너지 모니터링)

요겨 2022. 10. 25. 23:55
반응형

지난 9월 초 주방에 설치된 (주)동영엠텍의 DM-D5102QMS 주방용 TV폰의 RS-485 패킷 해석 의뢰를 맡아 진행한 바 있다

힐스테이트 광교산::주방 비디오폰 RS-485 패킷 해석

 

힐스테이트 광교산::주방 비디오폰 RS-485 패킷 해석

지난주 금요일(09/02) 밤에 이메일을 한통 받았다 제목만으로도 가슴을 설레게 하는(?) 그런 메일 ㅋㅋㅋ 너무나도 공손하게 보내셔서 끝까지 정독할 수 밖에 없는 메일이었다 ^^ 요점은 월패드와

yogyui.tistory.com

주방 비디오폰과 거실 메인 월패드(HDHN-2000)와 RS-485 통신이 이루어지며, 통신선을 통해 현관문의 비디오폰 및 도어락 제어와 공동현관문 제어가 가능한 것을 알 수 있었다

개별 현관문 및 공동 현관문 제어는 Composite Video 라인을 통해 실시간 영상도 받아볼 수 있다
또한, 외부에서 호출되는 상태에 대한 정보도 얻어볼 수 있다
Homebridge랑 연동하면 다음과 같이 스마트폰으로 현관 비디오폰 영상을 받아볼 수 있다
이와 관련된 코드는 아직 정리작업 중..
(ffserver와 ffmpeg를 파이썬 코드 내에서 subprocess로 호출하는 구문으로 전환 작업 중)
정리가 끝나는대로 포스팅 예정~

뿐만 아니라, 주방 비디오폰에서는 HEMS라고 해서 전기/수도/가스/온수/난방 사용량에 대한 통계 정보를 디스플레이하는 기능이 있는데, 이 또한 거실 월패드와의 RS-485 통신을 통해 얻어오는 것을 확인할 수 있었다

1. RS-485 하드웨어 연결

주방 비디오폰과 거실 월패드는 RJ-45 커넥터와 UTP 케이블을 통해 연결되어 있으며, UTP 케이블의 Pin Map은 입주 초기 월패드 분해 작업 시 확인한 바 있다

힐스테이트 광교산::홈네트워크 월패드 뜯어보기

 

힐스테이트 광교산::홈네트워크 월패드 뜯어보기

며칠전 힐스테이트 광교산 아파트로 이사왔다 (내집마련!) 3일정도 이사짐 정리를 하는데도 끝이 보이지 않는데다 몸살까지 겹쳐서 잠시 쉴 겸 힐스테이트는 어떤 홈네트워크 시스템을 쓰는지

yogyui.tistory.com

UTP케이블은 586B 규격인 것을 확인

https://honhon20.tistory.com/92

핀맵을 정리해보면 다음과 같다

  • 1번 (+흰): GND
  • 2번 (): SVS > Composite Video 시그널
  • 3번 (+흰): LS2 > 음성 신호로 추정
  • 4번 (): SC2B > RS-485 통신선(B)
  • 5번 (+흰): SC2A > RS-485 통신선(A)
  • 6번 (): LS1 > 음성 신호로 추정
  • 7번 (+흰): GND
  • 8번 (): SUB12 > 현관 도어폰 전원 (+12V, 주방 비디오폰에서는 의미없음)

UTP 1:2 Y형 커플러RS-45 개발용 키트를 활용해서 USB-RS485 컨버터에 4번과 5번 선을 극성에 맞게 연결해준다

※ 나중에 시간 여유되는대로 RS-485, Composite Video, 음성 모두 통합된 임베디드 보드 제작해볼 계획!

2. RS-485 패킷 명세

지난 패킷 해석 의뢰 작업 시, RS-485 통신선의 Start Bit의 시간 폭이 약 260us로 측정되었으며, 역으로 환산하면 약 3840bps가 통신 속도인 것을 확인했다 (databit=8, stopbit=1, parity=none)

※ 따라서, RS-485 컨버터가 해당 baudrate가 지원가능한지 확인해야 한다!

틈나는대로 패킷 캡쳐하면서 HEMS로 디스플레이되는 값들을 비교해본 결과 정리한 패킷 명세는 다음과 같다

  • Packet Prefix: 0x7F
  • Packet Suffix: 0xEE

2.1. 쿼리 패킷 (주방 비디오폰 월패드)

패킷 포맷: 7F E0 XY 00 EE

  • 두번째 바이트 상위 4비트 = 0xE, 하위 4비트 = 0x0
  • 세번째 바이트의 상위 4비트값(X) = 카테고리
    1 = 전기
    2 = 수도
    3 = 가스
    4 = 온수
    5 = 난방
  • 세번째 바이트의 하위 4비트값(Y) = 쿼리 타입
    1 = 3달간 우리집 사용량 이력
    2 = 3달간 동일평수 평균 사용량 이력
    3 = 3달간 요금 이력
    4 = 3달간 CO2 배출량 이력
    5 = 목표량
    7 = 실시간 사용량

[예시]

현재 실시간 전기 사용량 이력 쿼리: 7F E0 17 00 EE

3달간 우리집 온수 사용량 이력 쿼리: 7F E0 41 00 EE

3달간 동일평수 평균 난방 사용량 이력 쿼리: 7F E0 52 00 EE


2.2. 응답 패킷 (주방 비디오폰 ← 월패드)

패킷 포맷: 7F E1 XY ZZ [...] PP EE

  • 두번째 바이트 상위 4비트 = 0xE, 하위 4비트 = 0x1
  • 세번째 바이트의 상위 4비트값(X) = 카테고리, 하위 4비트값(Y) = 쿼리 타입
    (2.1. 쿼리 패킷에 명시된 값과 동일)
  • 4번째 바이트 = 데이터 길이 N (03 혹은 09)
    - 쿼리 타입이 1~4일 경우 09
    - 쿼리 타입이 5 혹은 7일 경우 03
  • 5번째 바이트 ~ (5 + N) 바이트 = 데이터 
    - 값은 3바이트 단위로 끊어서 해석하면 된다 (Big-endian, 정수형)
  • (5 + N + 1)번째 바이트 = 체크섬 (XOR Sum)

[예시]

7F E1 17 03 00 03 46 CF EE

현재 실시간 전기 사용량: 0x000346 = 838 (W)

7F E1 11 09 00 01 2D 00 01 92 00 02 0D 36 EE

이번달 우리집 전기 사용량: 0x00012D = 301 (kWh)

전월 우리집 전기 사용량: 0x000192 = 402 (kWh)

전전월 우리집 전기 사용량: 0x00020D = 525 (kWh)

7F E1 22 09 00 00 06 00 00 09 00 00 09 B3 EE

이번달 동일평수 평균 수도 사용량: 0x000006 = 6 (㎥)

전월 동일평수 평균 수도 사용량: 0x000009 = 9 (㎥)

전전월 동일평수 평균 수도 사용량: 0x000009 = 9 (㎥)


주방 비디오폰의 HEMS 모니터링 기능이 활성화되어 있지 않은 상태에서, USB-RS485 컨버터로 쿼리 패킷을 보내도 정상적으로 응답 패킷이 월패드로부터 수신되므로, 비디오폰 전원이 꺼진 상태에서도 HEMS 값을 주기적으로 받아올 수 있다!

 

안타깝게도, 거실 월패드에 표시되는 값은 실수형 값인데 반해 주방 비디오폰으로 전송하는 값은 소수점은 truncation한 정수형 값들이라 정확도가 떨어지는 단점이 있다

(거실 월패드에 디스플레이되는 값은 외부에 설치된 계측기 혹은 단지 서버로부터 가져올텐데, 이 값들이 실제로 관리비로 부과되는 사용량과 얼마나 일치하는지는 또다른 문제이니, 정확도 문제는 무시하도록 하자 ㅎㅎ...)

3. 코드 개발

패킷 구조가 간단하므로 파이썬으로 쉽게 구현할 수 있다

우선 3번째 패킷의 상위 4비트, 하위 4비트 각각에 대한 열거형 클래스를 정의해주자

from enum import IntEnum

class HEMSDevType(IntEnum):
    Electricity = 1  # 전기
    Water = 2  # 수도
    Gas = 3  # 가스
    HotWater = 4  # 온수
    Heating = 5  # 난방

class HEMSCategory(IntEnum):
    History = 1  # 우리집 사용량 이력 (3달간, 단위: kWh/L/MWh)
    OtherAverage = 2  # 동일평수 평균 사용량 이력 (3달간, 단위: kWh/L/MWh)
    Fee = 3  # 요금 이력 (3달간, 단위: 천원)
    CO2 = 4  # CO2 배출량 이력 (3달간, 단위: kg)
    Target = 5  # 목표량
    Current = 7  # 현재 실시간 사용량

그리고 주방 비디오폰 클래스를 만든 뒤, 쿼리 패킷을 만드는 메서드를 만들어주자

XOR Checksum 계산도 필요없기 때문에 코드 2줄이면 끝~

class SubPhone(Device):
    def makePacketQueryHEMS(self, devtype: HEMSDevType, category: HEMSCategory) -> bytearray:
        command = ((devtype.value & 0x0F) << 4) | (category.value & 0x0F)
        return bytearray([0x7F, 0xE0, command, 0x00, 0xEE])

그리고 RS-485 패킷 파서 클래스도 만들어주자

class ParserSubPhone(PacketParser):
    def handlePacket(self):
        # 3840 baudrate
        # packet format: 7F XX XX XX EE
        idx = self.buffer.find(0x7F)
        if idx > 0:
            self.buffer = self.buffer[idx:]
        if len(self.buffer) >= 2:
            idx2 = self.buffer.find(0xEE)
            if idx2 > 0:
                self.line_busy = False
                packet = self.buffer[:idx2 + 1]
                self.interpretPacket(packet)
                self.buffer = self.buffer[idx2 + 1:]
    
    def interpretPacket(self, packet: bytearray):
        if (packet[1] & 0xF0) == 0xE0:
            self.handleHEMS(packet)
    
    def handleHEMS(self, packet: bytearray):
        packet_type = packet[1] & 0x0F
        if packet_type == 0x00:
            # 쿼리 패킷 (서브폰 -> 월패드)
            pass
        elif packet_type == 0x01:
            result = {'device': 'hems', 'packet': packet}
            notify: bool = True
            # 응답 패킷 (월패드 -> 서브폰)
            devtype = HEMSDevType((packet[2] & 0xF0) >> 4)
            category = HEMSCategory(packet[2] & 0x0F)
            if category.value in [1, 2, 3, 4]:
                v1 = int.from_bytes(packet[4:7], byteorder='big', signed=False)
                v2 = int.from_bytes(packet[7:10], byteorder='big', signed=False)
                v3 = int.from_bytes(packet[10:13], byteorder='big', signed=False)
                result[f'{devtype.name.lower()}_{category.name.lower()}_cur_month'] = v1
                result[f'{devtype.name.lower()}_{category.name.lower()}_1m_ago'] = v2
                result[f'{devtype.name.lower()}_{category.name.lower()}_2m_ago'] = v3
            elif category.value in [5, 7]:
                v = int.from_bytes(packet[4:7], byteorder='big', signed=False)
                result[f'{devtype.name.lower()}_{category.name.lower()}'] = v
            if notify:
                self.sig_parse_result.emit(result)

그리고 웹서버로 HEMS 값을 확인할 수 있는 디버깅 툴도 만들어봤다

(웹페이지 및 실시간 HEMS 쿼리 쓰레드 소스코드는 굳이 첨부하지 않는다~ 깃허브 저장소 참고!)

실제 HEMS에 디스플레이되는 값과 동일한지 비교해보자

(월패드에 디스플레이되는 값에서 소수점 혹은 백원단위는 버려지는 것을 알 수 있다)

 

여담: 확실히 동일평수 평균 사용량에 비해 우리집 전기 사용량이 30~40% 가량 많다

(아무래도 일반 가정집에 비하면 NAS도 돌리고, SBC도 여러대 돌리고 있고, 홈IoT 허브도 많고, 집안 곳곳에서 모션 센서 + 조명 스트립 연동도 되고 있고... 변명거리는 차고 넘친다 ㅠ 지구야 미안해...)

 

전체 코드는 깃허브 메인 브랜치에 커밋 완료!

https://github.com/YOGYUI/HomeNetwork/tree/main/Hillstate-Gwanggyosan

 

GitHub - YOGYUI/HomeNetwork: HomeNetwork(Homebridge) Repo

HomeNetwork(Homebridge) Repo. Contribute to YOGYUI/HomeNetwork development by creating an account on GitHub.

github.com

4. Home Assistant 연동

Home 클래스 내부에서 HEMS 값들을 MQTT 메시지로 발행하는 구문을 추가했다

- MQTT로 발행하는 값은 실시간 전력 사용량(단위 W)만 하는걸로~

(HEMS를 별도의 클래스로 구현하려니 영 귀찮아서...ㅋㅋ;)

class Home:
    topic_hems_publish: str = ''
    
    def __init__(self, name: str = 'Home', init_service: bool = True):
        self.hems_info = dict()
        # 주방 비디오폰(서브폰) 포트
        self.rs485_subphone_config = RS485Config()
        self.rs485_subphone = RS485Comm('RS485-SubPhone')
        self.rs485_subphone.sig_connected.connect(self.onRS485SubPhoneConnected)
        self.rs485_list.append(self.rs485_subphone)
        self.parser_subphone = ParserSubPhone(self.rs485_subphone)
        self.parser_subphone.sig_parse_result.connect(lambda x: self.queue_parse_result.put(x))
        self.parser_list.append(self.parser_subphone)
    
    def initialize(self, init_service: bool, connect_rs485: bool):
        self.subphone = SubPhone(name="SubPhone", mqtt_client=self.mqtt_client)
    
    def initRS485Connection(self):
        rs485_list = [
            ('light', self.rs485_light_config, self.rs485_light),
            ('various', self.rs485_various_config, self.rs485_various),
            ('subphone', self.rs485_subphone_config, self.rs485_subphone)
        ]

        for elem in rs485_list:
            name, cfg, rs485 = elem
            try:
                if cfg.enable:
                    rs485.setType(cfg.comm_type)
                    if cfg.comm_type == RS485HwType.Serial:
                        port, baud = cfg.serial_port, cfg.serial_baud
                        rs485.connect(port, baud)
                    elif cfg.comm_type == RS485HwType.Socket:
                        ipaddr, port = cfg.socket_ipaddr, cfg.socket_port
                        rs485.connect(ipaddr, port)
                else:
                    writeLog(f"rs485 '{name}' is disabled", self)
            except Exception as e:
                writeLog(f"Failed to initialize '{name}' rs485 connection ({e})", self)
                continue
        
        if self.thread_timer is not None:
            self.thread_timer.set_home_initialized()
    
    def loadConfig(self, filepath: str):
        if not os.path.isfile(filepath):
            return
        root = ET.parse(filepath).getroot()

        node = root.find('hems')
        try:
            mqtt_node = node.find('mqtt')
            self.topic_hems_publish = mqtt_node.find('publish').text
        except Exception as e:
            writeLog(f"Failed to load HEMS config ({e})", self)  
    
    def handlePacketParseResult(self, result: dict):
        dev_type = result.get('device')
        if dev_type == 'hems':
            result.pop('device')
            self.hems_info['last_recv_time'] = datetime.datetime.now()
            for key in list(result.keys()):
                self.hems_info[key] = result.get(key)
                if key in ['electricity_current']:
                    topic = self.topic_hems_publish + f'/{key}'
                    value = result.get(key)
                    if value == 0:
                        writeLog(f"zero power consumption? >> {prettifyPacket(result.get('packet'))}", self)
                    self.mqtt_client.publish(topic, json.dumps({"value": value}), 1)

HEMS 발행 토픽은 "home/hillstate/hems/state"로 정했다

이제 HomeAssistant의 설정 파일(configuration.yaml)에서 sensor 액세서리와 integration 액세서리를 추가해주자

sensor:  
  - platform: mqtt
    name: "전력 사용량"
    unique_id: "hems_electricity_current"
    state_topic: "home/hillstate/hems/state/electricity_current"
    unit_of_measurement: W
    value_template: "{{ value_json.value }}"
    device_class: power
    state_class: measurement
  
  - platform: integration
    source: sensor.jeonryeog_sayongryang  
    name: "전력 총 사용량"
    unit_time: h
    unit_prefix: k
    round: 3

설정파일 수정 후 HA를 재시작한 뒤 '기록 그래프' 항목을 보면 실시간 전력 사용량에 대한 그래프를 확인할 수 있다

냉장고 + 냉동고 + 공기청정기 + 가습기 + NAS + 얼음정수기 등 몇몇 디바이스들(??)만 동작하는 (집을 비운 혹은 새벽) 환경에서도 400W 수준의 전력이 소모되는 추세를 보이고 있다.. 세대 전기료가 괜히 많이 나오는게 아니구만? ㅠ_ㅠ

로그 디스플레이 기간을 길게 뽑아보면 하루별 전력 사용량 추세를 확인할 수 있다

(확실히 세탁기/건조기/식기세척기를 구동하면 전력소비가 급격히 치솟는다.. 세탁기/건조기 동시에 돌리면 3000W는 우습게 넘겨버리네;)

물론 데스크탑 PC로 게임이라도 돌리면 1000W 가까운 추가 전력소모가 발생한다

integration 액세서리를 통해 초당 전력소모량(W)를 누적하여 하루 단위로 KWh로 환산해 '에너지'탭에 디스플레이도 할 수 있다 (일/주/월/년별 누적 사용량을 확인할 수 있다)

관리비가 부담스럽게 느껴진다면, 전력 모니터링을 통해 어떤 디바이스가 동작할 때 소모전력이 많은지 체크한 뒤 사용방법 혹은 기기교체 등의 대책을 마련하는게 가능해보인다~

(각 방별 콘센트에서의 소모 전력이 모니터링되지 않는다는 점은 많이 아쉽다 ㅠ)

 

끝~!

반응형