Notice
Recent Posts
Recent Comments
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- RS-485
- Espressif
- 퀄컴
- 오블완
- Home Assistant
- esp32
- 티스토리챌린지
- 힐스테이트 광교산
- 월패드
- matter
- MQTT
- raspberry pi
- 엔비디아
- 미국주식
- 배당
- 국내주식
- 홈네트워크
- ConnectedHomeIP
- 나스닥
- 매터
- 코스피
- 현대통신
- Bestin
- Python
- Apple
- homebridge
- 파이썬
- 해외주식
- 애플
- 공모주
Archives
- Today
- Total
YOGYUI
Pyserial 시리얼 통신 모듈 커스터마이징 본문
반응형
나중에 시간날때 Git에 올려야겠다...
[Structure]
----
-------- Define.py
-------- Threads.py
-------- SerialComm.py
# Define.py
import datetime
import threading
def checkAgrumentType(obj, arg):
if type(obj) == arg:
return True
if arg == object:
return True
if arg in obj.__class__.__bases__:
return True
return False
class Callback(object):
_args = None
_callback = None
def __init__(self, *args):
self._args = args
def connect(self, callback):
self._callback = callback
def emit(self, *args):
if len(args) != len(self._args):
raise Exception('Callback::Argument Length Mismatch')
arglen = len(args)
if arglen > 0:
validTypes = [checkAgrumentType(args[i], self._args[i]) for i in range(arglen)]
if sum(validTypes) != arglen:
raise Exception('Callback::Argument Type Mismatch (Definition: {}, Call: {})'.format(self._args, args))
if self._callback is not None:
self._callback(*args)
def timestampToString(timestamp: datetime.datetime):
h = timestamp.hour
m = timestamp.minute
s = timestamp.second
us = timestamp.microsecond
return '%02d:%02d:%02d.%06d' % (h, m, s, us)
def getCurTimeStr():
return '<%s>' % timestampToString(datetime.datetime.now())
def writeLog(strMsg: str, obj: object = None):
strTime = getCurTimeStr()
if obj is not None:
if isinstance(obj, threading.Thread):
if obj.ident is not None:
strObj = ' [%s (0x%X)]' % (type(obj).__name__, obj.ident)
else:
strObj = ' [%s (0x%X)]' % (type(obj).__name__, id(obj))
else:
strObj = ' [%s (0x%X)]' % (type(obj).__name__, id(obj))
else:
strObj = ''
msg = strTime + strObj + ' ' + strMsg
print(msg)
# Threads.py
import time
import queue
import serial
import threading
import traceback
from Define import Callback, writeLog
class ThreadSend(threading.Thread):
_keepAlive: bool = True
def __init__(self, serial_: serial.Serial, queue_: queue.Queue):
threading.Thread.__init__(self)
self.sig_send_data = Callback(bytes)
self.sig_terminated = Callback()
self.sig_exception = Callback(str)
self._serial = serial_
self._queue = queue_
def run(self):
writeLog('Started', self)
while self._keepAlive:
try:
if not self._queue.empty():
data = self._queue.get()
sendLen = len(data)
while sendLen > 0:
nLen = self._serial.write(data[(len(data) - sendLen):])
sData = data[(len(data) - sendLen):(len(data) - sendLen + nLen)]
self.sig_send_data.emit(sData)
sendLen -= nLen
else:
time.sleep(1e-3)
except Exception as e:
writeLog('Exception::{}'.format(e), self)
traceback.print_exc()
self.sig_exception.emit(str(e))
writeLog('Terminated', self)
self.sig_terminated.emit()
def stop(self):
self._keepAlive = False
class ThreadReceive(threading.Thread):
_keepAlive: bool = True
def __init__(self, serial_: serial.Serial, queue_: queue.Queue):
threading.Thread.__init__(self)
self.sig_terminated = Callback()
self.sig_exception = Callback(str)
self._serial = serial_
self._queue = queue_
def run(self):
writeLog('Started', self)
while self._keepAlive:
try:
if self._serial.isOpen():
if self._serial.in_waiting > 0:
rcv = self._serial.read(self._serial.in_waiting)
self._queue.put(rcv)
else:
time.sleep(1e-3)
else:
time.sleep(1e-3)
except Exception as e:
writeLog('Exception::{}'.format(e), self)
traceback.print_exc()
self.sig_exception.emit(str(e))
# break
writeLog('Terminated', self)
self.sig_terminated.emit()
def stop(self):
self._keepAlive = False
class ThreadCheck(threading.Thread):
_keepAlive: bool = True
def __init__(self, queue_: queue.Queue):
threading.Thread.__init__(self)
self.sig_get = Callback(bytes)
self.sig_terminated = Callback()
self.sig_exception = Callback(str)
self._queue = queue_
def run(self):
writeLog('Started', self)
while self._keepAlive:
try:
if not self._queue.empty():
self.sig_get.emit(self._queue.get())
else:
time.sleep(1e-3)
except Exception as e:
writeLog('Exception::{}'.format(e), self)
traceback.print_exc()
self.sig_exception.emit(str(e))
writeLog('Terminated', self)
self.sig_terminated.emit()
def stop(self):
self._keepAlive = False
# SerialComm.py
import queue
import serial
from typing import Union
from Define import writeLog, Callback
from Threads import ThreadSend, ThreadReceive, ThreadCheck
class SerialComm():
_serial: serial.Serial
_threadSend: Union[ThreadSend, None] = None
_threadRecv: Union[ThreadReceive, None] = None
_threadCheck: Union[ThreadCheck, None] = None
def __init__(self):
super().__init__()
self.sig_connected = Callback()
self.sig_disconnected = Callback()
self.sig_send_data = Callback(bytes)
self.sig_recv_data = Callback(bytes)
self.sig_exception = Callback(str)
self._serial = serial.Serial(timeout=0)
self._serial.bytesize = 8
self._serial.parity = 'N'
self._serial.stopbits = 1
self._queue_send = queue.Queue()
self._queue_recv = queue.Queue()
def release(self):
self.disconnect()
def connect(self, port: str, baudrate: int) -> bool:
try:
if self._serial.isOpen():
return False
self._serial.port = port
self._serial.baudrate = baudrate
self._serial.open()
if self._serial.isOpen():
self.clearQueues()
self.startThreads()
self.sig_connected.emit()
writeLog('Connected to <{}> (baud: {})'.format(port, baudrate), self)
return True
return False
except Exception as e:
writeLog('Exception::{}'.format(e), self)
self.sig_exception.emit(str(e))
def disconnect(self):
try:
if self._serial.isOpen():
self.stopThreads()
self._serial.close()
self.sig_disconnected.emit()
writeLog('Disconnected', self)
except Exception as e:
writeLog('Exception::{}'.format(e), self)
self.sig_exception.emit(str(e))
def isConnected(self) -> bool:
try:
return self._serial.isOpen()
except Exception as e:
writeLog('Exception::{}'.format(e), self)
return False
def startThreads(self):
if self._threadSend is None:
self._threadSend = ThreadSend(self._serial, self._queue_send)
self._threadSend.sig_send_data.connect(self.onSendData)
self._threadSend.sig_terminated.connect(self.onThreadSendTermanted)
self._threadSend.sig_exception.connect(self.onException)
self._threadSend.setDaemon(True)
self._threadSend.start()
if self._threadCheck is None:
self._threadCheck = ThreadCheck(self._queue_recv)
self._threadCheck.sig_get.connect(self.onRecvData)
self._threadCheck.sig_terminated.connect(self.onThreadCheckTermanted)
self._threadCheck.sig_exception.connect(self.onException)
self._threadCheck.setDaemon(True)
self._threadCheck.start()
if self._threadRecv is None:
self._threadRecv = ThreadReceive(self._serial, self._queue_recv)
self._threadRecv.sig_terminated.connect(self.onThreadRecvTermanted)
self._threadRecv.sig_exception.connect(self.onException)
self._threadRecv.setDaemon(True)
self._threadRecv.start()
def stopThreads(self):
if self._threadSend is not None:
self._threadSend.stop()
if self._threadRecv is not None:
self._threadRecv.stop()
if self._threadCheck is not None:
self._threadCheck.stop()
def clearQueues(self):
while not self._queue_send.empty():
self._queue_send.get()
while not self._queue_recv.empty():
self._queue_recv.get()
def sendData(self, data: Union[bytes, bytearray, str]):
try:
if not self.isConnected():
return
if isinstance(data, str):
sData = bytearray()
sData.extend(map(ord, data))
sData = bytes(sData)
self._queue_send.put(sData)
elif isinstance(data, bytes) or isinstance(data, bytearray):
sData = bytes(data)
self._queue_send.put(sData)
except Exception as e:
writeLog('Exception::{}'.format(e), self)
self.sig_exception.emit(str(e))
def onSendData(self, data: bytes):
self.sig_send_data.emit(data)
def onRecvData(self, data: bytes):
self.sig_recv_data.emit(data)
def onException(self, msg: str):
self.sig_exception.emit(msg)
def onThreadSendTermanted(self):
del self._threadSend
self._threadSend = None
def onThreadRecvTermanted(self):
del self._threadRecv
self._threadRecv = None
def onThreadCheckTermanted(self):
del self._threadCheck
self._threadCheck = None
def reset_input_buffer(self):
self._serial.reset_input_buffer()
@property
def port(self) -> str:
return self._serial.port
@property
def baudrate(self) -> int:
return self._serial.baudrate
반응형
'Software > Python' 카테고리의 다른 글
Python::구조적 패턴 매칭 - 파이썬에서 switch/case문을?! (0) | 2021.03.21 |
---|---|
Python으로 순서도 그리기 (schemdraw) (0) | 2021.01.27 |
Flask - Dynamic Page Update (Ajax) (0) | 2021.01.10 |
Flask - extension을 이용한 HTTP 인증 절차 구현 (0) | 2021.01.08 |
PyQt Serial Port List (0) | 2021.01.07 |