YOGYUI

Pyserial 시리얼 통신 모듈 커스터마이징 본문

Software/Python

Pyserial 시리얼 통신 모듈 커스터마이징

요겨 2021. 1. 1. 00:40
반응형

나중에 시간날때 Git에 올려야겠다...

[Structure]

---- 

-------- Define.py

-------- Threads.py

-------- SerialComm.py

# Define.py
import datetime
import threading

def checkAgrumentType(obj, arg):
    if type(obj) == arg:
        return True
    if arg == object:
        return True
    if arg in obj.__class__.__bases__:
        return True
    return False

class Callback(object):
    _args = None
    _callback = None

    def __init__(self, *args):
        self._args = args

    def connect(self, callback):
        self._callback = callback

    def emit(self, *args):
        if len(args) != len(self._args):
            raise Exception('Callback::Argument Length Mismatch')
        arglen = len(args)
        if arglen > 0:
            validTypes = [checkAgrumentType(args[i], self._args[i]) for i in range(arglen)]
            if sum(validTypes) != arglen:
                raise Exception('Callback::Argument Type Mismatch (Definition: {}, Call: {})'.format(self._args, args))
        if self._callback is not None:
            self._callback(*args)

def timestampToString(timestamp: datetime.datetime):
    h = timestamp.hour
    m = timestamp.minute
    s = timestamp.second
    us = timestamp.microsecond
    return '%02d:%02d:%02d.%06d' % (h, m, s, us)

def getCurTimeStr():
    return '<%s>' % timestampToString(datetime.datetime.now())

def writeLog(strMsg: str, obj: object = None):
    strTime = getCurTimeStr()
    if obj is not None:
        if isinstance(obj, threading.Thread):
            if obj.ident is not None:
                strObj = ' [%s (0x%X)]' % (type(obj).__name__, obj.ident)
            else:
                strObj = ' [%s (0x%X)]' % (type(obj).__name__, id(obj))
        else:
            strObj = ' [%s (0x%X)]' % (type(obj).__name__, id(obj))
    else:
        strObj = ''

    msg = strTime + strObj + ' ' + strMsg
    print(msg)
# Threads.py
import time
import queue
import serial
import threading
import traceback
from Define import Callback, writeLog

class ThreadSend(threading.Thread):
    _keepAlive: bool = True

    def __init__(self, serial_: serial.Serial, queue_: queue.Queue):
        threading.Thread.__init__(self)
        self.sig_send_data = Callback(bytes)
        self.sig_terminated = Callback()
        self.sig_exception = Callback(str)
        self._serial = serial_
        self._queue = queue_

    def run(self):
        writeLog('Started', self)
        while self._keepAlive:
            try:
                if not self._queue.empty():
                    data = self._queue.get()
                    sendLen = len(data)
                    while sendLen > 0:
                        nLen = self._serial.write(data[(len(data) - sendLen):])
                        sData = data[(len(data) - sendLen):(len(data) - sendLen + nLen)]
                        self.sig_send_data.emit(sData)
                        sendLen -= nLen
                else:
                    time.sleep(1e-3)
            except Exception as e:
                writeLog('Exception::{}'.format(e), self)
                traceback.print_exc()
                self.sig_exception.emit(str(e))
        writeLog('Terminated', self)
        self.sig_terminated.emit()
    
    def stop(self):
        self._keepAlive = False

class ThreadReceive(threading.Thread):
    _keepAlive: bool = True

    def __init__(self, serial_: serial.Serial, queue_: queue.Queue):
        threading.Thread.__init__(self)
        self.sig_terminated = Callback()
        self.sig_exception = Callback(str)
        self._serial = serial_
        self._queue = queue_
    
    def run(self):
        writeLog('Started', self)
        while self._keepAlive:
            try:
                if self._serial.isOpen():
                    if self._serial.in_waiting > 0:
                        rcv = self._serial.read(self._serial.in_waiting)
                        self._queue.put(rcv)
                    else:
                        time.sleep(1e-3)
                else:
                    time.sleep(1e-3)
            except Exception as e:
                writeLog('Exception::{}'.format(e), self)
                traceback.print_exc()
                self.sig_exception.emit(str(e))
                # break
        writeLog('Terminated', self)
        self.sig_terminated.emit()
    
    def stop(self):
        self._keepAlive = False

class ThreadCheck(threading.Thread):
    _keepAlive: bool = True

    def __init__(self, queue_: queue.Queue):
        threading.Thread.__init__(self)
        self.sig_get = Callback(bytes)
        self.sig_terminated = Callback()
        self.sig_exception = Callback(str)
        self._queue = queue_
    
    def run(self):
        writeLog('Started', self)
        while self._keepAlive:
            try:
                if not self._queue.empty():
                    self.sig_get.emit(self._queue.get())
                else:
                    time.sleep(1e-3)
            except Exception as e:
                writeLog('Exception::{}'.format(e), self)
                traceback.print_exc()
                self.sig_exception.emit(str(e))
        writeLog('Terminated', self)
        self.sig_terminated.emit()
    
    def stop(self):
        self._keepAlive = False
# SerialComm.py
import queue
import serial
from typing import Union
from Define import writeLog, Callback
from Threads import ThreadSend, ThreadReceive, ThreadCheck


class SerialComm():
    _serial: serial.Serial
    _threadSend: Union[ThreadSend, None] = None
    _threadRecv: Union[ThreadReceive, None] = None
    _threadCheck: Union[ThreadCheck, None] = None

    def __init__(self):
        super().__init__()
        self.sig_connected = Callback()
        self.sig_disconnected = Callback()
        self.sig_send_data = Callback(bytes)
        self.sig_recv_data = Callback(bytes)
        self.sig_exception = Callback(str)

        self._serial = serial.Serial(timeout=0)
        self._serial.bytesize = 8
        self._serial.parity = 'N'
        self._serial.stopbits = 1

        self._queue_send = queue.Queue()
        self._queue_recv = queue.Queue()
    
    def release(self):
        self.disconnect()

    def connect(self, port: str, baudrate: int) -> bool:
        try:
            if self._serial.isOpen():
                return False
            
            self._serial.port = port
            self._serial.baudrate = baudrate
            self._serial.open()
            if self._serial.isOpen():
                self.clearQueues()
                self.startThreads()
                self.sig_connected.emit()
                writeLog('Connected to <{}> (baud: {})'.format(port, baudrate), self)
                return True

            return False
        except Exception as e:
            writeLog('Exception::{}'.format(e), self)
            self.sig_exception.emit(str(e))

    def disconnect(self):
        try:
            if self._serial.isOpen():
                self.stopThreads()
                self._serial.close()
                self.sig_disconnected.emit()
                writeLog('Disconnected', self)
        except Exception as e:
            writeLog('Exception::{}'.format(e), self)
            self.sig_exception.emit(str(e))    

    def isConnected(self) -> bool:
        try:
            return self._serial.isOpen()
        except Exception as e:
            writeLog('Exception::{}'.format(e), self)
            return False

    def startThreads(self):
        if self._threadSend is None:
            self._threadSend = ThreadSend(self._serial, self._queue_send)
            self._threadSend.sig_send_data.connect(self.onSendData)
            self._threadSend.sig_terminated.connect(self.onThreadSendTermanted)
            self._threadSend.sig_exception.connect(self.onException)
            self._threadSend.setDaemon(True)
            self._threadSend.start()
        
        if self._threadCheck is None:
            self._threadCheck = ThreadCheck(self._queue_recv)
            self._threadCheck.sig_get.connect(self.onRecvData)
            self._threadCheck.sig_terminated.connect(self.onThreadCheckTermanted)
            self._threadCheck.sig_exception.connect(self.onException)
            self._threadCheck.setDaemon(True)
            self._threadCheck.start()

        if self._threadRecv is None:
            self._threadRecv = ThreadReceive(self._serial, self._queue_recv)
            self._threadRecv.sig_terminated.connect(self.onThreadRecvTermanted)
            self._threadRecv.sig_exception.connect(self.onException)
            self._threadRecv.setDaemon(True)
            self._threadRecv.start()

    def stopThreads(self):
        if self._threadSend is not None:
            self._threadSend.stop()
        if self._threadRecv is not None:
            self._threadRecv.stop()
        if self._threadCheck is not None:
            self._threadCheck.stop()

    def clearQueues(self):
        while not self._queue_send.empty():
            self._queue_send.get()
        while not self._queue_recv.empty():
            self._queue_recv.get()

    def sendData(self, data: Union[bytes, bytearray, str]):
        try:
            if not self.isConnected():
                return
            if isinstance(data, str):
                sData = bytearray()
                sData.extend(map(ord, data))
                sData = bytes(sData)
                self._queue_send.put(sData)
            elif isinstance(data, bytes) or isinstance(data, bytearray):
                sData = bytes(data)
                self._queue_send.put(sData)
        except Exception as e:
            writeLog('Exception::{}'.format(e), self)
            self.sig_exception.emit(str(e))  

    def onSendData(self, data: bytes):
        self.sig_send_data.emit(data)

    def onRecvData(self, data: bytes):
        self.sig_recv_data.emit(data)

    def onException(self, msg: str):
        self.sig_exception.emit(msg)

    def onThreadSendTermanted(self):
        del self._threadSend
        self._threadSend = None
    
    def onThreadRecvTermanted(self):
        del self._threadRecv
        self._threadRecv = None
    
    def onThreadCheckTermanted(self):
        del self._threadCheck
        self._threadCheck = None

    def reset_input_buffer(self):
        self._serial.reset_input_buffer()

    @property
    def port(self) -> str:
        return self._serial.port

    @property
    def baudrate(self) -> int:
        return self._serial.baudrate
반응형