Merge pull request #310 from gnh1201/dev

Update WebSocketManager.cs
This commit is contained in:
Namhyeon Go 2025-08-14 03:13:53 +09:00 committed by GitHub
commit e1a9c79a75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -4,7 +4,9 @@
// https://github.com/gnh1201/welsonjs // https://github.com/gnh1201/welsonjs
// //
using System; using System;
using System.Buffers;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.IO;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Security.Cryptography; using System.Security.Cryptography;
using System.Text; using System.Text;
@ -21,6 +23,8 @@ namespace WelsonJS.Launcher
public string Host; public string Host;
public int Port; public int Port;
public string Path; public string Path;
// Ensures that send/receive is serialized per socket
public readonly SemaphoreSlim IoLock = new SemaphoreSlim(1, 1);
} }
private readonly ConcurrentDictionary<string, Entry> _pool = new ConcurrentDictionary<string, Entry>(); private readonly ConcurrentDictionary<string, Entry> _pool = new ConcurrentDictionary<string, Entry>();
@ -36,33 +40,36 @@ namespace WelsonJS.Launcher
} }
} }
// Get an open WebSocket or connect a new one // Get an existing open WebSocket entry or create a new one
public async Task<ClientWebSocket> GetOrCreateAsync(string host, int port, string path) private async Task<Entry> GetOrCreateAsync(string host, int port, string path)
{ {
string key = MakeKey(host, port, path); string key = MakeKey(host, port, path);
if (_pool.TryGetValue(key, out var entry)) if (_pool.TryGetValue(key, out var entry))
{ {
var sock = entry.Socket; var sock = entry.Socket;
if (sock != null && sock.State == WebSocketState.Open)
return entry;
if (sock == null || sock.State != WebSocketState.Open) Remove(host, port, path);
{
Remove(host, port, path);
}
else
{
return sock;
}
} }
var newSock = new ClientWebSocket(); var newSock = new ClientWebSocket();
var uri = new Uri($"ws://{host}:{port}/{path}");
// Build the WebSocket URI safely
var ub = new UriBuilder
{
Scheme = "ws",
Host = host,
Port = port,
Path = string.IsNullOrEmpty(path) ? "/" : (path.StartsWith("/") ? path : "/" + path)
};
try try
{ {
await newSock.ConnectAsync(uri, CancellationToken.None); await newSock.ConnectAsync(ub.Uri, CancellationToken.None);
_pool[key] = new Entry var newEntry = new Entry
{ {
Socket = newSock, Socket = newSock,
Host = host, Host = host,
@ -70,7 +77,8 @@ namespace WelsonJS.Launcher
Path = path Path = path
}; };
return newSock; _pool[key] = newEntry;
return newEntry;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -92,22 +100,27 @@ namespace WelsonJS.Launcher
entry.Socket?.Dispose(); entry.Socket?.Dispose();
} }
catch { /* Ignore dispose exceptions */ } catch { /* Ignore dispose exceptions */ }
finally
{
try { entry.IoLock?.Dispose(); } catch { }
}
} }
} }
// Send and receive with automatic retry on first failure // Send a message and receive a response, with automatic retry on first failure
public async Task<string> SendAndReceiveAsync(string host, int port, string path, string message, int timeoutSec) public async Task<string> SendAndReceiveAsync(string host, int port, string path, string message, int timeoutSec, int maxMessageBytes = 8 * 1024 * 1024)
{ {
byte[] buf = Encoding.UTF8.GetBytes(message);
var cts = timeoutSec > 0 var cts = timeoutSec > 0
? new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSec)) ? new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSec))
: new CancellationTokenSource(); : new CancellationTokenSource();
byte[] buf = Encoding.UTF8.GetBytes(message);
for (int attempt = 0; attempt < 2; attempt++) for (int attempt = 0; attempt < 2; attempt++)
{ {
try try
{ {
return await TrySendAndReceiveAsync(host, port, path, buf, cts.Token); return await TrySendAndReceiveAsync(host, port, path, buf, cts.Token, maxMessageBytes);
} }
catch catch
{ {
@ -119,22 +132,65 @@ namespace WelsonJS.Launcher
throw new InvalidOperationException("Unreachable"); throw new InvalidOperationException("Unreachable");
} }
// Actual send and receive implementation // Actual send/receive logic with full-frame accumulation until EndOfMessage
private async Task<string> TrySendAndReceiveAsync(string host, int port, string path, byte[] buf, CancellationToken token) private async Task<string> TrySendAndReceiveAsync(string host, int port, string path, byte[] sendBuf, CancellationToken token, int maxMessageBytes)
{ {
try try
{ {
var sock = await GetOrCreateAsync(host, port, path); var entry = await GetOrCreateAsync(host, port, path);
var sock = entry.Socket;
if (sock.State != WebSocketState.Open) if (sock.State != WebSocketState.Open)
throw new WebSocketException("WebSocket is not in an open state"); throw new WebSocketException("WebSocket is not in an open state");
await sock.SendAsync(new ArraySegment<byte>(buf), WebSocketMessageType.Text, true, token); await entry.IoLock.WaitAsync(token);
try
{
// Send message (single-frame; can be split if needed)
await sock.SendAsync(new ArraySegment<byte>(sendBuf), WebSocketMessageType.Text, true, token);
byte[] recv = new byte[4096]; // Receive message until EndOfMessage is reached
var result = await sock.ReceiveAsync(new ArraySegment<byte>(recv), token); var buffer = ArrayPool<byte>.Shared.Rent(8192);
try
{
using (var ms = new MemoryStream())
{
while (true)
{
var res = await sock.ReceiveAsync(new ArraySegment<byte>(buffer), token);
return Encoding.UTF8.GetString(recv, 0, result.Count); if (res.MessageType == WebSocketMessageType.Close)
{
// Server requested closure
try { await sock.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing as requested by server", token); } catch { }
throw new WebSocketException($"WebSocket closed by server: {sock.CloseStatus} {sock.CloseStatusDescription}");
}
if (res.Count > 0)
{
ms.Write(buffer, 0, res.Count);
if (ms.Length > maxMessageBytes)
throw new InvalidOperationException($"Received message exceeds limit ({maxMessageBytes} bytes).");
}
if (res.EndOfMessage)
break;
}
// Convert UTF-8 encoded text message to string
return Encoding.UTF8.GetString(ms.ToArray());
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
finally
{
entry.IoLock.Release();
}
} }
catch (WebSocketException ex) catch (WebSocketException ex)
{ {