diff --git a/server.py b/server.py index a484c96..b7aaa5d 100644 --- a/server.py +++ b/server.py @@ -2,7 +2,7 @@ # Namyheon Go (Catswords Research) # https://github.com/gnh1201/caterpillar # Created at: 2022-10-06 -# Updated at: 2024-12-19 +# Updated at: 2024-12-20 import argparse import socket @@ -177,7 +177,11 @@ def proxy_check_filtered(data, webserver, port, scheme, method, url): matches = list(set(re.findall(pattern, text))) if len(matches) > 0: print ("[*] Found ID: %s" % (', '.join(matches))) - filtered = not all(map(pwnedpasswords_test, matches)) + try: + filtered = not all(map(pwnedpasswords_test, matches)) + except Exception as e: + print ("[*] K-Anonymity strategy not working! %s" % (str(e))) + filtered = True # check an attached images if not filtered and len(matches) > 0 and truecaptcha_userid != '': @@ -196,18 +200,23 @@ def proxy_check_filtered(data, webserver, port, scheme, method, url): urls = re.findall(r'https://[^\s"]+\.webp', text) if len(urls) > 0: for url in urls: - if not filtered: - print ("[*] downloading... %s" % (url)) - encoded_image = webp_to_png_base64(url) - print ("[*] downloaded.") - if encoded_image: - print ("[*] solving...") + if filtered: + break + + print ("[*] downloading... %s" % (url)) + encoded_image = webp_to_png_base64(url) + print ("[*] downloaded.") + if encoded_image: + print ("[*] solving...") + try: solved = truecaptcha_solve(encoded_image) if solved: print ("[*] solved: %s" % (solved)) - filtered = solved in ['ctkpaarr', 'SPAM'] + filtered = solved.lower() in ['ctkpaarr', 'spam'] else: print ("[*] not solved") + except Exception as e: + print ("[*] Not CAPTCHA strategy not working! %s" % (str(e))) # take action if filtered: @@ -266,7 +275,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data): raise Exception("Filtered request") sock.send(chunk) if len(buffered) > buffer_size*2: - buffered = buffered[:-buffer_size*2] # reduce memory usage + buffered = buffered[-buffer_size*2:] except: break @@ -301,7 +310,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data): raise Exception("Filtered response") conn.send(chunk) if len(buffered) > buffer_size*2: - buffered = buffered[:-buffer_size*2] # reduce memory usage + buffered = buffered[-buffer_size*2:] i += 1 print("[*] Received %s chunks. (%s bytes per chunk)" % (str(i), str(buffer_size))) @@ -310,7 +319,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data): proxy_data = { 'headers': { - "User-Agent": "php-httpproxy/0.1.4 (Client; Python " + python_version() + "; abuse@catswords.net)", + "User-Agent": "caterpillar-proxy/0.1.4 (Client; Python " + python_version() + "; abuse@catswords.net)", }, 'data': { "data": base64.b64encode(data).decode(client_encoding), @@ -330,7 +339,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data): i = 0 relay = requests.post(server_url, headers=proxy_data['headers'], data=raw_data, stream=True) - buffered = b'' + buffered = b'' for chunk in relay.iter_content(chunk_size=buffer_size): buffered += chunk if proxy_check_filtered(buffered, webserver, port, scheme, method, url): @@ -338,7 +347,7 @@ def proxy_server(webserver, port, scheme, method, url, conn, addr, data): raise Exception("Filtered response") conn.send(chunk) if len(buffered) > buffer_size*2: - buffered = buffered[:-buffer_size*2] # reduce memory usage + buffered = buffered[-buffer_size*2:] i += 1 print("[*] Received %s chunks. (%s bytes per chunk)" % (str(i), str(buffer_size))) @@ -410,25 +419,31 @@ def pwnedpasswords_test(s): # If there are matching hashes, return True, else return False return bool(matching_hashes) else: - return False + raise Exception("api.pwnedpasswords.com response status: %s" % (str(response.status_code))) + + return False # TrueCaptcha - truecaptcha.org def truecaptcha_solve(encoded_image): url = 'https://api.apitruecaptcha.org/one/gettext' - data = { - 'userid': truecaptcha_userid, - 'apikey': truecaptcha_apikey, + data = { + 'userid': truecaptcha_userid, + 'apikey': truecaptcha_apikey, 'data': encoded_image, 'mode': 'human' } response = requests.post(url = url, json = data) - data = response.json() - if 'error_message' in data: - print ("[*] Error: %s" % (data['error_message'])) - return None - if 'result' in data: - return data['result'] + if response.status_code == 200: + data = response.json() + + if 'error_message' in data: + print ("[*] Error: %s" % (data['error_message'])) + return None + if 'result' in data: + return data['result'] + else: + raise Exception("api.apitruecaptcha.org response status: %s" % (str(response.status_code))) return None