implement simple serial connector

This commit is contained in:
AkiaCode 2024-08-11 02:29:27 +09:00 committed by GitHub
parent 724f9f071e
commit bdd6615670
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

61
plugins/serial.py Normal file
View File

@ -0,0 +1,61 @@
#!/usr/bin/python3
#
# serial.py
# Serial integration plugin for Caterpillar Proxy
#
# Caterpillar Proxy - The simple web debugging proxy (formerly, php-httpproxy)
# Namyheon Go (Catswords Research) <gnh1201@gmail.com>
# https://github.com/gnh1201/caterpillar
# Created at: 2024-08-11
# Updated at: 2024-08-11
#
import serial
from decouple import config
from base import Extension, Logger
logger = Logger(name="serial")
try:
client_encoding = config("CLIENT_ENCODING")
except Exception as e:
logger.error("[*] Invalid configuration", exc_info=e)
import logging
logger = logging.getLogger(__name__)
class Serial(Extension):
def __init__(self):
self.type = "connector"
self.connection_type = "serial"
def dispatch(self, type, id, params, conn):
logger.info("[*] Greeting! dispatch")
conn.send(b"Greeting! dispatch")
def connect(self, conn, data, webserver, port, scheme, method, url):
connected = False
ser = None
try:
port_path = url.decode(client_encoding).replace('/', '')
if not ser:
ser = serial.Serial(port_path, baudrate=9600, timeout=2)
connected = True
logger.debug(f"Connected to {port_path} at 9600 baudrate")
ser.write(data)
logger.debug(f"Data sent to {port_path}: {data}")
ser_data = ser.read_all()
logger.debug(f"Data received: {ser_data}")
if ser_data:
conn.send(ser_data.decode(client_encoding))
except serial.SerialException as e:
logger.error(f"Failed to connect to {port}", exc_info=e)
finally:
if ser and ser.is_open:
ser.close()
logger.debug(f"Serial port {port_path} closed")
return connected