Remove all plug-ins

This commit is contained in:
Namhyeon Go 2024-10-18 22:06:47 +09:00
parent a7371b1fa2
commit acc6393658
7 changed files with 0 additions and 964 deletions

View File

@ -1,223 +0,0 @@
#!/usr/bin/python3
#
# alwaysonline.py
# Always Online implementation for Caterpillar Proxy
#
# Caterpillar Proxy - The simple web debugging proxy (formerly, php-httpproxy)
# Namyheon Go (Catswords Research) <gnh1201@gmail.com>
# https://github.com/gnh1201/caterpillar
# Created at: 2024-07-31
# Updated at: 2024-07-31
#
import socket
import ssl
import requests
from decouple import config
from elasticsearch import Elasticsearch, NotFoundError
import hashlib
from datetime import datetime, UTC
from base import Extension, Logger
logger = Logger(name="wayback")
try:
client_encoding = config("CLIENT_ENCODING")
es_host = config("ES_HOST")
es_index = config("ES_INDEX")
except Exception as e:
logger.error("[*] Invalid configuration", exc_info=e)
es = Elasticsearch([es_host])
def generate_id(url: str):
"""Generate a unique ID for a URL by hashing it."""
return hashlib.sha256(url.encode("utf-8")).hexdigest()
def get_cached_page_from_google(url: str):
status_code, content = (0, b"")
# Google Cache URL
google_cache_url = "https://webcache.googleusercontent.com/search?q=cache:" + url
# Send a GET request to Google Cache URL
response = requests.get(google_cache_url)
# Check if the request was successful (status code 200)
if response.status_code == 200:
content = response.content # Extract content from response
else:
status_code = response.status_code
return status_code, content
# API documentation: https://archive.org/help/wayback_api.php
def get_cached_page_from_wayback(url: str):
status_code, content = (0, b"")
# Wayback Machine API URL
wayback_api_url = "http://archive.org/wayback/available?url=" + url
# Send a GET request to Wayback Machine API
response = requests.get(wayback_api_url)
# Check if the request was successful (status code 200)
if response.status_code == 200:
try:
# Parse JSON response
data = response.json()
archived_snapshots = data.get("archived_snapshots", {})
closest_snapshot = archived_snapshots.get("closest", {})
# Check if the URL is available in the archive
if closest_snapshot:
archived_url = closest_snapshot.get("url", "")
# If URL is available, fetch the content of the archived page
if archived_url:
archived_page_response = requests.get(archived_url)
status_code = archived_page_response.status_code
if status_code == 200:
content = archived_page_response.content
else:
status_code = 404
else:
status_code = 404
except:
status_code = 502
else:
status_code = response.status_code
return status_code, content
def get_cached_page_from_elasticsearch(url: str):
url_id = generate_id(url)
try:
result = es.get(index=es_index, id=url_id)
logger.info(result["_source"])
return 200, result["_source"]["content"].encode(client_encoding)
except NotFoundError:
return 404, b""
except Exception as e:
logger.error(f"Error fetching from Elasticsearch: {e}")
return 502, b""
def cache_to_elasticsearch(url: str, data: bytes):
url_id = generate_id(url)
timestamp = datetime.now(UTC).timestamp()
try:
es.index(
index=es_index,
id=url_id,
body={
"url": url,
"content": data.decode(client_encoding),
"timestamp": timestamp,
},
)
except Exception as e:
logger.error(f"Error caching to Elasticsearch: {e}")
def get_page_from_origin_server(url: str):
try:
response = requests.get(url)
return response.status_code, response.content
except Exception as e:
return 502, str(e).encode(client_encoding)
class AlwaysOnline(Extension):
def __init__(self):
self.type = "connector" # this is a connector
self.connection_type = "alwaysonline"
self.buffer_size = 8192
def connect(self, conn: socket.socket, data: bytes, webserver: bytes, port: bytes, scheme: bytes, method: bytes, url: bytes):
logger.info("[*] Connecting... Connecting...")
connected = False
is_ssl = scheme in [b"https", b"tls", b"ssl"]
cache_hit = 0
buffered = b""
def sendall(_sock: socket.socket, _conn: socket.socket, _data: bytes):
# send first chuck
sock.send(_data)
if len(_data) < self.buffer_size:
return
# send following chunks
_conn.settimeout(1)
while True:
try:
chunk = _conn.recv(self.buffer_size)
if not chunk:
break
_sock.send(chunk)
except:
break
target_url = url.decode(client_encoding)
target_scheme = scheme.decode(client_encoding)
target_webserver = webserver.decode(client_encoding)
if "://" not in target_url:
target_url = f"{target_scheme}://{target_webserver}:{port}{target_url}"
if method == b"GET":
if not connected:
logger.info("Trying get data from Elasticsearch...")
status_code, content = get_cached_page_from_elasticsearch(target_url)
if status_code == 200:
buffered += content
cache_hit += 1
connected = True
if not connected:
logger.info("Trying get data from Wayback Machine...")
status_code, content = get_cached_page_from_wayback(target_url)
if status_code == 200:
buffered += content
cache_hit += 1
connected = True
if not connected:
logger.info("Trying get data from Google Website Cache...")
status_code, content = get_cached_page_from_google(target_url)
if status_code == 200:
buffered += content
cache_hit += 1
connected = True
if cache_hit == 0:
status_code, content = get_page_from_origin_server(target_url)
buffered += content
cache_to_elasticsearch(target_url, buffered)
conn.send(buffered)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if is_ssl:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = context.wrap_socket(
sock, server_hostname=webserver.decode(client_encoding)
)
sock.connect((webserver, port))
# sock.sendall(data)
sendall(sock, conn, data)
else:
sock.connect((webserver, port))
# sock.sendall(data)
sendall(sock, conn, data)
return connected

View File

@ -1,108 +0,0 @@
#!/usr/bin/python3
#
# bio.py
# Biopython plugin for Caterpillar Proxy
#
# Euiseo Cha (Wonkwang University) <zeroday0619_dev@outlook.com>
# https://github.com/gnh1201/caterpillar
# Created at: 2024-07-02
# Updated at: 2024-07-02
#
from socket import socket
from Bio.Seq import Seq
from Bio.SeqUtils import gc_fraction
from base import Extension
def _analyze_sequence(sequence: str) -> dict[str, str]:
"""
Analyze a given DNA sequence to provide various nucleotide transformations and translations.
:param sequence: DNA sequence (string) to be analyzed.
:return: Dictionary containing the following analyses of the sequence:
- complement: DNA complement of the sequence.
- complement_rna: RNA complement of the sequence.
- reverse_complement: Reverse complement of the DNA sequence.
- reverse_complement_rna: Reverse complement of the RNA sequence.
- transcription: Transcription of the DNA sequence to RNA.
- translation: Translation of the RNA sequence to an amino acid sequence.
- back_transcribe: Back-transcription of the RNA sequence to DNA.
"""
sequence_object = Seq(sequence)
return dict(
complement=str(sequence_object.complement()),
complement_rna=str(sequence_object.complement_rna()),
reverse_complement=str(sequence_object.reverse_complement()),
reverse_complement_rna=str(sequence_object.reverse_complement_rna()),
transcription=str(sequence_object.transcribe()),
translation=str(sequence_object.translate()),
back_transcribe=str(sequence_object.back_transcribe()),
)
def _gc_content_calculation(sequence: str) -> dict[str, str]:
"""
Calculate the GC content of a given DNA sequence and return it as a float.
:param sequence: DNA sequence (string) for which to calculate the GC content.
:return: Dictionary containing the GC content as a float.
"""
gc_content = gc_fraction(sequence)
return dict(
gc_content=gc_content,
)
class PyBio(Extension):
def __init__(self):
self.type = "rpcmethod"
self.method = "analyze_sequence_init"
self.exported_methods = ["analyze_sequence", "gc_content_calculation"]
def dispatch(self, type, id, params, conn):
conn.send(b"Greeting! dispatch")
def analyze_sequence(self, type, id, params, conn: socket):
"""
Analyze a DNA sequence provided in the params dictionary.
:param type: Not used in this function.
:param id: Not used in this function.
:param params: Dictionary containing the DNA sequence with the key "sequence".
Example: {"sequence": "ATGCGTACGTAGCTAGCTAGCGTAGCTAGCTGACT"}
:param conn: Not used in this function.
:return: Dictionary containing various analyses of the DNA sequence:
- back_transcribe: Back-transcription of the RNA sequence to DNA.
- complement: DNA complement of the sequence.
- complement_rna: RNA complement of the sequence.
- reverse_complement: Reverse complement of the DNA sequence.
- reverse_complement_rna: Reverse complement of the RNA sequence.
- transcription: Transcription of the DNA sequence to RNA.
- translation: Translation of the RNA sequence to an amino acid sequence.
Example: {"back_transcribe": "ATGCGTACGTAGCTAGCTAGCGTAGCTAGCTGACT",
"complement": "TACGCATGCATCGATCGATCGCATCGATCGACTGA",
"complement_rna": "UACGCAUGCAUCGAUCGAUCGCAUCGAUCGACUGA",
"reverse_complement": "AGTCAGCTAGCTACGCTAGCTAGCTACGTACGCAT",
"reverse_complement_rna": "AGUCAGCUAGCUACGCUAGCUAGCUACGUACGCAU",
"transcription": "AUGCGUACGUAGCUAGCUAGCGUAGCUAGCUGACU",
"translation": "MRT*LASVAS*"}
"""
result = _analyze_sequence(params["sequence"])
return result
def gc_content_calculation(self, type, id, params, conn: socket):
"""
Calculate the GC content for a given DNA sequence provided in the params dictionary.
:param type: Not used in this function.
:param id: Not used in this function.
:param params: Dictionary containing the DNA sequence with the key "sequence".
Example: {"sequence": "ATGCGTACGTAGCTAGCTAGCGTAGCTAGCTGACT"}
:param conn: Not used in this function.
:return: Dictionary containing the GC content as a float.
Example: {"gc_content": 0.5142857142857142}
"""
result = _gc_content_calculation(params["sequence"])
return result

View File

@ -1,112 +0,0 @@
#!/usr/bin/python3
#
# container.py
# Linux Container (e.g. Docker) plugin for Caterpillar Proxy
#
# Caterpillar Proxy - The simple and parasitic web proxy with SPAM filter
# Namyheon Go (Catswords Research) <gnh1201@gmail.com>
# https://github.com/gnh1201/caterpillar
# Created at: 2024-03-04
# Updated at: 2024-07-06
#
import docker
from socket import socket
from base import Extension, Logger
logger = Logger("Container")
class Container(Extension):
def __init__(self):
self.type = "rpcmethod"
self.method = "container_init"
self.exported_methods = [
"container_cteate",
"container_start",
"container_run",
"container_stop",
"container_pause",
"container_unpause",
"container_restart",
"container_kill",
"container_remove",
]
# docker
self.client = docker.from_env()
def dispatch(self, type, id, params, conn: socket):
logger.info("[*] Greeting! dispatch")
conn.send(b"Greeting! dispatch")
def container_cteate(self, type, id, params, conn: socket):
# todo: -
return b"[*] Created"
def container_start(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)
container.start()
def container_run(self, type, id, params, conn: socket):
devices = params["devices"]
image = params["image"]
devices = params["devices"]
name = params["name"]
environment = params["environment"]
volumes = params["volumes"]
container = self.client.containers.run(
image,
devices=devices,
name=name,
volumes=volumes,
environment=environment,
detach=True,
)
container.logs()
logger.info("[*] Running...")
return b"[*] Running..."
def container_stop(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)
container.stop()
logger.info("[*] Stopped")
return b"[*] Stopped"
def container_pause(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)
container.pause()
return b"[*] Paused"
def container_unpause(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)
container.unpause()
return b"[*] Unpaused"
def container_restart(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)
container.restart()
return b"[*] Restarted"
def container_kill(self, type, id, params, conn: socket):
# TODO: -
return b"[*] Killed"
def container_remove(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)
container.remove()
return b"[*] Removed"

View File

@ -1,317 +0,0 @@
#!/usr/bin/python3
#
# fediverse.py
# Fediverse (Mastodon, Misskey, Pleroma, ...) SPAM filter plugin for Caterpillar Proxy
#
# Caterpillar Proxy - The simple and parasitic web proxy with SPAM filter (formerly, php-httpproxy)
# Namyheon Go (Catswords Research) <abuse@catswords.net>
# https://github.com/gnh1201/caterpillar
# https://github.com/gnh1201/caterpillar/wiki/Fediverse
#
# Created in: 2022-10-06
# Updated in: 2024-10-08
#
import base64
import hashlib
import io
import re
import requests
import os.path
import logging
from decouple import config
from PIL import Image
from base import Extension, Logger
logger = Logger(name="fediverse", level=logging.WARNING)
try:
client_encoding = config("CLIENT_ENCODING", default="utf-8")
truecaptcha_userid = config("TRUECAPTCHA_USERID") # truecaptcha.org
truecaptcha_apikey = config("TRUECAPTCHA_APIKEY") # truecaptcha.org
dictionary_file = config(
"DICTIONARY_FILE", default="words_alpha.txt"
) # https://github.com/dwyl/english-words
librey_apiurl = config(
"LIBREY_APIURL", default="https://serp.catswords.net"
) # https://github.com/Ahwxorg/librey
bad_domain = config("BAD_DOMAIN", default="")
except Exception as e:
logger.error("[*] Invalid configuration", exc_info=e)
class Fediverse(Extension):
def __init__(self):
self.type = "filter" # this is a filter
# Load data to use KnownWords4 strategy
# Download data: https://github.com/dwyl/english-words
self.known_words = []
if dictionary_file != "" and os.path.isfile(dictionary_file):
with open(dictionary_file, "r") as file:
words = file.readlines()
self.known_words = [
word.strip() for word in words if len(word.strip()) > 3
]
logger.info("[*] Data loaded to use KnownWords4 strategy")
def test(self, filtered, data, webserver, port, scheme, method, url):
# prevent cache confusing
if data.find(b"<title>Welcome to nginx!</title>") > -1:
return True
# allowed conditions
if method == b"GET" or url.find(b"/api") > -1:
return False
# convert to text
data_length = len(data)
text = data.decode(client_encoding, errors="ignore")
error_rate = (data_length - len(text)) / data_length
if error_rate > 0.2: # it is a binary data
return False
# check if the text contains any of the bad domains
bad_domains = list(filter(None, map(str.strip, bad_domain.split(","))))
if bool(re.search(r"https://(" + "|".join(re.escape(domain) for domain in bad_domains) + ")", text)):
logger.warning("[*] Found a bad reputation domain.")
logger.warning("[*] BLOCKED MESSAGE: %s" % (text))
return True
# check ID with K-Anonymity strategy
pattern = r"\b(?:(?<=\/@)|(?<=acct:))([a-zA-Z0-9]{10})\b"
matches = list(set(re.findall(pattern, text)))
if len(matches) > 0:
try:
filtered = not all(map(self.pwnedpasswords_test, matches))
if filtered:
logger.warning("[*] Found Suspicious ID: %s" % (", ".join(matches)))
except Exception as e:
logger.error("[*] K-Anonymity strategy not working!", exc_info=e)
filtered = True
# feedback
if filtered and len(matches) > 0:
score = 0
strategies = []
# check ID with VowelRatio10 strategy
def vowel_ratio_test(s):
ratio = self.calculate_vowel_ratio(s)
return ratio > 0.2 and ratio < 0.8
if all(map(vowel_ratio_test, matches)):
score += 1
strategies.append("VowelRatio10")
# check ID with Palindrome4 strategy
if all(map(self.has_palindrome, matches)):
score += 1
strategies.append("Palindrome4")
# check ID with KnownWords4 strategy
if all(map(self.has_known_word, matches)):
score += 2
strategies.append("KnownWords4")
# check ID with SearchEngine3 strategy
if librey_apiurl != "" and all(map(self.search_engine_test, matches)):
score += 1
strategies.append("SearchEngine3")
# check ID with RepeatedNumbers3 strategy
if all(map(self.repeated_numbers_test, matches)):
score += 1
strategies.append("RepeatedNumbers3")
# logging score
with open("score.log", "a") as file:
file.write(
"%s\t%s\t%s\r\n"
% ("+".join(matches), str(score), "+".join(strategies))
)
# make decision
if score > 1:
filtered = False
# check an attached images (check images with Not-CAPTCHA strategy)
if truecaptcha_userid != "" and not filtered and len(matches) > 0:
def webp_to_png_base64(url):
try:
response = requests.get(url)
img = Image.open(io.BytesIO(response.content))
img_png = img.convert("RGBA")
buffered = io.BytesIO()
img_png.save(buffered, format="PNG")
encoded_image = base64.b64encode(buffered.getvalue()).decode(
"ascii"
)
return encoded_image
except:
return None
urls = re.findall(r'https://[^\s"]+\.webp', text)
if len(urls) > 0:
for url in urls:
if filtered:
break
logger.info("[*] downloading... %s" % (url))
encoded_image = webp_to_png_base64(url)
logger.info("[*] downloaded.")
if encoded_image:
logger.info("[*] solving...")
try:
solved = self.truecaptcha_solve(encoded_image)
if solved:
logger.info("[*] solved: %s" % (solved))
filtered = filtered or (
solved.lower() in ["ctkpaarr", "spam"]
)
else:
logger.info("[*] not solved")
except Exception as e:
logger.error(
"[*] Not CAPTCHA strategy not working!", exc_info=e
)
if filtered:
logger.warning("[*] BLOCKED MESSAGE: %s" % (text))
return filtered
# Strategy: K-Anonymity test - use api.pwnedpasswords.com
def pwnedpasswords_test(self, s):
# convert to lowercase
s = s.lower()
# SHA1 of the password
p_sha1 = hashlib.sha1(s.encode()).hexdigest()
# First 5 char of SHA1 for k-anonymity API use
f5_sha1 = p_sha1[:5]
# Last 5 char of SHA1 to match API output
l5_sha1 = p_sha1[-5:]
# Making GET request using Requests library
response = requests.get(f"https://api.pwnedpasswords.com/range/{f5_sha1}")
# Checking if request was successful
if response.status_code == 200:
# Parsing response text
hashes = response.text.split("\r\n")
# Using list comprehension to find matching hashes
matching_hashes = [
line.split(":")[0] for line in hashes if line.endswith(l5_sha1)
]
# If there are matching hashes, return True, else return False
return bool(matching_hashes)
else:
raise Exception(
"api.pwnedpasswords.com response status: %s"
% (str(response.status_code))
)
return False
# Strategy: Not-CAPTCHA - use truecaptcha.org
def truecaptcha_solve(self, encoded_image):
url = "https://api.apitruecaptcha.org/one/gettext"
data = {
"userid": truecaptcha_userid,
"apikey": truecaptcha_apikey,
"data": encoded_image,
"mode": "human",
"case": "lower"
}
response = requests.post(url=url, json=data)
if response.status_code == 200:
data = response.json()
if "error_message" in data:
print("[*] Error: %s" % (data["error_message"]))
return None
if "result" in data:
return data["result"]
else:
raise Exception(
"api.apitruecaptcha.org response status: %s"
% (str(response.status_code))
)
return None
# Strategy: VowelRatio10
def calculate_vowel_ratio(self, s):
# Calculate the length of the string.
length = len(s)
if length == 0:
return 0.0
# Count the number of vowels ('a', 'e', 'i', 'o', 'u', 'w', 'y') in the string.
vowel_count = sum(1 for char in s if char.lower() in "aeiouwy")
# Define vowel-ending patterns
vowel_ending_patterns = ["ang", "eng", "ing", "ong", "ung", "ank", "ink", "dge"]
# Count the occurrences of vowel-ending patterns in the string.
vowel_count += sum(s.count(pattern) for pattern in vowel_ending_patterns)
# Calculate the ratio of vowels to the total length of the string.
vowel_ratio = vowel_count / length
return vowel_ratio
# Strategy: Palindrome4
def has_palindrome(self, input_string):
def is_palindrome(s):
return s == s[::-1]
input_string = input_string.lower()
n = len(input_string)
for i in range(n):
for j in range(i + 4, n + 1): # Find substrings of at least 5 characters
substring = input_string[i:j]
if is_palindrome(substring):
return True
return False
# Strategy: KnownWords4
def has_known_word(self, input_string):
def is_known_word(s):
return s in self.known_words
input_string = input_string.lower()
n = len(input_string)
for i in range(n):
for j in range(i + 4, n + 1): # Find substrings of at least 5 characters
substring = input_string[i:j]
if is_known_word(substring):
return True
return False
# Strategy: SearchEngine3
def search_engine_test(self, s):
url = "%s/api.php?q=%s" % (librey_apiurl, s)
response = requests.get(url, verify=False)
if response.status_code != 200:
return False
data = response.json()
if "results_source" in data:
del data["results_source"]
num_results = len(data)
return num_results > 2
# Strategy: RepeatedNumbers3
def repeated_numbers_test(self, s):
return bool(re.search(r"\d{3,}", s))

View File

@ -1,35 +0,0 @@
#!/usr/bin/python3
#
# portscanner.py
# NMAP port scanning wrapper for Caterpillar Proxy
#
# Caterpillar Proxy - The simple web debugging proxy (formerly, php-httpproxy)
# Namyheon Go (Catswords Research) <gnh1201@gmail.com>
# https://github.com/gnh1201/caterpillar
# Created at: 2022-01-26 (from github.com/gnh1201/welsonjs)
# Updated at: 2024-07-11
#
import sys
import nmap
from base import Extension
class PortScanner(Extension):
def __init__(self):
self.type = "rpcmethod"
self.method = "scan_ports_by_hosts"
self.exported_methods = []
def dispatch(self, type, id, params, conn):
hosts = params["hosts"]
binpath = params["binpath"]
nm = nmap.PortScanner(nmap_search_path=(binpath,))
result = nm.scan(hosts=hosts, arguments="-T5 -sV -p0-65535 --max-retries 0")
return result
if __name__ == "__main__":
main(sys.argv)

View File

@ -1,62 +0,0 @@
#!/usr/bin/python3
#
# serial.py
# Serial integration plugin for Caterpillar Proxy
#
# Caterpillar Proxy - The simple web debugging proxy (formerly, php-httpproxy)
# Teakwoo Kim <catry.me@gmail.com>
# https://github.com/gnh1201/caterpillar
# Created at: 2024-08-11
# Updated at: 2024-08-11
#
import serial
from decouple import config
from base import Extension, Logger
logger = Logger(name="serial")
try:
client_encoding = config("CLIENT_ENCODING")
except Exception as e:
logger.error("[*] Invalid configuration", exc_info=e)
import logging
logger = logging.getLogger(__name__)
class Serial(Extension):
def __init__(self):
self.type = "connector"
self.connection_type = "serial"
def dispatch(self, type, id, params, conn):
logger.info("[*] Greeting! dispatch")
conn.send(b"Greeting! dispatch")
def connect(self, conn, data, webserver, port, scheme, method, url):
connected = False
ser = None
try:
port_path = url.decode(client_encoding).replace("/", "")
if not ser:
ser = serial.Serial(port_path, baudrate=9600, timeout=2)
connected = True
logger.debug(f"Connected to {port_path} at 9600 baudrate")
ser.write(data)
logger.debug(f"Data sent to {port_path}: {data}")
ser_data = ser.read_all()
logger.debug(f"Data received: {ser_data}")
if ser_data:
conn.send(ser_data.decode(client_encoding))
except serial.SerialException as e:
logger.error(f"Failed to connect to {port}", exc_info=e)
finally:
if ser and ser.is_open:
ser.close()
logger.debug(f"Serial port {port_path} closed")
return connected

View File

@ -1,107 +0,0 @@
#!/usr/bin/python3
#
# wayback.py
# Cached previous page (e.g. Wayback Machine) integration plugin for Caterpillar Proxy
#
# Caterpillar Proxy - The simple and parasitic web proxy with SPAM filter
# Namyheon Go (Catswords Research) <gnh1201@gmail.com>
# https://github.com/gnh1201/caterpillar
# Created at: 2024-03-13
# Updated at: 2024-07-06
#
import requests
from decouple import config
from base import Extension, Logger
logger = Logger(name="wayback")
try:
client_encoding = config("CLIENT_ENCODING")
except Exception as e:
logger.error("[*] Invalid configuration", exc_info=e)
def get_cached_page_from_google(url):
status_code, text = (0, "")
# Google Cache URL
google_cache_url = "https://webcache.googleusercontent.com/search?q=cache:" + url
# Send a GET request to Google Cache URL
response = requests.get(google_cache_url)
# Check if the request was successful (status code 200)
if response.status_code == 200:
text = response.text # Extract content from response
else:
status_code = response.status_code
return status_code, text
# API documentation: https://archive.org/help/wayback_api.php
def get_cached_page_from_wayback(url):
status_code, text = (0, "")
# Wayback Machine API URL
wayback_api_url = "http://archive.org/wayback/available?url=" + url
# Send a GET request to Wayback Machine API
response = requests.get(wayback_api_url)
# Check if the request was successful (status code 200)
if response.status_code == 200:
try:
# Parse JSON response
data = response.json()
archived_snapshots = data.get("archived_snapshots", {})
closest_snapshot = archived_snapshots.get("closest", {})
# Check if the URL is available in the archive
if closest_snapshot:
archived_url = closest_snapshot.get("url", "")
# If URL is available, fetch the content of the archived page
if archived_url:
archived_page_response = requests.get(archived_url)
status_code = archived_page_response.status_code
if status_code == 200:
text = archived_page_response.text
else:
status_code = 404
else:
status_code = 404
except:
status_code = 502
else:
status_code = response.status_code
return status_code, text
class Wayback(Extension):
def __init__(self):
self.type = "connector" # this is a connctor
self.connection_type = "wayback"
def connect(self, conn, data, webserver, port, scheme, method, url):
logger.info("[*] Connecting... Connecting...")
connected = False
target_url = url.decode(client_encoding)
if not connected:
status_code, text = get_cached_page_from_google(target_url)
if status_code == 200:
conn.send(text.encode(client_encoding))
connected = True
if not connected:
status_code, text = get_cached_page_from_wayback(target_url)
if status_code == 200:
conn.send(text.encode(client_encoding))
connected = True
return connected