Add AlwaysOnline feature with Elasticsearch

This commit is contained in:
Namhyeon Go 2024-07-31 15:36:16 +09:00
parent 6fa48ac64b
commit b845fe9356
3 changed files with 93 additions and 25 deletions

View File

@ -9,10 +9,13 @@
# Created at: 2024-07-31 # Created at: 2024-07-31
# Updated at: 2024-07-31 # Updated at: 2024-07-31
# #
import socket
import ssl
import requests import requests
from decouple import config from decouple import config
from elasticsearch import Elasticsearch, NotFoundError from elasticsearch import Elasticsearch, NotFoundError
import hashlib import hashlib
from datetime import datetime
from base import Extension, Logger from base import Extension, Logger
logger = Logger(name="wayback") logger = Logger(name="wayback")
@ -31,7 +34,7 @@ def generate_id(url):
return hashlib.sha256(url.encode('utf-8')).hexdigest() return hashlib.sha256(url.encode('utf-8')).hexdigest()
def get_cached_page_from_google(url): def get_cached_page_from_google(url):
status_code, text = (0, "") status_code, content = (0, b"")
# Google Cache URL # Google Cache URL
google_cache_url = "https://webcache.googleusercontent.com/search?q=cache:" + url google_cache_url = "https://webcache.googleusercontent.com/search?q=cache:" + url
@ -41,15 +44,15 @@ def get_cached_page_from_google(url):
# Check if the request was successful (status code 200) # Check if the request was successful (status code 200)
if response.status_code == 200: if response.status_code == 200:
text = response.text # Extract content from response content = response.content # Extract content from response
else: else:
status_code = response.status_code status_code = response.status_code
return status_code, text return status_code, content
# API documentation: https://archive.org/help/wayback_api.php # API documentation: https://archive.org/help/wayback_api.php
def get_cached_page_from_wayback(url): def get_cached_page_from_wayback(url):
status_code, text = (0, "") status_code, content = (0, b"")
# Wayback Machine API URL # Wayback Machine API URL
wayback_api_url = "http://archive.org/wayback/available?url=" + url wayback_api_url = "http://archive.org/wayback/available?url=" + url
@ -74,7 +77,7 @@ def get_cached_page_from_wayback(url):
archived_page_response = requests.get(archived_url) archived_page_response = requests.get(archived_url)
status_code = archived_page_response.status_code status_code = archived_page_response.status_code
if status_code == 200: if status_code == 200:
text = archived_page_response.text content = archived_page_response.content
else: else:
status_code = 404 status_code = 404
else: else:
@ -84,63 +87,126 @@ def get_cached_page_from_wayback(url):
else: else:
status_code = response.status_code status_code = response.status_code
return status_code, text return status_code, content
def get_cached_page_from_elasticsearch(url): def get_cached_page_from_elasticsearch(url):
url_id = generate_id(url) url_id = generate_id(url)
try: try:
result = es.get(index=es_index, id=url_id) result = es.get(index=es_index, id=url_id)
return 200, result['_source']['content'] logger.info(result['_source'])
return 200, result['_source']['content'].encode(client_encoding)
except NotFoundError: except NotFoundError:
return 404, "" return 404, b""
except Exception as e: except Exception as e:
logger.error(f"Error fetching from Elasticsearch: {e}") logger.error(f"Error fetching from Elasticsearch: {e}")
return 500, "" return 502, b""
def cache_to_elasticsearch(url, data): def cache_to_elasticsearch(url, data):
url_id = generate_id(url) url_id = generate_id(url)
timestamp = datetime.utcnow().isoformat()
try: try:
es.index(index=es_index, id=url_id, body={"content": data.decode(client_encoding)}) es.index(index=es_index, id=url_id, body={
"url": url,
"content": data.decode(client_encoding),
"timestamp": timestamp
})
except Exception as e: except Exception as e:
logger.error(f"Error caching to Elasticsearch: {e}") logger.error(f"Error caching to Elasticsearch: {e}")
def get_page_from_origin_server(url):
try:
response = requests.get(url)
return response.status_code, response.content
except Exception as e:
return 502, str(e).encode(client_encoding)
class AlwaysOnline(Extension): class AlwaysOnline(Extension):
def __init__(self): def __init__(self):
self.type = "connector" # this is a connector self.type = "connector" # this is a connector
self.connection_type = "alwaysonline" self.connection_type = "alwaysonline"
self.buffer_size = 8192;
def connect(self, conn, data, webserver, port, scheme, method, url): def connect(self, conn, data, webserver, port, scheme, method, url):
logger.info("[*] Connecting... Connecting...")
connected = False connected = False
is_ssl = scheme in [b"https", b"tls", b"ssl"]
cache_hit = 0 cache_hit = 0
buffered = b"" buffered = b""
def sendall(sock, conn, data):
# send first chuck
sock.send(data)
if len(data) < self.buffer_size:
return
# send following chunks
conn.settimeout(1)
while True:
try:
chunk = conn.recv(self.buffer_size)
if not chunk:
break
sock.send(chunk)
except:
break
target_url = url.decode(client_encoding) target_url = url.decode(client_encoding)
target_scheme = scheme.decode(client_encoding)
target_webserver = webserver.decode(client_encoding)
if method == "GET": if "://" not in target_url:
target_url = f"{target_scheme}://{target_webserver}:{port}{target_url}"
if method == b"GET":
if not connected: if not connected:
status_code, text = get_cached_page_from_elasticsearch(target_url) logger.info("Trying get data from Elasticsearch...")
status_code, content = get_cached_page_from_elasticsearch(target_url)
if status_code == 200: if status_code == 200:
buffered += text.encode(client_encoding) buffered += content
cache_hit += 1 cache_hit += 1
connected = True connected = True
if not connected: if not connected:
status_code, text = get_cached_page_from_google(target_url) logger.info("Trying get data from Wayback Machine...")
status_code, content = get_cached_page_from_wayback(target_url)
if status_code == 200: if status_code == 200:
buffered += text.encode(client_encoding) buffered += content
cache_hit += 1 cache_hit += 1
connected = True connected = True
if not connected: if not connected:
status_code, text = get_cached_page_from_wayback(target_url) logger.info("Trying get data from Google Website Cache...")
status_code, content = get_cached_page_from_google(target_url)
if status_code == 200: if status_code == 200:
buffered += text.encode(client_encoding) buffered += content
cache_hit += 1 cache_hit += 1
connected = True connected = True
conn.send(buffered)
if cache_hit == 0 and buffered:
cache_to_elasticsearch(target_url, buffered)
return connected if cache_hit == 0:
status_code, content = get_page_from_origin_server(target_url)
buffered += content
cache_to_elasticsearch(target_url, buffered)
conn.send(buffered)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if is_ssl:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = context.wrap_socket(
sock, server_hostname=webserver.decode(client_encoding)
)
sock.connect((webserver, port))
# sock.sendall(data)
sendall(sock, conn, data)
else:
sock.connect((webserver, port))
# sock.sendall(data)
sendall(sock, conn, data)
return connected

View File

@ -87,6 +87,7 @@ class Wayback(Extension):
self.connection_type = "wayback" self.connection_type = "wayback"
def connect(self, conn, data, webserver, port, scheme, method, url): def connect(self, conn, data, webserver, port, scheme, method, url):
logger.info("[*] Connecting... Connecting...")
connected = False connected = False
target_url = url.decode(client_encoding) target_url = url.decode(client_encoding)

View File

@ -50,7 +50,7 @@ try:
_username, _password, server_url = extract_credentials( _username, _password, server_url = extract_credentials(
config("SERVER_URL", default="") config("SERVER_URL", default="")
) )
server_connection_type = config("SERVER_CONNECTION_TYPE", default="") server_connection_type = config("SERVER_CONNECTION_TYPE", default="proxy")
cakey = config("CA_KEY", default="ca.key") cakey = config("CA_KEY", default="ca.key")
cacert = config("CA_CERT", default="ca.crt") cacert = config("CA_CERT", default="ca.crt")
certkey = config("CERT_KEY", default="cert.key") certkey = config("CERT_KEY", default="cert.key")
@ -337,7 +337,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
break break
# localhost mode # localhost mode
if server_url == "localhost": if server_url == "localhost" and server_connection_type == "proxy":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if is_ssl: if is_ssl:
@ -593,6 +593,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
else: else:
connector = Extension.get_connector(server_connection_type) connector = Extension.get_connector(server_connection_type)
if connector: if connector:
logger.info("[*] Connecting...")
connector.connect(conn, data, webserver, port, scheme, method, url) connector.connect(conn, data, webserver, port, scheme, method, url)
else: else:
raise Exception("Unsupported connection type") raise Exception("Unsupported connection type")