YOGYUI

LG 물걸레 로봇청소기 (코드제로 M9) + 싱크대 절수 페달 연동 (ThinQ + Homebridge) 본문

홈네트워크(IoT)/힐스테이트 광교산

LG 물걸레 로봇청소기 (코드제로 M9) + 싱크대 절수 페달 연동 (ThinQ + Homebridge)

요겨 2022. 10. 29. 22:05
반응형

싱크대 절수페달 IoT 연동 마지막 단계!

매주 아주 유용하게 쓰고 있는 LG전자의 물걸레 로봇청소기 코드제로 M9을 홈네트워크와 연동시켜보자 

 

[목표] 로봇청소기가 청소중일 때, 싱크대 수전에 물이 흐를 경우 일정 시간이 지나면 수전 밸브가 자동으로 닫히도록 기능 구현 (물낭비 방지)

 

싱크대 절수 페달 및 수전은 아래 글과 같이 IoT 환경을 구축해뒀다

힐스테이트 광교산::싱크대 절수페달 IoT 연동하기

 

힐스테이트 광교산::싱크대 절수페달 IoT 연동하기 - Final

9. PCB 제작 지난주 수요일에 주문한 부품과 PCB가 어제(09/29) 모두 도착했다 (너무 오래 걸려 ㅠㅠ) 야무지게 납땜해주자 CP2102같은 IC들은 손으로 납땜할 때 냉납 발생이 잦으니 광학 검사는 필수!

yogyui.tistory.com

 

아쉽게도 Homebridge의 ThinQ 플러그인은 로봇청소기를 지원하지 않는다

Homebridge - LG ThinQ 연동하기 (애플 홈 연동)

 

Homebridge - LG ThinQ 연동하기 (애플 홈 연동)

Homebridge의 verified된 플러그인 중에 LG전자 ThinQ 앱과 연동할 수 있는 플러그인이 있길래 설치 및 사용 후기를 남겨본다 1. 플러그인 설치 플러그인 검색창에서 'ThinQ' 키워드를 입력한 뒤, Homebridge L

yogyui.tistory.com

 

결국 ThinQ API를 사용하는 방법을 다른 개발자가 만든 소스코드를 참고하여 파이썬으로 구현 완료!

LG ThinQ REST API::파이썬 연동

 

LG ThinQ REST API::파이썬 연동

Access LG ThinQ API using Python 지난 글에서 Homebridge에 LG ThinQ 디바이스를 연동하는 방법에 대해 알아본 바 있다 Homebridge - LG ThinQ 연동하기 (애플 홈 연동) Homebridge - LG ThinQ 연동하기 (애플..

yogyui.tistory.com

1. 파이썬 코딩 (ThinQ 클래스)

ThinQ 클래스를 다음과 같이 구현해줬다

import os
import sys
import json
import hmac
import base64
import random
import hashlib
import datetime
from regex import E
import requests
import email.utils
import urllib.parse
from OpenSSL import crypto
from typing import Union, List
import paho.mqtt.client as mqtt
from Common import writeLog, Callback

class ThinQ:
    client_id: Union[str, None] = None
    user_no: Union[str, None] = None
    access_token: Union[str, None] = None

    country_code: str = 'KR'
    language_code: str = 'ko-KR'
    api_key: str = ''
    api_client_id: str = ''
    refresh_token: str = ''
    oauth_secret_key: str = ''
    app_client_id: str = ''
    app_key: str = ''

    uri_thinq1: Union[str, None] = None
    uri_thinq2: Union[str, None] = None
    uri_oauth: Union[str, None] = None

    subscribe_topics: List[str]
    mqtt_client: mqtt.Client
    
    device_discover_list: List[dict]
    robot_cleaner_dev_id: str = ''
    mqtt_topic: str = ''

    def __init__(self, **kwargs):
        self.sig_publish_mqtt = Callback(str, dict)
        
        self.subscribe_topics = list()
        self.device_discover_list = list()
        if 'country_code' in kwargs.keys():
            self.country_code = kwargs.get('country_code')
        if 'language_code' in kwargs.keys():
            self.language_code = kwargs.get('language_code')
        if 'api_key' in kwargs.keys():
            self.api_key = kwargs.get('api_key')
        if 'api_client_id' in kwargs.keys():
            self.api_client_id = kwargs.get('api_client_id')
        if 'refresh_token' in kwargs.keys():
            self.refresh_token = kwargs.get('refresh_token')
        if 'oauth_secret_key' in kwargs.keys():
            self.oauth_secret_key = kwargs.get('oauth_secret_key')
        if 'app_client_id' in kwargs.keys():
            self.app_client_id = kwargs.get('app_client_id')
        if 'app_key' in kwargs.keys():
            self.app_key = kwargs.get('app_key')
        if 'robot_cleaner_dev_id' in kwargs.keys():
            self.robot_cleaner_dev_id = kwargs.get('robot_cleaner_dev_id')
        if 'mqtt_topic' in kwargs.keys():
            self.mqtt_topic = kwargs.get('mqtt_topic')
        self.generate_rsa_csr_pemfiles()
        self.get_aws_root_ca_pem()

    def start(self):
        if not self.query_thinq_uris():
            return
        if not self.query_oauth_uris():
            return
        if not self.query_access_token():
            return
        if not self.query_user_number():
            return
        if not self.query_home_device_list():
            return
        if not self.get_certificate_from_server():
            return
        if not self.connect_mqtt_broker():
            return

    @staticmethod
    def generate_random_string(length: int) -> str:
        characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
        result = ''
        for i in range(length):
            result += characters[random.randint(0, len(characters) - 1)]
        return result

    @staticmethod
    def generate_signature(message: str, key: str) -> str:
        hmac_obj = hmac.new(key.encode(encoding='utf-8'), message.encode(encoding='utf-8'), hashlib.sha1)
        hashed = hmac_obj.digest()
        b64_encoded = base64.b64encode(hashed)
        return b64_encoded.decode(encoding='utf-8')

    def generate_default_header(self) -> dict:
        headers = {
            'x-api-key': self.api_key,
            'x-thinq-app-ver': '3.6.1200',
            'x-thinq-app-type': 'NUTS',
            'x-thinq-app-level': 'PRD',
            'x-thinq-app-os': 'ANDROID',
            'x-thinq-app-logintype': 'LGE',
            'x-service-code': 'SVC202',
            'x-country-code': self.country_code,
            'x-language-code': self.language_code,
            'x-service-phase': 'OP',
            'x-origin': 'app-native',
            'x-model-name': 'samsung/SM-G930L',
            'x-os-version': 'AOS/7.1.2',
            'x-app-version': 'LG ThinQ/3.6.12110',
            'x-message-id': self.generate_random_string(22),
            'user-agent': 'okhttp/3.14.9'
        }
        
        headers['x-client-id'] = self.api_client_id if self.client_id is None else self.client_id
        if self.user_no is not None:
            headers['x-user-no'] = self.user_no
        if self.access_token is not None:
            headers['x-emp-token'] = self.access_token
        return headers
    
    def query_thinq_uris(self) -> bool:
        result: bool
        url = "https://route.lgthinq.com:46030/v1/service/application/gateway-uri"
        response = requests.get(url, headers=self.generate_default_header())
        if response.status_code == 200:
            response_json = json.loads(response.text)
            result = response_json.get('result')
            self.uri_thinq1 = result.get('thinq1Uri')
            self.uri_thinq2 = result.get('thinq2Uri')
            elapsed = response.elapsed.microseconds
            writeLog('query thinq uri success ({:g} msec)'.format(elapsed / 1000), self)
            result = True
        else:
            writeLog(f'failed to query thinq uri ({response.status_code}, {response.text})', self)
            result = False
        return result
    
    def query_oauth_uris(self) -> bool:
        result: bool
        if self.uri_thinq1 is None:
            writeLog(f'thinq uri is not queried yet!', self)
            return False
        
        url = self.uri_thinq1 + '/common/gatewayUriList'
        headers = {
            'Accept': 'application/json',
            'x-thinq-application-key': 'wideq',
            'x-thinq-security-key': 'nuts_securitykey'
        } 
        data = {
            'lgedmRoot': {
                'countryCode': self.country_code, 
                'langCode': self.language_code
            }
        }
        response = requests.post(url, json=data, headers=headers)
        if response.status_code == 200:
            response_json = json.loads(response.text)
            lgemdRoot = response_json.get('lgedmRoot')
            self.uri_oauth = lgemdRoot.get('oauthUri')
            elapsed = response.elapsed.microseconds
            writeLog('query oauth uri success ({:g} msec)'.format(elapsed / 1000), self)
            result = True
        else:
            writeLog(f'failed to query oauth uri ({response.status_code}, {response.text})', self)
            result = False
        return result

    def query_access_token(self) -> bool:
        result: bool
        if self.uri_oauth is None:
            writeLog(f'oauth uri is not queried yet!', self)
            return False
        
        url = self.uri_oauth + '/oauth/1.0/oauth2/token'
        data = {
            'grant_type': 'refresh_token',
            'refresh_token': self.refresh_token
        }
        requestUrl = '/oauth/1.0/oauth2/token' + '?' + urllib.parse.urlencode(data)
        now = datetime.datetime.utcnow()
        timestamp = email.utils.format_datetime(now)
        signature = self.generate_signature(f"{requestUrl}\n{timestamp}", self.oauth_secret_key)
        headers = {
            'x-lge-app-os': 'ADR',
            'x-lge-appkey': self.app_client_id,
            'x-lge-oauth-signature': signature,
            'x-lge-oauth-date': timestamp,
            'Accept': 'application/json',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
        response = requests.post(url, params=data, headers=headers)
        if response.status_code == 200:
            response_json = json.loads(response.text)
            self.access_token = response_json.get('access_token')
            # expires_in_ = int(response_json.get('expires_in'))
            elapsed = response.elapsed.microseconds
            writeLog('query access token success ({:g} msec)'.format(elapsed / 1000), self)
            result = True
        else:
            writeLog(f'failed to query access token ({response.status_code}, {response.text})', self)
            result = False
        return result

    def query_user_number(self) -> bool:
        result: bool
        if self.access_token is None:
            writeLog(f'access token is not queried yet!', self)
            return False
        
        url = self.uri_oauth + '/users/profile'
        now = datetime.datetime.utcnow()
        timestamp = email.utils.format_datetime(now)
        signature = self.generate_signature(f"/users/profile\n{timestamp}", self.oauth_secret_key)
        headers = {
            'Accept': 'application/json',
            'Authorization': 'Bearer ' + self.access_token,
            'X-Lge-Svccode': 'SVC202',
            'X-Application-Key': self.app_key,
            'lgemp-x-app-key': self.app_client_id,
            'X-Device-Type': 'M01',
            'X-Device-Platform': 'ADR',
            'x-lge-oauth-date': timestamp,
            'x-lge-oauth-signature': signature
        }
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            response_json = json.loads(response.text)
            account = response_json.get('account')
            self.user_no = account.get('userNo')
            elapsed = response.elapsed.microseconds
            writeLog('query user number success ({:g} msec)'.format(elapsed / 1000), self)
            # create client id
            obj_hash = hashlib.sha256()
            now = int(datetime.datetime.now().timestamp())
            obj_hash.update((self.user_no + f'{now}').encode(encoding='utf-8'))
            self.client_id = obj_hash.hexdigest()
            result = True
        else:
            writeLog(f'failed to query user number ({response.status_code}, {response.text})', self)
            result = False
        return result

    def query_home_device_list(self) -> bool:
        result: bool
        if self.uri_thinq2 is None:
            writeLog(f'thinq uri is not queried yet!', self)
            return False
        
        url = self.uri_thinq2 + '/service/homes'
        response = requests.get(url, headers=self.generate_default_header())
        if response.status_code == 200:
            response_json = json.loads(response.text)
            result = response_json.get('result')
            home_list = result.get('item')
            self.device_discover_list.clear()
            for obj in home_list:
                homeId = obj.get('homeId')
                url2 = self.uri_thinq2 + '/service/homes/' + homeId
                response = requests.get(url2, headers=self.generate_default_header())
                if response.status_code == 200:
                    response_json = json.loads(response.text)
                    result = response_json.get('result')
                    devices = result.get('devices')
                    self.device_discover_list.extend(devices)
            self.print_device_discover_list()
            result = True
        else:
            writeLog(f'failed to query home - device list ({response.status_code}, {response.text})', self)
            result = False
        return result

    def print_device_discover_list(self):
        writeLog(f'discovered {len(self.device_discover_list)} device(s)', self)
        for device in self.device_discover_list:
            dev_id = device.get('deviceId')
            dev_type = device.get('deviceType')
            modelName = device.get('modelName')
            alias = device.get('alias')
            writeLog(f'{alias}::{modelName}::{dev_type}::{dev_id}', self)            

    def generate_rsa_csr_pemfiles(self):
        pubkey_pem_path = os.path.join(CURPATH, 'pubkey.pem')
        privkey_pem_path = os.path.join(CURPATH, 'privkey.pem')
        csr_pem_path = os.path.join(CURPATH, 'csr.pem')
        if not os.path.isfile(pubkey_pem_path) or not os.path.isfile(privkey_pem_path) or not os.path.isfile(csr_pem_path):
            keypair = crypto.PKey()
            keypair.generate_key(crypto.TYPE_RSA, 2048)
            pubkey_pem = crypto.dump_publickey(crypto.FILETYPE_PEM, keypair).decode(encoding='utf-8')
            with open(pubkey_pem_path, 'w') as fp:
                fp.write(pubkey_pem)
            privkey_pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, keypair).decode(encoding='utf-8')
            with open(privkey_pem_path, 'w') as fp:
                fp.write(privkey_pem)
            
            req = crypto.X509Req()
            req.get_subject().CN = "AWS IoT Certificate"
            req.get_subject().O = "Amazon"
            req.set_pubkey(keypair)
            req.sign(keypair, "sha256")
            csr_pem = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req).decode(encoding='utf-8')
            with open(csr_pem_path, 'w') as fp:
                fp.write(csr_pem)

    def get_certificate_from_server(self) -> bool:
        result: bool
        if self.uri_thinq2 is None:
            writeLog(f'thinq uri is not queried yet!', self)
            return False

        csr_pem_path = os.path.join(CURPATH, 'csr.pem')
        if not os.path.isfile(csr_pem_path):
            writeLog(f'cannot find csr pem file', self)
            return False

        url = self.uri_thinq2 + '/service/users/client'
        response = requests.post(url, headers=self.generate_default_header())
        if response.status_code == 200:
            with open(csr_pem_path, 'r') as fp:
                csr_pem = fp.read()
            url = self.uri_thinq2 + '/service/users/client/certificate'
            csr_pem = csr_pem.replace('-----BEGIN CERTIFICATE REQUEST-----', '')
            csr_pem = csr_pem.replace('-----END CERTIFICATE REQUEST-----', '')
            csr_pem = csr_pem.replace('\r\n', '')
            data = {
                'csr': csr_pem
            }
            response = requests.post(url, json=data, headers=self.generate_default_header())
            if response.status_code == 200:
                response_json = json.loads(response.text)
                result = response_json.get('result')
                certificate_pem = result.get('certificatePem')
                cert_pem_path = os.path.join(CURPATH, 'aws_cert.pem')
                with open(cert_pem_path, 'w') as fp:
                    fp.write(certificate_pem)
                self.subscribe_topics.clear()
                subscriptions = result.get('subscriptions')  # 구독할 Topic
                self.subscribe_topics.extend(subscriptions)
                elapsed = response.elapsed.microseconds
                writeLog('query certificate success ({:g} msec)'.format(elapsed / 1000), self)
                result = True
            else:
                writeLog(f'failed to query certificate ({response.status_code}, {response.text})', self)
                result = False
        else:
            writeLog(f'failed to visit service ({response.status_code}, {response.text})', self)
            result = False
        return result

    def get_aws_root_ca_pem(self) -> bool:
        result: bool
        rootca_pem_path = os.path.join(CURPATH, 'aws_root_ca.pem')
        if os.path.isfile(rootca_pem_path):
            return True

        url = 'https://www.amazontrust.com/repository/AmazonRootCA1.pem'
        response = requests.get(url)
        if response.status_code == 200:
            rootca_pem = response.text
            with open(rootca_pem_path, 'w') as fp:
                fp.write(rootca_pem)
            elapsed = response.elapsed.microseconds
            writeLog('query root CA from AWS ({:g} msec)'.format(elapsed / 1000), self)
            result = True
        else:
            writeLog(f'failed to query root CA from AWS ({response.status_code}, {response.text})', self)
            result = False
        return result

    def connect_mqtt_broker(self) -> bool:
        if self.client_id is None:
            writeLog(f'client id is not generated yet!', self)
            return False
        
        rootca_pem_path = os.path.join(CURPATH, 'aws_root_ca.pem')
        if not os.path.isfile(rootca_pem_path):
            writeLog('cannot find aws root ca pem file', self)
            return False
        cert_pem_path = os.path.join(CURPATH, 'aws_cert.pem')
        if not os.path.isfile(cert_pem_path):
            writeLog('cannot find aws cert pem file', self)
            return False
        privkey_pem_path = os.path.join(CURPATH, 'privkey.pem')
        if not os.path.isfile(privkey_pem_path):
            writeLog('cannot find private key pem file', self)
            return False
        
        # get mqtt broker host address
        url = "https://common.lgthinq.com/route"
        response = requests.get(url, headers=self.generate_default_header())
        mqttserver = ''
        if response.status_code == 200:
            response_json = json.loads(response.text)
            result = response_json.get('result')
            mqttserver = result.get('mqttServer')
        else:
            return False

        idx = mqttserver.rfind(':')
        mqtt_host = mqttserver[:idx]
        mqtt_port = int(mqttserver[idx+1:])
        self.mqtt_client = mqtt.Client(client_id=self.client_id)
        self.mqtt_client.on_connect = self.onMqttClientConnect
        self.mqtt_client.on_disconnect = self.onMqttClientDisconnect
        self.mqtt_client.on_message = self.onMqttClientMessage
        self.mqtt_client.on_subscribe = self.onMqttClientSubscribe

        self.mqtt_client.tls_set(ca_certs=rootca_pem_path, certfile=cert_pem_path, keyfile=privkey_pem_path)
        self.mqtt_client.connect(host=mqtt_host[6:], port=mqtt_port)
        self.mqtt_client.loop_start()

    def onMqttClientConnect(self, _, userdata, flags, rc):
        writeLog('connected to aws iot core (mqtt broker): {}, {}, {}'.format(userdata, flags, rc), self)
        for topic in self.subscribe_topics:
            self.mqtt_client.subscribe(topic)

    def onMqttClientDisconnect(self, _, userdata, rc):
        writeLog('disconnected from aws iot core (mqtt broker): {}, {}, {}'.format(userdata, rc), self)

    def onMqttClientSubscribe(self, _, userdata, mid, granted_qos):
        writeLog('mqtt subscribe: {}, {}, {}'.format(userdata, mid, granted_qos), self)

    def onMqttClientMessage(self, _, userdata, message):
        msg_dict = json.loads(message.payload.decode("utf-8"))
        print(msg_dict)

ThinQ API 접근을 통한 Access Token 획득, MQTT Broker (AWS IoT Core) 접속을 위한 클라이언트 ID 발급 등의 과정은 지난 글에서 다룬 것과 완전히 동일한 프로세스를 따른다 (링크)

 

이제 HomeNetwork 설정 파일(Config.xml)에 ThinQ 관련 항목도 추가해주자

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<config>
    <thinq>
        <refresh_token>refresh token from LGE login session</refresh_token>
        <api_key>VGhpblEyLjAgU0VSVklDRQ==</api_key>
        <api_client_id>c713ea8e50f657534ff8b9d373dfebfc2ed70b88285c26b8ade49868c0b164d9</api_client_id>
        <oauth_secret_key>c053c2a6ddeb7ad97cb0eed0dcb31cf8</oauth_secret_key>
        <app_client_id>LGAO221A02</app_client_id>
        <application_key>6V1V8H2BN5P9ZQGOI5DAQ92YZBDO3EK9</application_key>
        <country_code>KR</country_code>
        <language_code>ko-KR</language_code>
    </thinq>
</config>

refresh_token 값만 본인 계정의 토큰 값으로 바꿔 입력하면 된다

 

Home.py의 Home 클래스의 멤버변수로 thinq를 추가한 뒤에 초기화 단계를 다음과 같이 추가해주면 사용 준비 완료!

node = root.find('thinq')
robot_cleaner_node = node.find('robot_cleaner')
robot_cleaner_dev_id = robot_cleaner_node.find('dev_id').text
mqtt_node = node.find('mqtt')
mqtt_topic = mqtt_node.find('publish').text
self.thinq = ThinQ(
    country_code=node.find('country_code').text,
    language_code=node.find('language_code').text,
    api_key=node.find('api_key').text,
    api_client_id=node.find('api_client_id').text,
    refresh_token=node.find('refresh_token').text,
    oauth_secret_key=node.find('oauth_secret_key').text,
    app_client_id=node.find('app_client_id').text,
    app_key=node.find('application_key').text,
    robot_cleaner_dev_id=robot_cleaner_dev_id, 
    mqtt_topic=mqtt_topic
)
self.thinq.sig_publish_mqtt.connect(self.onThinqPublishMQTT)

1.1. ThinQ 앱에 등록된 Device 리스트 조회하기

ThinQ 클래스 내에 print_device_discover_list 메서드를 구현해뒀다

class ThinQ:
    def print_device_discover_list(self):
        for device in self.device_discover_list:
            dev_id = device.get('deviceId')
            dev_type = device.get('deviceType')
            modelName = device.get('modelName')
            alias = device.get('alias')
            writeLog(f'{alias}::{modelName}::{dev_type}::{dev_id}', self)

Home 초기화 단계에서 AWS IoT Core에 정상적으로 접속되면

콘솔에 위와 같이 device별 고유 ID값이 출력된다

물걸레 로봇청소기 ID가 필요하니 다음과 같이 Config.xml 파일에 추가한 뒤 ThinQ 객체 초기화 시에 인자로 넣어주도록 하자

<thinq>
    <robot_cleaner>
        <dev_id>d6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx</dev_id>
    </robot_cleaner>
    <mqtt>
        <publish>home/hillstate/thinq</publish>
    </mqtt>
</thinq>

2. 로봇 청소기 MQTT 메시지 확인

실제로 로봇 청소기를 구동시킨 뒤 MQTT 메시지가 어떤 형태로 수신되는지 확인해봤다 (JSON)

class ThinQ:
    def onMqttClientMessage(self, _, userdata, message):
        msg_dict = json.loads(message.payload.decode("utf-8"))
        print(msg_dict)

2.1.1. 청소 시작 시

{
    'data': {
        'state': {
            'reported': {
                'ROBOT_STATE': 'INITAILIZING', 
                'meta': {
                    'allDeviceInfoUpdate': False, 
                    'messageId': 'qbpr2L99QUmtOpnGRmWOhA'
                }, 
                'mid': 7315258837, 
                'online': True, 
                'static': {
                    'countryCode': 'KR', 'deviceType': '501'
                }, 
                'timestamp': 1667026681198
            }
        }
    }, 
    'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx', 
    'type': 'monitoring'
}

{
    'data': {
        'state': {
            'reported': {
                'BATT': '3', 
                'meta': {
                    'allDeviceInfoUpdate': False, 
                    'messageId': 'HtL1nQraRnO4-_6-_bFjxg'
                }, 
                'mid': 7315259186, 
                'online': True, 
                'static': {
                    'countryCode': 'KR', 
                    'deviceType': '501'
                }, 
                'timestamp': 1667026681430
            }
        }
    }, 
    'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx', 
    'type': 'monitoring'
}

{
    'data': {
        'state': {
            'reported': {
                'MODE': 'EDGE', 
                'meta': {
                    'allDeviceInfoUpdate': False, 
                    'messageId': 'lDuf_77xTCuxbyEoW6O1Pw'
                }, 
                'mid': 7315259531, 
                'online': True, 
                'static': {
                    'countryCode': 'KR', 
                    'deviceType': '501'
                }, 
                'timestamp': 1667026681909
            }
        }
    }, 
    'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx', 
    'type': 'monitoring'
}

{
    'data': {
        'state': {
            'reported': {
                'ROBOT_STATE': 'CLEAN_EDGE', 
                'meta': {
                    'allDeviceInfoUpdate': False, 
                    'messageId': 'knSFYagqRWi1m188y_aFaQ'
                }, 
                'mid': 7315271103, 
                'online': True, 
                'static': {
                    'countryCode': 'KR', 
                    'deviceType': '501'
                }, 
                'timestamp': 1667026693437
            }
        }
    }, 
    'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx', 
    'type': 'monitoring'
}

2.1.2. 청소 일시 중지 

{
    'data': {
        'state': {
            'reported': {
                'ROBOT_STATE': 'PAUSE_EDGE', 
                'meta': {
                    'allDeviceInfoUpdate': False, 
                    'messageId': 'YcVQeqfmT_W-G_R4w19aKw'
                }, 
                'mid': 7315271448, 
                'online': True, 
                'static': {
                    'countryCode': 'KR', 
                    'deviceType': '501'
                }, 
                'timestamp': 1667026693757
            }
        }
    }, 
    'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx', 
    'type': 'monitoring'
}

2.1.3. 청소 종료 및 충전대 복귀

{
    'data': {
        'state': {
            'reported': {
                'ROBOT_STATE': 'HOMING', 
                'meta': {
                    'allDeviceInfoUpdate': False, 
                    'messageId': 'XXT1tkQoQkW5Ao-DECijxA'
                }, 
                'mid': 7315272141, 
                'online': True, 
                'static': {
                    'countryCode': 'KR', 
                    'deviceType': '501'
                }, 
                'timestamp': 1667026694495
            }
        }
    }, 
    'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx', 
    'type': 'monitoring'
}

2.2. ROBOT STATE 정리 및 메시지 해석 코드 수정

위와 같이 로봇의 상태가 변경될 때, 'data' > 'state' > 'reported' > 'ROBOT_STATE' 의 문자열 값이 달라지는 것을 알 수 있었다

위 사례를 포함해 내가 이리저리 만지면서 발견한 ROBOT_STATE 도메인 값들은 다음과 같다

  • 'SLEEP': 절전
  • 'CHARGING': 충전 중
  • 'INITAILIZING': 청소 시작 초기화 상태 (로봇의 현재 위치 파악)
    ※ 내가 오타낸게 아니다... ㅋㅋㅋ 실제로 MQTT 메시지가 이렇게 수신된다;;
  • 'CLEAN_SELECT': 선택 영역 청소 시작/청소 중
  • 'CLEAN_EDGE': 전체 영역 청소 시작/청소 중, EDGE = 꼼꼼청소 모드인듯?
  • 'PAUSE_EDGE': 청소 일시 중지 
  • 'HOMING': 충전대 복귀
저런 사소한 오타는 청소기 펌웨어 개발자 혹은 ThinQ 앱 개발자의 실수로 보이는데.. (혹은 하청업체;;), 두 파트에서 서로 교차검증이 되지 않았다는 사실이 놀랍다 ㅋㅋ K-대기업의 위엄..
혹은 뭐 서로 다 알고 있는 상황에서 "이게 뭔 대수라고" 하면서 넘어갔을지도?

또한, 비정상 상황이 발생할 경우

'ROBOT_STATE': {
    'EMERGENCY': 'ROBOT_LIFT'
}

와 같이 dictionary 형으로 넘어오는 경우도 볼 수 있었다 (위 상황은 청소기를 위로 들어올려 움직이지 못하게 된 상태)

이 외에도 여러 상태값이 존재하겠지만, 어차피 나는 '청소기가 청소중인지 아닌지 여부'만 알면 되기 때문에 모든 상태를 알아낼 필요없이 다음과 같이 MQTT 메시지 해석 구문을 변경했다

class ThinQ:
    def onMqttClientMessage(self, _, userdata, message):
        msg_dict = json.loads(message.payload.decode("utf-8"))
        data = msg_dict.get('data')
        deviceId = msg_dict.get('deviceId')
        msg_type = msg_dict.get('type')
        if data is not None and deviceId is not None and msg_type is not None:
            if deviceId == self.robot_cleaner_dev_id:
                state = data.get('state')
                reported = state.get('reported')
                robot_state = reported.get('ROBOT_STATE')
                if robot_state is not None:
                    topic = self.mqtt_topic + '/robotcleaner/state'
                    if 'CLEAN' in robot_state or robot_state in ['INITAILIZING', 'HOMING']:
                        cleaning = 1
                    else:
                        cleaning = 0
                    message = {'cleaning': cleaning}
                    self.sig_publish_mqtt.emit(topic, message)

ROBOT_STATE 문자열 내에 'CLEAN'이 포함되거나, INITAILIZING, HOMING 중일 때는 로봇 청소기가 움직이는 상황으로 Notify할 수 있게 구현했다 (충전대 복귀 상황에서도 절수 페달을 건드릴 수도 있으니..)

2.3. GitHub Commit

위의 작업 내용은 모두 아래 저장소에 Commit 해두었다

Commit ID: e31b37e6bfcc9fa23cad1ba17cfdc116d8d54048

https://github.com/YOGYUI/HomeNetwork/tree/main/Hillstate-Gwanggyosan

 

GitHub - YOGYUI/HomeNetwork: HomeNetwork(Homebridge) Repo

HomeNetwork(Homebridge) Repo. Contribute to YOGYUI/HomeNetwork development by creating an account on GitHub.

github.com

3. Homebridge 액세서리 추가

액세서리는 단순하게 mqtt-thing 플러그인의 재실 센서(occupancy sensor)로 구현했다

{
    "accessory": "mqttthing",
    "type": "occupancySensor",
    "name": "Robot Cleaner Activated (MQTT)",
    "topics": {
        "getOccupancyDetected": {
            "topic": "home/hillstate/thinq/robotcleaner/state",
            "apply": "return (JSON.parse(message).cleaning == 1);"
        }
    },
    "integerValue": true,
    "onValue": 1,
    "offValue": 0,
    "logMqtt": false
}

로봇 청소기가 청소 중이라면 재실 센서가 감지되고, 감지되면 싱크대 절수 페달 자동 잠금 기능을 활성화하면 된다

 

홈브릿지 재시작 후 재실 센서가 추가된 것을 확인

4. 애플 홈 자동화 추가

애플 홈 앱에서 자동화를 추가해주자

[센서 감지가 있는 경우] → [물걸레 로봇청소기 재실 센서 선택] → [사람이 감지될 때] → [싱크대 자동 잠금] → [켜기]

 

마찬가지로 다음과 같이 비활성화 조건 추가

[센서 감지가 있는 경우] → [물걸레 로봇청소기 재실 센서 선택] → [사람이 감지되지 않을 때] → [싱크대 자동 잠금] → [끄기]

5. DEMO

로봇청소기 청소가 시작되면 재실 센서 '사람이 감지됨' 상태가 되며, 이에 따라 싱크대 자동 잠금 스위치가 활성화되는 것을 볼 수 있다

 

 

 

 

싱크대 자동 잠금 스위치가 활성화된 상태에서 절수 페달을 눌러 수전을 개방하면, 일정 시간 (5초) 뒤 자동으로 밸브가 닫혀서 물이 흐르지 않게 된다! (열려있는 시간은 ESP32 임베디드 보드로 별도로 설정 가능)

※ 로봇청소기가 절수 페달을 건드리는 걸 찍어보고 싶었는데, 굉장히 드물게 어쩌다 한 번 건드릴 정도로 빈도가 낮다보니 그냥 발로 누른 모습을 찍었다 ㅎㅎ...

 

이제 로봇청소기를 돌릴 때 싱크대 수전을 잠궜는지 확인하지 않고도 안심하고 외출을 할 수 있게 됐다!!!

6. Summary

시스템 도식은 다음과 같다

  • SBC(라즈베리파이4) 상에서 Homebridge, Mosquitto (mqtt broker) 구동 중
  • 홈네트워크 연동 코드는 파이썬으로 구동 중
  • ThinQ API를 통해 AWS IoT Core 접속 인증 받은 후 나의 계정에 할당된 MQTT topic 구독 (subscribe)
  • 로봇청소기는 자신의 상태를 AWS IoT Core로 MQTT 메시지 발행 (Publish)
  • MQTT 메시지를 통해 로봇청소기의 상태를 모니터링, 만약 로봇청소기가 움직이는 상태일 경우 JATA 절수페달 연동 보드(ESP32 기반 임베디드 보드)에 싱크대 수전 밸브 자동 잠금 기능 활성화 (Homebride + Apple Home 자동화)

2달 가까이 걸린 프로젝트가 마침내 마무리되었다

속이 아주 후련하다 ^^

 

끝~!

반응형