diff --git a/plugins/alwaysonline.py b/plugins/alwaysonline.py index 4e75996..2e21bb7 100644 --- a/plugins/alwaysonline.py +++ b/plugins/alwaysonline.py @@ -15,7 +15,7 @@ import requests from decouple import config from elasticsearch import Elasticsearch, NotFoundError import hashlib -from datetime import datetime +from datetime import datetime, UTC from base import Extension, Logger logger = Logger(name="wayback") @@ -30,12 +30,12 @@ except Exception as e: es = Elasticsearch([es_host]) -def generate_id(url): +def generate_id(url: str): """Generate a unique ID for a URL by hashing it.""" return hashlib.sha256(url.encode("utf-8")).hexdigest() -def get_cached_page_from_google(url): +def get_cached_page_from_google(url: str): status_code, content = (0, b"") # Google Cache URL @@ -54,7 +54,7 @@ def get_cached_page_from_google(url): # API documentation: https://archive.org/help/wayback_api.php -def get_cached_page_from_wayback(url): +def get_cached_page_from_wayback(url: str): status_code, content = (0, b"") # Wayback Machine API URL @@ -93,7 +93,7 @@ def get_cached_page_from_wayback(url): return status_code, content -def get_cached_page_from_elasticsearch(url): +def get_cached_page_from_elasticsearch(url: str): url_id = generate_id(url) try: result = es.get(index=es_index, id=url_id) @@ -106,9 +106,9 @@ def get_cached_page_from_elasticsearch(url): return 502, b"" -def cache_to_elasticsearch(url, data): +def cache_to_elasticsearch(url: str, data: bytes): url_id = generate_id(url) - timestamp = datetime.utcnow().isoformat() + timestamp = datetime.now(UTC).timestamp() try: es.index( index=es_index, @@ -123,7 +123,7 @@ def cache_to_elasticsearch(url, data): logger.error(f"Error caching to Elasticsearch: {e}") -def get_page_from_origin_server(url): +def get_page_from_origin_server(url: str): try: response = requests.get(url) return response.status_code, response.content @@ -137,7 +137,7 @@ class AlwaysOnline(Extension): self.connection_type = "alwaysonline" self.buffer_size = 8192 - def connect(self, conn, data, webserver, port, scheme, method, url): + def connect(self, conn: socket.socket, data: bytes, webserver: bytes, port: bytes, scheme: bytes, method: bytes, url: bytes): logger.info("[*] Connecting... Connecting...") connected = False @@ -146,20 +146,20 @@ class AlwaysOnline(Extension): cache_hit = 0 buffered = b"" - def sendall(sock, conn, data): + def sendall(_sock: socket.socket, _conn: socket.socket, _data: bytes): # send first chuck - sock.send(data) - if len(data) < self.buffer_size: + sock.send(_data) + if len(_data) < self.buffer_size: return # send following chunks - conn.settimeout(1) + _conn.settimeout(1) while True: try: - chunk = conn.recv(self.buffer_size) + chunk = _conn.recv(self.buffer_size) if not chunk: break - sock.send(chunk) + _sock.send(chunk) except: break diff --git a/plugins/bio.py b/plugins/bio.py index 1633ae6..1805812 100644 --- a/plugins/bio.py +++ b/plugins/bio.py @@ -9,13 +9,14 @@ # Updated at: 2024-07-02 # +from socket import socket from Bio.Seq import Seq from Bio.SeqUtils import gc_fraction from base import Extension -def _analyze_sequence(sequence) -> dict[str, str]: +def _analyze_sequence(sequence: str) -> dict[str, str]: """ Analyze a given DNA sequence to provide various nucleotide transformations and translations. @@ -41,7 +42,7 @@ def _analyze_sequence(sequence) -> dict[str, str]: ) -def _gc_content_calculation(sequence) -> dict[str, str]: +def _gc_content_calculation(sequence: str) -> dict[str, str]: """ Calculate the GC content of a given DNA sequence and return it as a float. @@ -63,7 +64,7 @@ class PyBio(Extension): def dispatch(self, type, id, params, conn): conn.send(b"Greeting! dispatch") - def analyze_sequence(self, type, id, params, conn): + def analyze_sequence(self, type, id, params, conn: socket): """ Analyze a DNA sequence provided in the params dictionary. @@ -91,7 +92,7 @@ class PyBio(Extension): result = _analyze_sequence(params["sequence"]) return result - def gc_content_calculation(self, type, id, params, conn): + def gc_content_calculation(self, type, id, params, conn: socket): """ Calculate the GC content for a given DNA sequence provided in the params dictionary. diff --git a/plugins/container.py b/plugins/container.py index a74b4bd..3c27e97 100644 --- a/plugins/container.py +++ b/plugins/container.py @@ -11,7 +11,7 @@ # import docker - +from socket import socket from base import Extension, Logger logger = Logger("Container") @@ -36,21 +36,21 @@ class Container(Extension): # docker self.client = docker.from_env() - def dispatch(self, type, id, params, conn): + def dispatch(self, type, id, params, conn: socket): logger.info("[*] Greeting! dispatch") conn.send(b"Greeting! dispatch") - def container_cteate(self, type, id, params, conn): + def container_cteate(self, type, id, params, conn: socket): # todo: - return b"[*] Created" - def container_start(self, type, id, params, conn): + def container_start(self, type, id, params, conn: socket): name = params["name"] container = self.client.containers.get(name) container.start() - def container_run(self, type, id, params, conn): + def container_run(self, type, id, params, conn: socket): devices = params["devices"] image = params["image"] devices = params["devices"] @@ -70,7 +70,7 @@ class Container(Extension): logger.info("[*] Running...") return b"[*] Running..." - def container_stop(self, type, id, params, conn): + def container_stop(self, type, id, params, conn: socket): name = params["name"] container = self.client.containers.get(name) @@ -79,32 +79,32 @@ class Container(Extension): logger.info("[*] Stopped") return b"[*] Stopped" - def container_pause(self, type, id, params, conn): + def container_pause(self, type, id, params, conn: socket): name = params["name"] container = self.client.containers.get(name) container.pause() return b"[*] Paused" - def container_unpause(self, type, id, params, conn): + def container_unpause(self, type, id, params, conn: socket): name = params["name"] container = self.client.containers.get(name) container.unpause() return b"[*] Unpaused" - def container_restart(self, type, id, params, conn): + def container_restart(self, type, id, params, conn: socket): name = params["name"] container = self.client.containers.get(name) container.restart() return b"[*] Restarted" - def container_kill(self, type, id, params, conn): + def container_kill(self, type, id, params, conn: socket): # TODO: - return b"[*] Killed" - def container_remove(self, type, id, params, conn): + def container_remove(self, type, id, params, conn: socket): name = params["name"] container = self.client.containers.get(name) diff --git a/smtp.py b/smtp.py index 8e0382d..fc8ff14 100644 --- a/smtp.py +++ b/smtp.py @@ -20,7 +20,7 @@ from requests.auth import HTTPBasicAuth from base import ( extract_credentials, jsonrpc2_encode, - Logger, + Logger, jsonrpc2_decode, ) logger = Logger(name="smtp") @@ -47,8 +47,8 @@ class CaterpillarSMTPHandler: self.smtp_version = "0.1.6" async def handle_DATA(self, server, session, envelope): - mailfrom = envelope.mail_from - rcpttos = envelope.rcpt_tos + mail_from = envelope.mail_from + rcpt_tos = envelope.rcpt_tos data = envelope.content message = EmailMessage() @@ -65,7 +65,7 @@ class CaterpillarSMTPHandler: }, "data": { "to": to, - "from": mailfrom, + "from": mail_from, "subject": subject, "message": data.decode("utf-8"), }, @@ -81,11 +81,11 @@ class CaterpillarSMTPHandler: auth=auth, ) if response.status_code == 200: - type, id, rpcdata = jsonrpc2_decode(response.text) - if rpcdata["success"]: + _type, _id, rpc_data = jsonrpc2_decode(response.text) + if rpc_data["success"]: logger.info("[*] Email sent successfully.") else: - raise Exception(f"({rpcdata['code']}) {rpcdata['message']}") + raise Exception(f"({rpc_data['code']}) {rpc_data['message']}") else: raise Exception(f"Status {response.status_code}") except Exception as e: diff --git a/web.py b/web.py index a639a3f..8b0f752 100644 --- a/web.py +++ b/web.py @@ -49,18 +49,18 @@ def process_jsonrpc2(): conn = Connection(request) # JSON-RPC 2.0 request - jsondata = request.get_json(silent=True) - if jsondata["jsonrpc"] == "2.0": + json_data = request.get_json(silent=True) + if json_data["jsonrpc"] == "2.0": return Extension.dispatch_rpcmethod( - jsondata["method"], "call", jsondata["id"], jsondata["params"], conn + json_data["method"], "call", json_data["id"], json_data["params"], conn ) # when error - return jsonrpc2_error_encode({"message": "Not vaild JSON-RPC 2.0 request"}) + return jsonrpc2_error_encode({"message": "Not valid JSON-RPC 2.0 request"}) -def jsonrpc2_server(conn, id, method, params): - return Extension.dispatch_rpcmethod(method, "call", id, params, conn) +def jsonrpc2_server(conn, _id, method, params): + return Extension.dispatch_rpcmethod(method, "call", _id, params, conn) class Connection: