Merge pull request #40 from gnh1201/elasticsearch
Some checks failed
Ruff / ruff (push) Has been cancelled

Implement the Always Online Cache with Elasticsearch / Always Online Cache 기능 중 엘라스틱서치 관련 구현
This commit is contained in:
Namhyeon Go 2024-08-01 15:16:56 +09:00 committed by GitHub
commit 724f9f071e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 221 additions and 17 deletions

View File

@ -115,7 +115,7 @@ def find_openssl_binpath():
path = result.stdout.decode().strip() path = result.stdout.decode().strip()
if path: if path:
return path return path
except Exception as e: except Exception:
pass pass
return "openssl" return "openssl"
@ -143,7 +143,7 @@ class Extension:
module = importlib.import_module(module_path) module = importlib.import_module(module_path)
_class = getattr(module, class_name) _class = getattr(module, class_name)
cls.extensions.append(_class()) cls.extensions.append(_class())
except (ImportError, AttributeError) as e: except (ImportError, AttributeError):
raise ImportError(class_name + " in the extension " + module_name) raise ImportError(class_name + " in the extension " + module_name)
@classmethod @classmethod

212
plugins/alwaysonline.py Normal file
View File

@ -0,0 +1,212 @@
#!/usr/bin/python3
#
# alwaysonline.py
# Always Online implementation for Caterpillar Proxy
#
# Caterpillar Proxy - The simple web debugging proxy (formerly, php-httpproxy)
# Namyheon Go (Catswords Research) <gnh1201@gmail.com>
# https://github.com/gnh1201/caterpillar
# Created at: 2024-07-31
# Updated at: 2024-07-31
#
import socket
import ssl
import requests
from decouple import config
from elasticsearch import Elasticsearch, NotFoundError
import hashlib
from datetime import datetime
from base import Extension, Logger
logger = Logger(name="wayback")
try:
client_encoding = config("CLIENT_ENCODING")
es_host = config("ES_HOST")
es_index = config("ES_INDEX")
except Exception as e:
logger.error("[*] Invalid configuration", exc_info=e)
es = Elasticsearch([es_host])
def generate_id(url):
"""Generate a unique ID for a URL by hashing it."""
return hashlib.sha256(url.encode('utf-8')).hexdigest()
def get_cached_page_from_google(url):
status_code, content = (0, b"")
# Google Cache URL
google_cache_url = "https://webcache.googleusercontent.com/search?q=cache:" + url
# Send a GET request to Google Cache URL
response = requests.get(google_cache_url)
# Check if the request was successful (status code 200)
if response.status_code == 200:
content = response.content # Extract content from response
else:
status_code = response.status_code
return status_code, content
# API documentation: https://archive.org/help/wayback_api.php
def get_cached_page_from_wayback(url):
status_code, content = (0, b"")
# Wayback Machine API URL
wayback_api_url = "http://archive.org/wayback/available?url=" + url
# Send a GET request to Wayback Machine API
response = requests.get(wayback_api_url)
# Check if the request was successful (status code 200)
if response.status_code == 200:
try:
# Parse JSON response
data = response.json()
archived_snapshots = data.get("archived_snapshots", {})
closest_snapshot = archived_snapshots.get("closest", {})
# Check if the URL is available in the archive
if closest_snapshot:
archived_url = closest_snapshot.get("url", "")
# If URL is available, fetch the content of the archived page
if archived_url:
archived_page_response = requests.get(archived_url)
status_code = archived_page_response.status_code
if status_code == 200:
content = archived_page_response.content
else:
status_code = 404
else:
status_code = 404
except:
status_code = 502
else:
status_code = response.status_code
return status_code, content
def get_cached_page_from_elasticsearch(url):
url_id = generate_id(url)
try:
result = es.get(index=es_index, id=url_id)
logger.info(result['_source'])
return 200, result['_source']['content'].encode(client_encoding)
except NotFoundError:
return 404, b""
except Exception as e:
logger.error(f"Error fetching from Elasticsearch: {e}")
return 502, b""
def cache_to_elasticsearch(url, data):
url_id = generate_id(url)
timestamp = datetime.utcnow().isoformat()
try:
es.index(index=es_index, id=url_id, body={
"url": url,
"content": data.decode(client_encoding),
"timestamp": timestamp
})
except Exception as e:
logger.error(f"Error caching to Elasticsearch: {e}")
def get_page_from_origin_server(url):
try:
response = requests.get(url)
return response.status_code, response.content
except Exception as e:
return 502, str(e).encode(client_encoding)
class AlwaysOnline(Extension):
def __init__(self):
self.type = "connector" # this is a connector
self.connection_type = "alwaysonline"
self.buffer_size = 8192
def connect(self, conn, data, webserver, port, scheme, method, url):
logger.info("[*] Connecting... Connecting...")
connected = False
is_ssl = scheme in [b"https", b"tls", b"ssl"]
cache_hit = 0
buffered = b""
def sendall(sock, conn, data):
# send first chuck
sock.send(data)
if len(data) < self.buffer_size:
return
# send following chunks
conn.settimeout(1)
while True:
try:
chunk = conn.recv(self.buffer_size)
if not chunk:
break
sock.send(chunk)
except:
break
target_url = url.decode(client_encoding)
target_scheme = scheme.decode(client_encoding)
target_webserver = webserver.decode(client_encoding)
if "://" not in target_url:
target_url = f"{target_scheme}://{target_webserver}:{port}{target_url}"
if method == b"GET":
if not connected:
logger.info("Trying get data from Elasticsearch...")
status_code, content = get_cached_page_from_elasticsearch(target_url)
if status_code == 200:
buffered += content
cache_hit += 1
connected = True
if not connected:
logger.info("Trying get data from Wayback Machine...")
status_code, content = get_cached_page_from_wayback(target_url)
if status_code == 200:
buffered += content
cache_hit += 1
connected = True
if not connected:
logger.info("Trying get data from Google Website Cache...")
status_code, content = get_cached_page_from_google(target_url)
if status_code == 200:
buffered += content
cache_hit += 1
connected = True
if cache_hit == 0:
status_code, content = get_page_from_origin_server(target_url)
buffered += content
cache_to_elasticsearch(target_url, buffered)
conn.send(buffered)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if is_ssl:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = context.wrap_socket(
sock, server_hostname=webserver.decode(client_encoding)
)
sock.connect((webserver, port))
# sock.sendall(data)
sendall(sock, conn, data)
else:
sock.connect((webserver, port))
# sock.sendall(data)
sendall(sock, conn, data)
return connected

View File

@ -9,7 +9,6 @@
# Updated at: 2024-07-02 # Updated at: 2024-07-02
# #
import json
from Bio.Seq import Seq from Bio.Seq import Seq
from Bio.SeqUtils import gc_fraction from Bio.SeqUtils import gc_fraction

View File

@ -11,7 +11,6 @@
# #
import sys import sys
import nmap import nmap
import json
from base import Extension from base import Extension

View File

@ -87,6 +87,7 @@ class Wayback(Extension):
self.connection_type = "wayback" self.connection_type = "wayback"
def connect(self, conn, data, webserver, port, scheme, method, url): def connect(self, conn, data, webserver, port, scheme, method, url):
logger.info("[*] Connecting... Connecting...")
connected = False connected = False
target_url = url.decode(client_encoding) target_url = url.decode(client_encoding)

View File

@ -1,2 +1,3 @@
ruff==0.5.1 ruff==0.5.1
elasticsearch==8.14.0
yara-python==4.5.1 yara-python==4.5.1

View File

@ -20,13 +20,11 @@ import base64
import json import json
import ssl import ssl
import time import time
import hashlib
import traceback import traceback
import textwrap import textwrap
from datetime import datetime from datetime import datetime
from platform import python_version from platform import python_version
import re
import requests import requests
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
from urllib.parse import urlparse from urllib.parse import urlparse
@ -35,9 +33,7 @@ from decouple import config
from base import ( from base import (
Extension, Extension,
extract_credentials, extract_credentials,
jsonrpc2_create_id,
jsonrpc2_encode, jsonrpc2_encode,
jsonrpc2_result_encode,
find_openssl_binpath, find_openssl_binpath,
Logger, Logger,
) )
@ -50,7 +46,7 @@ try:
_username, _password, server_url = extract_credentials( _username, _password, server_url = extract_credentials(
config("SERVER_URL", default="") config("SERVER_URL", default="")
) )
server_connection_type = config("SERVER_CONNECTION_TYPE", default="") server_connection_type = config("SERVER_CONNECTION_TYPE", default="proxy")
cakey = config("CA_KEY", default="ca.key") cakey = config("CA_KEY", default="ca.key")
cacert = config("CA_CERT", default="ca.crt") cacert = config("CA_CERT", default="ca.crt")
certkey = config("CERT_KEY", default="cert.key") certkey = config("CERT_KEY", default="cert.key")
@ -337,7 +333,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
break break
# localhost mode # localhost mode
if server_url == "localhost": if server_url == "localhost" and server_connection_type == "proxy":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if is_ssl: if is_ssl:
@ -467,7 +463,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
result = query.json()["result"] result = query.json()["result"]
resolved_address_list.append(result["data"]) resolved_address_list.append(result["data"])
logger.info("[*] resolved IP: %s" % (result["data"])) logger.info("[*] resolved IP: %s" % (result["data"]))
except requests.exceptions.ReadTimeout as e: except requests.exceptions.ReadTimeout:
pass pass
proxy_data["data"]["client_address"] = resolved_address_list[0] proxy_data["data"]["client_address"] = resolved_address_list[0]
@ -503,7 +499,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
logger.info("[*] waiting for the relay... %s" % id) logger.info("[*] waiting for the relay... %s" % id)
max_reties = 30 max_reties = 30
t = 0 t = 0
while t < max_reties and not id in accepted_relay: while t < max_reties and id not in accepted_relay:
time.sleep(1) time.sleep(1)
t += 1 t += 1
if t < max_reties: if t < max_reties:
@ -593,6 +589,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
else: else:
connector = Extension.get_connector(server_connection_type) connector = Extension.get_connector(server_connection_type)
if connector: if connector:
logger.info("[*] Connecting...")
connector.connect(conn, data, webserver, port, scheme, method, url) connector.connect(conn, data, webserver, port, scheme, method, url)
else: else:
raise Exception("Unsupported connection type") raise Exception("Unsupported connection type")

View File

@ -11,20 +11,15 @@
# #
import asyncio import asyncio
from aiosmtpd.controller import Controller from aiosmtpd.controller import Controller
from aiosmtpd.handlers import Message
from email.message import EmailMessage from email.message import EmailMessage
import re
import sys import sys
import json
import requests import requests
from platform import python_version from platform import python_version
from decouple import config from decouple import config
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
from base import ( from base import (
extract_credentials, extract_credentials,
jsonrpc2_create_id,
jsonrpc2_encode, jsonrpc2_encode,
jsonrpc2_result_encode,
Logger, Logger,
) )