Update server.py

This commit is contained in:
Namhyeon Go 2024-02-19 07:58:44 +09:00 committed by GitHub
parent 5bc6c7aaf6
commit a055976125
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -14,6 +14,7 @@ import json
import ssl
import time
import re
import hashlib
import resource
import traceback
from subprocess import Popen, PIPE
@ -23,6 +24,8 @@ from platform import python_version
import requests
from decouple import config
notify_server = 'catswords.social'
try:
listening_port = config('PORT', cast=int)
server_url = config('SERVER_URL')
@ -162,13 +165,13 @@ def proxy_check_filtered(data, webserver, port, scheme, method, url):
# convert to text
text = data.decode(client_encoding, errors='ignore')
#filtered = text.find('@misskey.io') > -1
#filtered = filtered or text.find("https://misskey.io") > -1
filtered = filtered or text.find('ctkpaarr') > -1
filtered = filtered or bool(re.search(r'\b\w{10}@(?:\w+\.)+\w+\b', text))
filtered = filtered or bool(re.search(r"https://[^\s/@]+@([a-zA-Z0-9]{10})", text))
filtered = filtered or bool(re.search(r'https://[a-zA-Z0-9.-]+/users/[a-zA-Z0-9]{10}/statuses/[0-9]+', text))
# ID filtering with K-Anonymity
pattern = r'[/@]([a-zA-Z0-9]{10})(?![a-zA-Z0-9])'
matches = re.findall(pattern, text)
if len(matches) > 0:
filtered = not all(map(validate_string_with_k_anonymity, matches))
# take action
if filtered:
print ("[*] Filtered from %s:%s" % (webserver.decode(client_encoding), str(port)))
@ -184,10 +187,6 @@ def proxy_check_filtered(data, webserver, port, scheme, method, url):
except Exception as e:
print ("[*] Failed to save the file: %s" % (str(e)))
#print ("[*] ====== start preview data =====")
#print ("%s" % (text))
#print ("[*] ====== end preview data =====")
return filtered
def proxy_server(webserver, port, scheme, method, url, conn, addr, data):
@ -326,11 +325,11 @@ def add_filtered_host(domain, ip_address):
with open(hosts_path, 'w') as file:
file.writelines(lines)
if user_token != '': # notify to catswords.social
post_status(f"[catswords.social]\r\n\r\n{domain} is a domain with suspicious spam activity.\r\n\r\n\#catswords")
post_status(f"[{notify_server} user]\r\n\r\n{domain} is a domain with suspicious spam activity.\r\n\r\n#catswords")
# notify to catswords.social
def post_status(text, media_ids=None, poll_options=None, poll_expires_in=None, scheduled_at=None, idempotency_key=None):
url = "https://catswords.social/api/v1/statuses"
url = f"https://{notify_server}/api/v1/statuses"
headers = {
"Authorization": f"Bearer {user_token}",
"Content-Type": "application/x-www-form-urlencoded",
@ -348,5 +347,32 @@ def post_status(text, media_ids=None, poll_options=None, poll_expires_in=None, s
response = requests.post(url, headers=headers, data=form_data)
return response.json()
# k-anonymity validation
def validate_string_with_k_anonymity(s):
# SHA1 of the password
p_sha1 = hashlib.sha1(s.encode()).hexdigest()
# First 5 char of SHA1 for k-anonymity API use
f5_sha1 = p_sha1[:5]
# Last 5 char of SHA1 to match API output
l5_sha1 = p_sha1[-5:]
# Making GET request using Requests library
response = requests.get(f'https://api.pwnedpasswords.com/range/{f5_sha1}')
# Checking if request was successful
if response.status_code == 200:
# Parsing response text
hashes = response.text.split('\r\n')
# Using list comprehension to find matching hashes
matching_hashes = [line for line in hashes if l5_sha1 in line.lower()]
# If there are matching hashes, return True, else return False
return bool(matching_hashes)
else:
return False
if __name__== "__main__":
start()