Update server.py

This commit is contained in:
Namhyeon Go 2024-02-20 11:03:25 +09:00 committed by GitHub
parent 3a0da47dc3
commit 6f9bf90359
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,7 +2,7 @@
# Namyheon Go (Catswords Research) <gnh1201@gmail.com>
# https://github.com/gnh1201/caterpillar
# Created at: 2022-10-06
# Updated at: 2024-12-19
# Updated at: 2024-12-20
import argparse
import socket
@ -177,7 +177,11 @@ def proxy_check_filtered(data, webserver, port, scheme, method, url):
matches = list(set(re.findall(pattern, text)))
if len(matches) > 0:
print ("[*] Found ID: %s" % (', '.join(matches)))
filtered = not all(map(pwnedpasswords_test, matches))
try:
filtered = not all(map(pwnedpasswords_test, matches))
except Exception as e:
print ("[*] K-Anonymity strategy not working! %s" % (str(e)))
filtered = True
# check an attached images
if not filtered and len(matches) > 0 and truecaptcha_userid != '':
@ -196,18 +200,23 @@ def proxy_check_filtered(data, webserver, port, scheme, method, url):
urls = re.findall(r'https://[^\s"]+\.webp', text)
if len(urls) > 0:
for url in urls:
if not filtered:
print ("[*] downloading... %s" % (url))
encoded_image = webp_to_png_base64(url)
print ("[*] downloaded.")
if encoded_image:
print ("[*] solving...")
if filtered:
break
print ("[*] downloading... %s" % (url))
encoded_image = webp_to_png_base64(url)
print ("[*] downloaded.")
if encoded_image:
print ("[*] solving...")
try:
solved = truecaptcha_solve(encoded_image)
if solved:
print ("[*] solved: %s" % (solved))
filtered = solved in ['ctkpaarr', 'SPAM']
filtered = solved.lower() in ['ctkpaarr', 'spam']
else:
print ("[*] not solved")
except Exception as e:
print ("[*] Not CAPTCHA strategy not working! %s" % (str(e)))
# take action
if filtered:
@ -266,7 +275,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
raise Exception("Filtered request")
sock.send(chunk)
if len(buffered) > buffer_size*2:
buffered = buffered[:-buffer_size*2] # reduce memory usage
buffered = buffered[-buffer_size*2:]
except:
break
@ -301,7 +310,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
raise Exception("Filtered response")
conn.send(chunk)
if len(buffered) > buffer_size*2:
buffered = buffered[:-buffer_size*2] # reduce memory usage
buffered = buffered[-buffer_size*2:]
i += 1
print("[*] Received %s chunks. (%s bytes per chunk)" % (str(i), str(buffer_size)))
@ -310,7 +319,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
proxy_data = {
'headers': {
"User-Agent": "php-httpproxy/0.1.4 (Client; Python " + python_version() + "; abuse@catswords.net)",
"User-Agent": "caterpillar-proxy/0.1.4 (Client; Python " + python_version() + "; abuse@catswords.net)",
},
'data': {
"data": base64.b64encode(data).decode(client_encoding),
@ -330,7 +339,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
i = 0
relay = requests.post(server_url, headers=proxy_data['headers'], data=raw_data, stream=True)
buffered = b''
buffered = b''
for chunk in relay.iter_content(chunk_size=buffer_size):
buffered += chunk
if proxy_check_filtered(buffered, webserver, port, scheme, method, url):
@ -338,7 +347,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
raise Exception("Filtered response")
conn.send(chunk)
if len(buffered) > buffer_size*2:
buffered = buffered[:-buffer_size*2] # reduce memory usage
buffered = buffered[-buffer_size*2:]
i += 1
print("[*] Received %s chunks. (%s bytes per chunk)" % (str(i), str(buffer_size)))
@ -410,25 +419,31 @@ def pwnedpasswords_test(s):
# If there are matching hashes, return True, else return False
return bool(matching_hashes)
else:
return False
raise Exception("api.pwnedpasswords.com response status: %s" % (str(response.status_code)))
return False
# TrueCaptcha - truecaptcha.org
def truecaptcha_solve(encoded_image):
url = 'https://api.apitruecaptcha.org/one/gettext'
data = {
'userid': truecaptcha_userid,
'apikey': truecaptcha_apikey,
data = {
'userid': truecaptcha_userid,
'apikey': truecaptcha_apikey,
'data': encoded_image,
'mode': 'human'
}
response = requests.post(url = url, json = data)
data = response.json()
if 'error_message' in data:
print ("[*] Error: %s" % (data['error_message']))
return None
if 'result' in data:
return data['result']
if response.status_code == 200:
data = response.json()
if 'error_message' in data:
print ("[*] Error: %s" % (data['error_message']))
return None
if 'result' in data:
return data['result']
else:
raise Exception("api.apitruecaptcha.org response status: %s" % (str(response.status_code)))
return None