일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 월패드
- ConnectedHomeIP
- 공모주
- 매터
- Python
- 국내주식
- raspberry pi
- Home Assistant
- 힐스테이트 광교산
- 티스토리챌린지
- 엔비디아
- homebridge
- 나스닥
- 현대통신
- matter
- 미국주식
- 오블완
- 홈네트워크
- 배당
- MQTT
- Apple
- Bestin
- RS-485
- 퀄컴
- 파이썬
- esp32
- 해외주식
- 코스피
- 애플
- Espressif
- Today
- Total
YOGYUI
광교아이파크::조명 Apple 홈킷 연동 (4) 본문
[4] HTTP 서버 구축 테스트
라즈베리파이에 연결된 RS-485 to USB 디바이스를 외부에서 접근 가능하게 하는 여러 방법 중 가장 보편적인 방법은 HTTP 서버를 구축하는 것이라고 생각했다 (외부에서 인터넷을 통한 접근, 궁극적으로는 아이폰/아이패드 활용)
JAVA, Node.js, 혹은 Python Flask, Python Django 등 다양한 프레임워크로 서버 구축을 해보았는데, 경험상 빠른 프로토타이핑에는 Flask가 제일 좋아 보여 선택했다 (시리얼 통신 프로토타이핑을 파이썬으로 하는 바람에...)
추후 안정성이나 사용자 인증같은 부가기능을 사용하기 위해 Node로 교체할 지 여부는 TODO로 남겨놓기로 한다
https://flask.palletsprojects.com/en/1.1.x/
서버 구축 전에, RS485 패킷 파싱 구문을 모듈화 구현해보았다
(SerialComm 클래스는 https://yogyui.tistory.com/entry/시리얼-통신-모듈-커스터마이징 참고)
# Parser.py
from abc import abstractmethod, ABCMeta
from SerialComm import SerialComm
from Define import Callback, writeLog
class Parser():
__metaclass__ = ABCMeta
buffer: bytearray
max_buffer_size: int = 200
def __init__(self, ser: SerialComm):
super().__init__()
self.buffer = bytearray()
self.sig_parse = Callback(bytearray)
ser.sig_send_data.connect(self.onSendData)
ser.sig_recv_data.connect(self.onRecvData)
def release(self):
self.buffer.clear()
def onSendData(self, data: bytes):
msg = ' '.join(['%02X' % x for x in data])
writeLog("Send >> {}".format(msg), self)
def onRecvData(self, data: bytes):
if len(self.buffer) > self.max_buffer_size:
self.buffer.clear()
self.buffer.extend(data)
self.handlePacket()
@abstractmethod
def handlePacket(self):
pass
# EnergyParser.py
from Parser import Parser
class EnergyParser(Parser):
def handlePacket(self):
idx = self.buffer.find(0x2)
if idx > 0:
self.buffer = self.buffer[idx:]
if len(self.buffer) >= 3:
packetLen = self.buffer[2]
if len(self.buffer) >= max(packetLen, 5):
header = self.buffer[1]
timestamp = self.buffer[4]
if header == 0xD1 and packetLen == 0x30:
idx_lst = []
next_ts = (timestamp + 1) & 0xFF
# 02 D1 30 XX 시작 패킷은 전체 패킷이 다 수신되지 않는다...
for i in range(len(self.buffer)):
if self.buffer[i] == next_ts:
idx_lst.append(i)
for idx in idx_lst:
if idx >= 4 and len(self.buffer) > idx + 4 and self.buffer[idx - 4] == 0x2:
chunk = self.buffer[:idx - 4]
self.sig_parse.emit(chunk)
self.buffer = self.buffer[idx - 4:]
packetLen = self.buffer[2] if len(self.buffer) >= 3 else 0
break
chunk = self.buffer[:packetLen]
self.sig_parse.emit(chunk)
self.buffer = self.buffer[packetLen:]
기대 사항: 서버 프로젝트에서 패킷 파서 객체만 선언하면 바로 사용 가능 + 코드 유지보수 유연화
웹브라우저를 통한 조명 제어 테스트를 위해 다음과 같이 Flask 코드 구현 (목표: 컴퓨터방의 첫번째 조명만 제어)
# app.py
import time
from SerialComm import SerialComm
from EnergyParser import EnergyParser
from flask import Flask, render_template, redirect, url_for
ser_energy = SerialComm()
par_energy = EnergyParser(ser_energy)
light_on = False
def parse_energy_result(packet: bytearray):
global light_on
if len(packet) < 7:
return
if packet[1] == 0x31 and packet[3] == 0x91:
room_idx = packet[5] & 0x0F
if room_idx == 3:
light_on = bool(packet[6] & 0x1)
par_energy.sig_parse.connect(parse_energy_result)
app = Flask(__name__)
def checkConn():
if not ser_energy.isConnected():
ser_energy.connect('/dev/ttyUSB0', 9600)
def sendPacket(packet: str):
checkConn()
ser_energy.sendData(bytearray([int(x, 16) for x in packet.split(' ')]))
@app.route('/')
def index():
global light_on
checkConn()
return render_template('index.html', light_on=light_on)
@app.route('/light/on', methods=['GET'])
def light_turn_on():
global light_on
checkConn()
while not light_on:
# 상태가 변할때까지 반복 전송
sendPacket('02 31 0D 01 3B 03 81 00 00 00 00 04 97')
time.sleep(0.5)
return redirect(url_for('index'))
@app.route('/light/off', methods=['GET'])
def light_turn_off():
global light_on
checkConn()
while light_on:
# 상태가 변할때까지 반복 전송
sendPacket('02 31 0D 01 43 03 01 00 00 00 00 00 8B')
time.sleep(0.5)
return redirect(url_for('index'))
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9999, debug=True)
템플릿은 간단하게 form만 추가
<!DOCTYPE html>
<html>
<head>
<title>Test Page</title>
</head>
<body>
<a style="font-size: 8em;">
LIGHT:
{% if light_on %}
<a style="font-size: 8em; color: red;">ON</a>
{% else %}
<a style="font-size: 8em; color: blue;">OFF</a>
{% endif %}
</a>
<form method="get" action="/light/on">
<input type="submit" value="ON" name="" style="font-size: 8em;">
</form>
<br>
<form method="get" action="/light/off">
<input type="submit" value="OFF" name="" style="font-size: 8em;">
</form>
</body>
</html>
외부에서 접속 가능하도록 공유기 포트포워딩 설정도 추가해주자
서버 코드를 실행하자
pi@raspberrypi:~ $ /bin/python3 /home/pi/Desktop/Project/app.py
* Serving Flask app "app" (lazy loading)
* Environment: production
WARNING: Do not use the development server in a production environment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://0.0.0.0:9999/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 207-151-518
외부 기기 (맥북)에서 라즈베리파이 IP 및 포트로 접속해보면 index.html 템플릿 화면을 볼 수 있다
성공!
홈킷 연동을 위한 준비는 80% 되었다
남은건 Homebridge와의 연동 테스트!
[시리즈 링크]
'홈네트워크(IoT) > 광교아이파크' 카테고리의 다른 글
광교아이파크::조명 Apple 홈킷 연동 (6) (0) | 2021.01.01 |
---|---|
광교아이파크::조명 Apple 홈킷 연동 (5) (0) | 2021.01.01 |
광교아이파크::조명 Apple 홈킷 연동 (3) (2) | 2020.12.31 |
광교아이파크::조명 Apple 홈킷 연동 (2) (0) | 2020.12.31 |
광교아이파크::조명 Apple 홈킷 연동 (1) (0) | 2020.12.31 |