mirror of
https://github.com/gnh1201/welsonjs.git
synced 2025-11-27 10:00:57 +00:00
Merge pull request #304 from gnh1201/dev
Refactor WebSocketManager for improved clarity and reliability
This commit is contained in:
commit
13cc674433
|
|
@ -5,7 +5,6 @@
|
||||||
//
|
//
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.IO;
|
|
||||||
using System.Net.WebSockets;
|
using System.Net.WebSockets;
|
||||||
using System.Security.Cryptography;
|
using System.Security.Cryptography;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
|
@ -16,115 +15,113 @@ namespace WelsonJS.Launcher
|
||||||
{
|
{
|
||||||
public class WebSocketManager
|
public class WebSocketManager
|
||||||
{
|
{
|
||||||
private class WebSocketEntry
|
private class Entry
|
||||||
{
|
{
|
||||||
public ClientWebSocket Socket { get; set; }
|
public ClientWebSocket Socket;
|
||||||
public string Host { get; set; }
|
public string Host;
|
||||||
public int Port { get; set; }
|
public int Port;
|
||||||
public string Path { get; set; }
|
public string Path;
|
||||||
}
|
}
|
||||||
|
|
||||||
private readonly ConcurrentDictionary<string, WebSocketEntry> _wsPool = new ConcurrentDictionary<string, WebSocketEntry>();
|
private readonly ConcurrentDictionary<string, Entry> _pool = new ConcurrentDictionary<string, Entry>();
|
||||||
|
|
||||||
|
// Create a unique cache key based on host, port, and path using MD5
|
||||||
private string MakeKey(string host, int port, string path)
|
private string MakeKey(string host, int port, string path)
|
||||||
{
|
{
|
||||||
// To create a unique key for the WebSocket connection
|
string raw = host + ":" + port + "/" + path;
|
||||||
string input = host + ":" + port + "/" + path;
|
|
||||||
using (var md5 = MD5.Create())
|
using (var md5 = MD5.Create())
|
||||||
{
|
{
|
||||||
byte[] hash = md5.ComputeHash(Encoding.UTF8.GetBytes(input));
|
byte[] hash = md5.ComputeHash(Encoding.UTF8.GetBytes(raw));
|
||||||
return BitConverter.ToString(hash).Replace("-", "").ToLower();
|
return BitConverter.ToString(hash).Replace("-", "").ToLower();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get existing WebSocket if valid, otherwise connect and store a new one
|
||||||
public async Task<ClientWebSocket> GetOrCreateAsync(string host, int port, string path)
|
public async Task<ClientWebSocket> GetOrCreateAsync(string host, int port, string path)
|
||||||
{
|
{
|
||||||
string key = MakeKey(host, port, path);
|
string key = MakeKey(host, port, path);
|
||||||
|
|
||||||
if (_wsPool.TryGetValue(key, out var entry) && entry.Socket?.State == WebSocketState.Open)
|
if (_pool.TryGetValue(key, out var entry))
|
||||||
return entry.Socket;
|
|
||||||
|
|
||||||
// 재연결 필요
|
|
||||||
if (entry != null)
|
|
||||||
{
|
{
|
||||||
_wsPool.TryRemove(key, out _);
|
var sock = entry.Socket;
|
||||||
entry.Socket?.Dispose();
|
if (sock != null && sock.State == WebSocketState.Open)
|
||||||
|
return sock;
|
||||||
|
|
||||||
|
// Remove stale or broken socket
|
||||||
|
Remove(host, port, path);
|
||||||
}
|
}
|
||||||
|
|
||||||
var ws = new ClientWebSocket();
|
var newSock = new ClientWebSocket();
|
||||||
Uri uri = new Uri($"ws://{host}:{port}/{path}");
|
var uri = new Uri($"ws://{host}:{port}/{path}");
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await ws.ConnectAsync(uri, CancellationToken.None);
|
await newSock.ConnectAsync(uri, CancellationToken.None);
|
||||||
_wsPool[key] = new WebSocketEntry
|
_pool[key] = new Entry
|
||||||
{
|
{
|
||||||
Socket = ws,
|
Socket = newSock,
|
||||||
Host = host,
|
Host = host,
|
||||||
Port = port,
|
Port = port,
|
||||||
Path = path
|
Path = path
|
||||||
};
|
};
|
||||||
return ws;
|
return newSock;
|
||||||
}
|
}
|
||||||
catch
|
catch
|
||||||
{
|
{
|
||||||
ws.Dispose();
|
newSock.Dispose();
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove WebSocket from the pool and dispose
|
||||||
public void Remove(string host, int port, string path)
|
public void Remove(string host, int port, string path)
|
||||||
{
|
{
|
||||||
string key = MakeKey(host, port, path);
|
string key = MakeKey(host, port, path);
|
||||||
if (_wsPool.TryRemove(key, out var entry))
|
if (_pool.TryRemove(key, out var entry))
|
||||||
{
|
{
|
||||||
entry.Socket?.Abort();
|
try
|
||||||
entry.Socket?.Dispose();
|
{
|
||||||
|
entry.Socket?.Abort();
|
||||||
|
entry.Socket?.Dispose();
|
||||||
|
}
|
||||||
|
catch { /* Ignore errors */ }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<bool> SendWithReconnectAsync(string host, int port, string path, byte[] message, CancellationToken token)
|
// Send message and receive response with 1 retry on failure
|
||||||
|
public async Task<string> SendAndReceiveAsync(string host, int port, string path, string message, int timeoutSec)
|
||||||
{
|
{
|
||||||
ClientWebSocket ws;
|
byte[] buf = Encoding.UTF8.GetBytes(message);
|
||||||
|
var cts = timeoutSec > 0
|
||||||
|
? new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSec))
|
||||||
|
: new CancellationTokenSource();
|
||||||
|
|
||||||
try
|
for (int attempt = 0; attempt < 2; attempt++)
|
||||||
{
|
{
|
||||||
ws = await GetOrCreateAsync(host, port, path);
|
|
||||||
await ws.SendAsync(new ArraySegment<byte>(message), WebSocketMessageType.Text, true, token);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
catch
|
|
||||||
{
|
|
||||||
Remove(host, port, path);
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
ws = await GetOrCreateAsync(host, port, path);
|
return await TrySendAndReceiveAsync(host, port, path, buf, cts.Token);
|
||||||
await ws.SendAsync(new ArraySegment<byte>(message), WebSocketMessageType.Text, true, token);
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
catch
|
catch
|
||||||
{
|
{
|
||||||
Remove(host, port, path);
|
Remove(host, port, path);
|
||||||
return false;
|
if (attempt == 1) throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
throw new InvalidOperationException("Unreachable");
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<string> SendAndReceiveAsync(string host, int port, string path, string message, int timeoutSeconds, int bufferSize = 65536)
|
// Internal helper for sending and receiving data
|
||||||
|
private async Task<string> TrySendAndReceiveAsync(string host, int port, string path, byte[] buf, CancellationToken token)
|
||||||
{
|
{
|
||||||
var buffer = Encoding.UTF8.GetBytes(message);
|
var sock = await GetOrCreateAsync(host, port, path);
|
||||||
CancellationTokenSource cts = timeoutSeconds > 0
|
await sock.SendAsync(new ArraySegment<byte>(buf), WebSocketMessageType.Text, true, token);
|
||||||
? new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds))
|
|
||||||
: new CancellationTokenSource();
|
|
||||||
|
|
||||||
if (!await SendWithReconnectAsync(host, port, path, buffer, cts.Token))
|
byte[] recv = new byte[4096];
|
||||||
throw new IOException("Failed to send after reconnect");
|
var result = await sock.ReceiveAsync(new ArraySegment<byte>(recv), token);
|
||||||
|
|
||||||
ClientWebSocket ws = await GetOrCreateAsync(host, port, path);
|
return Encoding.UTF8.GetString(recv, 0, result.Count);
|
||||||
|
|
||||||
byte[] recvBuffer = new byte[bufferSize];
|
|
||||||
WebSocketReceiveResult result = await ws.ReceiveAsync(new ArraySegment<byte>(recvBuffer), cts.Token);
|
|
||||||
return Encoding.UTF8.GetString(recvBuffer, 0, result.Count);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user