feat: refactoring typed programing

This commit is contained in:
Euiseo Cha 2024-08-31 15:48:35 +09:00
parent 93e0b4edd9
commit 910e5e4ed5
No known key found for this signature in database
GPG Key ID: 39F74FF9CEA87CC9
5 changed files with 44 additions and 43 deletions

View File

@ -15,7 +15,7 @@ import requests
from decouple import config
from elasticsearch import Elasticsearch, NotFoundError
import hashlib
from datetime import datetime
from datetime import datetime, UTC
from base import Extension, Logger
logger = Logger(name="wayback")
@ -30,12 +30,12 @@ except Exception as e:
es = Elasticsearch([es_host])
def generate_id(url):
def generate_id(url: str):
"""Generate a unique ID for a URL by hashing it."""
return hashlib.sha256(url.encode("utf-8")).hexdigest()
def get_cached_page_from_google(url):
def get_cached_page_from_google(url: str):
status_code, content = (0, b"")
# Google Cache URL
@ -54,7 +54,7 @@ def get_cached_page_from_google(url):
# API documentation: https://archive.org/help/wayback_api.php
def get_cached_page_from_wayback(url):
def get_cached_page_from_wayback(url: str):
status_code, content = (0, b"")
# Wayback Machine API URL
@ -93,7 +93,7 @@ def get_cached_page_from_wayback(url):
return status_code, content
def get_cached_page_from_elasticsearch(url):
def get_cached_page_from_elasticsearch(url: str):
url_id = generate_id(url)
try:
result = es.get(index=es_index, id=url_id)
@ -106,9 +106,9 @@ def get_cached_page_from_elasticsearch(url):
return 502, b""
def cache_to_elasticsearch(url, data):
def cache_to_elasticsearch(url: str, data: bytes):
url_id = generate_id(url)
timestamp = datetime.utcnow().isoformat()
timestamp = datetime.now(UTC).timestamp()
try:
es.index(
index=es_index,
@ -123,7 +123,7 @@ def cache_to_elasticsearch(url, data):
logger.error(f"Error caching to Elasticsearch: {e}")
def get_page_from_origin_server(url):
def get_page_from_origin_server(url: str):
try:
response = requests.get(url)
return response.status_code, response.content
@ -137,7 +137,7 @@ class AlwaysOnline(Extension):
self.connection_type = "alwaysonline"
self.buffer_size = 8192
def connect(self, conn, data, webserver, port, scheme, method, url):
def connect(self, conn: socket.socket, data: bytes, webserver: bytes, port: bytes, scheme: bytes, method: bytes, url: bytes):
logger.info("[*] Connecting... Connecting...")
connected = False
@ -146,20 +146,20 @@ class AlwaysOnline(Extension):
cache_hit = 0
buffered = b""
def sendall(sock, conn, data):
def sendall(_sock: socket.socket, _conn: socket.socket, _data: bytes):
# send first chuck
sock.send(data)
if len(data) < self.buffer_size:
sock.send(_data)
if len(_data) < self.buffer_size:
return
# send following chunks
conn.settimeout(1)
_conn.settimeout(1)
while True:
try:
chunk = conn.recv(self.buffer_size)
chunk = _conn.recv(self.buffer_size)
if not chunk:
break
sock.send(chunk)
_sock.send(chunk)
except:
break

View File

@ -9,13 +9,14 @@
# Updated at: 2024-07-02
#
from socket import socket
from Bio.Seq import Seq
from Bio.SeqUtils import gc_fraction
from base import Extension
def _analyze_sequence(sequence) -> dict[str, str]:
def _analyze_sequence(sequence: str) -> dict[str, str]:
"""
Analyze a given DNA sequence to provide various nucleotide transformations and translations.
@ -41,7 +42,7 @@ def _analyze_sequence(sequence) -> dict[str, str]:
)
def _gc_content_calculation(sequence) -> dict[str, str]:
def _gc_content_calculation(sequence: str) -> dict[str, str]:
"""
Calculate the GC content of a given DNA sequence and return it as a float.
@ -63,7 +64,7 @@ class PyBio(Extension):
def dispatch(self, type, id, params, conn):
conn.send(b"Greeting! dispatch")
def analyze_sequence(self, type, id, params, conn):
def analyze_sequence(self, type, id, params, conn: socket):
"""
Analyze a DNA sequence provided in the params dictionary.
@ -91,7 +92,7 @@ class PyBio(Extension):
result = _analyze_sequence(params["sequence"])
return result
def gc_content_calculation(self, type, id, params, conn):
def gc_content_calculation(self, type, id, params, conn: socket):
"""
Calculate the GC content for a given DNA sequence provided in the params dictionary.

View File

@ -11,7 +11,7 @@
#
import docker
from socket import socket
from base import Extension, Logger
logger = Logger("Container")
@ -36,21 +36,21 @@ class Container(Extension):
# docker
self.client = docker.from_env()
def dispatch(self, type, id, params, conn):
def dispatch(self, type, id, params, conn: socket):
logger.info("[*] Greeting! dispatch")
conn.send(b"Greeting! dispatch")
def container_cteate(self, type, id, params, conn):
def container_cteate(self, type, id, params, conn: socket):
# todo: -
return b"[*] Created"
def container_start(self, type, id, params, conn):
def container_start(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)
container.start()
def container_run(self, type, id, params, conn):
def container_run(self, type, id, params, conn: socket):
devices = params["devices"]
image = params["image"]
devices = params["devices"]
@ -70,7 +70,7 @@ class Container(Extension):
logger.info("[*] Running...")
return b"[*] Running..."
def container_stop(self, type, id, params, conn):
def container_stop(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)
@ -79,32 +79,32 @@ class Container(Extension):
logger.info("[*] Stopped")
return b"[*] Stopped"
def container_pause(self, type, id, params, conn):
def container_pause(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)
container.pause()
return b"[*] Paused"
def container_unpause(self, type, id, params, conn):
def container_unpause(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)
container.unpause()
return b"[*] Unpaused"
def container_restart(self, type, id, params, conn):
def container_restart(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)
container.restart()
return b"[*] Restarted"
def container_kill(self, type, id, params, conn):
def container_kill(self, type, id, params, conn: socket):
# TODO: -
return b"[*] Killed"
def container_remove(self, type, id, params, conn):
def container_remove(self, type, id, params, conn: socket):
name = params["name"]
container = self.client.containers.get(name)

14
smtp.py
View File

@ -20,7 +20,7 @@ from requests.auth import HTTPBasicAuth
from base import (
extract_credentials,
jsonrpc2_encode,
Logger,
Logger, jsonrpc2_decode,
)
logger = Logger(name="smtp")
@ -47,8 +47,8 @@ class CaterpillarSMTPHandler:
self.smtp_version = "0.1.6"
async def handle_DATA(self, server, session, envelope):
mailfrom = envelope.mail_from
rcpttos = envelope.rcpt_tos
mail_from = envelope.mail_from
rcpt_tos = envelope.rcpt_tos
data = envelope.content
message = EmailMessage()
@ -65,7 +65,7 @@ class CaterpillarSMTPHandler:
},
"data": {
"to": to,
"from": mailfrom,
"from": mail_from,
"subject": subject,
"message": data.decode("utf-8"),
},
@ -81,11 +81,11 @@ class CaterpillarSMTPHandler:
auth=auth,
)
if response.status_code == 200:
type, id, rpcdata = jsonrpc2_decode(response.text)
if rpcdata["success"]:
_type, _id, rpc_data = jsonrpc2_decode(response.text)
if rpc_data["success"]:
logger.info("[*] Email sent successfully.")
else:
raise Exception(f"({rpcdata['code']}) {rpcdata['message']}")
raise Exception(f"({rpc_data['code']}) {rpc_data['message']}")
else:
raise Exception(f"Status {response.status_code}")
except Exception as e:

12
web.py
View File

@ -49,18 +49,18 @@ def process_jsonrpc2():
conn = Connection(request)
# JSON-RPC 2.0 request
jsondata = request.get_json(silent=True)
if jsondata["jsonrpc"] == "2.0":
json_data = request.get_json(silent=True)
if json_data["jsonrpc"] == "2.0":
return Extension.dispatch_rpcmethod(
jsondata["method"], "call", jsondata["id"], jsondata["params"], conn
json_data["method"], "call", json_data["id"], json_data["params"], conn
)
# when error
return jsonrpc2_error_encode({"message": "Not vaild JSON-RPC 2.0 request"})
return jsonrpc2_error_encode({"message": "Not valid JSON-RPC 2.0 request"})
def jsonrpc2_server(conn, id, method, params):
return Extension.dispatch_rpcmethod(method, "call", id, params, conn)
def jsonrpc2_server(conn, _id, method, params):
return Extension.dispatch_rpcmethod(method, "call", _id, params, conn)
class Connection: