일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 퀄컴
- Home Assistant
- matter
- esp32
- RS-485
- 파이썬
- 티스토리챌린지
- Bestin
- 미국주식
- 배당
- 홈네트워크
- ConnectedHomeIP
- 공모주
- raspberry pi
- 힐스테이트 광교산
- 해외주식
- Apple
- 코스피
- 국내주식
- Python
- 오블완
- 매터
- Espressif
- 나스닥
- MQTT
- homebridge
- 현대통신
- 애플
- 월패드
- 엔비디아
- Today
- Total
YOGYUI
힐스테이트 광교산::환기(전열교환기) - 애플 홈킷 + 구글 어시스턴트 연동 본문
지난 포스트에서 환기(전열교환기) 관련 RS-485 패킷 후킹 및 분석을 완료했다 (링크)
이제껏 구현한 조명/아울렛/가스밸브/난방과 크게 패킷 구조가 다르지 않으니 최대한 코드 구조를 재활용하면서 홈네트워크 플랫폼과 연동하도록 한다
1. python 코드 작성
깃헙 저장소 hillstate-ventilator 브랜치로 소스코드 커밋 완료
https://github.com/YOGYUI/HomeNetwork/tree/hillstate-ventilator
홈브릿지나 홈어시스턴트의 환기 관련 액세서리는 일반 선풍기(fan) 액세서리를 활용했는데, 선풍기의 풍량은 0 ~ 100으로 퍼센트(%) 단위를 사용하는 반면 전열교환기는 약, 중, 강 세단계로 풍량이 정해져있기 때문에 mqtt 메시지를 주고받을 때 이를 적절히 환산해주는 알고리즘이 필요하다
- 알고리즘이라 해서 거창한 건 아니고.. 그냥 0~30%까지는 약풍, 31 ~ 60%까지는 중풍, 61 ~ 100%까지는 강풍으로 범위 기반 인코딩 방식을 사용했다 ㅎㅎ
시리얼 패킷 파서는 가스밸브, 난방과 유사하게 짤 수 있다
class ParserVarious(SerialParser):
def interpretPacket(self, packet: bytearray):
try:
if packet[2:4] == bytearray([0x01, 0x1B]): # 가스차단기
# 가스밸브 관련 코드 (생략)
elif packet[2:4] == bytearray([0x01, 0x18]): # 난방
# 난방 관련 코드 (생략)
elif packet[2:4] == bytearray([0x01, 0x2B]): # 환기 (전열교환기)
if packet[4] == 0x01:
pass
elif packet[4] == 0x02:
pass
elif packet[4] == 0x04:
state = 0 if packet[8] == 0x02 else 1
rotation_speed = packet[9] # 0x01=약, 0x03=중, 0x07=강
result = {
'device': 'ventilator',
'state': state
}
if rotation_speed != 0:
result['rotation_speed'] = rotation_speed
self.sig_parse_result.emit(result)
except Exception as e:
writeLog('interpretPacket::Exception::{} ({})'.format(e, packet), self)
전열교환기의 작동이 중지되었을 때는 풍량이 0값으로 들어오는데 이를 굳이 디바이스 객체에 적용할 필요는 없으므로 조건문을 달아둔 게 다른 디바이스들과의 차별점
전열교환기 관련 디바이스 클래스 이름은 Ventilator로 정했고, 다른 기기들과 마찬가지로 Device 객체를 상속했다
class Ventilator(Device):
rotation_speed: int = 0
rotation_speed_prev: int = 0
def __init__(self, name: str = 'Ventilator', **kwargs):
super().__init__(name, **kwargs)
def __repr__(self):
repr_txt = f'<{self.name}({self.__class__.__name__} at {hex(id(self))})'
repr_txt += '>'
return repr_txt
def publish_mqtt(self):
obj = {"state": self.state}
if self.state:
if self.rotation_speed == 0x01:
obj['rotationspeed'] = 30
elif self.rotation_speed == 0x03:
obj['rotationspeed'] = 60
elif self.rotation_speed == 0x07:
obj['rotationspeed'] = 100
if self.mqtt_client is not None:
self.mqtt_client.publish(self.mqtt_publish_topic, json.dumps(obj), 1)
def setState(self, state: int, **kwargs):
self.state = state
if not self.init:
self.publish_mqtt()
self.init = True
if self.state != self.state_prev:
self.publish_mqtt()
self.state_prev = self.state
# 풍량 인자
rotation_speed = kwargs.get('rotation_speed')
if rotation_speed is not None:
self.rotation_speed = rotation_speed
if self.rotation_speed != self.rotation_speed_prev:
self.publish_mqtt()
self.rotation_speed_prev = self.rotation_speed
def makePacketQueryState(self) -> bytearray:
# F7 0B 01 2B 01 40 11 00 00 XX EE
# XX: Checksum (XOR SUM)
packet = bytearray([0xF7, 0x0B, 0x01, 0x2B, 0x01, 0x40])
packet.extend([0x11, 0x00, 0x00])
packet.append(self.calcXORChecksum(packet))
packet.append(0xEE)
return packet
def makePacketSetState(self, state: bool):
# F7 0B 01 2B 02 40 11 XX 00 YY EE
# XX: 0x01=On, 0x02=Off
# YY: Checksum (XOR SUM)
packet = bytearray([0xF7, 0x0B, 0x01, 0x2B, 0x02, 0x40])
# packet.append(0x10 + (self.room_index & 0x0F))
packet.append(0x11) # 환기는 거실(공간인덱스 1)에 설치된걸로 설정되어 있다
if state:
packet.extend([0x01, 0x00])
else:
packet.extend([0x02, 0x00])
packet.append(self.calcXORChecksum(packet))
packet.append(0xEE)
return packet
def makePacketSetRotationSpeed(self, rotation_speed: int):
# F7 0B 01 2B 02 42 11 XX 00 YY EE
# XX: 풍량 (0x01=약, 0x03=중, 0x07=강)
# YY: Checksum (XOR SUM)
packet = bytearray([0xF7, 0x0B, 0x01, 0x2B, 0x02, 0x42])
# packet.append(0x10 + (self.room_index & 0x0F))
packet.append(0x11) # 환기는 거실(공간인덱스 1)에 설치된걸로 설정되어 있다
packet.append(rotation_speed)
packet.append(0x00)
packet.append(self.calcXORChecksum(packet))
packet.append(0xEE)
return packet
MQTT 메시지를 발행하는 publish_mqtt, 풍량 설정 명령 패킷을 만들어는 메서드 makePacketSetRotationSpeed 두 메서드를 보면 풍량 관련 인코딩이 어떤 식으로 되는지 알 수 있다
(홈네트워크 플랫폼에서 실제 표시되는 풍량은 30%, 60%, 100% 세단계로 고정된다)
MQTT 메시지 핸들러 및 명령 큐 처리 쓰레드도 다른 기기들과 유사하게 짤 수 있다
- 홈네트워크 플랫폼에서 fan 가동을 시작할 경우 state=1, rotationspeed=100이 default로 메시지가 두 번 넘어오는데, 최초 기동 시 강풍으로 하고 싶지는 않았기에 기존 상태가 켜져 있을 경우에만 풍량 조절이 이루어지도록 구현했다
(힐스테이트 월패드는 가동 시작 시 약풍으로 가동이 시작된다)
class Home:
def onMqttClientMessage(self, _, userdata, message):
"""
Homebridge Publish, App Subscribe
사용자에 의한 명령 MQTT 토픽 핸들링
"""
if self.enable_mqtt_console_log:
writeLog('Mqtt Client Message: {}, {}'.format(userdata, message), self)
topic = message.topic
msg_dict = json.loads(message.payload.decode("utf-8"))
if 'light/command' in topic:
# 조명 관련 처리 구문 (생략)
if 'outlet/command' in topic:
# 아울렛(콘센트) 관련 처리 구문 (생략)
if 'gasvalve/command' in topic:
# 가스밸브 관련 처리 구문 (생략)
if 'thermostat/command' in topic:
# 난방 관련 처리 구문 (생략)
if 'ventilator/command' in topic:
if 'state' in msg_dict.keys():
self.command(
device=self.ventilator,
category='state',
target=msg_dict['state']
)
if 'rotationspeed' in msg_dict.keys():
if self.ventilator.state == 1:
# 전원이 켜져있을 경우에만 풍량설정 가능하도록..
# 최초 전원 ON시 풍량 '약'으로 설정!
self.command(
device=self.ventilator,
category='rotationspeed',
target=msg_dict['rotationspeed']
)
class ThreadCommandQueue(threading.Thread):
def run(self):
writeLog('Started', self)
while self._keepAlive:
if not self._queue.empty():
elem = self._queue.get()
elem_txt = '\n'
for k, v in elem.items():
elem_txt += f' {k}: {v}\n'
writeLog(f'Get Command Queue: \n{{{elem_txt}}}', self)
try:
dev = elem['device']
category = elem['category']
target = elem['target']
parser = elem['parser']
if target is None:
continue
if isinstance(dev, Light):
# 조명 관련 (생략)
elif isinstance(dev, Outlet):
# 아울렛(콘센트) 관련 (생략)
elif isinstance(dev, GasValve):
# 가스밸브 관련 (생략)
elif isinstance(dev, Thermostat):
# 난방 관련 (생략)
elif isinstance(dev, Ventilator):
if category == 'state':
self.set_state_common(dev, target, parser)
elif category == 'rotationspeed':
self.set_rotation_speed(dev, target, parser)
except Exception as e:
writeLog(str(e), self)
else:
time.sleep(1e-3)
writeLog('Terminated', self)
self.sig_terminated.emit()
def set_rotation_speed(self, dev: Ventilator, target: int, parser: SerialParser):
# Speed 값 변환 (100단계의 풍량을 세단계로 나누어 1, 3, 7 중 하나로)
if target <= 30:
conv = 0x01
elif target <= 60:
conv = 0x03
else:
conv = 0x07
cnt = 0
packet_command = dev.makePacketSetRotationSpeed(conv)
packet_query = dev.makePacketQueryState()
for _ in range(self._retry_cnt):
if dev.rotation_speed == conv:
break
parser.sendPacket(packet_command)
cnt += 1
time.sleep(0.2)
if dev.rotation_speed == conv:
break
parser.sendPacket(packet_query)
time.sleep(0.2)
if cnt > 0:
writeLog('set_rotation_speed::send # = {}'.format(cnt), self)
time.sleep(self._delay_response)
dev.publish_mqtt()
2. Homebridge, Home Assistant 액세서리 추가
homebridge 액세서리는 다음과 같은 템플릿으로 추가할 수 있다 (fan)
{
"accessory": "mqttthing",
"type": "fan",
"name": "Ventilator (MQTT)",
"url": "mosquitto broker url",
"username": "mosquitto auth id",
"password": "mosquitto auth password",
"caption": "Ventilator(MQTT)",
"topics": {
"getOn": {
"topic": "home/hillstate/ventilator/state",
"apply": "return JSON.parse(message).state;"
},
"setOn": {
"topic": "home/hillstate/ventilator/command",
"apply": "return JSON.stringify({state: message});"
},
"getRotationSpeed": {
"topic": "home/hillstate/ventilator/state",
"apply": "return JSON.parse(message).rotationspeed;"
},
"setRotationSpeed": {
"topic": "home/hillstate/ventilator/command",
"apply": "return JSON.stringify({rotationspeed: message});"
}
},
"integerValue": true,
"logMqtt": true
}
HA 액세서리도 동일한 방식으로 MQTT 방식으로 구동하게 만들면 된다
fan:
- platform: mqtt
name: "전열교환기"
unique_id: "ventilator"
state_topic: "home/hillstate/ventilator/state"
state_value_template: "{% if value_json.state %} ON {% else %} OFF {% endif %}"
command_topic: "home/hillstate/ventilator/command"
command_template: >-
{% set values = {'OFF': 0, 'ON': 1} %}
{ "state": {{ values[value] if value in values.keys() else 0 }} }
percentage_state_topic: "home/hillstate/ventilator/state"
percentage_value_template: "{{ value_json.rotationspeed }}"
percentage_command_topic: "home/hillstate/ventilator/command"
percentage_command_template: '{ "rotationspeed": {{ value }} }'
speed_range_min: 1
speed_range_max: 100
3. 작동 테스트
가스밸브, 난방과 마찬가지로 환기 역시 홈킷에서 상태 변경 후 월패드에 UI 업데이트될 때까지 레이턴시가 있다 (원인: 쿼리 주기)
반면 월패드에서 상태 변경시 아이폰에는 변경 내용이 거의 실시간으로 적용된다~
일반 공기청정기는 집안의 미세먼지나 부유물질만 걸러줄 뿐 집안에 쌓이는 이산화탄소 등 걸러줘야 하는 기체 성분은 없애주지 못하기 때문에 창문을 열어서 환기를 주기적으로 해줘야되는데, 전열교환기는 기상상황이 안좋을 때 (비가 오거나 미세먼지 농도가 심할 때 등) 집안을 환기하는데 약간의 도움이 되는 장치라 활용도가 꽤 높다
홈네트워크의 자동화(automation) 기능을 활용해서 매일 일정 시간마다 전열교환기를 작동하게 해주면 맑은 실내공기를 기대할 수 있다
나는 매일 오전 9시에 작동 (풍량=약)시키고, 오후 9시에 끄는 자동화로 구성해서 사용 중 (자동화는 간단하게 애플 홈앱으로 구성)
[TODO]
조명 On/Off아울렛(전원 콘센트) On/Off - 실시간 전력량 조회는 불가능도시가스 차단난방 제어 (On/Off, 희망 온도 설정, 현재 방 온도 가져오기)환기 (전열교환기)- 시스템 에어컨 (냉방 및 공기청정)
- 엘리베이터 호출
- 도어락 해제
- Optional: 현관 비디오폰 영상, 거실 천장 모션 센서
'홈네트워크(IoT) > 힐스테이트 광교산' 카테고리의 다른 글
힐스테이트 광교산::시스템에어컨 - 애플 홈킷 + 구글 어시스턴트 연동 (0) | 2022.06.20 |
---|---|
힐스테이트 광교산::시스템에어컨 제어 RS-485 패킷 분석 (0) | 2022.06.19 |
힐스테이트 광교산::환기(전열교환기) 제어 RS-485 패킷 분석 (0) | 2022.06.17 |
힐스테이트 광교산::난방 - 애플 홈킷 + 구글 어시스턴트 연동 (2) | 2022.06.17 |
힐스테이트 광교산::난방 제어 RS-485 패킷 분석 (0) | 2022.06.16 |