caterpillar/base.py

295 lines
8.5 KiB
Python
Raw Normal View History

2024-05-19 17:19:54 +00:00
#!/usr/bin/python3
#
# base.py
2024-05-19 17:20:12 +00:00
# base (common) file
2024-05-19 17:19:54 +00:00
#
# Caterpillar Proxy - The simple web debugging proxy (formerly, php-httpproxy)
2024-05-19 17:19:54 +00:00
# Namyheon Go (Catswords Research) <gnh1201@gmail.com>
# Euiseo Cha (Wonkwang University) <zeroday0619_dev@outlook.com>
2024-05-19 17:19:54 +00:00
# https://github.com/gnh1201/caterpillar
# Created at: 2024-05-20
2024-11-12 07:10:20 +00:00
# Updated at: 2024-10-12
2024-05-19 17:19:54 +00:00
#
import logging
2024-05-19 17:16:49 +00:00
import hashlib
import json
import os
2024-05-20 15:04:39 +00:00
import re
import importlib
2024-07-11 10:03:34 +00:00
import subprocess
import platform
2024-05-19 17:16:49 +00:00
2024-08-31 05:37:21 +00:00
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import Union, List
2024-07-11 10:02:08 +00:00
client_encoding = "utf-8"
2024-05-19 17:16:49 +00:00
2024-07-11 07:02:51 +00:00
2024-05-20 07:18:09 +00:00
def extract_credentials(url):
2024-07-11 10:02:08 +00:00
pattern = re.compile(
r"(?P<scheme>\w+://)?(?P<username>[^:/]+):(?P<password>[^@]+)@(?P<url>.+)"
)
2024-05-20 07:18:09 +00:00
match = pattern.match(url)
if match:
2024-07-11 10:02:08 +00:00
scheme = match.group("scheme") if match.group("scheme") else "https://"
username = match.group("username")
password = match.group("password")
url = match.group("url")
2024-05-20 07:18:09 +00:00
return username, password, scheme + url
else:
return None, None, url
2024-07-11 07:02:51 +00:00
2024-05-19 17:16:49 +00:00
def jsonrpc2_create_id(data):
return hashlib.sha1(json.dumps(data).encode(client_encoding)).hexdigest()
2024-07-11 07:02:51 +00:00
def jsonrpc2_encode(method, params=None):
2024-07-11 10:02:08 +00:00
data = {"jsonrpc": "2.0", "method": method, "params": params}
2024-05-19 17:16:49 +00:00
id = jsonrpc2_create_id(data)
2024-10-25 06:27:45 +00:00
data["id"] = id
2024-05-19 17:16:49 +00:00
return (id, json.dumps(data))
2024-07-11 07:02:51 +00:00
2024-07-12 01:14:39 +00:00
def jsonrpc2_decode(text):
data = json.loads(text)
2024-08-31 05:37:21 +00:00
type = "error" if "error" in data else "result" if "result" in data else None
id = data.get("id")
2024-07-12 01:14:39 +00:00
rpcdata = data.get(type) if type else None
return type, id, rpcdata
2024-07-11 07:02:51 +00:00
2024-07-11 10:02:08 +00:00
def jsonrpc2_result_encode(result, id=""):
data = {"jsonrpc": "2.0", "result": result, "id": id}
2024-05-19 17:16:49 +00:00
return json.dumps(data)
2024-07-11 07:02:51 +00:00
2024-07-11 10:02:08 +00:00
def jsonrpc2_error_encode(error, id=""):
data = {"jsonrpc": "2.0", "error": error, "id": id}
2024-05-19 17:16:49 +00:00
return json.dumps(data)
2024-08-31 05:37:21 +00:00
2024-07-11 10:03:34 +00:00
def find_openssl_binpath():
system = platform.system()
if system == "Windows":
possible_paths = [
os.path.join(
os.getenv("ProgramFiles", "C:\\Program Files"),
"OpenSSL-Win64",
"bin",
"openssl.exe",
),
os.path.join(
os.getenv("ProgramFiles", "C:\\Program Files"),
"OpenSSL-Win32",
"bin",
"openssl.exe",
),
os.path.join(
os.getenv("ProgramFiles(x86)", "C:\\Program Files (x86)"),
"OpenSSL-Win32",
"bin",
"openssl.exe",
),
os.path.join(
os.getenv("ProgramW6432", "C:\\Program Files"),
"OpenSSL-Win64",
"bin",
"openssl.exe",
),
os.path.join(
os.getenv("ProgramW6432", "C:\\Program Files"),
"OpenSSL-Win32",
"bin",
"openssl.exe",
),
]
for path in possible_paths:
if os.path.exists(path):
return path
else:
try:
result = subprocess.run(
["which", "openssl"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
path = result.stdout.decode().strip()
if path:
return path
2024-07-31 06:39:48 +00:00
except Exception:
2024-07-11 10:03:34 +00:00
pass
return "openssl"
2024-08-31 05:37:21 +00:00
class ExtensionType:
def __init__(self):
self.type: str = None
self.method: str = None
self.exported_methods: list[str] = []
self.connection_type: str = None
2024-07-11 10:02:08 +00:00
class Extension:
2024-10-08 14:36:02 +00:00
extensions: list[ExtensionType] = []
2024-05-19 17:16:49 +00:00
protocols = []
buffer_size = 8192
@classmethod
def set_protocol(cls, protocol):
cls.protocols.append(protocol)
@classmethod
def set_buffer_size(cls, _buffer_size):
cls.buffer_size = _buffer_size
@classmethod
2024-07-09 08:02:29 +00:00
def register(cls, s):
2024-07-11 10:02:08 +00:00
module_name, class_name = s.strip().split(".")[0:2]
module_path = "plugins." + module_name
2024-07-09 07:38:19 +00:00
try:
module = importlib.import_module(module_path)
_class = getattr(module, class_name)
cls.extensions.append(_class())
2024-07-31 06:39:48 +00:00
except (ImportError, AttributeError):
2024-07-09 07:38:19 +00:00
raise ImportError(class_name + " in the extension " + module_name)
2024-05-19 17:16:49 +00:00
@classmethod
def get_filters(cls):
filters = []
for extension in cls.extensions:
if extension.type == "filter":
filters.append(extension)
return filters
@classmethod
def get_rpcmethod(cls, method):
for extension in cls.extensions:
2024-10-24 23:59:49 +00:00
is_exported_method = False
try:
is_exported_method = (method == extension.method) or (
method in extension.exported_methods
)
except:
pass
2024-05-19 17:16:49 +00:00
if extension.type == "rpcmethod" and is_exported_method:
return extension
return None
@classmethod
def dispatch_rpcmethod(cls, method, type, id, params, conn):
rpcmethod = cls.get_rpcmethod(method)
if rpcmethod:
if rpcmethod.method == method:
return rpcmethod.dispatch(type, id, params, conn)
else:
f = getattr(rpcmethod, method, None)
if f:
return f(type, id, params, conn)
@classmethod
def get_connector(cls, connection_type):
for extension in cls.extensions:
2024-07-11 10:02:08 +00:00
if (
extension.type == "connector"
and extension.connection_type == connection_type
):
2024-05-19 17:16:49 +00:00
return extension
return None
2024-11-12 07:10:09 +00:00
@classmethod
def test_connector(cls, data):
def test(prelude, data):
return data.find(prelude) == 0
for extension in cls.extensions:
if (
extension.type == "connector"
and test(extension.prelude, data)
):
return extension
return None
2024-05-19 17:16:49 +00:00
@classmethod
2024-07-11 07:02:51 +00:00
def send_accept(cls, conn, method, success=True):
2024-07-11 10:02:08 +00:00
if "tcp" in cls.protocols:
_, message = jsonrpc2_encode(f"{method}_accept", {"success": success})
2024-05-19 17:16:49 +00:00
conn.send(message.encode(client_encoding))
2024-07-11 07:02:51 +00:00
print(f"Accepted request with {cls.protocols[0]} protocol")
2024-05-19 17:16:49 +00:00
@classmethod
def readall(cls, conn):
2024-07-11 10:02:08 +00:00
if "tcp" in cls.protocols:
data = b""
2024-05-19 17:16:49 +00:00
while True:
try:
chunk = conn.recv(cls.buffer_size)
if not chunk:
break
data += chunk
except:
pass
return data
2024-07-11 07:02:51 +00:00
2024-07-11 10:02:08 +00:00
elif "http" in cls.protocols:
2024-05-19 17:16:49 +00:00
# empty binary when an file not exists
2024-07-11 10:02:08 +00:00
if "file" not in conn.request.files:
return b""
2024-05-19 17:16:49 +00:00
# read an uploaded file with binary mode
2024-07-11 10:02:08 +00:00
file = conn.request.files["file"]
2024-05-19 17:16:49 +00:00
return file.read()
2024-07-11 07:02:51 +00:00
2024-05-19 17:16:49 +00:00
def __init__(self):
self.type = None
self.method = None
self.exported_methods = []
self.connection_type = None
def test(self, filtered, data, webserver, port, scheme, method, url):
raise NotImplementedError
2024-07-11 07:02:51 +00:00
def dispatch(self, type, id, params, method=None, conn=None):
2024-05-19 17:16:49 +00:00
raise NotImplementedError
def connect(self, conn, data, webserver, port, scheme, method, url):
raise NotImplementedError
class Logger(logging.Logger):
def __init__(self, name: str, level: int = logging.NOTSET):
super().__init__(name, level)
2024-07-11 10:02:08 +00:00
self.formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s %(module)s: %(message)s"
)
if not os.path.isdir("logs"):
os.mkdir("logs")
stream_handler = logging.StreamHandler()
2024-07-11 10:02:08 +00:00
file_handler = logging.FileHandler(
"logs/" + name + "-" + self._generate_timestamp() + ".log"
)
self._set_formatters([stream_handler, file_handler])
self._add_handlers([stream_handler, file_handler])
@staticmethod
def _generate_timestamp():
2024-07-11 10:02:08 +00:00
date = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")
return date
2024-07-11 10:02:08 +00:00
def _set_formatters(
self, handlers: List[Union[logging.StreamHandler, logging.FileHandler]]
):
for handler in handlers:
handler.setFormatter(self.formatter)
2024-07-11 10:02:08 +00:00
def _add_handlers(
self, handlers: List[Union[logging.StreamHandler, logging.FileHandler]]
):
for handler in handlers:
self.addHandler(handler)