YOGYUI

광교아이파크::난방 Apple 홈킷 연동 (3) 본문

홈네트워크(IoT)/광교아이파크

광교아이파크::난방 Apple 홈킷 연동 (3)

요겨 2021. 1. 2. 19:26
반응형

[3] 서버 구현하기

기존에 구현해둔 Flask 서버 코드에 난방 관련 코드를 추가하자

아직 어떤 homebridge plug-in을 쓸지 결정하지 않았으므로 뼈대만 구축해둔다

  • RoomInfo 클래스에 난방 관련 (On/Off, 현재 온도, 설정 온도) 속성 추가
  • Serial 통신 객체 추가
  • Serial Packet Parsing (난방 관련) 구문 추가
# app.py
import os
import sys
import time
import queue
import requests
from typing import List
from flask import Flask, request, json, render_template
from common import RoomInfo
from Serial485.SerialComm import SerialComm
from Serial485.EnergyParser import EnergyParser
from Serial485.ControlParser import ControlParser

ser_energy = SerialComm()
ser_control = SerialComm()

PORT_485_ENERGY = '/dev/ttyUSB0'
PORT_485_CONTROL = '/dev/ttyUSB1'

packet_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'heat_setting_packets.txt')
with open(packet_path, 'r') as fp:
    heat_setting_packets = fp.readlines()

# room 0: null, 1: bedroom, 2: pc room
rooms = [RoomInfo(), RoomInfo(4), RoomInfo(2), RoomInfo(2)]
rooms[1].packet_light_on[0] = '02 31 0D 01 D0 01 81 00 00 00 00 04 76'
rooms[1].packet_light_off[0] = '02 31 0D 01 D7 01 01 00 00 00 00 00 F5'
rooms[1].packet_light_on[1] = '02 31 0D 01 58 01 82 00 00 00 00 04 E9'
rooms[1].packet_light_off[1] = '02 31 0D 01 5F 01 02 00 00 00 00 00 6A'
rooms[1].packet_light_on[2] = '02 31 0D 01 5C 01 84 00 00 00 00 04 EF'
rooms[1].packet_light_off[2] = '02 31 0D 01 63 01 04 00 00 00 00 00 6C'
rooms[1].packet_light_on[3] = '02 31 0D 01 2B 01 88 00 00 00 00 04 94'
rooms[1].packet_light_off[3] = '02 31 0D 01 33 01 08 00 00 00 00 00 20'
rooms[1].packet_light_status = '02 31 07 11 9B 01 C0'
rooms[1].packet_heat_on = '02 28 0E 12 E9 01 01 00 00 00 00 00 00 E3'
rooms[1].packet_heat_off = '02 28 0E 12 F7 01 02 00 00 00 00 00 00 C8'
for i in range(71):
    rooms[1].packet_heat_temp_set[i] = heat_setting_packets[i].replace('\n', '')

rooms[2].packet_light_on[0] = '02 31 0D 01 8C 02 81 00 00 00 00 04 3F'
rooms[2].packet_light_off[0] = '02 31 0D 01 93 02 01 00 00 00 00 00 B8'
rooms[2].packet_light_on[1] = '02 31 0D 01 7B 02 82 00 00 00 00 04 CB'
rooms[2].packet_light_off[1] = '02 31 0D 01 84 02 02 00 00 00 00 00 C4'
rooms[2].packet_light_status = '02 31 07 11 93 02 B5'
rooms[2].packet_heat_on = '02 28 0E 12 D3 02 01 00 00 00 00 00 00 EE'
rooms[2].packet_heat_off = '02 28 0E 12 DD 02 02 00 00 00 00 00 00 F5'
for i in range(71):
    rooms[2].packet_heat_temp_set[i] = heat_setting_packets[i + 71].replace('\n', '')

rooms[3].packet_light_on[0] = '02 31 0D 01 3B 03 81 00 00 00 00 04 97'
rooms[3].packet_light_off[0] = '02 31 0D 01 43 03 01 00 00 00 00 00 8B'
rooms[3].packet_light_on[1] = '02 31 0D 01 76 03 82 00 00 00 00 04 D5'
rooms[3].packet_light_off[1] = '02 31 0D 01 7E 03 02 00 00 00 00 00 49'
rooms[3].packet_light_status = '02 31 07 11 94 03 B1'
rooms[3].packet_heat_on = '02 28 0E 12 7E 03 01 00 00 00 00 00 00 58'
rooms[3].packet_heat_off = '02 28 0E 12 87 03 02 00 00 00 00 00 00 BA'
for i in range(71):
    rooms[3].packet_heat_temp_set[i] = heat_setting_packets[i + 71 * 2].replace('\n', '')


def parse_energy_result(chunk: bytearray):
    try:
        if len(chunk) < 7:
            return
        header = chunk[1]
        command = chunk[3]
        if header == 0x31 and command in [0x81, 0x91]:
            room_idx = chunk[5] & 0x0F
            room = rooms[room_idx]
            for i in range(room.light_count):
                room.lights[i] = bool(chunk[6] & (0x01 << i))
                # notification
                if room.lights[i] != room.lights_prev[i] or not room.lights_init[i]:
                    url = "http://0.0.0.0:----/"
                    url += "switch_room{}_light{}?password=----".format(room_idx, i + 1)
                    json_obj = {"characteristic": "On", "value": room.lights[i]}
                    print('Notify: {}, {}'.format(url, json_obj))
                    requests.post(url, json=json_obj)
                    room.lights_init[i] = True
                room.lights_prev[i] = room.lights[i]
    except Exception as e:
        print(e)

def checkSerialEnergyConn():
    if not ser_energy.isConnected():
        ser_energy.connect(PORT_485_ENERGY, 9600)

def sendSerialEnergyPacket(packet: str):
    checkSerialEnergyConn()
    ser_energy.sendData(bytearray([int(x, 16) for x in packet.split(' ')]))

def parse_control_result(chunk: bytearray):
    try:
        if len(chunk) < 10:
            return
        header = chunk[1]
        command = chunk[3]
        if header == 0x28 and command in [0x91, 0x92]:
            room_idx = chunk[5] & 0x0F
            room = rooms[room_idx]
            if chunk[6] == 0x02:
                room.heat = False
            elif chunk[6] == 0x11:
                room.heat = True
            room.heat_temp_setting = (chunk[7] & 0x3F) + (chunk[7] & 0x40 > 0) * 0.5
            room.heat_temp_current = chunk[9] / 10.0
            # notification
            if room.heat_prev != room.heat or room.heat_temp_setting != room.heat_temp_setting_prev:
                # TODO:
                pass
            room.heat_prev = room.heat
            room.heat_temp_setting_prev = room.heat_temp_setting
    except Exception as e:
        print(e)

def checkSerialControlConn():
    if not ser_control.isConnected():
        ser_control.connect(PORT_485_CONTROL, 9600)

def sendSerialControlPacket(packet: str):
    checkSerialControlConn()
    ser_control.sendData(bytearray([int(x, 16) for x in packet.split(' ')]))

class MyFlaskApp(Flask):
    def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
        with self.app_context():
            checkSerialEnergyConn()
            checkSerialControlConn()
        super(MyFlaskApp, self).run(host=host, port=port, debug=debug, load_dotenv=load_dotenv, **options)

app = MyFlaskApp(__name__)

@app.route('/process', methods=['POST'])
def process():
    response = ""
    if not request.is_json:
        req = request.get_data().decode(encoding='utf-8')
        req = req.replace('\r', '')
        req = req.replace('\n', '')
        if 'json=' in req:
            params = json.loads(req[5:])
        else:
            if req[0] == '{' and req[-1] == '}':
                try:
                    params = json.loads(req)
                except Exception:
                    params = {}
            else:
                params = {}
    else:
        params = json.loads(request.get_data(), encoding='utf-8')

    print('POST json: {}'.format(params))
    if 'room' in params.keys():
        room_idx = params['room']
        if 'light' in params.keys():
            light_idx = params['light']
            if 'command' in params.keys():
                if params['command'].lower() == 'on':
                    packet = rooms[room_idx].packet_light_on[light_idx]
                    target = True
                else:
                    packet = rooms[room_idx].packet_light_off[light_idx]
                    target = False
                while rooms[room_idx].lights[light_idx] != target:
                    sendSerialEnergyPacket(packet)
                    time.sleep(0.25)
                    sendSerialEnergyPacket(rooms[room_idx].packet_light_status)
                    time.sleep(0.25)
            elif 'status' in params.keys():
                response = '{}'.format(int(rooms[room_idx].lights[light_idx]))
        if 'heat' in params.keys():
            # TODO:
            pass
    return response

if __name__ == '__main__':
    par_energy = EnergyParser(ser_energy)
    par_energy.sig_parse.connect(parse_energy_result)
    par_control = ControlParser(ser_control)
    par_control.sig_parse.connect(parse_control_result)
    app.run(host='0.0.0.0', port=----, debug=False)
    ser_energy.release()
    ser_control.release()

[시리즈 링크]

광교아이파크::난방 Apple 홈킷 연동 (1)

광교아이파크::난방 Apple 홈킷 연동 (2)

광교아이파크::난방 Apple 홈킷 연동 (3)

광교아이파크::난방 Apple 홈킷 연동 (4)

광교아이파크::난방 Apple 홈킷 연동 (5)

반응형