mirror of
https://github.com/gnh1201/caterpillar.git
synced 2024-11-26 07:21:46 +00:00
Merge pull request #41 from AkiaCode/serial
Some checks failed
Ruff / ruff (push) Has been cancelled
Some checks failed
Ruff / ruff (push) Has been cancelled
Implement simple serial connector
This commit is contained in:
commit
6353cb69ad
61
plugins/serial.py
Normal file
61
plugins/serial.py
Normal file
|
@ -0,0 +1,61 @@
|
|||
#!/usr/bin/python3
|
||||
#
|
||||
# serial.py
|
||||
# Serial integration plugin for Caterpillar Proxy
|
||||
#
|
||||
# Caterpillar Proxy - The simple web debugging proxy (formerly, php-httpproxy)
|
||||
# Teakwoo Kim <catry.me@gmail.com>
|
||||
# https://github.com/gnh1201/caterpillar
|
||||
# Created at: 2024-08-11
|
||||
# Updated at: 2024-08-11
|
||||
#
|
||||
|
||||
import serial
|
||||
from decouple import config
|
||||
from base import Extension, Logger
|
||||
|
||||
logger = Logger(name="serial")
|
||||
|
||||
try:
|
||||
client_encoding = config("CLIENT_ENCODING")
|
||||
except Exception as e:
|
||||
logger.error("[*] Invalid configuration", exc_info=e)
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class Serial(Extension):
|
||||
def __init__(self):
|
||||
self.type = "connector"
|
||||
self.connection_type = "serial"
|
||||
|
||||
def dispatch(self, type, id, params, conn):
|
||||
logger.info("[*] Greeting! dispatch")
|
||||
conn.send(b"Greeting! dispatch")
|
||||
|
||||
def connect(self, conn, data, webserver, port, scheme, method, url):
|
||||
connected = False
|
||||
ser = None
|
||||
try:
|
||||
port_path = url.decode(client_encoding).replace('/', '')
|
||||
if not ser:
|
||||
ser = serial.Serial(port_path, baudrate=9600, timeout=2)
|
||||
connected = True
|
||||
logger.debug(f"Connected to {port_path} at 9600 baudrate")
|
||||
|
||||
ser.write(data)
|
||||
logger.debug(f"Data sent to {port_path}: {data}")
|
||||
|
||||
ser_data = ser.read_all()
|
||||
logger.debug(f"Data received: {ser_data}")
|
||||
|
||||
if ser_data:
|
||||
conn.send(ser_data.decode(client_encoding))
|
||||
except serial.SerialException as e:
|
||||
logger.error(f"Failed to connect to {port}", exc_info=e)
|
||||
finally:
|
||||
if ser and ser.is_open:
|
||||
ser.close()
|
||||
logger.debug(f"Serial port {port_path} closed")
|
||||
return connected
|
Loading…
Reference in New Issue
Block a user