diff --git a/plugins/alwaysonline.py b/plugins/alwaysonline.py deleted file mode 100644 index 2e21bb7..0000000 --- a/plugins/alwaysonline.py +++ /dev/null @@ -1,223 +0,0 @@ -#!/usr/bin/python3 -# -# alwaysonline.py -# Always Online implementation for Caterpillar Proxy -# -# Caterpillar Proxy - The simple web debugging proxy (formerly, php-httpproxy) -# Namyheon Go (Catswords Research) -# https://github.com/gnh1201/caterpillar -# Created at: 2024-07-31 -# Updated at: 2024-07-31 -# -import socket -import ssl -import requests -from decouple import config -from elasticsearch import Elasticsearch, NotFoundError -import hashlib -from datetime import datetime, UTC -from base import Extension, Logger - -logger = Logger(name="wayback") - -try: - client_encoding = config("CLIENT_ENCODING") - es_host = config("ES_HOST") - es_index = config("ES_INDEX") -except Exception as e: - logger.error("[*] Invalid configuration", exc_info=e) - -es = Elasticsearch([es_host]) - - -def generate_id(url: str): - """Generate a unique ID for a URL by hashing it.""" - return hashlib.sha256(url.encode("utf-8")).hexdigest() - - -def get_cached_page_from_google(url: str): - status_code, content = (0, b"") - - # Google Cache URL - google_cache_url = "https://webcache.googleusercontent.com/search?q=cache:" + url - - # Send a GET request to Google Cache URL - response = requests.get(google_cache_url) - - # Check if the request was successful (status code 200) - if response.status_code == 200: - content = response.content # Extract content from response - else: - status_code = response.status_code - - return status_code, content - - -# API documentation: https://archive.org/help/wayback_api.php -def get_cached_page_from_wayback(url: str): - status_code, content = (0, b"") - - # Wayback Machine API URL - wayback_api_url = "http://archive.org/wayback/available?url=" + url - - # Send a GET request to Wayback Machine API - response = requests.get(wayback_api_url) - - # Check if the request was successful (status code 200) - if response.status_code == 200: - try: - # Parse JSON response - data = response.json() - archived_snapshots = data.get("archived_snapshots", {}) - closest_snapshot = archived_snapshots.get("closest", {}) - - # Check if the URL is available in the archive - if closest_snapshot: - archived_url = closest_snapshot.get("url", "") - - # If URL is available, fetch the content of the archived page - if archived_url: - archived_page_response = requests.get(archived_url) - status_code = archived_page_response.status_code - if status_code == 200: - content = archived_page_response.content - else: - status_code = 404 - else: - status_code = 404 - except: - status_code = 502 - else: - status_code = response.status_code - - return status_code, content - - -def get_cached_page_from_elasticsearch(url: str): - url_id = generate_id(url) - try: - result = es.get(index=es_index, id=url_id) - logger.info(result["_source"]) - return 200, result["_source"]["content"].encode(client_encoding) - except NotFoundError: - return 404, b"" - except Exception as e: - logger.error(f"Error fetching from Elasticsearch: {e}") - return 502, b"" - - -def cache_to_elasticsearch(url: str, data: bytes): - url_id = generate_id(url) - timestamp = datetime.now(UTC).timestamp() - try: - es.index( - index=es_index, - id=url_id, - body={ - "url": url, - "content": data.decode(client_encoding), - "timestamp": timestamp, - }, - ) - except Exception as e: - logger.error(f"Error caching to Elasticsearch: {e}") - - -def get_page_from_origin_server(url: str): - try: - response = requests.get(url) - return response.status_code, response.content - except Exception as e: - return 502, str(e).encode(client_encoding) - - -class AlwaysOnline(Extension): - def __init__(self): - self.type = "connector" # this is a connector - self.connection_type = "alwaysonline" - self.buffer_size = 8192 - - def connect(self, conn: socket.socket, data: bytes, webserver: bytes, port: bytes, scheme: bytes, method: bytes, url: bytes): - logger.info("[*] Connecting... Connecting...") - - connected = False - - is_ssl = scheme in [b"https", b"tls", b"ssl"] - cache_hit = 0 - buffered = b"" - - def sendall(_sock: socket.socket, _conn: socket.socket, _data: bytes): - # send first chuck - sock.send(_data) - if len(_data) < self.buffer_size: - return - - # send following chunks - _conn.settimeout(1) - while True: - try: - chunk = _conn.recv(self.buffer_size) - if not chunk: - break - _sock.send(chunk) - except: - break - - target_url = url.decode(client_encoding) - target_scheme = scheme.decode(client_encoding) - target_webserver = webserver.decode(client_encoding) - - if "://" not in target_url: - target_url = f"{target_scheme}://{target_webserver}:{port}{target_url}" - - if method == b"GET": - if not connected: - logger.info("Trying get data from Elasticsearch...") - status_code, content = get_cached_page_from_elasticsearch(target_url) - if status_code == 200: - buffered += content - cache_hit += 1 - connected = True - - if not connected: - logger.info("Trying get data from Wayback Machine...") - status_code, content = get_cached_page_from_wayback(target_url) - if status_code == 200: - buffered += content - cache_hit += 1 - connected = True - - if not connected: - logger.info("Trying get data from Google Website Cache...") - status_code, content = get_cached_page_from_google(target_url) - if status_code == 200: - buffered += content - cache_hit += 1 - connected = True - - if cache_hit == 0: - status_code, content = get_page_from_origin_server(target_url) - buffered += content - cache_to_elasticsearch(target_url, buffered) - - conn.send(buffered) - else: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - if is_ssl: - context = ssl.create_default_context() - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE - - sock = context.wrap_socket( - sock, server_hostname=webserver.decode(client_encoding) - ) - sock.connect((webserver, port)) - # sock.sendall(data) - sendall(sock, conn, data) - else: - sock.connect((webserver, port)) - # sock.sendall(data) - sendall(sock, conn, data) - - return connected diff --git a/plugins/bio.py b/plugins/bio.py deleted file mode 100644 index 1805812..0000000 --- a/plugins/bio.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/python3 -# -# bio.py -# Biopython plugin for Caterpillar Proxy -# -# Euiseo Cha (Wonkwang University) -# https://github.com/gnh1201/caterpillar -# Created at: 2024-07-02 -# Updated at: 2024-07-02 -# - -from socket import socket -from Bio.Seq import Seq -from Bio.SeqUtils import gc_fraction - -from base import Extension - - -def _analyze_sequence(sequence: str) -> dict[str, str]: - """ - Analyze a given DNA sequence to provide various nucleotide transformations and translations. - - :param sequence: DNA sequence (string) to be analyzed. - :return: Dictionary containing the following analyses of the sequence: - - complement: DNA complement of the sequence. - - complement_rna: RNA complement of the sequence. - - reverse_complement: Reverse complement of the DNA sequence. - - reverse_complement_rna: Reverse complement of the RNA sequence. - - transcription: Transcription of the DNA sequence to RNA. - - translation: Translation of the RNA sequence to an amino acid sequence. - - back_transcribe: Back-transcription of the RNA sequence to DNA. - """ - sequence_object = Seq(sequence) - return dict( - complement=str(sequence_object.complement()), - complement_rna=str(sequence_object.complement_rna()), - reverse_complement=str(sequence_object.reverse_complement()), - reverse_complement_rna=str(sequence_object.reverse_complement_rna()), - transcription=str(sequence_object.transcribe()), - translation=str(sequence_object.translate()), - back_transcribe=str(sequence_object.back_transcribe()), - ) - - -def _gc_content_calculation(sequence: str) -> dict[str, str]: - """ - Calculate the GC content of a given DNA sequence and return it as a float. - - :param sequence: DNA sequence (string) for which to calculate the GC content. - :return: Dictionary containing the GC content as a float. - """ - gc_content = gc_fraction(sequence) - return dict( - gc_content=gc_content, - ) - - -class PyBio(Extension): - def __init__(self): - self.type = "rpcmethod" - self.method = "analyze_sequence_init" - self.exported_methods = ["analyze_sequence", "gc_content_calculation"] - - def dispatch(self, type, id, params, conn): - conn.send(b"Greeting! dispatch") - - def analyze_sequence(self, type, id, params, conn: socket): - """ - Analyze a DNA sequence provided in the params dictionary. - - :param type: Not used in this function. - :param id: Not used in this function. - :param params: Dictionary containing the DNA sequence with the key "sequence". - Example: {"sequence": "ATGCGTACGTAGCTAGCTAGCGTAGCTAGCTGACT"} - :param conn: Not used in this function. - :return: Dictionary containing various analyses of the DNA sequence: - - back_transcribe: Back-transcription of the RNA sequence to DNA. - - complement: DNA complement of the sequence. - - complement_rna: RNA complement of the sequence. - - reverse_complement: Reverse complement of the DNA sequence. - - reverse_complement_rna: Reverse complement of the RNA sequence. - - transcription: Transcription of the DNA sequence to RNA. - - translation: Translation of the RNA sequence to an amino acid sequence. - Example: {"back_transcribe": "ATGCGTACGTAGCTAGCTAGCGTAGCTAGCTGACT", - "complement": "TACGCATGCATCGATCGATCGCATCGATCGACTGA", - "complement_rna": "UACGCAUGCAUCGAUCGAUCGCAUCGAUCGACUGA", - "reverse_complement": "AGTCAGCTAGCTACGCTAGCTAGCTACGTACGCAT", - "reverse_complement_rna": "AGUCAGCUAGCUACGCUAGCUAGCUACGUACGCAU", - "transcription": "AUGCGUACGUAGCUAGCUAGCGUAGCUAGCUGACU", - "translation": "MRT*LASVAS*"} - """ - result = _analyze_sequence(params["sequence"]) - return result - - def gc_content_calculation(self, type, id, params, conn: socket): - """ - Calculate the GC content for a given DNA sequence provided in the params dictionary. - - :param type: Not used in this function. - :param id: Not used in this function. - :param params: Dictionary containing the DNA sequence with the key "sequence". - Example: {"sequence": "ATGCGTACGTAGCTAGCTAGCGTAGCTAGCTGACT"} - :param conn: Not used in this function. - :return: Dictionary containing the GC content as a float. - Example: {"gc_content": 0.5142857142857142} - """ - result = _gc_content_calculation(params["sequence"]) - return result diff --git a/plugins/container.py b/plugins/container.py deleted file mode 100644 index 3c27e97..0000000 --- a/plugins/container.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/python3 -# -# container.py -# Linux Container (e.g. Docker) plugin for Caterpillar Proxy -# -# Caterpillar Proxy - The simple and parasitic web proxy with SPAM filter -# Namyheon Go (Catswords Research) -# https://github.com/gnh1201/caterpillar -# Created at: 2024-03-04 -# Updated at: 2024-07-06 -# - -import docker -from socket import socket -from base import Extension, Logger - -logger = Logger("Container") - - -class Container(Extension): - def __init__(self): - self.type = "rpcmethod" - self.method = "container_init" - self.exported_methods = [ - "container_cteate", - "container_start", - "container_run", - "container_stop", - "container_pause", - "container_unpause", - "container_restart", - "container_kill", - "container_remove", - ] - - # docker - self.client = docker.from_env() - - def dispatch(self, type, id, params, conn: socket): - logger.info("[*] Greeting! dispatch") - conn.send(b"Greeting! dispatch") - - def container_cteate(self, type, id, params, conn: socket): - # todo: - - return b"[*] Created" - - def container_start(self, type, id, params, conn: socket): - name = params["name"] - - container = self.client.containers.get(name) - container.start() - - def container_run(self, type, id, params, conn: socket): - devices = params["devices"] - image = params["image"] - devices = params["devices"] - name = params["name"] - environment = params["environment"] - volumes = params["volumes"] - - container = self.client.containers.run( - image, - devices=devices, - name=name, - volumes=volumes, - environment=environment, - detach=True, - ) - container.logs() - logger.info("[*] Running...") - return b"[*] Running..." - - def container_stop(self, type, id, params, conn: socket): - name = params["name"] - - container = self.client.containers.get(name) - container.stop() - - logger.info("[*] Stopped") - return b"[*] Stopped" - - def container_pause(self, type, id, params, conn: socket): - name = params["name"] - - container = self.client.containers.get(name) - container.pause() - return b"[*] Paused" - - def container_unpause(self, type, id, params, conn: socket): - name = params["name"] - - container = self.client.containers.get(name) - container.unpause() - return b"[*] Unpaused" - - def container_restart(self, type, id, params, conn: socket): - name = params["name"] - - container = self.client.containers.get(name) - container.restart() - return b"[*] Restarted" - - def container_kill(self, type, id, params, conn: socket): - # TODO: - - return b"[*] Killed" - - def container_remove(self, type, id, params, conn: socket): - name = params["name"] - - container = self.client.containers.get(name) - container.remove() - return b"[*] Removed" diff --git a/plugins/fediverse.py b/plugins/fediverse.py deleted file mode 100644 index 30e578f..0000000 --- a/plugins/fediverse.py +++ /dev/null @@ -1,317 +0,0 @@ -#!/usr/bin/python3 -# -# fediverse.py -# Fediverse (Mastodon, Misskey, Pleroma, ...) SPAM filter plugin for Caterpillar Proxy -# -# Caterpillar Proxy - The simple and parasitic web proxy with SPAM filter (formerly, php-httpproxy) -# Namyheon Go (Catswords Research) -# https://github.com/gnh1201/caterpillar -# https://github.com/gnh1201/caterpillar/wiki/Fediverse -# -# Created in: 2022-10-06 -# Updated in: 2024-10-08 -# -import base64 -import hashlib -import io -import re -import requests -import os.path -import logging - -from decouple import config -from PIL import Image - -from base import Extension, Logger - -logger = Logger(name="fediverse", level=logging.WARNING) - -try: - client_encoding = config("CLIENT_ENCODING", default="utf-8") - truecaptcha_userid = config("TRUECAPTCHA_USERID") # truecaptcha.org - truecaptcha_apikey = config("TRUECAPTCHA_APIKEY") # truecaptcha.org - dictionary_file = config( - "DICTIONARY_FILE", default="words_alpha.txt" - ) # https://github.com/dwyl/english-words - librey_apiurl = config( - "LIBREY_APIURL", default="https://serp.catswords.net" - ) # https://github.com/Ahwxorg/librey - bad_domain = config("BAD_DOMAIN", default="") -except Exception as e: - logger.error("[*] Invalid configuration", exc_info=e) - -class Fediverse(Extension): - def __init__(self): - self.type = "filter" # this is a filter - - # Load data to use KnownWords4 strategy - # Download data: https://github.com/dwyl/english-words - self.known_words = [] - if dictionary_file != "" and os.path.isfile(dictionary_file): - with open(dictionary_file, "r") as file: - words = file.readlines() - self.known_words = [ - word.strip() for word in words if len(word.strip()) > 3 - ] - logger.info("[*] Data loaded to use KnownWords4 strategy") - - def test(self, filtered, data, webserver, port, scheme, method, url): - # prevent cache confusing - if data.find(b"Welcome to nginx!") > -1: - return True - - # allowed conditions - if method == b"GET" or url.find(b"/api") > -1: - return False - - # convert to text - data_length = len(data) - text = data.decode(client_encoding, errors="ignore") - error_rate = (data_length - len(text)) / data_length - if error_rate > 0.2: # it is a binary data - return False - - # check if the text contains any of the bad domains - bad_domains = list(filter(None, map(str.strip, bad_domain.split(",")))) - if bool(re.search(r"https://(" + "|".join(re.escape(domain) for domain in bad_domains) + ")", text)): - logger.warning("[*] Found a bad reputation domain.") - logger.warning("[*] BLOCKED MESSAGE: %s" % (text)) - return True - - # check ID with K-Anonymity strategy - pattern = r"\b(?:(?<=\/@)|(?<=acct:))([a-zA-Z0-9]{10})\b" - matches = list(set(re.findall(pattern, text))) - if len(matches) > 0: - try: - filtered = not all(map(self.pwnedpasswords_test, matches)) - if filtered: - logger.warning("[*] Found Suspicious ID: %s" % (", ".join(matches))) - except Exception as e: - logger.error("[*] K-Anonymity strategy not working!", exc_info=e) - filtered = True - - # feedback - if filtered and len(matches) > 0: - score = 0 - strategies = [] - - # check ID with VowelRatio10 strategy - def vowel_ratio_test(s): - ratio = self.calculate_vowel_ratio(s) - return ratio > 0.2 and ratio < 0.8 - - if all(map(vowel_ratio_test, matches)): - score += 1 - strategies.append("VowelRatio10") - - # check ID with Palindrome4 strategy - if all(map(self.has_palindrome, matches)): - score += 1 - strategies.append("Palindrome4") - - # check ID with KnownWords4 strategy - if all(map(self.has_known_word, matches)): - score += 2 - strategies.append("KnownWords4") - - # check ID with SearchEngine3 strategy - if librey_apiurl != "" and all(map(self.search_engine_test, matches)): - score += 1 - strategies.append("SearchEngine3") - - # check ID with RepeatedNumbers3 strategy - if all(map(self.repeated_numbers_test, matches)): - score += 1 - strategies.append("RepeatedNumbers3") - - # logging score - with open("score.log", "a") as file: - file.write( - "%s\t%s\t%s\r\n" - % ("+".join(matches), str(score), "+".join(strategies)) - ) - - # make decision - if score > 1: - filtered = False - - # check an attached images (check images with Not-CAPTCHA strategy) - if truecaptcha_userid != "" and not filtered and len(matches) > 0: - - def webp_to_png_base64(url): - try: - response = requests.get(url) - img = Image.open(io.BytesIO(response.content)) - img_png = img.convert("RGBA") - buffered = io.BytesIO() - img_png.save(buffered, format="PNG") - encoded_image = base64.b64encode(buffered.getvalue()).decode( - "ascii" - ) - return encoded_image - except: - return None - - urls = re.findall(r'https://[^\s"]+\.webp', text) - if len(urls) > 0: - for url in urls: - if filtered: - break - - logger.info("[*] downloading... %s" % (url)) - encoded_image = webp_to_png_base64(url) - logger.info("[*] downloaded.") - if encoded_image: - logger.info("[*] solving...") - try: - solved = self.truecaptcha_solve(encoded_image) - if solved: - logger.info("[*] solved: %s" % (solved)) - filtered = filtered or ( - solved.lower() in ["ctkpaarr", "spam"] - ) - else: - logger.info("[*] not solved") - except Exception as e: - logger.error( - "[*] Not CAPTCHA strategy not working!", exc_info=e - ) - - if filtered: - logger.warning("[*] BLOCKED MESSAGE: %s" % (text)) - - return filtered - - # Strategy: K-Anonymity test - use api.pwnedpasswords.com - def pwnedpasswords_test(self, s): - # convert to lowercase - s = s.lower() - - # SHA1 of the password - p_sha1 = hashlib.sha1(s.encode()).hexdigest() - - # First 5 char of SHA1 for k-anonymity API use - f5_sha1 = p_sha1[:5] - - # Last 5 char of SHA1 to match API output - l5_sha1 = p_sha1[-5:] - - # Making GET request using Requests library - response = requests.get(f"https://api.pwnedpasswords.com/range/{f5_sha1}") - - # Checking if request was successful - if response.status_code == 200: - # Parsing response text - hashes = response.text.split("\r\n") - - # Using list comprehension to find matching hashes - matching_hashes = [ - line.split(":")[0] for line in hashes if line.endswith(l5_sha1) - ] - - # If there are matching hashes, return True, else return False - return bool(matching_hashes) - else: - raise Exception( - "api.pwnedpasswords.com response status: %s" - % (str(response.status_code)) - ) - - return False - - # Strategy: Not-CAPTCHA - use truecaptcha.org - def truecaptcha_solve(self, encoded_image): - url = "https://api.apitruecaptcha.org/one/gettext" - data = { - "userid": truecaptcha_userid, - "apikey": truecaptcha_apikey, - "data": encoded_image, - "mode": "human", - "case": "lower" - } - response = requests.post(url=url, json=data) - - if response.status_code == 200: - data = response.json() - - if "error_message" in data: - print("[*] Error: %s" % (data["error_message"])) - return None - if "result" in data: - return data["result"] - else: - raise Exception( - "api.apitruecaptcha.org response status: %s" - % (str(response.status_code)) - ) - - return None - - # Strategy: VowelRatio10 - def calculate_vowel_ratio(self, s): - # Calculate the length of the string. - length = len(s) - if length == 0: - return 0.0 - - # Count the number of vowels ('a', 'e', 'i', 'o', 'u', 'w', 'y') in the string. - vowel_count = sum(1 for char in s if char.lower() in "aeiouwy") - - # Define vowel-ending patterns - vowel_ending_patterns = ["ang", "eng", "ing", "ong", "ung", "ank", "ink", "dge"] - - # Count the occurrences of vowel-ending patterns in the string. - vowel_count += sum(s.count(pattern) for pattern in vowel_ending_patterns) - - # Calculate the ratio of vowels to the total length of the string. - vowel_ratio = vowel_count / length - - return vowel_ratio - - # Strategy: Palindrome4 - def has_palindrome(self, input_string): - def is_palindrome(s): - return s == s[::-1] - - input_string = input_string.lower() - n = len(input_string) - for i in range(n): - for j in range(i + 4, n + 1): # Find substrings of at least 5 characters - substring = input_string[i:j] - if is_palindrome(substring): - return True - return False - - # Strategy: KnownWords4 - def has_known_word(self, input_string): - def is_known_word(s): - return s in self.known_words - - input_string = input_string.lower() - n = len(input_string) - for i in range(n): - for j in range(i + 4, n + 1): # Find substrings of at least 5 characters - substring = input_string[i:j] - if is_known_word(substring): - return True - return False - - # Strategy: SearchEngine3 - def search_engine_test(self, s): - url = "%s/api.php?q=%s" % (librey_apiurl, s) - response = requests.get(url, verify=False) - if response.status_code != 200: - return False - - data = response.json() - - if "results_source" in data: - del data["results_source"] - - num_results = len(data) - - return num_results > 2 - - # Strategy: RepeatedNumbers3 - def repeated_numbers_test(self, s): - return bool(re.search(r"\d{3,}", s)) diff --git a/plugins/portscanner.py b/plugins/portscanner.py deleted file mode 100644 index a0421c4..0000000 --- a/plugins/portscanner.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/python3 -# -# portscanner.py -# NMAP port scanning wrapper for Caterpillar Proxy -# -# Caterpillar Proxy - The simple web debugging proxy (formerly, php-httpproxy) -# Namyheon Go (Catswords Research) -# https://github.com/gnh1201/caterpillar -# Created at: 2022-01-26 (from github.com/gnh1201/welsonjs) -# Updated at: 2024-07-11 -# -import sys -import nmap - -from base import Extension - - -class PortScanner(Extension): - def __init__(self): - self.type = "rpcmethod" - self.method = "scan_ports_by_hosts" - self.exported_methods = [] - - def dispatch(self, type, id, params, conn): - hosts = params["hosts"] - binpath = params["binpath"] - - nm = nmap.PortScanner(nmap_search_path=(binpath,)) - result = nm.scan(hosts=hosts, arguments="-T5 -sV -p0-65535 --max-retries 0") - - return result - - -if __name__ == "__main__": - main(sys.argv) diff --git a/plugins/serial.py b/plugins/serial.py deleted file mode 100644 index e83bda3..0000000 --- a/plugins/serial.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/python3 -# -# serial.py -# Serial integration plugin for Caterpillar Proxy -# -# Caterpillar Proxy - The simple web debugging proxy (formerly, php-httpproxy) -# Teakwoo Kim -# https://github.com/gnh1201/caterpillar -# Created at: 2024-08-11 -# Updated at: 2024-08-11 -# - -import serial -from decouple import config -from base import Extension, Logger - -logger = Logger(name="serial") - -try: - client_encoding = config("CLIENT_ENCODING") -except Exception as e: - logger.error("[*] Invalid configuration", exc_info=e) - -import logging - -logger = logging.getLogger(__name__) - - -class Serial(Extension): - def __init__(self): - self.type = "connector" - self.connection_type = "serial" - - def dispatch(self, type, id, params, conn): - logger.info("[*] Greeting! dispatch") - conn.send(b"Greeting! dispatch") - - def connect(self, conn, data, webserver, port, scheme, method, url): - connected = False - ser = None - try: - port_path = url.decode(client_encoding).replace("/", "") - if not ser: - ser = serial.Serial(port_path, baudrate=9600, timeout=2) - connected = True - logger.debug(f"Connected to {port_path} at 9600 baudrate") - - ser.write(data) - logger.debug(f"Data sent to {port_path}: {data}") - - ser_data = ser.read_all() - logger.debug(f"Data received: {ser_data}") - - if ser_data: - conn.send(ser_data.decode(client_encoding)) - except serial.SerialException as e: - logger.error(f"Failed to connect to {port}", exc_info=e) - finally: - if ser and ser.is_open: - ser.close() - logger.debug(f"Serial port {port_path} closed") - return connected diff --git a/plugins/wayback.py b/plugins/wayback.py deleted file mode 100644 index 41982f9..0000000 --- a/plugins/wayback.py +++ /dev/null @@ -1,107 +0,0 @@ -#!/usr/bin/python3 -# -# wayback.py -# Cached previous page (e.g. Wayback Machine) integration plugin for Caterpillar Proxy -# -# Caterpillar Proxy - The simple and parasitic web proxy with SPAM filter -# Namyheon Go (Catswords Research) -# https://github.com/gnh1201/caterpillar -# Created at: 2024-03-13 -# Updated at: 2024-07-06 -# - -import requests -from decouple import config - -from base import Extension, Logger - -logger = Logger(name="wayback") - -try: - client_encoding = config("CLIENT_ENCODING") -except Exception as e: - logger.error("[*] Invalid configuration", exc_info=e) - - -def get_cached_page_from_google(url): - status_code, text = (0, "") - - # Google Cache URL - google_cache_url = "https://webcache.googleusercontent.com/search?q=cache:" + url - - # Send a GET request to Google Cache URL - response = requests.get(google_cache_url) - - # Check if the request was successful (status code 200) - if response.status_code == 200: - text = response.text # Extract content from response - else: - status_code = response.status_code - - return status_code, text - - -# API documentation: https://archive.org/help/wayback_api.php -def get_cached_page_from_wayback(url): - status_code, text = (0, "") - - # Wayback Machine API URL - wayback_api_url = "http://archive.org/wayback/available?url=" + url - - # Send a GET request to Wayback Machine API - response = requests.get(wayback_api_url) - - # Check if the request was successful (status code 200) - if response.status_code == 200: - try: - # Parse JSON response - data = response.json() - archived_snapshots = data.get("archived_snapshots", {}) - closest_snapshot = archived_snapshots.get("closest", {}) - - # Check if the URL is available in the archive - if closest_snapshot: - archived_url = closest_snapshot.get("url", "") - - # If URL is available, fetch the content of the archived page - if archived_url: - archived_page_response = requests.get(archived_url) - status_code = archived_page_response.status_code - if status_code == 200: - text = archived_page_response.text - else: - status_code = 404 - else: - status_code = 404 - except: - status_code = 502 - else: - status_code = response.status_code - - return status_code, text - - -class Wayback(Extension): - def __init__(self): - self.type = "connector" # this is a connctor - self.connection_type = "wayback" - - def connect(self, conn, data, webserver, port, scheme, method, url): - logger.info("[*] Connecting... Connecting...") - connected = False - - target_url = url.decode(client_encoding) - - if not connected: - status_code, text = get_cached_page_from_google(target_url) - if status_code == 200: - conn.send(text.encode(client_encoding)) - connected = True - - if not connected: - status_code, text = get_cached_page_from_wayback(target_url) - if status_code == 200: - conn.send(text.encode(client_encoding)) - connected = True - - return connected