YOGYUI

Bestin 홈네트워크 RS485 패킷 체크섬 계산 알고리즘을 찾다! 본문

홈네트워크(IoT)/광교아이파크

Bestin 홈네트워크 RS485 패킷 체크섬 계산 알고리즘을 찾다!

요겨 2022. 12. 5. 00:37
반응형

지난주, 아이파크에 거주하시는 분(할윈, harwin 님)과의 카카오톡 대화 중 의미심장한 내용을 전해들었다

harwin님과의 카톡 대화 내용

바로 Bestin 홈네트워크의 RS-485 패킷 중 마지막 바이트 (편의상 체크섬 바이트)를 계산하는 알고리즘에 대한 내용!

 

Github의 laz-라는 유저께서 지난 8월 16일에 gist로 올려놓은 코드에 관련 내용이 나와있다https://gist.github.com/laz-/a507af756e13e64ed3aaceb236b5ad49

단순한 XOR SUM을 약간 비틀었을 뿐인 단순한 알고리즘인데, 이걸 찾아낸 게 신통방통하다 ㅋㅋㅋ

나도 가끔 심심하면 요리조리 연구해봤는데, 도저히 답이 안나와서 거의 반 포기상태였는데 ㅠ 대단!

(256 타임스탬프 각각에 대해서 LUT까지 만들어봤는데 도저히 안되겠더라 ~.,~)

 

계산 알고리즘은 간단하다

- 3 (0x03)으로 값 초기화
- 패킷의 각 바이트를 순차적으로 XOR(Executive OR)한 뒤 1을 더하고 8비트로 truncation

각 loop마다 XOR를 하면서 1을 더하는 심플한 알고리즘인데... 정체를 알고나니 굉장히 허무하다 ㅋㅋ


예를 들어보자

주방(방 인덱스 1)의 첫번째 조명을 켜는 명령 패킷은 다음과 같다

02 31 0D 01 D0 01 81 00 00 00 00 04 76

※ 여기서 패킷의 5번째 바이트 0xD0는 베스틴 월패드가 디바이스랑 패킷을 주고 받을 때마다 값이 1씩 올라가는 값으로, Spin code (스핀 코드)라 부르는 게 일반적이지만 나는 편의상 그동안 타임스탬프(timestamp)라 불러왔다

조명 패킷 관련 글은 광교아이파크::조명 Apple 홈킷 연동 (3) 참고

 

광교아이파크::조명 Apple 홈킷 연동 (3)

[3] RS-485 Signal Hooking 최신버전 라즈비안 깔아두면 FT232 계열은 드라이버 별도로 설치하지 않아도 장치가 잘 인식된다 pyserial 패키지 이용해서 우선 어떤 패킷이 RS-485 통신 라인에 실리는지 후킹해

yogyui.tistory.com

 

이제 마지막 바이트 0x76 값이 위 알고리즘대로 얻어지는 지 확인해보자

packet = bytearray([0x02, 0x31, 0x0D, 0x01, 0xD0, 0x01, 0x81, 0x00, 0x00, 0x00, 0x00, 0x04, 0x76])

result = 0x03
for i, p in enumerate(packet[:-1]):
    print('Step {}: {:02X} ^ {:02X} = '.format(i + 1, result, p), end=' ')
    result = (result ^ p) + 1
    result = result & 0xFF
    print('{:02X}'.format(result))
Step 1: 03 ^ 02 =  02
Step 2: 02 ^ 31 =  34
Step 3: 34 ^ 0D =  3A
Step 4: 3A ^ 01 =  3C
Step 5: 3C ^ D0 =  ED
Step 6: ED ^ 01 =  ED
Step 7: ED ^ 81 =  6D
Step 8: 6D ^ 00 =  6E
Step 9: 6E ^ 00 =  6F
Step 10: 6F ^ 00 =  70
Step 11: 70 ^ 00 =  71
Step 12: 71 ^ 04 =  76

Wow~ 계산 결과와 패킷 마지막 바이트가 일치한다

 

과연 우연의 일치일까?

엘리베이터 하행 호출 패킷 중 몇개를 가져와서 일치하는지 여부도 확인해보자

※ 엘리베이터 명령은 다른 디바이스랑 다르게, 월패드의 타임스탬프와 싱크가 맞지 않으면 호출이 되지 않아서 타임스탬프 256개에 대한 패킷을 전부 추출한 노가다를 한 경험이 있다 ㅋㅋㅋ 관련 내용은 링크 참고

광교아이파크::엘리베이터 Apple 홈킷 연동 (3)

 

광교아이파크::엘리베이터 Apple 홈킷 연동 (3)

3. Implementation 우선, 256개 Timestamp에 대한 하행 호출 패킷을 모으기 위해 다음과 같이 코드를 짜봤다 (패킷을 리스트에 담는데, 동일한 timestamp 값을 모은적이 있다면 패스, 리스트 자체를 직렬화해

yogyui.tistory.com

 

우선, 파이썬에서 for문을 쓰면 멍청하다는 소리를 듣기 십상이므로, 있어보이게 reduce 함수 (functools 모듈)를 사용해서 계산 함수를 예쁘게 만들어보자

from functools import reduce

def calculate_bestin_checksum(packet: bytearray) -> int:
    return reduce(lambda x, y: ((x ^ y) + 1) & 0xFF, packet, 0x03)

엘리베이터 호출 패킷 중 5개만 임의로 가져와서 테스트해보자

(5번째 바이트 타임스탬프 값에 따라 패킷 마지막 바이트 값이 바뀌는 것을 알 수 있다)

packets = [
    bytearray([0x02, 0xC1, 0x0C, 0x91, 0x1C, 0x10, 0x03, 0x00, 0x02, 0x01, 0x02, 0x58]),
    bytearray([0x02, 0xC1, 0x0C, 0x91, 0x35, 0x10, 0x01, 0x00, 0x02, 0x01, 0x02, 0x85]),
    bytearray([0x02, 0xC1, 0x0C, 0x91, 0xDC, 0x10, 0x01, 0x00, 0x02, 0x01, 0x02, 0x9A]),
    bytearray([0x02, 0xC1, 0x0C, 0x91, 0xF0, 0x10, 0x01, 0x00, 0x02, 0x01, 0x02, 0xBE]),
    bytearray([0x02, 0xC1, 0x0C, 0x91, 0xFC, 0x10, 0x01, 0x00, 0x02, 0x01, 0x02, 0xBA])
]

for packet in packets:
    checksum = calculate_bestin_checksum(packet[:-1])
    print('calculated: {:02X}, real value: {:02X}'.format(checksum, packet[-1]))
calculated: 58, real value: 58
calculated: 85, real value: 85
calculated: 9A, real value: 9A
calculated: BE, real value: BE
calculated: BA, real value: BA

와...

진작 알았다면 개고생할 필요가 없었던 것이었던 것이었드래요~


내가 광교아이파크 살 때는 조명, 아울렛(전기 콘센트), 도시가스 밸브, 온수 난방, 환기 (전열교환기), 엘리베이터 총 6종의 디바이스를 RS-485로 제어했었는데, 각 디바이스별로 명령 및 상태 조회 패킷을 동적으로 생성하는 함수를 만들어보자 (이전에는 로컬 XML 파일에 각 디바이스별 패킷을 기록해두고 코드가 불러오는 방식으로 구현했었다)

- 방이 몇개던, 조명이 몇개던 마음껏 제어할 수 있게 됐다

- 더이상 bestin 홈네트워크 환경에서 살고 있지 않다는 건 함정.. ㅠ

class Device:
    room_index: int = 0
    
    def __init__(self, name: str = 'Device', **kwargs):
        if 'room_index' in kwargs.keys():
            self.room_index = kwargs['room_index']
            
    def make_packet_common(self, header: int, length: int, packet_type: int, timestamp: int = 0) -> bytearray:
        packet = bytearray([
            0x02, 
            header & 0xFF, 
            length & 0xFF, 
            packet_type & 0xFF, 
            timestamp & 0xFF
        ])
        packet.extend(bytearray([0] * (length - 5)))
        return packet

    @abstractmethod
    def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
        pass

    @abstractmethod
    def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
        pass

조명, 아울렛, 난방은 make_packet_common 함수로 공통화할 수 있고, 전열교환기와 가스밸브 및 엘리베이터는 각각의 Rule대로 패킷을 생성해주면 된다

(조명, 아울렛, 난방은 각 방별로 디바이스들이 배정되어 있으며 여러 개의 디바이스가 존재할 수 있으므로 디바이스 인덱스 및 방 인덱스에 따라 동적으로 생성해줘야 한다 - 난방은 방마다 1개만 존재)

make_packet_set_state, make_packet_query_state는 명령, 상태 조회 패킷을 만들어주는 부모 메서드로, device를 상속받는 자식 클래스들 각각에서 따로 구현(메서드 오버라이딩)해주면 된다

※ 각 디바이스별 패킷 명세는 링크로 걸어둔 블로그 글들을 참고~ 다시 일일이 규칙을 적자니 너무 귀찮다 ㅠ

1. 조명

광교아이파크::조명 Apple 홈킷 연동 (3)

 

광교아이파크::조명 Apple 홈킷 연동 (3)

[3] RS-485 Signal Hooking 최신버전 라즈비안 깔아두면 FT232 계열은 드라이버 별도로 설치하지 않아도 장치가 잘 인식된다 pyserial 패키지 이용해서 우선 어떤 패킷이 RS-485 통신 라인에 실리는지 후킹해

yogyui.tistory.com

class Light(Device):
    def __init__(self, name: str = 'Light', index: int = 0, **kwargs):
        self.index = index
        super().__init__(name, **kwargs)
    
    def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
        packet = self.make_packet_common(0x31, 13, 0x01, timestamp)
        packet[5] = self.room_index & 0x0F
        packet[6] = 0x01 << self.index
        if target:
            packet[6] += 0x80
            packet[11] = 0x04
        else:
            packet[11] = 0x00
        packet[12] = calculate_bestin_checksum(packet[:-1])
        return packet

    def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
        packet = self.make_packet_common(0x31, 7, 0x11, timestamp)
        packet[5] = self.room_index & 0x0F
        packet[6] = calculate_bestin_checksum(packet[:-1])
        return packet

2. 아울렛

광교아이파크::전원콘센트 Apple 홈킷 연동 (1)

 

광교아이파크::전원콘센트 Apple 홈킷 연동 (1)

최초로 Bestin을 Apple 홈킷과 연동했던 거실 및 각 방의 조명 제어를 구현할 때, 게이트웨이의 'Energy' RS-485 포트로 전송되는 패킷 중 조명과 관련된 바이트만 해석했었다 (관련 링크 참고) 그 때 해

yogyui.tistory.com

class Outlet(Device):
    def __init__(self, name: str = 'Outlet', index: int = 0, **kwargs):
        self.index = index
        super().__init__(name, **kwargs)
    
    def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
        packet = self.make_packet_common(0x31, 13, 0x01, timestamp)
        packet[5] = self.room_index & 0x0F
        packet[7] = 0x01 << self.index
        if target:
            packet[7] += 0x80
            packet[11] = 0x09 << self.index  # 확실하지 않음...
        else:
            packet[11] = 0x00
        packet[12] = calculate_bestin_checksum(packet[:-1])
        return packet

    def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
        # 조명 쿼리 패킷과 동일한 것으로 판단됨 (어차피 응답 패킷에 조명/아울렛 정보가 같이 담겨있음)
        packet = self.make_packet_common(0x31, 7, 0x11, timestamp)
        packet[5] = self.room_index & 0x0F
        packet[6] = calculate_bestin_checksum(packet[:-1])
        return packet

3. 난방

광교아이파크::난방 Apple 홈킷 연동 (2)

 

광교아이파크::난방 Apple 홈킷 연동 (2)

[2] 온도 설정 패킷 모으기 조명과 마찬가지로, 온도 설정 패킷 전송 시 최후 바이트가 '모종의 규칙'을 따르지 않으면 정상적으로 동작하지 않았다 규칙 판단은 나중으로 미루고 일단은 모든 온

yogyui.tistory.com

난방은 5도 ~ 40도까지 0.5도 단위로 희망온도를 설정하는 패킷이 필요해서, 각 방별로 71개씩 총 213개 패킷을 캡쳐해서 모으는 노가다를 했었다... ㅋㅋ 이젠 희망온도만 가지고 얼마든지 패킷을 쉽게 만들 수 있다!

class Thermostat(Device):
    def __init__(self, name: str = 'Thermostat', **kwargs):
        super().__init__(name, **kwargs)
    
    def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
        packet = self.make_packet_common(0x28, 14, 0x12, timestamp)
        packet[5] = self.room_index & 0x0F
        if target:
            packet[6] = 0x01
        else:
            packet[6] = 0x02
        packet[13] = calculate_bestin_checksum(packet[:-1])
        return packet

    def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
        packet = self.make_packet_common(0x28, 7, 0x11, timestamp)
        packet[5] = self.room_index & 0x0F
        packet[6] = calculate_bestin_checksum(packet[:-1])
        return packet
    
    def make_packet_set_temperature(self, target: float, timestamp: int = 0) -> bytearray:
        packet = self.make_packet_common(0x28, 14, 0x12, timestamp)
        packet[5] = self.room_index & 0x0F
        value_int = int(target)
        value_float = target - value_int
        packet[7] = value_int & 0xFF
        if value_float != 0:
            packet[7] += 0x40
        packet[13] = calculate_bestin_checksum(packet[:-1])
        return packet

4. 가스 밸브

광교아이파크::가스 Apple 홈킷 연동 (1)

 

광교아이파크::가스 Apple 홈킷 연동 (1)

[1] Prepare 주방 가스레인지 옆 도시가스관을 보니 전자식 도시가스 개폐장치(신우전자, SV-20H)가 설치되어 있다 수동식 개폐기가 기존 가스관에 있어서 이중으로 차단...이 되고 있다 차단기 상단

yogyui.tistory.com

가스밸브는 '닫기' 명령에 해당하는 패킷만 만들어주면 된다

class GasValve(Device):
    def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
        if target:
            return bytearray([])  # not allowed open valve
        packet = bytearray([0x02, 0x31, 0x02, timestamp & 0xFF])
        packet.extend(bytearray([0x00] * 5))
        packet.append(calculate_bestin_checksum(packet))
        return packet

    def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
        packet = bytearray([0x02, 0x31, 0x00, timestamp & 0xFF])
        packet.extend(bytearray([0x00] * 5))
        packet.append(calculate_bestin_checksum(packet))
        return packet

5. 전열교환기

광교아이파크::환기(전열교환기) Apple 홈킷 연동 (1)

 

광교아이파크::환기(전열교환기) Apple 홈킷 연동 (1)

[1] Prepare 다용도실 천장에 설치된 환기장치 스티커 확대해보니 Bestin 제품 (아이파크 시공사 = HDC, Bestin = HDC아이콘트롤스 브랜드) 단지 내 홈네트워크 구축 위해 자체 제작하는 라인업이 있네 (아

yogyui.tistory.com

전열교환기는 On/Off 외에 3단계 풍량 조절, 자연환기 모드 설정, 타이머 설정 기능 등 부가 기능들이 있는데, 나는 풍량 조절만 구현해뒀었다 (혹시 다른 기능들에 대한 구현도 필요한 분이 계시다면 댓글로 남겨주시길~)

<22.12.19 수정> - 자연환기 설정, 타이머 설정 패킷 생성 추가

class Ventilator(Device):
    def make_packet_set_state(self, target: int, timestamp: int = 0) -> bytearray:
        packet = bytearray([0x02, 0x61, 0x01, timestamp & 0xFF, 0x00])
        packet.append(0x01) if target else packet.append(0x00)
        packet.extend([0x01, 0x00, 0x00])
        packet.append(calculate_bestin_checksum(packet))
        return packet

    def make_packet_query_state(self, timestamp: int = 0) -> bytearray:
        packet = bytearray([0x02, 0x61, 0x00, timestamp & 0xFF, 0x00])
        packet.extend([0x00, 0x00, 0x00, 0x00])
        packet.append(calculate_bestin_checksum(packet))
        return packet
    
    def make_packet_set_rotation_speed(self, target: int, timestamp: int = 0) -> bytearray:
        target = max(1, min(3, target))
        packet = bytearray([0x02, 0x61, 0x03, timestamp & 0xFF, 0x00, 0x00, target, 0x00, 0x00])
        packet.append(calculate_bestin_checksum(packet))
        return packet
    
    def make_packet_set_natural(self, target: int, timestamp: int = 0) -> bytearray:
        """
        자연환기 On/Off
        :param target: 0=Off, 1=On
        """
        packet = bytearray([0x02, 0x61, 0x07, timestamp & 0xFF, 0x00])
        packet.append(0x10) if target else packet.append(0x00)    
        packet.extend([0x00, 0x00, 0x00])
        packet.append(calculate_bestin_checksum(packet))
        return packet

    def make_packet_set_timer(self, value: int, timestamp: int = 0) -> bytearray:
        """
        타이머 설정
        :param value: 0=Off, others=timer value (unit=minute)
        """
        packet = bytearray([0x02, 0x61, 0x04, timestamp & 0xFF, 0x00])
        packet.append(value & 0xFF)
        packet.extend([0x00, 0x00, 0x00])
        packet.append(calculate_bestin_checksum(packet))
        return packet

6. 엘리베이터

광교아이파크::엘리베이터 Apple 홈킷 연동 (3)

 

광교아이파크::엘리베이터 Apple 홈킷 연동 (3)

3. Implementation 우선, 256개 Timestamp에 대한 하행 호출 패킷을 모으기 위해 다음과 같이 코드를 짜봤다 (패킷을 리스트에 담는데, 동일한 timestamp 값을 모은적이 있다면 패스, 리스트 자체를 직렬화해

yogyui.tistory.com

엘리베이터 호출 패킷 생성 구문은 smart RS-485 포트 파서 구문 내에 다음과 같이 구현했다

(파서에서 디바이스 인스턴스에 접근하는 구문을 만들기가 너무 귀찮아서... ㅋㅋ)

상행/하행 호출 각각 256개씩 타임스탬프별로 패킷 캡쳐한 거 생각하면 대단히 무식하긴 했다 ㅋㅋ 궁여지책이긴 했지만..

class SmartSendParser(PacketParser):
    def make_packet_call_down(self, timestamp: int) -> bytearray:
        packet = bytearray([0x02, 0xC1, 0x0C, 0x91, timestamp, 0x10, 0x01, 0x00, 0x02, 0x01, 0x02])
        packet.append(calculate_bestin_checksum(packet))
        return packet

    def make_packet_call_up(self, timestamp: int) -> bytearray:
        packet = bytearray([0x02, 0xC1, 0x0C, 0x91, timestamp, 0x20, 0x01, 0x00, 0x02, 0x01, 0x02])
        packet.append(calculate_bestin_checksum(packet))
        return packet

이제까지 언급한 타임스탬프는 컨트롤, 에너지, 스마트 RS-485 각 포트별로 월패드와 디바이스가 주고받는 패킷을 파서 객체가 패킷 캡쳐하면서 내부 변수에 저장하고 있다가, 명령 패킷 호출 시에 1을 더해서 패킷을 만드는 방식으로 구현하긴 했다.. 사실 엘리베이터 말고는 타임스탬프 싱크가 맞을 필요가 없긴 하지만~
[파서의 타임스탬프를 불러와 패킷을 만드는 예시 코드]
class ThreadCommand(threading.Thread):
    def set_light_outlet_state(self, dev: Union[Light, Outlet], target: int, parser: PacketParser):
        packet = dev.make_packet_set_state(target, parser.get_packet_timestamp() + 1)
        parser.sendPacket(packet)
        packet = dev.make_packet_query_state(parser.get_packet_timestamp() + 1)
        parser.sendPacket(packet)
        dev.publish_mqtt()

일단 이제까지 작성한 코드는 깃허브 저장소의 별도 브랜치에 커밋해뒀다

(브랜치명: bestin-checksum-calc, commit id: 043c29d7bfd23650074b2d5150c6de69d100cb0d)

https://github.com/YOGYUI/HomeNetwork/tree/bestin-checksum-calc

 

GitHub - YOGYUI/HomeNetwork: HomeNetwork(Homebridge) Repo

HomeNetwork(Homebridge) Repo. Contribute to YOGYUI/HomeNetwork development by creating an account on GitHub.

github.com

앞서 언급했듯이, 안타깝게도 지금은 아이파크를 떠난 지 어언 6개월가량 되었기 때문에 실물 테스트를 할 수 있는 환경이 갖춰져있지 않다 (시뮬레이션 환경을 얼마든지 만들 수는 있는데... 시간이 아깝기도 하고 의지도 없기에 PASS ㅎㅎ)

작동 여부를 확인하지 않고 머리 속으로만 그린 코드를 2시간 정도에 걸쳐서 마구 작성한 코드라 제대로 작동하는지도 확신이 서질 않는다;; 

혹시나 위 코드로 작동하는 분들이 계시다면, 디버깅을 부탁드리는 바.. 문제가 있다면 블로그 댓글/방명록이나 이메일 (lee2002w@gmail.com)로 내용을 공유해주시면 감사하겠습니다~

끝~! (or to be continued...)

 

반응형