#!/usr/bin/python3 # # fediverse.py # Fediverse (Mastodon, Misskey, Pleroma, ...) SPAM filter plugin for Caterpillar Proxy # # Caterpillar Proxy - The simple and parasitic web proxy with SPAM filter (formerly, php-httpproxy) # Namyheon Go (Catswords Research) # https://github.com/gnh1201/caterpillar # # Created in: 2022-10-06 # Updated in: 2024-06-05 # import io import re import requests import os.path from decouple import config from PIL import Image from server import Extension try: client_encoding = config('CLIENT_ENCODING', default='utf-8') truecaptcha_userid = config('TRUECAPTCHA_USERID') # truecaptcha.org truecaptcha_apikey = config('TRUECAPTCHA_APIKEY') # truecaptcha.org dictionary_file = config('DICTIONARY_FILE', default='words_alpha.txt') # https://github.com/dwyl/english-words librey_apiurl = config('LIBREY_APIURL', default='https://search.catswords.net') # https://github.com/Ahwxorg/librey except Exception as e: print ("[*] Invaild configration: %s" % (str(e))) class Fediverse(Extension): def __init__(self): self.type = "filter" # this is a filter # Load data to use KnownWords4 strategy # Download data: https://github.com/dwyl/english-words self.known_words = [] if dictionary_file != '' and os.path.isfile(dictionary_file): with open(dictionary_file, "r") as file: words = file.readlines() self.known_words = [word.strip() for word in words if len(word.strip()) > 3] print ("[*] Data loaded to use KnownWords4 strategy") def test(self, filtered, data, webserver, port, scheme, method, url): # prevent cache confusing if data.find(b'Welcome to nginx!') > -1: return True # allowed conditions if method == b'GET' or url.find(b'/api') > -1: return False # convert to text data_length = len(data) text = data.decode(client_encoding, errors='ignore') error_rate = (data_length - len(text)) / data_length if error_rate > 0.2: # it is a binary data return False # check ID with K-Anonymity strategy pattern = r'\b(?:(?<=\/@)|(?<=acct:))([a-zA-Z0-9]{10})\b' matches = list(set(re.findall(pattern, text))) if len(matches) > 0: print ("[*] Found ID: %s" % (', '.join(matches))) try: filtered = not all(map(self.pwnedpasswords_test, matches)) except Exception as e: print ("[*] K-Anonymity strategy not working! %s" % (str(e))) filtered = True # feedback if filtered and len(matches) > 0: score = 0 strategies = [] # check ID with VowelRatio10 strategy def vowel_ratio_test(s): ratio = self.calculate_vowel_ratio(s) return ratio > 0.2 and ratio < 0.8 if all(map(vowel_ratio_test, matches)): score += 1 strategies.append('VowelRatio10') # check ID with Palindrome4 strategy if all(map(self.has_palindrome, matches)): score += 1 strategies.append('Palindrome4') # check ID with KnownWords4 strategy if all(map(self.has_known_word, matches)): score += 2 strategies.append('KnownWords4') # check ID with SearchEngine3 strategy if librey_apiurl != '' and all(map(self.search_engine_test, matches)): score += 1 strategies.append('SearchEngine3') # check ID with RepeatedNumbers3 strategy if all(map(self.repeated_numbers_test, matches)): score += 1 strategies.append('RepeatedNumbers3') # logging score with open('score.log', 'a') as file: file.write("%s\t%s\t%s\r\n" % ('+'.join(matches), str(score), '+'.join(strategies))) # make decision if score > 1: filtered = False # check an attached images (check images with Not-CAPTCHA strategy) if truecaptcha_userid != '' and not filtered and len(matches) > 0: def webp_to_png_base64(url): try: response = requests.get(url) img = Image.open(io.BytesIO(response.content)) img_png = img.convert("RGBA") buffered = io.BytesIO() img_png.save(buffered, format="PNG") encoded_image = base64.b64encode(buffered.getvalue()).decode(client_encoding) return encoded_image except: return None urls = re.findall(r'https://[^\s"]+\.webp', text) if len(urls) > 0: for url in urls: if filtered: break print ("[*] downloading... %s" % (url)) encoded_image = webp_to_png_base64(url) print ("[*] downloaded.") if encoded_image: print ("[*] solving...") try: solved = truecaptcha_solve(encoded_image) if solved: print ("[*] solved: %s" % (solved)) filtered = filtered or (solved.lower() in ['ctkpaarr', 'spam']) else: print ("[*] not solved") except Exception as e: print ("[*] Not CAPTCHA strategy not working! %s" % (str(e))) return filtered # Strategy: K-Anonymity test - use api.pwnedpasswords.com def pwnedpasswords_test(self, s): # convert to lowercase s = s.lower() # SHA1 of the password p_sha1 = hashlib.sha1(s.encode()).hexdigest() # First 5 char of SHA1 for k-anonymity API use f5_sha1 = p_sha1[:5] # Last 5 char of SHA1 to match API output l5_sha1 = p_sha1[-5:] # Making GET request using Requests library response = requests.get(f'https://api.pwnedpasswords.com/range/{f5_sha1}') # Checking if request was successful if response.status_code == 200: # Parsing response text hashes = response.text.split('\r\n') # Using list comprehension to find matching hashes matching_hashes = [line.split(':')[0] for line in hashes if line.endswith(l5_sha1)] # If there are matching hashes, return True, else return False return bool(matching_hashes) else: raise Exception("api.pwnedpasswords.com response status: %s" % (str(response.status_code))) return False # Strategy: Not-CAPTCHA - use truecaptcha.org def truecaptcha_solve(self, encoded_image): url = 'https://api.apitruecaptcha.org/one/gettext' data = { 'userid': truecaptcha_userid, 'apikey': truecaptcha_apikey, 'data': encoded_image, 'mode': 'human' } response = requests.post(url = url, json = data) if response.status_code == 200: data = response.json() if 'error_message' in data: print ("[*] Error: %s" % (data['error_message'])) return None if 'result' in data: return data['result'] else: raise Exception("api.apitruecaptcha.org response status: %s" % (str(response.status_code))) return None # Strategy: VowelRatio10 def calculate_vowel_ratio(self, s): # Calculate the length of the string. length = len(s) if length == 0: return 0.0 # Count the number of vowels ('a', 'e', 'i', 'o', 'u', 'w', 'y') in the string. vowel_count = sum(1 for char in s if char.lower() in 'aeiouwy') # Define vowel-ending patterns vowel_ending_patterns = ['ang', 'eng', 'ing', 'ong', 'ung', 'ank', 'ink', 'dge'] # Count the occurrences of vowel-ending patterns in the string. vowel_count += sum(s.count(pattern) for pattern in vowel_ending_patterns) # Calculate the ratio of vowels to the total length of the string. vowel_ratio = vowel_count / length return vowel_ratio # Strategy: Palindrome4 def has_palindrome(self, input_string): def is_palindrome(s): return s == s[::-1] input_string = input_string.lower() n = len(input_string) for i in range(n): for j in range(i + 4, n + 1): # Find substrings of at least 5 characters substring = input_string[i:j] if is_palindrome(substring): return True return False # Strategy: KnownWords4 def has_known_word(self, input_string): def is_known_word(s): return s in self.known_words input_string = input_string.lower() n = len(input_string) for i in range(n): for j in range(i + 4, n + 1): # Find substrings of at least 5 characters substring = input_string[i:j] if is_known_word(substring): return True return False # Strategy: SearchEngine3 def search_engine_test(self, s): url = "%s/api.php?q=%s" % (librey_apiurl, s) response = requests.get(url, verify=False) if response.status_code != 200: return False data = response.json() if 'results_source' in data: del data['results_source'] num_results = len(data) return num_results > 2 # Strategy: RepeatedNumbers3 def repeated_numbers_test(self, s): return bool(re.search(r'\d{3,}', s))