일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- homebridge
- 월패드
- Bestin
- SK텔레콤
- esp32
- 코스피
- 오블완
- Apple
- Espressif
- Home Assistant
- 미국주식
- 티스토리챌린지
- 매터
- Python
- 해외주식
- raspberry pi
- ConnectedHomeIP
- MQTT
- 힐스테이트 광교산
- 파이썬
- 애플
- 배당
- 국내주식
- 홈네트워크
- cluster
- matter
- 공모주
- 나스닥
- 현대통신
- RS-485
- Today
- Total
YOGYUI
Bestin 홈네트워크 RS485 패킷 체크섬 계산 알고리즘을 찾다! 본문
지난주, 아이파크에 거주하시는 분(할윈, harwin 님)과의 카카오톡 대화 중 의미심장한 내용을 전해들었다
바로 Bestin 홈네트워크의 RS-485 패킷 중 마지막 바이트 (편의상 체크섬 바이트)를 계산하는 알고리즘에 대한 내용!
Github의 laz-라는 유저께서 지난 8월 16일에 gist로 올려놓은 코드에 관련 내용이 나와있다https://gist.github.com/laz-/a507af756e13e64ed3aaceb236b5ad49
def bestin_sum(array):
sum = 3
for b in array:
sum = ((b ^ sum) + 1) & 0xff
return sum
단순한 XOR SUM을 약간 비틀었을 뿐인 단순한 알고리즘인데, 이걸 찾아낸 게 신통방통하다 ㅋㅋㅋ
나도 가끔 심심하면 요리조리 연구해봤는데, 도저히 답이 안나와서 거의 반 포기상태였는데 ㅠ 대단!
(256 타임스탬프 각각에 대해서 LUT까지 만들어봤는데 도저히 안되겠더라 ~.,~)
계산 알고리즘은 간단하다
- 3 (0x03)으로 값 초기화
- 패킷의 각 바이트를 순차적으로 XOR(Executive OR)한 뒤 1을 더하고 8비트로 truncation
각 loop마다 XOR를 하면서 1을 더하는 심플한 알고리즘인데... 정체를 알고나니 굉장히 허무하다 ㅋㅋ
예를 들어보자
주방(방 인덱스 1)의 첫번째 조명을 켜는 명령 패킷은 다음과 같다
02 31 0D 01 D0 01 81 00 00 00 00 04 76
※ 여기서 패킷의 5번째 바이트 0xD0는 베스틴 월패드가 디바이스랑 패킷을 주고 받을 때마다 값이 1씩 올라가는 값으로, Spin code (스핀 코드)라 부르는 게 일반적이지만 나는 편의상 그동안 타임스탬프(timestamp)라 불러왔다
조명 패킷 관련 글은 광교아이파크::조명 Apple 홈킷 연동 (3) 참고
이제 마지막 바이트 0x76 값이 위 알고리즘대로 얻어지는 지 확인해보자
packet = bytearray([0x02, 0x31, 0x0D, 0x01, 0xD0, 0x01, 0x81, 0x00, 0x00, 0x00, 0x00, 0x04, 0x76])
result = 0x03
for i, p in enumerate(packet[:-1]):
print('Step {}: {:02X} ^ {:02X} = '.format(i + 1, result, p), end=' ')
result = (result ^ p) + 1
result = result & 0xFF
print('{:02X}'.format(result))
Step 1: 03 ^ 02 = 02
Step 2: 02 ^ 31 = 34
Step 3: 34 ^ 0D = 3A
Step 4: 3A ^ 01 = 3C
Step 5: 3C ^ D0 = ED
Step 6: ED ^ 01 = ED
Step 7: ED ^ 81 = 6D
Step 8: 6D ^ 00 = 6E
Step 9: 6E ^ 00 = 6F
Step 10: 6F ^ 00 = 70
Step 11: 70 ^ 00 = 71
Step 12: 71 ^ 04 = 76
Wow~ 계산 결과와 패킷 마지막 바이트가 일치한다
과연 우연의 일치일까?
엘리베이터 하행 호출 패킷 중 몇개를 가져와서 일치하는지 여부도 확인해보자
※ 엘리베이터 명령은 다른 디바이스랑 다르게, 월패드의 타임스탬프와 싱크가 맞지 않으면 호출이 되지 않아서 타임스탬프 256개에 대한 패킷을 전부 추출한 노가다를 한 경험이 있다 ㅋㅋㅋ 관련 내용은 링크 참고
우선, 파이썬에서 for문을 쓰면 멍청하다는 소리를 듣기 십상이므로, 있어보이게 reduce 함수 (functools 모듈)를 사용해서 계산 함수를 예쁘게 만들어보자
from functools import reduce
def calculate_bestin_checksum(packet: bytearray) -> int:
return reduce(lambda x, y: ((x ^ y) + 1) & 0xFF, packet, 0x03)
엘리베이터 호출 패킷 중 5개만 임의로 가져와서 테스트해보자
(5번째 바이트 타임스탬프 값에 따라 패킷 마지막 바이트 값이 바뀌는 것을 알 수 있다)
packets = [
bytearray([0x02, 0xC1, 0x0C, 0x91, 0x1C, 0x10, 0x03, 0x00, 0x02, 0x01, 0x02, 0x58]),
bytearray([0x02, 0xC1, 0x0C, 0x91, 0x35, 0x10, 0x01, 0x00, 0x02, 0x01, 0x02, 0x85]),
bytearray([0x02, 0xC1, 0x0C, 0x91, 0xDC, 0x10, 0x01, 0x00, 0x02, 0x01, 0x02, 0x9A]),
bytearray([0x02, 0xC1, 0x0C, 0x91, 0xF0, 0x10, 0x01, 0x00, 0x02, 0x01, 0x02, 0xBE]),
bytearray([0x02, 0xC1, 0x0C, 0x91, 0xFC, 0x10, 0x01, 0x00, 0x02, 0x01, 0x02, 0xBA])
]
for packet in packets:
checksum = calculate_bestin_checksum(packet[:-1])
print('calculated: {:02X}, real value: {:02X}'.format(checksum, packet[-1]))
calculated: 58, real value: 58
calculated: 85, real value: 85
calculated: 9A, real value: 9A
calculated: BE, real value: BE
calculated: BA, real value: BA
와...
진작 알았다면 개고생할 필요가 없었던 것이었던 것이었드래요~
내가 광교아이파크 살 때는 조명, 아울렛(전기 콘센트), 도시가스 밸브, 온수 난방, 환기 (전열교환기), 엘리베이터 총 6종의 디바이스를 RS-485로 제어했었는데, 각 디바이스별로 명령 및 상태 조회 패킷을 동적으로 생성하는 함수를 만들어보자 (이전에는 로컬 XML 파일에 각 디바이스별 패킷을 기록해두고 코드가 불러오는 방식으로 구현했었다)
- 방이 몇개던, 조명이 몇개던 마음껏 제어할 수 있게 됐다
- 더이상 bestin 홈네트워크 환경에서 살고 있지 않다는 건 함정.. ㅠ
class Device:
room_index: int = 0
def __init__(self, name: str = 'Device', **kwargs):
if 'room_index' in kwargs.keys():
self.room_index = kwargs['room_index']
def make_packet_common(self, header: int, length: int, packet_type: int, timestamp: int = 0) -> bytearray:
packet = bytearray([
0x02,
header & 0xFF,
length & 0xFF,
packet_type & 0xFF,
timestamp & 0xFF
])
packet.extend(bytearray([0] * (length - 5)))
return packet
@abstractmethod
def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
pass
@abstractmethod
def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
pass
조명, 아울렛, 난방은 make_packet_common 함수로 공통화할 수 있고, 전열교환기와 가스밸브 및 엘리베이터는 각각의 Rule대로 패킷을 생성해주면 된다
(조명, 아울렛, 난방은 각 방별로 디바이스들이 배정되어 있으며 여러 개의 디바이스가 존재할 수 있으므로 디바이스 인덱스 및 방 인덱스에 따라 동적으로 생성해줘야 한다 - 난방은 방마다 1개만 존재)
make_packet_set_state, make_packet_query_state는 명령, 상태 조회 패킷을 만들어주는 부모 메서드로, device를 상속받는 자식 클래스들 각각에서 따로 구현(메서드 오버라이딩)해주면 된다
※ 각 디바이스별 패킷 명세는 링크로 걸어둔 블로그 글들을 참고~ 다시 일일이 규칙을 적자니 너무 귀찮다 ㅠ
1. 조명
class Light(Device):
def __init__(self, name: str = 'Light', index: int = 0, **kwargs):
self.index = index
super().__init__(name, **kwargs)
def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
packet = self.make_packet_common(0x31, 13, 0x01, timestamp)
packet[5] = self.room_index & 0x0F
packet[6] = 0x01 << self.index
if target:
packet[6] += 0x80
packet[11] = 0x04
else:
packet[11] = 0x00
packet[12] = calculate_bestin_checksum(packet[:-1])
return packet
def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
packet = self.make_packet_common(0x31, 7, 0x11, timestamp)
packet[5] = self.room_index & 0x0F
packet[6] = calculate_bestin_checksum(packet[:-1])
return packet
2. 아울렛
class Outlet(Device):
def __init__(self, name: str = 'Outlet', index: int = 0, **kwargs):
self.index = index
super().__init__(name, **kwargs)
def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
packet = self.make_packet_common(0x31, 13, 0x01, timestamp)
packet[5] = self.room_index & 0x0F
packet[7] = 0x01 << self.index
if target:
packet[7] += 0x80
packet[11] = 0x09 << self.index # 확실하지 않음...
else:
packet[11] = 0x00
packet[12] = calculate_bestin_checksum(packet[:-1])
return packet
def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
# 조명 쿼리 패킷과 동일한 것으로 판단됨 (어차피 응답 패킷에 조명/아울렛 정보가 같이 담겨있음)
packet = self.make_packet_common(0x31, 7, 0x11, timestamp)
packet[5] = self.room_index & 0x0F
packet[6] = calculate_bestin_checksum(packet[:-1])
return packet
3. 난방
난방은 5도 ~ 40도까지 0.5도 단위로 희망온도를 설정하는 패킷이 필요해서, 각 방별로 71개씩 총 213개 패킷을 캡쳐해서 모으는 노가다를 했었다... ㅋㅋ 이젠 희망온도만 가지고 얼마든지 패킷을 쉽게 만들 수 있다!
class Thermostat(Device):
def __init__(self, name: str = 'Thermostat', **kwargs):
super().__init__(name, **kwargs)
def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
packet = self.make_packet_common(0x28, 14, 0x12, timestamp)
packet[5] = self.room_index & 0x0F
if target:
packet[6] = 0x01
else:
packet[6] = 0x02
packet[13] = calculate_bestin_checksum(packet[:-1])
return packet
def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
packet = self.make_packet_common(0x28, 7, 0x11, timestamp)
packet[5] = self.room_index & 0x0F
packet[6] = calculate_bestin_checksum(packet[:-1])
return packet
def make_packet_set_temperature(self, target: float, timestamp: int = 0) -> bytearray:
packet = self.make_packet_common(0x28, 14, 0x12, timestamp)
packet[5] = self.room_index & 0x0F
value_int = int(target)
value_float = target - value_int
packet[7] = value_int & 0xFF
if value_float != 0:
packet[7] += 0x40
packet[13] = calculate_bestin_checksum(packet[:-1])
return packet
4. 가스 밸브
가스밸브는 '닫기' 명령에 해당하는 패킷만 만들어주면 된다
class GasValve(Device):
def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
if target:
return bytearray([]) # not allowed open valve
packet = bytearray([0x02, 0x31, 0x02, timestamp & 0xFF])
packet.extend(bytearray([0x00] * 5))
packet.append(calculate_bestin_checksum(packet))
return packet
def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
packet = bytearray([0x02, 0x31, 0x00, timestamp & 0xFF])
packet.extend(bytearray([0x00] * 5))
packet.append(calculate_bestin_checksum(packet))
return packet
5. 전열교환기
광교아이파크::환기(전열교환기) Apple 홈킷 연동 (1)
전열교환기는 On/Off 외에 3단계 풍량 조절, 자연환기 모드 설정, 타이머 설정 기능 등 부가 기능들이 있는데, 나는 풍량 조절만 구현해뒀었다 (혹시 다른 기능들에 대한 구현도 필요한 분이 계시다면 댓글로 남겨주시길~)
<22.12.19 수정> - 자연환기 설정, 타이머 설정 패킷 생성 추가
class Ventilator(Device):
def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
packet = bytearray([0x02, 0x61, 0x01, timestamp & 0xFF, 0x00])
packet.append(0x01) if target else packet.append(0x00)
packet.extend([0x01, 0x00, 0x00])
packet.append(calculate_bestin_checksum(packet))
return packet
def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
packet = bytearray([0x02, 0x61, 0x00, timestamp & 0xFF, 0x00])
packet.extend([0x00, 0x00, 0x00, 0x00])
packet.append(calculate_bestin_checksum(packet))
return packet
def make_packet_set_rotation_speed(self, target: int, timestamp: int = 0) -> bytearray:
target = max(1, min(3, target))
packet = bytearray([0x02, 0x61, 0x03, timestamp & 0xFF, 0x00, 0x00, target, 0x00, 0x00])
packet.append(calculate_bestin_checksum(packet))
return packet
def make_packet_set_natural(self, target: int, timestamp: int = 0) -> bytearray:
"""
자연환기 On/Off
:param target: 0=Off, 1=On
"""
packet = bytearray([0x02, 0x61, 0x07, timestamp & 0xFF, 0x00])
packet.append(0x10) if target else packet.append(0x00)
packet.extend([0x00, 0x00, 0x00])
packet.append(calculate_bestin_checksum(packet))
return packet
def make_packet_set_timer(self, value: int, timestamp: int = 0) -> bytearray:
"""
타이머 설정
:param value: 0=Off, others=timer value (unit=minute)
"""
packet = bytearray([0x02, 0x61, 0x04, timestamp & 0xFF, 0x00])
packet.append(value & 0xFF)
packet.extend([0x00, 0x00, 0x00])
packet.append(calculate_bestin_checksum(packet))
return packet
6. 엘리베이터
엘리베이터 호출 패킷 생성 구문은 smart RS-485 포트 파서 구문 내에 다음과 같이 구현했다
(파서에서 디바이스 인스턴스에 접근하는 구문을 만들기가 너무 귀찮아서... ㅋㅋ)
상행/하행 호출 각각 256개씩 타임스탬프별로 패킷 캡쳐한 거 생각하면 대단히 무식하긴 했다 ㅋㅋ 궁여지책이긴 했지만..
class SmartSendParser(PacketParser):
def make_packet_call_down(self, timestamp: int) -> bytearray:
packet = bytearray([0x02, 0xC1, 0x0C, 0x91, timestamp, 0x10, 0x01, 0x00, 0x02, 0x01, 0x02])
packet.append(calculate_bestin_checksum(packet))
return packet
def make_packet_call_up(self, timestamp: int) -> bytearray:
packet = bytearray([0x02, 0xC1, 0x0C, 0x91, timestamp, 0x20, 0x01, 0x00, 0x02, 0x01, 0x02])
packet.append(calculate_bestin_checksum(packet))
return packet
이제까지 언급한 타임스탬프는 컨트롤, 에너지, 스마트 RS-485 각 포트별로 월패드와 디바이스가 주고받는 패킷을 파서 객체가 패킷 캡쳐하면서 내부 변수에 저장하고 있다가, 명령 패킷 호출 시에 1을 더해서 패킷을 만드는 방식으로 구현하긴 했다.. 사실 엘리베이터 말고는 타임스탬프 싱크가 맞을 필요가 없긴 하지만~
[파서의 타임스탬프를 불러와 패킷을 만드는 예시 코드]
class ThreadCommand(threading.Thread): def set_light_outlet_state(self, dev: Union[Light, Outlet], target: int, parser: PacketParser): packet = dev.make_packet_set_state(target, parser.get_packet_timestamp() + 1) parser.sendPacket(packet) packet = dev.make_packet_query_state(parser.get_packet_timestamp() + 1) parser.sendPacket(packet) dev.publish_mqtt()
일단 이제까지 작성한 코드는 깃허브 저장소의 별도 브랜치에 커밋해뒀다
(브랜치명: bestin-checksum-calc, commit id: 043c29d7bfd23650074b2d5150c6de69d100cb0d)
https://github.com/YOGYUI/HomeNetwork/tree/bestin-checksum-calc
앞서 언급했듯이, 안타깝게도 지금은 아이파크를 떠난 지 어언 6개월가량 되었기 때문에 실물 테스트를 할 수 있는 환경이 갖춰져있지 않다 (시뮬레이션 환경을 얼마든지 만들 수는 있는데... 시간이 아깝기도 하고 의지도 없기에 PASS ㅎㅎ)
작동 여부를 확인하지 않고 머리 속으로만 그린 코드를 2시간 정도에 걸쳐서 마구 작성한 코드라 제대로 작동하는지도 확신이 서질 않는다;;
혹시나 위 코드로 작동하는 분들이 계시다면, 디버깅을 부탁드리는 바.. 문제가 있다면 블로그 댓글/방명록이나 이메일 (lee2002w@gmail.com)로 내용을 공유해주시면 감사하겠습니다~
끝~! (or to be continued...)
'홈네트워크(IoT) > 광교아이파크' 카테고리의 다른 글
광교아이파크::Bestin 현관카메라 애플 홈킷 연동 (2) - Final (2) | 2022.04.08 |
---|---|
광교아이파크::Bestin 현관카메라 애플 홈킷 연동 (1) (0) | 2022.04.08 |
광교아이파크::Bestin 현관문 도어락 연동 (2) - Final (1) | 2022.03.30 |
광교아이파크::Bestin 현관문 도어락 연동 (1) (0) | 2022.03.30 |
광교 아이파크::Bestin 홈네트워크 연동(애플, 안드로이드) 시스템 정리 (2) | 2022.02.22 |