From a055976125c705aa4e5f627447bc52651ba7fedb Mon Sep 17 00:00:00 2001 From: "Namhyeon, Go" Date: Mon, 19 Feb 2024 07:58:44 +0900 Subject: [PATCH] Update server.py --- server.py | 50 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/server.py b/server.py index 73f4b7c..4bbf298 100644 --- a/server.py +++ b/server.py @@ -14,6 +14,7 @@ import json import ssl import time import re +import hashlib import resource import traceback from subprocess import Popen, PIPE @@ -23,6 +24,8 @@ from platform import python_version import requests from decouple import config +notify_server = 'catswords.social' + try: listening_port = config('PORT', cast=int) server_url = config('SERVER_URL') @@ -162,13 +165,13 @@ def proxy_check_filtered(data, webserver, port, scheme, method, url): # convert to text text = data.decode(client_encoding, errors='ignore') - #filtered = text.find('@misskey.io') > -1 - #filtered = filtered or text.find("https://misskey.io") > -1 - filtered = filtered or text.find('ctkpaarr') > -1 - filtered = filtered or bool(re.search(r'\b\w{10}@(?:\w+\.)+\w+\b', text)) - filtered = filtered or bool(re.search(r"https://[^\s/@]+@([a-zA-Z0-9]{10})", text)) - filtered = filtered or bool(re.search(r'https://[a-zA-Z0-9.-]+/users/[a-zA-Z0-9]{10}/statuses/[0-9]+', text)) + # ID filtering with K-Anonymity + pattern = r'[/@]([a-zA-Z0-9]{10})(?![a-zA-Z0-9])' + matches = re.findall(pattern, text) + if len(matches) > 0: + filtered = not all(map(validate_string_with_k_anonymity, matches)) + # take action if filtered: print ("[*] Filtered from %s:%s" % (webserver.decode(client_encoding), str(port))) @@ -184,10 +187,6 @@ def proxy_check_filtered(data, webserver, port, scheme, method, url): except Exception as e: print ("[*] Failed to save the file: %s" % (str(e))) - #print ("[*] ====== start preview data =====") - #print ("%s" % (text)) - #print ("[*] ====== end preview data =====") - return filtered def proxy_server(webserver, port, scheme, method, url, conn, addr, data): @@ -326,11 +325,11 @@ def add_filtered_host(domain, ip_address): with open(hosts_path, 'w') as file: file.writelines(lines) if user_token != '': # notify to catswords.social - post_status(f"[catswords.social]\r\n\r\n{domain} is a domain with suspicious spam activity.\r\n\r\n\#catswords") + post_status(f"[{notify_server} user]\r\n\r\n{domain} is a domain with suspicious spam activity.\r\n\r\n#catswords") # notify to catswords.social def post_status(text, media_ids=None, poll_options=None, poll_expires_in=None, scheduled_at=None, idempotency_key=None): - url = "https://catswords.social/api/v1/statuses" + url = f"https://{notify_server}/api/v1/statuses" headers = { "Authorization": f"Bearer {user_token}", "Content-Type": "application/x-www-form-urlencoded", @@ -348,5 +347,32 @@ def post_status(text, media_ids=None, poll_options=None, poll_expires_in=None, s response = requests.post(url, headers=headers, data=form_data) return response.json() +# k-anonymity validation +def validate_string_with_k_anonymity(s): + # SHA1 of the password + p_sha1 = hashlib.sha1(s.encode()).hexdigest() + + # First 5 char of SHA1 for k-anonymity API use + f5_sha1 = p_sha1[:5] + + # Last 5 char of SHA1 to match API output + l5_sha1 = p_sha1[-5:] + + # Making GET request using Requests library + response = requests.get(f'https://api.pwnedpasswords.com/range/{f5_sha1}') + + # Checking if request was successful + if response.status_code == 200: + # Parsing response text + hashes = response.text.split('\r\n') + + # Using list comprehension to find matching hashes + matching_hashes = [line for line in hashes if l5_sha1 in line.lower()] + + # If there are matching hashes, return True, else return False + return bool(matching_hashes) + else: + return False + if __name__== "__main__": start()