mirror of
https://github.com/gnh1201/caterpillar.git
synced 2025-03-11 16:35:14 +00:00
Update server.py, plugins/fediverse.py
This commit is contained in:
parent
18f928dbe8
commit
7156b934cb
269
plugins/fediverse.py
Normal file
269
plugins/fediverse.py
Normal file
|
@ -0,0 +1,269 @@
|
|||
#!/usr/bin/python3
|
||||
#
|
||||
# fediverse.py
|
||||
# Fediverse (Mastodon, Misskey, Pleroma, ...) SPAM filter plugin for Caterpillar
|
||||
#
|
||||
# Caterpillar - The simple and parasitic web proxy with spam filter
|
||||
# Namyheon Go (Catswords Research) <gnh1201@gmail.com>
|
||||
# https://github.com/gnh1201/caterpillar
|
||||
# Created at: 2022-10-06
|
||||
# Updated at: 2024-12-28
|
||||
#
|
||||
|
||||
import io
|
||||
import re
|
||||
import requests
|
||||
|
||||
from decouple import config
|
||||
from PIL import Image
|
||||
|
||||
from server import Filter
|
||||
|
||||
try:
|
||||
truecaptcha_userid = config('TRUECAPTCHA_USERID') # truecaptcha.org
|
||||
truecaptcha_apikey = config('TRUECAPTCHA_APIKEY') # truecaptcha.org
|
||||
librey_apiurl = config("LIBREY_APIURL") # https://github.com/Ahwxorg/librey
|
||||
except
|
||||
pass
|
||||
|
||||
class Fediverse(Filter):
|
||||
def __init__(self):
|
||||
# Load data to use KnownWords4 strategy
|
||||
# Download data: https://github.com/dwyl/english-words
|
||||
self.known_words = []
|
||||
with open("words_alpha.txt", "r") as file:
|
||||
words = file.readlines()
|
||||
self.known_words = [word.strip() for word in words if len(word.strip()) > 3]
|
||||
print ("[*] Data loaded to use KnownWords4 strategy")
|
||||
|
||||
def test(self, data):
|
||||
filtered = False
|
||||
|
||||
# prevent cache confusing
|
||||
if data.find(b'<title>Welcome to nginx!</title>') > -1:
|
||||
return True
|
||||
|
||||
# allowed conditions
|
||||
if method == b'GET' or url.find(b'/api') > -1:
|
||||
return False
|
||||
|
||||
# convert to text
|
||||
data_length = len(data)
|
||||
text = data.decode(client_encoding, errors='ignore')
|
||||
error_rate = (data_length - len(text)) / data_length
|
||||
if error_rate > 0.2: # it is a binary data
|
||||
return False
|
||||
|
||||
# check ID with K-Anonymity strategy
|
||||
pattern = r'\b(?:(?<=\/@)|(?<=acct:))([a-zA-Z0-9]{10})\b'
|
||||
matches = list(set(re.findall(pattern, text)))
|
||||
if len(matches) > 0:
|
||||
print ("[*] Found ID: %s" % (', '.join(matches)))
|
||||
try:
|
||||
filtered = not all(map(self.pwnedpasswords_test, matches))
|
||||
except Exception as e:
|
||||
print ("[*] K-Anonymity strategy not working! %s" % (str(e)))
|
||||
filtered = True
|
||||
|
||||
# feedback
|
||||
if filtered and len(matches) > 0:
|
||||
score = 0
|
||||
strategies = []
|
||||
|
||||
# check ID with VowelRatio10 strategy
|
||||
def vowel_ratio_test(s):
|
||||
ratio = self.calculate_vowel_ratio(s)
|
||||
return ratio > 0.2 and ratio < 0.8
|
||||
if all(map(vowel_ratio_test, matches)):
|
||||
score += 1
|
||||
strategies.append('VowelRatio10')
|
||||
|
||||
# check ID with Palindrome4 strategy
|
||||
if all(map(self.has_palindrome, matches)):
|
||||
score += 1
|
||||
strategies.append('Palindrome4')
|
||||
|
||||
# check ID with KnownWords4 strategy
|
||||
if all(map(self.has_known_word, matches)):
|
||||
score += 2
|
||||
strategies.append('KnownWords4')
|
||||
|
||||
# check ID with SearchEngine3 strategy
|
||||
if librey_apiurl != '' and all(map(self.search_engine_test, matches)):
|
||||
score += 1
|
||||
strategies.append('SearchEngine3')
|
||||
|
||||
# check ID with RepeatedNumbers3 strategy
|
||||
if all(map(self.repeated_numbers_test, matches)):
|
||||
score += 1
|
||||
strategies.append('RepeatedNumbers3')
|
||||
|
||||
# logging score
|
||||
with open('score.log', 'a') as file:
|
||||
file.write("%s\t%s\t%s\r\n" % ('+'.join(matches), str(score), '+'.join(strategies)))
|
||||
|
||||
# make decision
|
||||
if score > 1:
|
||||
filtered = False
|
||||
|
||||
# check an attached images (check images with Not-CAPTCHA strategy)
|
||||
if truecaptcha_userid != '' and not filtered and len(matches) > 0:
|
||||
def webp_to_png_base64(url):
|
||||
try:
|
||||
response = requests.get(url)
|
||||
img = Image.open(io.BytesIO(response.content))
|
||||
img_png = img.convert("RGBA")
|
||||
buffered = io.BytesIO()
|
||||
img_png.save(buffered, format="PNG")
|
||||
encoded_image = base64.b64encode(buffered.getvalue()).decode(client_encoding)
|
||||
return encoded_image
|
||||
except:
|
||||
return None
|
||||
|
||||
urls = re.findall(r'https://[^\s"]+\.webp', text)
|
||||
if len(urls) > 0:
|
||||
for url in urls:
|
||||
if filtered:
|
||||
break
|
||||
|
||||
print ("[*] downloading... %s" % (url))
|
||||
encoded_image = webp_to_png_base64(url)
|
||||
print ("[*] downloaded.")
|
||||
if encoded_image:
|
||||
print ("[*] solving...")
|
||||
try:
|
||||
solved = truecaptcha_solve(encoded_image)
|
||||
if solved:
|
||||
print ("[*] solved: %s" % (solved))
|
||||
filtered = filtered or (solved.lower() in ['ctkpaarr', 'spam'])
|
||||
else:
|
||||
print ("[*] not solved")
|
||||
except Exception as e:
|
||||
print ("[*] Not CAPTCHA strategy not working! %s" % (str(e)))
|
||||
|
||||
return filtered
|
||||
|
||||
# Strategy: K-Anonymity test - use api.pwnedpasswords.com
|
||||
def pwnedpasswords_test(self, s):
|
||||
# convert to lowercase
|
||||
s = s.lower()
|
||||
|
||||
# SHA1 of the password
|
||||
p_sha1 = hashlib.sha1(s.encode()).hexdigest()
|
||||
|
||||
# First 5 char of SHA1 for k-anonymity API use
|
||||
f5_sha1 = p_sha1[:5]
|
||||
|
||||
# Last 5 char of SHA1 to match API output
|
||||
l5_sha1 = p_sha1[-5:]
|
||||
|
||||
# Making GET request using Requests library
|
||||
response = requests.get(f'https://api.pwnedpasswords.com/range/{f5_sha1}')
|
||||
|
||||
# Checking if request was successful
|
||||
if response.status_code == 200:
|
||||
# Parsing response text
|
||||
hashes = response.text.split('\r\n')
|
||||
|
||||
# Using list comprehension to find matching hashes
|
||||
matching_hashes = [line.split(':')[0] for line in hashes if line.endswith(l5_sha1)]
|
||||
|
||||
# If there are matching hashes, return True, else return False
|
||||
return bool(matching_hashes)
|
||||
else:
|
||||
raise Exception("api.pwnedpasswords.com response status: %s" % (str(response.status_code)))
|
||||
|
||||
return False
|
||||
|
||||
# Strategy: Not-CAPTCHA - use truecaptcha.org
|
||||
def truecaptcha_solve(self, encoded_image):
|
||||
url = 'https://api.apitruecaptcha.org/one/gettext'
|
||||
data = {
|
||||
'userid': truecaptcha_userid,
|
||||
'apikey': truecaptcha_apikey,
|
||||
'data': encoded_image,
|
||||
'mode': 'human'
|
||||
}
|
||||
response = requests.post(url = url, json = data)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
|
||||
if 'error_message' in data:
|
||||
print ("[*] Error: %s" % (data['error_message']))
|
||||
return None
|
||||
if 'result' in data:
|
||||
return data['result']
|
||||
else:
|
||||
raise Exception("api.apitruecaptcha.org response status: %s" % (str(response.status_code)))
|
||||
|
||||
return None
|
||||
|
||||
# Strategy: VowelRatio10
|
||||
def calculate_vowel_ratio(self, s):
|
||||
# Calculate the length of the string.
|
||||
length = len(s)
|
||||
if length == 0:
|
||||
return 0.0
|
||||
|
||||
# Count the number of vowels ('a', 'e', 'i', 'o', 'u', 'w', 'y') in the string.
|
||||
vowel_count = sum(1 for char in s if char.lower() in 'aeiouwy')
|
||||
|
||||
# Define vowel-ending patterns
|
||||
vowel_ending_patterns = ['ang', 'eng', 'ing', 'ong', 'ung', 'ank', 'ink', 'dge']
|
||||
|
||||
# Count the occurrences of vowel-ending patterns in the string.
|
||||
vowel_count += sum(s.count(pattern) for pattern in vowel_ending_patterns)
|
||||
|
||||
# Calculate the ratio of vowels to the total length of the string.
|
||||
vowel_ratio = vowel_count / length
|
||||
|
||||
return vowel_ratio
|
||||
|
||||
# Strategy: Palindrome4
|
||||
def has_palindrome(self, input_string):
|
||||
def is_palindrome(s):
|
||||
return s == s[::-1]
|
||||
|
||||
input_string = input_string.lower()
|
||||
n = len(input_string)
|
||||
for i in range(n):
|
||||
for j in range(i + 4, n + 1): # Find substrings of at least 5 characters
|
||||
substring = input_string[i:j]
|
||||
if is_palindrome(substring):
|
||||
return True
|
||||
return False
|
||||
|
||||
# Strategy: KnownWords4
|
||||
def has_known_word(self, input_string):
|
||||
def is_known_word(s):
|
||||
return s in self.known_words
|
||||
|
||||
input_string = input_string.lower()
|
||||
n = len(input_string)
|
||||
for i in range(n):
|
||||
for j in range(i + 4, n + 1): # Find substrings of at least 5 characters
|
||||
substring = input_string[i:j]
|
||||
if is_known_word(substring):
|
||||
return True
|
||||
return False
|
||||
|
||||
# Strategy: SearchEngine3
|
||||
def search_engine_test(self, s):
|
||||
url = "%s/api.php?q=%s" % (librey_apiurl, s)
|
||||
response = requests.get(url, verify=False)
|
||||
if response.status_code != 200:
|
||||
return False
|
||||
|
||||
data = response.json()
|
||||
|
||||
if 'results_source' in data:
|
||||
del data['results_source']
|
||||
|
||||
num_results = len(data)
|
||||
|
||||
return num_results > 2
|
||||
|
||||
# Strategy: RepeatedNumbers3
|
||||
def repeated_numbers_test(self, s):
|
||||
return bool(re.search(r'\d{3,}', s))
|
344
server.py
344
server.py
|
@ -1,8 +1,13 @@
|
|||
#!/usr/bin/python3
|
||||
#
|
||||
# server.py
|
||||
#
|
||||
# Caterpillar - The simple and parasitic web proxy with spam filter
|
||||
# Namyheon Go (Catswords Research) <gnh1201@gmail.com>
|
||||
# https://github.com/gnh1201/caterpillar
|
||||
# Created at: 2022-10-06
|
||||
# Updated at: 2024-12-28
|
||||
#
|
||||
|
||||
import argparse
|
||||
import socket
|
||||
|
@ -13,16 +18,11 @@ import base64
|
|||
import json
|
||||
import ssl
|
||||
import time
|
||||
import re
|
||||
import hashlib
|
||||
import resource
|
||||
import traceback
|
||||
import io
|
||||
import textwrap
|
||||
from subprocess import Popen, PIPE
|
||||
from datetime import datetime
|
||||
from platform import python_version
|
||||
from PIL import Image
|
||||
|
||||
import requests
|
||||
from decouple import config
|
||||
|
@ -39,11 +39,6 @@ try:
|
|||
client_encoding = config('CLIENT_ENCODING')
|
||||
local_domain = config('LOCAL_DOMAIN')
|
||||
proxy_pass = config('PROXY_PASS')
|
||||
mastodon_server = config('MASTODON_SERVER') # catswords.social
|
||||
mastodon_user_token = config('MASTODON_USER_TOKEN') # catswords.social
|
||||
truecaptcha_userid = config('TRUECAPTCHA_USERID') # truecaptcha.org
|
||||
truecaptcha_apikey = config('TRUECAPTCHA_APIKEY') # truecaptcha.org
|
||||
librey_apiurl = config("LIBREY_APIURL") # https://github.com/Ahwxorg/librey
|
||||
except KeyboardInterrupt:
|
||||
print("\n[*] User has requested an interrupt")
|
||||
print("[*] Application Exiting.....")
|
||||
|
@ -60,41 +55,6 @@ buffer_size = args.buffer_size
|
|||
accepted_relay = {}
|
||||
resolved_address_list = []
|
||||
|
||||
# https://stackoverflow.com/questions/25475906/set-ulimit-c-from-outside-shell
|
||||
resource.setrlimit(
|
||||
resource.RLIMIT_CORE,
|
||||
(resource.RLIM_INFINITY, resource.RLIM_INFINITY))
|
||||
|
||||
# load data to use KnownWords4 strategy
|
||||
# Download data: https://github.com/dwyl/english-words
|
||||
known_words = []
|
||||
if os.path.exists("words_alpha.txt"):
|
||||
with open("words_alpha.txt", "r") as file:
|
||||
words = file.readlines()
|
||||
known_words = [word.strip() for word in words if len(word.strip()) > 3]
|
||||
print ("[*] data loaded to use KnownWords4 strategy")
|
||||
|
||||
def start(): #Main Program
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.bind(('', listening_port))
|
||||
sock.listen(max_connection)
|
||||
print("[*] Server started successfully [ %d ]" %(listening_port))
|
||||
except Exception:
|
||||
print("[*] Unable to Initialize Socket")
|
||||
print(Exception)
|
||||
sys.exit(2)
|
||||
|
||||
while True:
|
||||
try:
|
||||
conn, addr = sock.accept() #Accept connection from client browser
|
||||
data = conn.recv(buffer_size) #Recieve client data
|
||||
start_new_thread(conn_string, (conn, data, addr)) #Starting a thread
|
||||
except KeyboardInterrupt:
|
||||
sock.close()
|
||||
print("\n[*] Graceful Shutdown")
|
||||
sys.exit(1)
|
||||
|
||||
def jsonrpc2_create_id(data):
|
||||
return hashlib.sha1(json.dumps(data).encode(client_encoding)).hexdigest()
|
||||
|
||||
|
@ -214,123 +174,9 @@ def proxy_connect(webserver, conn):
|
|||
def proxy_check_filtered(data, webserver, port, scheme, method, url):
|
||||
filtered = False
|
||||
|
||||
# prevent cache confusing
|
||||
if data.find(b'<title>Welcome to nginx!</title>') > -1:
|
||||
return True
|
||||
|
||||
# allowed conditions
|
||||
if method == b'GET' or url.find(b'/api') > -1:
|
||||
return False
|
||||
|
||||
# convert to text
|
||||
data_length = len(data)
|
||||
text = data.decode(client_encoding, errors='ignore')
|
||||
error_rate = (data_length - len(text)) / data_length
|
||||
if error_rate > 0.2: # it is a binary data
|
||||
return False
|
||||
|
||||
# check ID with K-Anonymity strategy
|
||||
pattern = r'\b(?:(?<=\/@)|(?<=acct:))([a-zA-Z0-9]{10})\b'
|
||||
matches = list(set(re.findall(pattern, text)))
|
||||
if len(matches) > 0:
|
||||
print ("[*] Found ID: %s" % (', '.join(matches)))
|
||||
try:
|
||||
filtered = not all(map(pwnedpasswords_test, matches))
|
||||
except Exception as e:
|
||||
print ("[*] K-Anonymity strategy not working! %s" % (str(e)))
|
||||
filtered = True
|
||||
|
||||
# feedback
|
||||
if filtered and len(matches) > 0:
|
||||
score = 0
|
||||
strategies = []
|
||||
|
||||
# check ID with VowelRatio10 strategy
|
||||
def vowel_ratio_test(s):
|
||||
ratio = calculate_vowel_ratio(s)
|
||||
return ratio > 0.2 and ratio < 0.8
|
||||
if all(map(vowel_ratio_test, matches)):
|
||||
score += 1
|
||||
strategies.append('VowelRatio10')
|
||||
|
||||
# check ID with Palindrome4 strategy
|
||||
if all(map(has_palindrome, matches)):
|
||||
score += 1
|
||||
strategies.append('Palindrome4')
|
||||
|
||||
# check ID with KnownWords4 strategy
|
||||
if all(map(has_known_word, matches)):
|
||||
score += 2
|
||||
strategies.append('KnownWords4')
|
||||
|
||||
# check ID with SearchEngine3 strategy
|
||||
if librey_apiurl != '' and all(map(search_engine_test, matches)):
|
||||
score += 1
|
||||
strategies.append('SearchEngine3')
|
||||
|
||||
# check ID with RepeatedNumbers3 strategy
|
||||
if all(map(repeated_numbers_test, matches)):
|
||||
score += 1
|
||||
strategies.append('RepeatedNumbers3')
|
||||
|
||||
# logging score
|
||||
with open('score.log', 'a') as file:
|
||||
file.write("%s\t%s\t%s\r\n" % ('+'.join(matches), str(score), '+'.join(strategies)))
|
||||
|
||||
# make decision
|
||||
if score > 1:
|
||||
filtered = False
|
||||
|
||||
# check an attached images (check images with Not-CAPTCHA strategy)
|
||||
if truecaptcha_userid != '' and not filtered and len(matches) > 0:
|
||||
def webp_to_png_base64(url):
|
||||
try:
|
||||
response = requests.get(url)
|
||||
img = Image.open(io.BytesIO(response.content))
|
||||
img_png = img.convert("RGBA")
|
||||
buffered = io.BytesIO()
|
||||
img_png.save(buffered, format="PNG")
|
||||
encoded_image = base64.b64encode(buffered.getvalue()).decode(client_encoding)
|
||||
return encoded_image
|
||||
except:
|
||||
return None
|
||||
|
||||
urls = re.findall(r'https://[^\s"]+\.webp', text)
|
||||
if len(urls) > 0:
|
||||
for url in urls:
|
||||
if filtered:
|
||||
break
|
||||
|
||||
print ("[*] downloading... %s" % (url))
|
||||
encoded_image = webp_to_png_base64(url)
|
||||
print ("[*] downloaded.")
|
||||
if encoded_image:
|
||||
print ("[*] solving...")
|
||||
try:
|
||||
solved = truecaptcha_solve(encoded_image)
|
||||
if solved:
|
||||
print ("[*] solved: %s" % (solved))
|
||||
filtered = filtered or (solved.lower() in ['ctkpaarr', 'spam'])
|
||||
else:
|
||||
print ("[*] not solved")
|
||||
except Exception as e:
|
||||
print ("[*] Not CAPTCHA strategy not working! %s" % (str(e)))
|
||||
|
||||
# take action
|
||||
if filtered:
|
||||
print ("[*] Filtered from %s:%s" % (webserver.decode(client_encoding), str(port)))
|
||||
|
||||
try:
|
||||
savedir = './savedfiles'
|
||||
if not os.path.exists(savedir):
|
||||
os.makedirs(savedir)
|
||||
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
file_path = os.path.join(savedir, ("%s_%s.bin" % (current_time, webserver.decode(client_encoding))))
|
||||
with open(file_path, 'wb') as file:
|
||||
file.write(data)
|
||||
print ("[*] Saved the file: %s" % (file_path))
|
||||
except Exception as e:
|
||||
print ("[*] Failed to save the file: %s" % (str(e)))
|
||||
for f in Filter.get_filters():
|
||||
if not filtered:
|
||||
filtered = f.test(data)
|
||||
|
||||
return filtered
|
||||
|
||||
|
@ -599,153 +445,45 @@ def add_filtered_host(domain, ip_address):
|
|||
lines.append(f"{ip_address}\t{domain}\n")
|
||||
with open(hosts_path, 'w') as file:
|
||||
file.writelines(lines)
|
||||
if mastodon_user_token != '': # notify to catswords.social
|
||||
post_status_to_mastodon(f"[{mastodon_server} user]\r\n\r\n{domain} is a domain with suspicious spam activity.\r\n\r\n#catswords")
|
||||
|
||||
# notify to mastodon server
|
||||
def post_status_to_mastodon(text, media_ids=None, poll_options=None, poll_expires_in=None, scheduled_at=None, idempotency_key=None):
|
||||
url = f"https://{mastodon_server}/api/v1/statuses"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {user_token}",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
}
|
||||
form_data = {
|
||||
"status": text,
|
||||
"media_ids[]": media_ids,
|
||||
"poll[options][]": poll_options,
|
||||
"poll[expires_in]": poll_expires_in,
|
||||
"scheduled_at": scheduled_at,
|
||||
}
|
||||
if idempotency_key:
|
||||
headers["Idempotency-Key"] = idempotency_key
|
||||
def start(): #Main Program
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.bind(('', listening_port))
|
||||
sock.listen(max_connection)
|
||||
print("[*] Server started successfully [ %d ]" %(listening_port))
|
||||
except Exception:
|
||||
print("[*] Unable to Initialize Socket")
|
||||
print(Exception)
|
||||
sys.exit(2)
|
||||
|
||||
response = requests.post(url, headers=headers, data=form_data)
|
||||
return response.json()
|
||||
while True:
|
||||
try:
|
||||
conn, addr = sock.accept() #Accept connection from client browser
|
||||
data = conn.recv(buffer_size) #Recieve client data
|
||||
start_new_thread(conn_string, (conn, data, addr)) #Starting a thread
|
||||
except KeyboardInterrupt:
|
||||
sock.close()
|
||||
print("\n[*] Graceful Shutdown")
|
||||
sys.exit(1)
|
||||
|
||||
# Strategy: K-Anonymity test - use api.pwnedpasswords.com
|
||||
def pwnedpasswords_test(s):
|
||||
# convert to lowercase
|
||||
s = s.lower()
|
||||
class Filter():
|
||||
filters = []
|
||||
|
||||
# SHA1 of the password
|
||||
p_sha1 = hashlib.sha1(s.encode()).hexdigest()
|
||||
@classmethod
|
||||
def register(cls, f):
|
||||
cls.filters.append(f)
|
||||
|
||||
# First 5 char of SHA1 for k-anonymity API use
|
||||
f5_sha1 = p_sha1[:5]
|
||||
@classmethod
|
||||
def get_filters(cls):
|
||||
return cls.filters
|
||||
|
||||
# Last 5 char of SHA1 to match API output
|
||||
l5_sha1 = p_sha1[-5:]
|
||||
|
||||
# Making GET request using Requests library
|
||||
response = requests.get(f'https://api.pwnedpasswords.com/range/{f5_sha1}')
|
||||
|
||||
# Checking if request was successful
|
||||
if response.status_code == 200:
|
||||
# Parsing response text
|
||||
hashes = response.text.split('\r\n')
|
||||
|
||||
# Using list comprehension to find matching hashes
|
||||
matching_hashes = [line.split(':')[0] for line in hashes if line.endswith(l5_sha1)]
|
||||
|
||||
# If there are matching hashes, return True, else return False
|
||||
return bool(matching_hashes)
|
||||
else:
|
||||
raise Exception("api.pwnedpasswords.com response status: %s" % (str(response.status_code)))
|
||||
|
||||
return False
|
||||
|
||||
# Strategy: Not-CAPTCHA - use truecaptcha.org
|
||||
def truecaptcha_solve(encoded_image):
|
||||
url = 'https://api.apitruecaptcha.org/one/gettext'
|
||||
data = {
|
||||
'userid': truecaptcha_userid,
|
||||
'apikey': truecaptcha_apikey,
|
||||
'data': encoded_image,
|
||||
'mode': 'human'
|
||||
}
|
||||
response = requests.post(url = url, json = data)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
|
||||
if 'error_message' in data:
|
||||
print ("[*] Error: %s" % (data['error_message']))
|
||||
return None
|
||||
if 'result' in data:
|
||||
return data['result']
|
||||
else:
|
||||
raise Exception("api.apitruecaptcha.org response status: %s" % (str(response.status_code)))
|
||||
|
||||
return None
|
||||
|
||||
# Strategy: VowelRatio10
|
||||
def calculate_vowel_ratio(s):
|
||||
# Calculate the length of the string.
|
||||
length = len(s)
|
||||
if length == 0:
|
||||
return 0.0
|
||||
|
||||
# Count the number of vowels ('a', 'e', 'i', 'o', 'u', 'w', 'y') in the string.
|
||||
vowel_count = sum(1 for char in s if char.lower() in 'aeiouwy')
|
||||
|
||||
# Define vowel-ending patterns
|
||||
vowel_ending_patterns = ['ang', 'eng', 'ing', 'ong', 'ung', 'ank', 'ink', 'dge']
|
||||
|
||||
# Count the occurrences of vowel-ending patterns in the string.
|
||||
vowel_count += sum(s.count(pattern) for pattern in vowel_ending_patterns)
|
||||
|
||||
# Calculate the ratio of vowels to the total length of the string.
|
||||
vowel_ratio = vowel_count / length
|
||||
|
||||
return vowel_ratio
|
||||
|
||||
# Strategy: Palindrome4
|
||||
def has_palindrome(input_string):
|
||||
def is_palindrome(s):
|
||||
return s == s[::-1]
|
||||
|
||||
input_string = input_string.lower()
|
||||
n = len(input_string)
|
||||
for i in range(n):
|
||||
for j in range(i + 4, n + 1): # Find substrings of at least 5 characters
|
||||
substring = input_string[i:j]
|
||||
if is_palindrome(substring):
|
||||
return True
|
||||
return False
|
||||
|
||||
# Strategy: KnownWords4
|
||||
def has_known_word(input_string):
|
||||
def is_known_word(s):
|
||||
return s in known_words
|
||||
|
||||
input_string = input_string.lower()
|
||||
n = len(input_string)
|
||||
for i in range(n):
|
||||
for j in range(i + 4, n + 1): # Find substrings of at least 5 characters
|
||||
substring = input_string[i:j]
|
||||
if is_known_word(substring):
|
||||
return True
|
||||
return False
|
||||
|
||||
# Strategy: SearchEngine3
|
||||
def search_engine_test(s):
|
||||
url = "%s/api.php?q=%s" % (librey_apiurl, s)
|
||||
response = requests.get(url, verify=False)
|
||||
if response.status_code != 200:
|
||||
return False
|
||||
|
||||
data = response.json()
|
||||
|
||||
if 'results_source' in data:
|
||||
del data['results_source']
|
||||
|
||||
num_results = len(data)
|
||||
|
||||
return num_results > 2
|
||||
|
||||
# Strategy: RepeatedNumbers3
|
||||
def repeated_numbers_test(s):
|
||||
return bool(re.search(r'\d{3,}', s))
|
||||
def test(self, data):
|
||||
print ("[*] Not implemented")
|
||||
|
||||
if __name__== "__main__":
|
||||
# load filters
|
||||
Filter.register(importlib.import_module("plugins.fediverse").Fediverse())
|
||||
|
||||
# start
|
||||
start()
|
||||
|
|
Loading…
Reference in New Issue
Block a user