일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Bestin
- Apple
- 배당
- 월패드
- 미국주식
- esp32
- 오블완
- raspberry pi
- cluster
- 파이썬
- 티스토리챌린지
- 나스닥
- 홈네트워크
- 공모주
- 현대통신
- SK텔레콤
- Python
- ConnectedHomeIP
- matter
- Espressif
- MQTT
- 매터
- 코스피
- Home Assistant
- RS-485
- homebridge
- 힐스테이트 광교산
- 해외주식
- 애플
- 국내주식
- Today
- Total
YOGYUI
LG 물걸레 로봇청소기 (코드제로 M9) + 싱크대 절수 페달 연동 (ThinQ + Homebridge) 본문
LG 물걸레 로봇청소기 (코드제로 M9) + 싱크대 절수 페달 연동 (ThinQ + Homebridge)
요겨 2022. 10. 29. 22:05
싱크대 절수페달 IoT 연동 마지막 단계!
매주 아주 유용하게 쓰고 있는 LG전자의 물걸레 로봇청소기 코드제로 M9을 홈네트워크와 연동시켜보자
[목표] 로봇청소기가 청소중일 때, 싱크대 수전에 물이 흐를 경우 일정 시간이 지나면 수전 밸브가 자동으로 닫히도록 기능 구현 (물낭비 방지)
싱크대 절수 페달 및 수전은 아래 글과 같이 IoT 환경을 구축해뒀다
아쉽게도 Homebridge의 ThinQ 플러그인은 로봇청소기를 지원하지 않는다
Homebridge - LG ThinQ 연동하기 (애플 홈 연동)
결국 ThinQ API를 사용하는 방법을 다른 개발자가 만든 소스코드를 참고하여 파이썬으로 구현 완료!
1. 파이썬 코딩 (ThinQ 클래스)
ThinQ 클래스를 다음과 같이 구현해줬다
import os
import sys
import json
import hmac
import base64
import random
import hashlib
import datetime
from regex import E
import requests
import email.utils
import urllib.parse
from OpenSSL import crypto
from typing import Union, List
import paho.mqtt.client as mqtt
from Common import writeLog, Callback
class ThinQ:
client_id: Union[str, None] = None
user_no: Union[str, None] = None
access_token: Union[str, None] = None
country_code: str = 'KR'
language_code: str = 'ko-KR'
api_key: str = ''
api_client_id: str = ''
refresh_token: str = ''
oauth_secret_key: str = ''
app_client_id: str = ''
app_key: str = ''
uri_thinq1: Union[str, None] = None
uri_thinq2: Union[str, None] = None
uri_oauth: Union[str, None] = None
subscribe_topics: List[str]
mqtt_client: mqtt.Client
device_discover_list: List[dict]
robot_cleaner_dev_id: str = ''
mqtt_topic: str = ''
def __init__(self, **kwargs):
self.sig_publish_mqtt = Callback(str, dict)
self.subscribe_topics = list()
self.device_discover_list = list()
if 'country_code' in kwargs.keys():
self.country_code = kwargs.get('country_code')
if 'language_code' in kwargs.keys():
self.language_code = kwargs.get('language_code')
if 'api_key' in kwargs.keys():
self.api_key = kwargs.get('api_key')
if 'api_client_id' in kwargs.keys():
self.api_client_id = kwargs.get('api_client_id')
if 'refresh_token' in kwargs.keys():
self.refresh_token = kwargs.get('refresh_token')
if 'oauth_secret_key' in kwargs.keys():
self.oauth_secret_key = kwargs.get('oauth_secret_key')
if 'app_client_id' in kwargs.keys():
self.app_client_id = kwargs.get('app_client_id')
if 'app_key' in kwargs.keys():
self.app_key = kwargs.get('app_key')
if 'robot_cleaner_dev_id' in kwargs.keys():
self.robot_cleaner_dev_id = kwargs.get('robot_cleaner_dev_id')
if 'mqtt_topic' in kwargs.keys():
self.mqtt_topic = kwargs.get('mqtt_topic')
self.generate_rsa_csr_pemfiles()
self.get_aws_root_ca_pem()
def start(self):
if not self.query_thinq_uris():
return
if not self.query_oauth_uris():
return
if not self.query_access_token():
return
if not self.query_user_number():
return
if not self.query_home_device_list():
return
if not self.get_certificate_from_server():
return
if not self.connect_mqtt_broker():
return
@staticmethod
def generate_random_string(length: int) -> str:
characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
result = ''
for i in range(length):
result += characters[random.randint(0, len(characters) - 1)]
return result
@staticmethod
def generate_signature(message: str, key: str) -> str:
hmac_obj = hmac.new(key.encode(encoding='utf-8'), message.encode(encoding='utf-8'), hashlib.sha1)
hashed = hmac_obj.digest()
b64_encoded = base64.b64encode(hashed)
return b64_encoded.decode(encoding='utf-8')
def generate_default_header(self) -> dict:
headers = {
'x-api-key': self.api_key,
'x-thinq-app-ver': '3.6.1200',
'x-thinq-app-type': 'NUTS',
'x-thinq-app-level': 'PRD',
'x-thinq-app-os': 'ANDROID',
'x-thinq-app-logintype': 'LGE',
'x-service-code': 'SVC202',
'x-country-code': self.country_code,
'x-language-code': self.language_code,
'x-service-phase': 'OP',
'x-origin': 'app-native',
'x-model-name': 'samsung/SM-G930L',
'x-os-version': 'AOS/7.1.2',
'x-app-version': 'LG ThinQ/3.6.12110',
'x-message-id': self.generate_random_string(22),
'user-agent': 'okhttp/3.14.9'
}
headers['x-client-id'] = self.api_client_id if self.client_id is None else self.client_id
if self.user_no is not None:
headers['x-user-no'] = self.user_no
if self.access_token is not None:
headers['x-emp-token'] = self.access_token
return headers
def query_thinq_uris(self) -> bool:
result: bool
url = "https://route.lgthinq.com:46030/v1/service/application/gateway-uri"
response = requests.get(url, headers=self.generate_default_header())
if response.status_code == 200:
response_json = json.loads(response.text)
result = response_json.get('result')
self.uri_thinq1 = result.get('thinq1Uri')
self.uri_thinq2 = result.get('thinq2Uri')
elapsed = response.elapsed.microseconds
writeLog('query thinq uri success ({:g} msec)'.format(elapsed / 1000), self)
result = True
else:
writeLog(f'failed to query thinq uri ({response.status_code}, {response.text})', self)
result = False
return result
def query_oauth_uris(self) -> bool:
result: bool
if self.uri_thinq1 is None:
writeLog(f'thinq uri is not queried yet!', self)
return False
url = self.uri_thinq1 + '/common/gatewayUriList'
headers = {
'Accept': 'application/json',
'x-thinq-application-key': 'wideq',
'x-thinq-security-key': 'nuts_securitykey'
}
data = {
'lgedmRoot': {
'countryCode': self.country_code,
'langCode': self.language_code
}
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
response_json = json.loads(response.text)
lgemdRoot = response_json.get('lgedmRoot')
self.uri_oauth = lgemdRoot.get('oauthUri')
elapsed = response.elapsed.microseconds
writeLog('query oauth uri success ({:g} msec)'.format(elapsed / 1000), self)
result = True
else:
writeLog(f'failed to query oauth uri ({response.status_code}, {response.text})', self)
result = False
return result
def query_access_token(self) -> bool:
result: bool
if self.uri_oauth is None:
writeLog(f'oauth uri is not queried yet!', self)
return False
url = self.uri_oauth + '/oauth/1.0/oauth2/token'
data = {
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token
}
requestUrl = '/oauth/1.0/oauth2/token' + '?' + urllib.parse.urlencode(data)
now = datetime.datetime.utcnow()
timestamp = email.utils.format_datetime(now)
signature = self.generate_signature(f"{requestUrl}\n{timestamp}", self.oauth_secret_key)
headers = {
'x-lge-app-os': 'ADR',
'x-lge-appkey': self.app_client_id,
'x-lge-oauth-signature': signature,
'x-lge-oauth-date': timestamp,
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded'
}
response = requests.post(url, params=data, headers=headers)
if response.status_code == 200:
response_json = json.loads(response.text)
self.access_token = response_json.get('access_token')
# expires_in_ = int(response_json.get('expires_in'))
elapsed = response.elapsed.microseconds
writeLog('query access token success ({:g} msec)'.format(elapsed / 1000), self)
result = True
else:
writeLog(f'failed to query access token ({response.status_code}, {response.text})', self)
result = False
return result
def query_user_number(self) -> bool:
result: bool
if self.access_token is None:
writeLog(f'access token is not queried yet!', self)
return False
url = self.uri_oauth + '/users/profile'
now = datetime.datetime.utcnow()
timestamp = email.utils.format_datetime(now)
signature = self.generate_signature(f"/users/profile\n{timestamp}", self.oauth_secret_key)
headers = {
'Accept': 'application/json',
'Authorization': 'Bearer ' + self.access_token,
'X-Lge-Svccode': 'SVC202',
'X-Application-Key': self.app_key,
'lgemp-x-app-key': self.app_client_id,
'X-Device-Type': 'M01',
'X-Device-Platform': 'ADR',
'x-lge-oauth-date': timestamp,
'x-lge-oauth-signature': signature
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
response_json = json.loads(response.text)
account = response_json.get('account')
self.user_no = account.get('userNo')
elapsed = response.elapsed.microseconds
writeLog('query user number success ({:g} msec)'.format(elapsed / 1000), self)
# create client id
obj_hash = hashlib.sha256()
now = int(datetime.datetime.now().timestamp())
obj_hash.update((self.user_no + f'{now}').encode(encoding='utf-8'))
self.client_id = obj_hash.hexdigest()
result = True
else:
writeLog(f'failed to query user number ({response.status_code}, {response.text})', self)
result = False
return result
def query_home_device_list(self) -> bool:
result: bool
if self.uri_thinq2 is None:
writeLog(f'thinq uri is not queried yet!', self)
return False
url = self.uri_thinq2 + '/service/homes'
response = requests.get(url, headers=self.generate_default_header())
if response.status_code == 200:
response_json = json.loads(response.text)
result = response_json.get('result')
home_list = result.get('item')
self.device_discover_list.clear()
for obj in home_list:
homeId = obj.get('homeId')
url2 = self.uri_thinq2 + '/service/homes/' + homeId
response = requests.get(url2, headers=self.generate_default_header())
if response.status_code == 200:
response_json = json.loads(response.text)
result = response_json.get('result')
devices = result.get('devices')
self.device_discover_list.extend(devices)
self.print_device_discover_list()
result = True
else:
writeLog(f'failed to query home - device list ({response.status_code}, {response.text})', self)
result = False
return result
def print_device_discover_list(self):
writeLog(f'discovered {len(self.device_discover_list)} device(s)', self)
for device in self.device_discover_list:
dev_id = device.get('deviceId')
dev_type = device.get('deviceType')
modelName = device.get('modelName')
alias = device.get('alias')
writeLog(f'{alias}::{modelName}::{dev_type}::{dev_id}', self)
def generate_rsa_csr_pemfiles(self):
pubkey_pem_path = os.path.join(CURPATH, 'pubkey.pem')
privkey_pem_path = os.path.join(CURPATH, 'privkey.pem')
csr_pem_path = os.path.join(CURPATH, 'csr.pem')
if not os.path.isfile(pubkey_pem_path) or not os.path.isfile(privkey_pem_path) or not os.path.isfile(csr_pem_path):
keypair = crypto.PKey()
keypair.generate_key(crypto.TYPE_RSA, 2048)
pubkey_pem = crypto.dump_publickey(crypto.FILETYPE_PEM, keypair).decode(encoding='utf-8')
with open(pubkey_pem_path, 'w') as fp:
fp.write(pubkey_pem)
privkey_pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, keypair).decode(encoding='utf-8')
with open(privkey_pem_path, 'w') as fp:
fp.write(privkey_pem)
req = crypto.X509Req()
req.get_subject().CN = "AWS IoT Certificate"
req.get_subject().O = "Amazon"
req.set_pubkey(keypair)
req.sign(keypair, "sha256")
csr_pem = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req).decode(encoding='utf-8')
with open(csr_pem_path, 'w') as fp:
fp.write(csr_pem)
def get_certificate_from_server(self) -> bool:
result: bool
if self.uri_thinq2 is None:
writeLog(f'thinq uri is not queried yet!', self)
return False
csr_pem_path = os.path.join(CURPATH, 'csr.pem')
if not os.path.isfile(csr_pem_path):
writeLog(f'cannot find csr pem file', self)
return False
url = self.uri_thinq2 + '/service/users/client'
response = requests.post(url, headers=self.generate_default_header())
if response.status_code == 200:
with open(csr_pem_path, 'r') as fp:
csr_pem = fp.read()
url = self.uri_thinq2 + '/service/users/client/certificate'
csr_pem = csr_pem.replace('-----BEGIN CERTIFICATE REQUEST-----', '')
csr_pem = csr_pem.replace('-----END CERTIFICATE REQUEST-----', '')
csr_pem = csr_pem.replace('\r\n', '')
data = {
'csr': csr_pem
}
response = requests.post(url, json=data, headers=self.generate_default_header())
if response.status_code == 200:
response_json = json.loads(response.text)
result = response_json.get('result')
certificate_pem = result.get('certificatePem')
cert_pem_path = os.path.join(CURPATH, 'aws_cert.pem')
with open(cert_pem_path, 'w') as fp:
fp.write(certificate_pem)
self.subscribe_topics.clear()
subscriptions = result.get('subscriptions') # 구독할 Topic
self.subscribe_topics.extend(subscriptions)
elapsed = response.elapsed.microseconds
writeLog('query certificate success ({:g} msec)'.format(elapsed / 1000), self)
result = True
else:
writeLog(f'failed to query certificate ({response.status_code}, {response.text})', self)
result = False
else:
writeLog(f'failed to visit service ({response.status_code}, {response.text})', self)
result = False
return result
def get_aws_root_ca_pem(self) -> bool:
result: bool
rootca_pem_path = os.path.join(CURPATH, 'aws_root_ca.pem')
if os.path.isfile(rootca_pem_path):
return True
url = 'https://www.amazontrust.com/repository/AmazonRootCA1.pem'
response = requests.get(url)
if response.status_code == 200:
rootca_pem = response.text
with open(rootca_pem_path, 'w') as fp:
fp.write(rootca_pem)
elapsed = response.elapsed.microseconds
writeLog('query root CA from AWS ({:g} msec)'.format(elapsed / 1000), self)
result = True
else:
writeLog(f'failed to query root CA from AWS ({response.status_code}, {response.text})', self)
result = False
return result
def connect_mqtt_broker(self) -> bool:
if self.client_id is None:
writeLog(f'client id is not generated yet!', self)
return False
rootca_pem_path = os.path.join(CURPATH, 'aws_root_ca.pem')
if not os.path.isfile(rootca_pem_path):
writeLog('cannot find aws root ca pem file', self)
return False
cert_pem_path = os.path.join(CURPATH, 'aws_cert.pem')
if not os.path.isfile(cert_pem_path):
writeLog('cannot find aws cert pem file', self)
return False
privkey_pem_path = os.path.join(CURPATH, 'privkey.pem')
if not os.path.isfile(privkey_pem_path):
writeLog('cannot find private key pem file', self)
return False
# get mqtt broker host address
url = "https://common.lgthinq.com/route"
response = requests.get(url, headers=self.generate_default_header())
mqttserver = ''
if response.status_code == 200:
response_json = json.loads(response.text)
result = response_json.get('result')
mqttserver = result.get('mqttServer')
else:
return False
idx = mqttserver.rfind(':')
mqtt_host = mqttserver[:idx]
mqtt_port = int(mqttserver[idx+1:])
self.mqtt_client = mqtt.Client(client_id=self.client_id)
self.mqtt_client.on_connect = self.onMqttClientConnect
self.mqtt_client.on_disconnect = self.onMqttClientDisconnect
self.mqtt_client.on_message = self.onMqttClientMessage
self.mqtt_client.on_subscribe = self.onMqttClientSubscribe
self.mqtt_client.tls_set(ca_certs=rootca_pem_path, certfile=cert_pem_path, keyfile=privkey_pem_path)
self.mqtt_client.connect(host=mqtt_host[6:], port=mqtt_port)
self.mqtt_client.loop_start()
def onMqttClientConnect(self, _, userdata, flags, rc):
writeLog('connected to aws iot core (mqtt broker): {}, {}, {}'.format(userdata, flags, rc), self)
for topic in self.subscribe_topics:
self.mqtt_client.subscribe(topic)
def onMqttClientDisconnect(self, _, userdata, rc):
writeLog('disconnected from aws iot core (mqtt broker): {}, {}, {}'.format(userdata, rc), self)
def onMqttClientSubscribe(self, _, userdata, mid, granted_qos):
writeLog('mqtt subscribe: {}, {}, {}'.format(userdata, mid, granted_qos), self)
def onMqttClientMessage(self, _, userdata, message):
msg_dict = json.loads(message.payload.decode("utf-8"))
print(msg_dict)
ThinQ API 접근을 통한 Access Token 획득, MQTT Broker (AWS IoT Core) 접속을 위한 클라이언트 ID 발급 등의 과정은 지난 글에서 다룬 것과 완전히 동일한 프로세스를 따른다 (링크)
이제 HomeNetwork 설정 파일(Config.xml)에 ThinQ 관련 항목도 추가해주자
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<config>
<thinq>
<refresh_token>refresh token from LGE login session</refresh_token>
<api_key>VGhpblEyLjAgU0VSVklDRQ==</api_key>
<api_client_id>c713ea8e50f657534ff8b9d373dfebfc2ed70b88285c26b8ade49868c0b164d9</api_client_id>
<oauth_secret_key>c053c2a6ddeb7ad97cb0eed0dcb31cf8</oauth_secret_key>
<app_client_id>LGAO221A02</app_client_id>
<application_key>6V1V8H2BN5P9ZQGOI5DAQ92YZBDO3EK9</application_key>
<country_code>KR</country_code>
<language_code>ko-KR</language_code>
</thinq>
</config>
refresh_token 값만 본인 계정의 토큰 값으로 바꿔 입력하면 된다
Home.py의 Home 클래스의 멤버변수로 thinq를 추가한 뒤에 초기화 단계를 다음과 같이 추가해주면 사용 준비 완료!
node = root.find('thinq')
robot_cleaner_node = node.find('robot_cleaner')
robot_cleaner_dev_id = robot_cleaner_node.find('dev_id').text
mqtt_node = node.find('mqtt')
mqtt_topic = mqtt_node.find('publish').text
self.thinq = ThinQ(
country_code=node.find('country_code').text,
language_code=node.find('language_code').text,
api_key=node.find('api_key').text,
api_client_id=node.find('api_client_id').text,
refresh_token=node.find('refresh_token').text,
oauth_secret_key=node.find('oauth_secret_key').text,
app_client_id=node.find('app_client_id').text,
app_key=node.find('application_key').text,
robot_cleaner_dev_id=robot_cleaner_dev_id,
mqtt_topic=mqtt_topic
)
self.thinq.sig_publish_mqtt.connect(self.onThinqPublishMQTT)
1.1. ThinQ 앱에 등록된 Device 리스트 조회하기
ThinQ 클래스 내에 print_device_discover_list 메서드를 구현해뒀다
class ThinQ:
def print_device_discover_list(self):
for device in self.device_discover_list:
dev_id = device.get('deviceId')
dev_type = device.get('deviceType')
modelName = device.get('modelName')
alias = device.get('alias')
writeLog(f'{alias}::{modelName}::{dev_type}::{dev_id}', self)
Home 초기화 단계에서 AWS IoT Core에 정상적으로 접속되면
콘솔에 위와 같이 device별 고유 ID값이 출력된다
물걸레 로봇청소기 ID가 필요하니 다음과 같이 Config.xml 파일에 추가한 뒤 ThinQ 객체 초기화 시에 인자로 넣어주도록 하자
<thinq>
<robot_cleaner>
<dev_id>d6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx</dev_id>
</robot_cleaner>
<mqtt>
<publish>home/hillstate/thinq</publish>
</mqtt>
</thinq>
2. 로봇 청소기 MQTT 메시지 확인
실제로 로봇 청소기를 구동시킨 뒤 MQTT 메시지가 어떤 형태로 수신되는지 확인해봤다 (JSON)
class ThinQ:
def onMqttClientMessage(self, _, userdata, message):
msg_dict = json.loads(message.payload.decode("utf-8"))
print(msg_dict)
2.1.1. 청소 시작 시
{
'data': {
'state': {
'reported': {
'ROBOT_STATE': 'INITAILIZING',
'meta': {
'allDeviceInfoUpdate': False,
'messageId': 'qbpr2L99QUmtOpnGRmWOhA'
},
'mid': 7315258837,
'online': True,
'static': {
'countryCode': 'KR', 'deviceType': '501'
},
'timestamp': 1667026681198
}
}
},
'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx',
'type': 'monitoring'
}
{
'data': {
'state': {
'reported': {
'BATT': '3',
'meta': {
'allDeviceInfoUpdate': False,
'messageId': 'HtL1nQraRnO4-_6-_bFjxg'
},
'mid': 7315259186,
'online': True,
'static': {
'countryCode': 'KR',
'deviceType': '501'
},
'timestamp': 1667026681430
}
}
},
'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx',
'type': 'monitoring'
}
{
'data': {
'state': {
'reported': {
'MODE': 'EDGE',
'meta': {
'allDeviceInfoUpdate': False,
'messageId': 'lDuf_77xTCuxbyEoW6O1Pw'
},
'mid': 7315259531,
'online': True,
'static': {
'countryCode': 'KR',
'deviceType': '501'
},
'timestamp': 1667026681909
}
}
},
'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx',
'type': 'monitoring'
}
{
'data': {
'state': {
'reported': {
'ROBOT_STATE': 'CLEAN_EDGE',
'meta': {
'allDeviceInfoUpdate': False,
'messageId': 'knSFYagqRWi1m188y_aFaQ'
},
'mid': 7315271103,
'online': True,
'static': {
'countryCode': 'KR',
'deviceType': '501'
},
'timestamp': 1667026693437
}
}
},
'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx',
'type': 'monitoring'
}
2.1.2. 청소 일시 중지
{
'data': {
'state': {
'reported': {
'ROBOT_STATE': 'PAUSE_EDGE',
'meta': {
'allDeviceInfoUpdate': False,
'messageId': 'YcVQeqfmT_W-G_R4w19aKw'
},
'mid': 7315271448,
'online': True,
'static': {
'countryCode': 'KR',
'deviceType': '501'
},
'timestamp': 1667026693757
}
}
},
'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx',
'type': 'monitoring'
}
2.1.3. 청소 종료 및 충전대 복귀
{
'data': {
'state': {
'reported': {
'ROBOT_STATE': 'HOMING',
'meta': {
'allDeviceInfoUpdate': False,
'messageId': 'XXT1tkQoQkW5Ao-DECijxA'
},
'mid': 7315272141,
'online': True,
'static': {
'countryCode': 'KR',
'deviceType': '501'
},
'timestamp': 1667026694495
}
}
},
'deviceId': 'd6c1a158-xxxx-xxxx-xxxx-xxxxxxxxxxxx',
'type': 'monitoring'
}
2.2. ROBOT STATE 정리 및 메시지 해석 코드 수정
위와 같이 로봇의 상태가 변경될 때, 'data' > 'state' > 'reported' > 'ROBOT_STATE' 의 문자열 값이 달라지는 것을 알 수 있었다
위 사례를 포함해 내가 이리저리 만지면서 발견한 ROBOT_STATE 도메인 값들은 다음과 같다
- 'SLEEP': 절전
- 'CHARGING': 충전 중
- 'INITAILIZING': 청소 시작 초기화 상태 (로봇의 현재 위치 파악)
※ 내가 오타낸게 아니다... ㅋㅋㅋ 실제로 MQTT 메시지가 이렇게 수신된다;; - 'CLEAN_SELECT': 선택 영역 청소 시작/청소 중
- 'CLEAN_EDGE': 전체 영역 청소 시작/청소 중, EDGE = 꼼꼼청소 모드인듯?
- 'PAUSE_EDGE': 청소 일시 중지
- 'HOMING': 충전대 복귀
저런 사소한 오타는 청소기 펌웨어 개발자 혹은 ThinQ 앱 개발자의 실수로 보이는데.. (혹은 하청업체;;), 두 파트에서 서로 교차검증이 되지 않았다는 사실이 놀랍다 ㅋㅋ K-대기업의 위엄..
혹은 뭐 서로 다 알고 있는 상황에서 "이게 뭔 대수라고" 하면서 넘어갔을지도?
또한, 비정상 상황이 발생할 경우
'ROBOT_STATE': {
'EMERGENCY': 'ROBOT_LIFT'
}
와 같이 dictionary 형으로 넘어오는 경우도 볼 수 있었다 (위 상황은 청소기를 위로 들어올려 움직이지 못하게 된 상태)
이 외에도 여러 상태값이 존재하겠지만, 어차피 나는 '청소기가 청소중인지 아닌지 여부'만 알면 되기 때문에 모든 상태를 알아낼 필요없이 다음과 같이 MQTT 메시지 해석 구문을 변경했다
class ThinQ:
def onMqttClientMessage(self, _, userdata, message):
msg_dict = json.loads(message.payload.decode("utf-8"))
data = msg_dict.get('data')
deviceId = msg_dict.get('deviceId')
msg_type = msg_dict.get('type')
if data is not None and deviceId is not None and msg_type is not None:
if deviceId == self.robot_cleaner_dev_id:
state = data.get('state')
reported = state.get('reported')
robot_state = reported.get('ROBOT_STATE')
if robot_state is not None:
topic = self.mqtt_topic + '/robotcleaner/state'
if 'CLEAN' in robot_state or robot_state in ['INITAILIZING', 'HOMING']:
cleaning = 1
else:
cleaning = 0
message = {'cleaning': cleaning}
self.sig_publish_mqtt.emit(topic, message)
ROBOT_STATE 문자열 내에 'CLEAN'이 포함되거나, INITAILIZING, HOMING 중일 때는 로봇 청소기가 움직이는 상황으로 Notify할 수 있게 구현했다 (충전대 복귀 상황에서도 절수 페달을 건드릴 수도 있으니..)
2.3. GitHub Commit
위의 작업 내용은 모두 아래 저장소에 Commit 해두었다
Commit ID: e31b37e6bfcc9fa23cad1ba17cfdc116d8d54048
https://github.com/YOGYUI/HomeNetwork/tree/main/Hillstate-Gwanggyosan
3. Homebridge 액세서리 추가
액세서리는 단순하게 mqtt-thing 플러그인의 재실 센서(occupancy sensor)로 구현했다
{
"accessory": "mqttthing",
"type": "occupancySensor",
"name": "Robot Cleaner Activated (MQTT)",
"topics": {
"getOccupancyDetected": {
"topic": "home/hillstate/thinq/robotcleaner/state",
"apply": "return (JSON.parse(message).cleaning == 1);"
}
},
"integerValue": true,
"onValue": 1,
"offValue": 0,
"logMqtt": false
}
로봇 청소기가 청소 중이라면 재실 센서가 감지되고, 감지되면 싱크대 절수 페달 자동 잠금 기능을 활성화하면 된다
홈브릿지 재시작 후 재실 센서가 추가된 것을 확인
4. 애플 홈 자동화 추가
애플 홈 앱에서 자동화를 추가해주자
[센서 감지가 있는 경우] → [물걸레 로봇청소기 재실 센서 선택] → [사람이 감지될 때] → [싱크대 자동 잠금] → [켜기]
마찬가지로 다음과 같이 비활성화 조건 추가
[센서 감지가 있는 경우] → [물걸레 로봇청소기 재실 센서 선택] → [사람이 감지되지 않을 때] → [싱크대 자동 잠금] → [끄기]
5. DEMO
로봇청소기 청소가 시작되면 재실 센서 '사람이 감지됨' 상태가 되며, 이에 따라 싱크대 자동 잠금 스위치가 활성화되는 것을 볼 수 있다
싱크대 자동 잠금 스위치가 활성화된 상태에서 절수 페달을 눌러 수전을 개방하면, 일정 시간 (5초) 뒤 자동으로 밸브가 닫혀서 물이 흐르지 않게 된다! (열려있는 시간은 ESP32 임베디드 보드로 별도로 설정 가능)
※ 로봇청소기가 절수 페달을 건드리는 걸 찍어보고 싶었는데, 굉장히 드물게 어쩌다 한 번 건드릴 정도로 빈도가 낮다보니 그냥 발로 누른 모습을 찍었다 ㅎㅎ...
이제 로봇청소기를 돌릴 때 싱크대 수전을 잠궜는지 확인하지 않고도 안심하고 외출을 할 수 있게 됐다!!!
6. Summary
시스템 도식은 다음과 같다
- SBC(라즈베리파이4) 상에서 Homebridge, Mosquitto (mqtt broker) 구동 중
- 홈네트워크 연동 코드는 파이썬으로 구동 중
- ThinQ API를 통해 AWS IoT Core 접속 인증 받은 후 나의 계정에 할당된 MQTT topic 구독 (subscribe)
- 로봇청소기는 자신의 상태를 AWS IoT Core로 MQTT 메시지 발행 (Publish)
- MQTT 메시지를 통해 로봇청소기의 상태를 모니터링, 만약 로봇청소기가 움직이는 상태일 경우 JATA 절수페달 연동 보드(ESP32 기반 임베디드 보드)에 싱크대 수전 밸브 자동 잠금 기능 활성화 (Homebride + Apple Home 자동화)
2달 가까이 걸린 프로젝트가 마침내 마무리되었다
속이 아주 후련하다 ^^
끝~!
'홈네트워크(IoT) > 힐스테이트 광교산' 카테고리의 다른 글
힐스테이트 광교산::공동출입문용 RFID 스티커형 태그 복사 (스마트폰) (1) | 2022.11.21 |
---|---|
힐스테이트 광교산::주방 싱크대 LED 조명 IoT 연동 (2) | 2022.11.19 |
힐스테이트 광교산::주방 비디오폰 연동 - 세대 및 공동 현관문 제어 (애플 홈킷) (0) | 2022.10.28 |
힐스테이트 광교산::주방 비디오폰 연동 - HEMS(에너지 모니터링) (0) | 2022.10.25 |
힐스테이트 광교산::싱크대 절수페달 IoT 연동하기 - Final (0) | 2022.09.30 |