YOGYUI

Homebridge - HTTP 액세서리 MQTT로 Migration 본문

홈네트워크(IoT)/일반

Homebridge - HTTP 액세서리 MQTT로 Migration

요겨 2021. 1. 18. 20:27
반응형

https://mqtt.org/

서버 코드 유지보수성 강화를 위해 기존의 HTTP 기반 액세서리들을 MQTT 기반으로 바꾸고자 결심했다

(HTTP 기반 액세서리는 디바이스 개별로 http-nofitication listener 포트를 할당해야 하는데, MQTT 기반에서는 subscribe만 하면 되기 때문에 서버 부하도 줄일 수 있을 것으로 판단)

1. 플러그인 설치

homebridge 플러그인 설치

Homebridge UI에서 mqttthing을 검색해서 Homebridge Mqttthing 플러그인을 설치

mqttthing으로 만들 수 있는 액세서리 종류는 다음과 같다

mqttthing 지원 액세서리 종류

거의 모든 종류의 디바이스를 mqttthing 플러그인 하나로 만들어낼 수 있다

config.json의 액세서리 중 HTTP 기반으로 구현한 액세서리를 mqtt 기반으로 변경해준다

 

예시)

[HTTP 기반 액세서리 (스위치)]

{
    "accessory": "HTTP-SWITCH",
    "name": "Kitchen Light1",
    "switchType": "stateful",
    "onUrl": {
        "url": "http://localhost:9999/light",
        "method": "POST",
        "body": {
            "room": 1,
            "light": 0,
            "command": "on"
        },
        "headers": {
            "Accept": "application/json"
        }
    },
    "offUrl": {
        "url": "http://localhost:9999/light",
        "method": "POST",
        "body": {
            "room": 1,
            "light": 0,
            "command": "off"
        },
        "headers": {
            "Accept": "application/json"
        }
    },
    "statusUrl": {
        "url": "http://localhost:9999/light",
        "method": "POST",
        "body": {
            "room": 1,
            "light": 0,
            "status": 0
        },
        "headers": {
            "Accept": "application/json"
        }
    },
    "notificationID": "switch_room1_light1",
    "notificationPassword": "password1",
    "auth": {
        "username": "yogyui",
        "password": "password2"
    }
}

(on, off 명령 및 status 쿼리용 url에 대한 정보 및 notification 서버 정보까지 적어야 하다보니 액세서리 한개를 위한 코드가 꽤나 길다)

 

[MQTT 기반 액세서리 (스위치)]

{
    "accessory": "mqttthing",
    "type": "switch",
    "name": "Kitchen Light1 (MQTT)",
    "url": "mqtt:://localhost:1883",
    "username": "yogyui",
    "password": "12345678",
    "topics": {
        "getOn": {
            "topic": "home/ipark/light/state/1/0",
            "apply": "return JSON.parse(message).state;"
            },
        "setOn": {
            "topic": "home/ipark/light/command/1/0",
            "apply": "return JSON.stringify({state: message});"
        }
    },
    "integerValue": true,
    "onValue": 1,
    "offValue": 0,
    "history": true,
    "logMqtt": true
}

url은 mosquitto mqtt broker의 주소를 적어준다 (mosquitto 관련 링크)

topic은 mqttthing 도큐먼트를 참고해서 구현 (액세서리별로 깔끔하게 정리가 잘 되어있다)

mqttthing switch 액세서리 관련 도큐먼트

2. 코드 변경

(일부만 발췌, 모듈화로 구현한다고 했는데도 너무 길다)

# HomeDef.py
import threading
from abc import ABCMeta, abstractmethod
from typing import List
import paho.mqtt.client as mqtt
from Serial485.SerialComm import SerialComm
from Serial485.EnergyParser import EnergyParser

class Device:
    __metaclass__ = ABCMeta

    name: str = 'Device'
    room_index: int = 0
    init: bool = False
    state: int = 0  # mostly, 0 is OFF and 1 is ON
    state_prev: int = 0
    packet_set_state_on: str = ''
    packet_set_state_off: str = ''
    packet_get_state: str = ''
    mqtt_client: mqtt.Client = None
    mqtt_publish_topic: str = ''
    mqtt_subscribe_topic: str = ''

    def __init__(self, name: str = 'Device', **kwargs):
        self.name = name
        if 'room_index' in kwargs.keys():
            self.room_index = kwargs['room_index']
        self.mqtt_client = kwargs.get('mqtt_client')
        writeLog('Device Initialized >> Name: {}, Room Index: {}'.format(self.name, self.room_index), self)
    
    @abstractmethod
    def publish_mqtt(self):
        pass

class Light(Device):
    def __init__(self, name: str = 'Device', index: int = 0, **kwargs):
        self.index = index
        super().__init__(name, **kwargs)

    def publish_mqtt(self):
        obj = {"state": self.state}
        self.mqtt_client.publish(self.mqtt_publish_topic, json.dumps(obj), 1)

class Room:
    name: str = 'Room'
    index: int = 0
    lights: List[Light]

    def __init__(self, name: str = 'Room', index: int = 0, light_count: int = 0, **kwargs):
        self.name = name
        self.index = index
        self.lights = list()
        for i in range(light_count):
            self.lights.append(Light(name='Light {}'.format(i + 1), index=i, room_index=self.index, mqtt_client=kwargs.get('mqtt_client')))

    @property
    def light_count(self):
        return len(self.lights)

class ThreadMonitoring(threading.Thread):
    _keepAlive: bool = True

    def __init__(self, device_list: List[Device], publish_interval: int = 60):
        threading.Thread.__init__(self)
        self._device_list = device_list
        self._publish_interval = publish_interval
    
    def run(self):
        tm = time.perf_counter()
        while self._keepAlive:
            if time.perf_counter() - tm > self._publish_interval:
                for dev in self._device_list:
                    dev.publish_mqtt()
                tm = time.perf_counter()
            time.sleep(100)

    def stop(self):
        self._keepAlive = False

class Home:
    name: str = 'Home'
    device_list: List[Device]
    rooms: List[Room]
    serial_baud: int = 9600
    serial_485_energy_port: str = ''
    mqtt_client: mqtt.Client
    mqtt_host: str = 'localhost'
    mqtt_port: int = 1883
    
    def __init__(self, room_info: List, name: str = 'Home'):
        self.name = name
        self.device_list = list()
        
        self.mqtt_client = mqtt.Client()
        self.mqtt_client.on_message = self.onMqttClientMessage
        self.rooms = list()
        
        self.mqtt_client.connect(self.mqtt_host, self.mqtt_port)
        self.mqtt_client.loop_start()
        self.mqtt_client.subscribe("home/ipark/light/command/1/0")
        
        self.serial_485_energy = SerialComm('Energy')
        self.parser_energy = EnergyParser(self.serial_485_energy)
        self.parser_energy.sig_parse.connect(self.onParserEnergyResult)
        
        self.startThreadMonitoring()

    def release(self):
        self.mqtt_client.loop_stop()
        self.mqtt_client.disconnect()
        self.stopThreadMonitoring()
        
    def onParserEnergyResult(self, chunk: bytearray):
        if len(chunk) < 7:
            return
        header = chunk[1]  # [0x31, 0x41, 0x42, 0xD1]
        command = chunk[3]
        if header == 0x31 and command in [0x81, 0x91]:
            # 방 조명 패킷
            room_idx = chunk[5] & 0x0F
            room = self.rooms[room_idx]
            for i in range(room.light_count):
                dev = room.lights[i]
                dev.state = (chunk[6] & (0x01 << i)) >> i
                # notification
                if dev.state != dev.state_prev or not dev.init:
                    dev.publish_mqtt()
                    dev.init = True
                dev.state_prev = dev.state
            
    def onMqttClientMessage(self, client, userdata, message):
        topic = message.topic
        msg_dict =  json.loads(message.payload.decode("utf-8"))
        if 'light/command' in topic:
            splt = topic.split('/')
            room_idx = int(splt[-2])
            dev_idx = int(splt[-1])
            target = msg_dict['state']
            self.set_light_state(dev, target, room_idx, dev_idx)
     
    def sendSerialEnergyPacket(self, packet: str):
        if self.serial_485_energy.isConnected():
            self.serial_485_energy.sendData(bytearray([int(x, 16) for x in packet.split(' ')]))
    
    def set_light_state(self, dev: Light, target: int, room_idx: int, dev_idx: int):
        cnt = 0
        packet1 = dev.packet_set_state_on if target else dev.packet_set_state_off
        packet2 = dev.packet_get_state
        for _ in range(self._retry_cnt):
            if dev.state == target:
                break
            func(packet1)
            cnt += 1
            time.sleep(0.2)
            if dev.state == target:
                break
            self.sendSerialEnergyPacket(packet2)
            time.sleep(0.2)
        writeLog('set_light_state::send # = {}'.format(cnt), self)
        time.sleep(self._delay_response)
        dev.publish_mqtt()
    
    def startThreadMonitoring(self):
        if self.thread_monitoring is None:
            self.thread_monitoring = ThreadMonitoring(self.device_list)
            self.thread_monitoring.setDaemon(True)
            self.thread_monitoring.start()

    def stopThreadMonitoring(self):
        if self.thread_monitoring is not None:
            self.thread_monitoring.stop()

Home 객체가 mqtt client를 가지며, 각 액세서리별로 지정된 토픽을 구독(Subscribe)해서 홈브릿지 액세서리의 '명령'을 처리하고, 현재 상태를 발행(Publish)하도록 구현했다

HTTP 액세서리와 달리 플러그인이 단독으로 주기적 상태 쿼리를 하지 않으므로, Home 객체가 능동적으로 일정 시간마다 현재의 상태를 발행할 수 있도록 쓰레드도 하나 추가했다

 

전체 시스템은 다음과 같이 도식화할 수 있다

System Schematic

 

액세서리가 제대로 추가되었는지 확인

mqttthing 액세서리 (스위치)

정상적으로 작동하는 것도 확인했으니, 앞으로는 정말 특별한 경우가 아니라면 MQTT 기반으로 모든 액세서리를 추가하도록 하자

반응형