Notice
Recent Posts
Recent Comments
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- Espressif
- MQTT
- 월패드
- 배당
- 해외주식
- esp32
- 공모주
- Python
- ConnectedHomeIP
- Home Assistant
- 오블완
- matter
- Apple
- Bestin
- 매터
- homebridge
- 엔비디아
- 힐스테이트 광교산
- 미국주식
- 파이썬
- 현대통신
- 코스피
- 홈네트워크
- RS-485
- 애플
- 퀄컴
- raspberry pi
- 국내주식
- 나스닥
- 티스토리챌린지
Archives
- Today
- Total
YOGYUI
광교아이파크::난방 Apple 홈킷 연동 (3) 본문
반응형
[3] 서버 구현하기
기존에 구현해둔 Flask 서버 코드에 난방 관련 코드를 추가하자
아직 어떤 homebridge plug-in을 쓸지 결정하지 않았으므로 뼈대만 구축해둔다
- RoomInfo 클래스에 난방 관련 (On/Off, 현재 온도, 설정 온도) 속성 추가
- Serial 통신 객체 추가
- Serial Packet Parsing (난방 관련) 구문 추가
# app.py
import os
import sys
import time
import queue
import requests
from typing import List
from flask import Flask, request, json, render_template
from common import RoomInfo
from Serial485.SerialComm import SerialComm
from Serial485.EnergyParser import EnergyParser
from Serial485.ControlParser import ControlParser
ser_energy = SerialComm()
ser_control = SerialComm()
PORT_485_ENERGY = '/dev/ttyUSB0'
PORT_485_CONTROL = '/dev/ttyUSB1'
packet_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'heat_setting_packets.txt')
with open(packet_path, 'r') as fp:
heat_setting_packets = fp.readlines()
# room 0: null, 1: bedroom, 2: pc room
rooms = [RoomInfo(), RoomInfo(4), RoomInfo(2), RoomInfo(2)]
rooms[1].packet_light_on[0] = '02 31 0D 01 D0 01 81 00 00 00 00 04 76'
rooms[1].packet_light_off[0] = '02 31 0D 01 D7 01 01 00 00 00 00 00 F5'
rooms[1].packet_light_on[1] = '02 31 0D 01 58 01 82 00 00 00 00 04 E9'
rooms[1].packet_light_off[1] = '02 31 0D 01 5F 01 02 00 00 00 00 00 6A'
rooms[1].packet_light_on[2] = '02 31 0D 01 5C 01 84 00 00 00 00 04 EF'
rooms[1].packet_light_off[2] = '02 31 0D 01 63 01 04 00 00 00 00 00 6C'
rooms[1].packet_light_on[3] = '02 31 0D 01 2B 01 88 00 00 00 00 04 94'
rooms[1].packet_light_off[3] = '02 31 0D 01 33 01 08 00 00 00 00 00 20'
rooms[1].packet_light_status = '02 31 07 11 9B 01 C0'
rooms[1].packet_heat_on = '02 28 0E 12 E9 01 01 00 00 00 00 00 00 E3'
rooms[1].packet_heat_off = '02 28 0E 12 F7 01 02 00 00 00 00 00 00 C8'
for i in range(71):
rooms[1].packet_heat_temp_set[i] = heat_setting_packets[i].replace('\n', '')
rooms[2].packet_light_on[0] = '02 31 0D 01 8C 02 81 00 00 00 00 04 3F'
rooms[2].packet_light_off[0] = '02 31 0D 01 93 02 01 00 00 00 00 00 B8'
rooms[2].packet_light_on[1] = '02 31 0D 01 7B 02 82 00 00 00 00 04 CB'
rooms[2].packet_light_off[1] = '02 31 0D 01 84 02 02 00 00 00 00 00 C4'
rooms[2].packet_light_status = '02 31 07 11 93 02 B5'
rooms[2].packet_heat_on = '02 28 0E 12 D3 02 01 00 00 00 00 00 00 EE'
rooms[2].packet_heat_off = '02 28 0E 12 DD 02 02 00 00 00 00 00 00 F5'
for i in range(71):
rooms[2].packet_heat_temp_set[i] = heat_setting_packets[i + 71].replace('\n', '')
rooms[3].packet_light_on[0] = '02 31 0D 01 3B 03 81 00 00 00 00 04 97'
rooms[3].packet_light_off[0] = '02 31 0D 01 43 03 01 00 00 00 00 00 8B'
rooms[3].packet_light_on[1] = '02 31 0D 01 76 03 82 00 00 00 00 04 D5'
rooms[3].packet_light_off[1] = '02 31 0D 01 7E 03 02 00 00 00 00 00 49'
rooms[3].packet_light_status = '02 31 07 11 94 03 B1'
rooms[3].packet_heat_on = '02 28 0E 12 7E 03 01 00 00 00 00 00 00 58'
rooms[3].packet_heat_off = '02 28 0E 12 87 03 02 00 00 00 00 00 00 BA'
for i in range(71):
rooms[3].packet_heat_temp_set[i] = heat_setting_packets[i + 71 * 2].replace('\n', '')
def parse_energy_result(chunk: bytearray):
try:
if len(chunk) < 7:
return
header = chunk[1]
command = chunk[3]
if header == 0x31 and command in [0x81, 0x91]:
room_idx = chunk[5] & 0x0F
room = rooms[room_idx]
for i in range(room.light_count):
room.lights[i] = bool(chunk[6] & (0x01 << i))
# notification
if room.lights[i] != room.lights_prev[i] or not room.lights_init[i]:
url = "http://0.0.0.0:----/"
url += "switch_room{}_light{}?password=----".format(room_idx, i + 1)
json_obj = {"characteristic": "On", "value": room.lights[i]}
print('Notify: {}, {}'.format(url, json_obj))
requests.post(url, json=json_obj)
room.lights_init[i] = True
room.lights_prev[i] = room.lights[i]
except Exception as e:
print(e)
def checkSerialEnergyConn():
if not ser_energy.isConnected():
ser_energy.connect(PORT_485_ENERGY, 9600)
def sendSerialEnergyPacket(packet: str):
checkSerialEnergyConn()
ser_energy.sendData(bytearray([int(x, 16) for x in packet.split(' ')]))
def parse_control_result(chunk: bytearray):
try:
if len(chunk) < 10:
return
header = chunk[1]
command = chunk[3]
if header == 0x28 and command in [0x91, 0x92]:
room_idx = chunk[5] & 0x0F
room = rooms[room_idx]
if chunk[6] == 0x02:
room.heat = False
elif chunk[6] == 0x11:
room.heat = True
room.heat_temp_setting = (chunk[7] & 0x3F) + (chunk[7] & 0x40 > 0) * 0.5
room.heat_temp_current = chunk[9] / 10.0
# notification
if room.heat_prev != room.heat or room.heat_temp_setting != room.heat_temp_setting_prev:
# TODO:
pass
room.heat_prev = room.heat
room.heat_temp_setting_prev = room.heat_temp_setting
except Exception as e:
print(e)
def checkSerialControlConn():
if not ser_control.isConnected():
ser_control.connect(PORT_485_CONTROL, 9600)
def sendSerialControlPacket(packet: str):
checkSerialControlConn()
ser_control.sendData(bytearray([int(x, 16) for x in packet.split(' ')]))
class MyFlaskApp(Flask):
def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
with self.app_context():
checkSerialEnergyConn()
checkSerialControlConn()
super(MyFlaskApp, self).run(host=host, port=port, debug=debug, load_dotenv=load_dotenv, **options)
app = MyFlaskApp(__name__)
@app.route('/process', methods=['POST'])
def process():
response = ""
if not request.is_json:
req = request.get_data().decode(encoding='utf-8')
req = req.replace('\r', '')
req = req.replace('\n', '')
if 'json=' in req:
params = json.loads(req[5:])
else:
if req[0] == '{' and req[-1] == '}':
try:
params = json.loads(req)
except Exception:
params = {}
else:
params = {}
else:
params = json.loads(request.get_data(), encoding='utf-8')
print('POST json: {}'.format(params))
if 'room' in params.keys():
room_idx = params['room']
if 'light' in params.keys():
light_idx = params['light']
if 'command' in params.keys():
if params['command'].lower() == 'on':
packet = rooms[room_idx].packet_light_on[light_idx]
target = True
else:
packet = rooms[room_idx].packet_light_off[light_idx]
target = False
while rooms[room_idx].lights[light_idx] != target:
sendSerialEnergyPacket(packet)
time.sleep(0.25)
sendSerialEnergyPacket(rooms[room_idx].packet_light_status)
time.sleep(0.25)
elif 'status' in params.keys():
response = '{}'.format(int(rooms[room_idx].lights[light_idx]))
if 'heat' in params.keys():
# TODO:
pass
return response
if __name__ == '__main__':
par_energy = EnergyParser(ser_energy)
par_energy.sig_parse.connect(parse_energy_result)
par_control = ControlParser(ser_control)
par_control.sig_parse.connect(parse_control_result)
app.run(host='0.0.0.0', port=----, debug=False)
ser_energy.release()
ser_control.release()
[시리즈 링크]
반응형
'홈네트워크(IoT) > 광교아이파크' 카테고리의 다른 글
광교아이파크::난방 Apple 홈킷 연동 (5) - Final (0) | 2021.01.03 |
---|---|
광교아이파크::난방 Apple 홈킷 연동 (4) (0) | 2021.01.02 |
광교아이파크::난방 Apple 홈킷 연동 (2) (0) | 2021.01.02 |
광교아이파크::난방 Apple 홈킷 연동 (1) (0) | 2021.01.02 |
광교아이파크::조명 Apple 홈킷 연동 (7) - Final (6) | 2021.01.01 |