일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Python
- 퀄컴
- 미국주식
- Bestin
- 매터
- 해외주식
- ConnectedHomeIP
- 국내주식
- homebridge
- Home Assistant
- 코스피
- 힐스테이트 광교산
- 월패드
- 홈네트워크
- 현대통신
- 애플
- Espressif
- Apple
- RS-485
- raspberry pi
- 티스토리챌린지
- 나스닥
- MQTT
- 배당
- matter
- 공모주
- SK텔레콤
- esp32
- 오블완
- 파이썬
- Today
- Total
YOGYUI
Homebridge - HTTP 액세서리 MQTT로 Migration 본문
서버 코드 유지보수성 강화를 위해 기존의 HTTP 기반 액세서리들을 MQTT 기반으로 바꾸고자 결심했다
(HTTP 기반 액세서리는 디바이스 개별로 http-nofitication listener 포트를 할당해야 하는데, MQTT 기반에서는 subscribe만 하면 되기 때문에 서버 부하도 줄일 수 있을 것으로 판단)
1. 플러그인 설치
Homebridge UI에서 mqttthing을 검색해서 Homebridge Mqttthing 플러그인을 설치
mqttthing으로 만들 수 있는 액세서리 종류는 다음과 같다
거의 모든 종류의 디바이스를 mqttthing 플러그인 하나로 만들어낼 수 있다
config.json의 액세서리 중 HTTP 기반으로 구현한 액세서리를 mqtt 기반으로 변경해준다
예시)
[HTTP 기반 액세서리 (스위치)]
{
"accessory": "HTTP-SWITCH",
"name": "Kitchen Light1",
"switchType": "stateful",
"onUrl": {
"url": "http://localhost:9999/light",
"method": "POST",
"body": {
"room": 1,
"light": 0,
"command": "on"
},
"headers": {
"Accept": "application/json"
}
},
"offUrl": {
"url": "http://localhost:9999/light",
"method": "POST",
"body": {
"room": 1,
"light": 0,
"command": "off"
},
"headers": {
"Accept": "application/json"
}
},
"statusUrl": {
"url": "http://localhost:9999/light",
"method": "POST",
"body": {
"room": 1,
"light": 0,
"status": 0
},
"headers": {
"Accept": "application/json"
}
},
"notificationID": "switch_room1_light1",
"notificationPassword": "password1",
"auth": {
"username": "yogyui",
"password": "password2"
}
}
(on, off 명령 및 status 쿼리용 url에 대한 정보 및 notification 서버 정보까지 적어야 하다보니 액세서리 한개를 위한 코드가 꽤나 길다)
[MQTT 기반 액세서리 (스위치)]
{
"accessory": "mqttthing",
"type": "switch",
"name": "Kitchen Light1 (MQTT)",
"url": "mqtt:://localhost:1883",
"username": "yogyui",
"password": "12345678",
"topics": {
"getOn": {
"topic": "home/ipark/light/state/1/0",
"apply": "return JSON.parse(message).state;"
},
"setOn": {
"topic": "home/ipark/light/command/1/0",
"apply": "return JSON.stringify({state: message});"
}
},
"integerValue": true,
"onValue": 1,
"offValue": 0,
"history": true,
"logMqtt": true
}
url은 mosquitto mqtt broker의 주소를 적어준다 (mosquitto 관련 링크)
topic은 mqttthing 도큐먼트를 참고해서 구현 (액세서리별로 깔끔하게 정리가 잘 되어있다)
2. 코드 변경
(일부만 발췌, 모듈화로 구현한다고 했는데도 너무 길다)
# HomeDef.py
import threading
from abc import ABCMeta, abstractmethod
from typing import List
import paho.mqtt.client as mqtt
from Serial485.SerialComm import SerialComm
from Serial485.EnergyParser import EnergyParser
class Device:
__metaclass__ = ABCMeta
name: str = 'Device'
room_index: int = 0
init: bool = False
state: int = 0 # mostly, 0 is OFF and 1 is ON
state_prev: int = 0
packet_set_state_on: str = ''
packet_set_state_off: str = ''
packet_get_state: str = ''
mqtt_client: mqtt.Client = None
mqtt_publish_topic: str = ''
mqtt_subscribe_topic: str = ''
def __init__(self, name: str = 'Device', **kwargs):
self.name = name
if 'room_index' in kwargs.keys():
self.room_index = kwargs['room_index']
self.mqtt_client = kwargs.get('mqtt_client')
writeLog('Device Initialized >> Name: {}, Room Index: {}'.format(self.name, self.room_index), self)
@abstractmethod
def publish_mqtt(self):
pass
class Light(Device):
def __init__(self, name: str = 'Device', index: int = 0, **kwargs):
self.index = index
super().__init__(name, **kwargs)
def publish_mqtt(self):
obj = {"state": self.state}
self.mqtt_client.publish(self.mqtt_publish_topic, json.dumps(obj), 1)
class Room:
name: str = 'Room'
index: int = 0
lights: List[Light]
def __init__(self, name: str = 'Room', index: int = 0, light_count: int = 0, **kwargs):
self.name = name
self.index = index
self.lights = list()
for i in range(light_count):
self.lights.append(Light(name='Light {}'.format(i + 1), index=i, room_index=self.index, mqtt_client=kwargs.get('mqtt_client')))
@property
def light_count(self):
return len(self.lights)
class ThreadMonitoring(threading.Thread):
_keepAlive: bool = True
def __init__(self, device_list: List[Device], publish_interval: int = 60):
threading.Thread.__init__(self)
self._device_list = device_list
self._publish_interval = publish_interval
def run(self):
tm = time.perf_counter()
while self._keepAlive:
if time.perf_counter() - tm > self._publish_interval:
for dev in self._device_list:
dev.publish_mqtt()
tm = time.perf_counter()
time.sleep(100)
def stop(self):
self._keepAlive = False
class Home:
name: str = 'Home'
device_list: List[Device]
rooms: List[Room]
serial_baud: int = 9600
serial_485_energy_port: str = ''
mqtt_client: mqtt.Client
mqtt_host: str = 'localhost'
mqtt_port: int = 1883
def __init__(self, room_info: List, name: str = 'Home'):
self.name = name
self.device_list = list()
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_message = self.onMqttClientMessage
self.rooms = list()
self.mqtt_client.connect(self.mqtt_host, self.mqtt_port)
self.mqtt_client.loop_start()
self.mqtt_client.subscribe("home/ipark/light/command/1/0")
self.serial_485_energy = SerialComm('Energy')
self.parser_energy = EnergyParser(self.serial_485_energy)
self.parser_energy.sig_parse.connect(self.onParserEnergyResult)
self.startThreadMonitoring()
def release(self):
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
self.stopThreadMonitoring()
def onParserEnergyResult(self, chunk: bytearray):
if len(chunk) < 7:
return
header = chunk[1] # [0x31, 0x41, 0x42, 0xD1]
command = chunk[3]
if header == 0x31 and command in [0x81, 0x91]:
# 방 조명 패킷
room_idx = chunk[5] & 0x0F
room = self.rooms[room_idx]
for i in range(room.light_count):
dev = room.lights[i]
dev.state = (chunk[6] & (0x01 << i)) >> i
# notification
if dev.state != dev.state_prev or not dev.init:
dev.publish_mqtt()
dev.init = True
dev.state_prev = dev.state
def onMqttClientMessage(self, client, userdata, message):
topic = message.topic
msg_dict = json.loads(message.payload.decode("utf-8"))
if 'light/command' in topic:
splt = topic.split('/')
room_idx = int(splt[-2])
dev_idx = int(splt[-1])
target = msg_dict['state']
self.set_light_state(dev, target, room_idx, dev_idx)
def sendSerialEnergyPacket(self, packet: str):
if self.serial_485_energy.isConnected():
self.serial_485_energy.sendData(bytearray([int(x, 16) for x in packet.split(' ')]))
def set_light_state(self, dev: Light, target: int, room_idx: int, dev_idx: int):
cnt = 0
packet1 = dev.packet_set_state_on if target else dev.packet_set_state_off
packet2 = dev.packet_get_state
for _ in range(self._retry_cnt):
if dev.state == target:
break
func(packet1)
cnt += 1
time.sleep(0.2)
if dev.state == target:
break
self.sendSerialEnergyPacket(packet2)
time.sleep(0.2)
writeLog('set_light_state::send # = {}'.format(cnt), self)
time.sleep(self._delay_response)
dev.publish_mqtt()
def startThreadMonitoring(self):
if self.thread_monitoring is None:
self.thread_monitoring = ThreadMonitoring(self.device_list)
self.thread_monitoring.setDaemon(True)
self.thread_monitoring.start()
def stopThreadMonitoring(self):
if self.thread_monitoring is not None:
self.thread_monitoring.stop()
Home 객체가 mqtt client를 가지며, 각 액세서리별로 지정된 토픽을 구독(Subscribe)해서 홈브릿지 액세서리의 '명령'을 처리하고, 현재 상태를 발행(Publish)하도록 구현했다
HTTP 액세서리와 달리 플러그인이 단독으로 주기적 상태 쿼리를 하지 않으므로, Home 객체가 능동적으로 일정 시간마다 현재의 상태를 발행할 수 있도록 쓰레드도 하나 추가했다
전체 시스템은 다음과 같이 도식화할 수 있다
액세서리가 제대로 추가되었는지 확인
정상적으로 작동하는 것도 확인했으니, 앞으로는 정말 특별한 경우가 아니라면 MQTT 기반으로 모든 액세서리를 추가하도록 하자
'홈네트워크(IoT) > 일반' 카테고리의 다른 글
라즈베리파이 - Home Assistant 컨테이너 설치하기 (0) | 2022.02.18 |
---|---|
라즈베리파이 - Home Assistant OS 설치하기 (0) | 2022.02.17 |
Apple 홈킷 - 외부 대기오염(Air-quality) 센서 추가하기 (0) | 2022.01.14 |
라즈베리파이로 Apple HomeKit 카메라 만들기 (Homebridge) (0) | 2021.08.04 |
Apple 홈킷 - Weather Station 추가하기 (0) | 2021.01.04 |