mirror of
https://github.com/gnh1201/welsonjs.git
synced 2025-10-31 21:07:30 +00:00
Merge pull request #313 from gnh1201/revert-310-dev
Revert "Update WebSocketManager.cs"
This commit is contained in:
commit
6bf280d38f
|
|
@ -2,11 +2,9 @@
|
|||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
// SPDX-FileCopyrightText: 2025 Catswords OSS and WelsonJS Contributors
|
||||
// https://github.com/gnh1201/welsonjs
|
||||
//
|
||||
//
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.Collections.Concurrent;
|
||||
using System.IO;
|
||||
using System.Net.WebSockets;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
|
|
@ -23,8 +21,6 @@ namespace WelsonJS.Launcher
|
|||
public string Host;
|
||||
public int Port;
|
||||
public string Path;
|
||||
// Ensures that send/receive is serialized per socket
|
||||
public readonly SemaphoreSlim IoLock = new SemaphoreSlim(1, 1);
|
||||
}
|
||||
|
||||
private readonly ConcurrentDictionary<string, Entry> _pool = new ConcurrentDictionary<string, Entry>();
|
||||
|
|
@ -40,36 +36,33 @@ namespace WelsonJS.Launcher
|
|||
}
|
||||
}
|
||||
|
||||
// Get an existing open WebSocket entry or create a new one
|
||||
private async Task<Entry> GetOrCreateAsync(string host, int port, string path)
|
||||
// Get an open WebSocket or connect a new one
|
||||
public async Task<ClientWebSocket> GetOrCreateAsync(string host, int port, string path)
|
||||
{
|
||||
string key = MakeKey(host, port, path);
|
||||
|
||||
if (_pool.TryGetValue(key, out var entry))
|
||||
{
|
||||
var sock = entry.Socket;
|
||||
if (sock != null && sock.State == WebSocketState.Open)
|
||||
return entry;
|
||||
|
||||
Remove(host, port, path);
|
||||
if (sock == null || sock.State != WebSocketState.Open)
|
||||
{
|
||||
Remove(host, port, path);
|
||||
}
|
||||
else
|
||||
{
|
||||
return sock;
|
||||
}
|
||||
}
|
||||
|
||||
var newSock = new ClientWebSocket();
|
||||
|
||||
// Build the WebSocket URI safely
|
||||
var ub = new UriBuilder
|
||||
{
|
||||
Scheme = "ws",
|
||||
Host = host,
|
||||
Port = port,
|
||||
Path = string.IsNullOrEmpty(path) ? "/" : (path.StartsWith("/") ? path : "/" + path)
|
||||
};
|
||||
var uri = new Uri($"ws://{host}:{port}/{path}");
|
||||
|
||||
try
|
||||
{
|
||||
await newSock.ConnectAsync(ub.Uri, CancellationToken.None);
|
||||
await newSock.ConnectAsync(uri, CancellationToken.None);
|
||||
|
||||
var newEntry = new Entry
|
||||
_pool[key] = new Entry
|
||||
{
|
||||
Socket = newSock,
|
||||
Host = host,
|
||||
|
|
@ -77,8 +70,7 @@ namespace WelsonJS.Launcher
|
|||
Path = path
|
||||
};
|
||||
|
||||
_pool[key] = newEntry;
|
||||
return newEntry;
|
||||
return newSock;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
|
@ -100,27 +92,22 @@ namespace WelsonJS.Launcher
|
|||
entry.Socket?.Dispose();
|
||||
}
|
||||
catch { /* Ignore dispose exceptions */ }
|
||||
finally
|
||||
{
|
||||
try { entry.IoLock?.Dispose(); } catch { }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send a message and receive a response, with automatic retry on first failure
|
||||
public async Task<string> SendAndReceiveAsync(string host, int port, string path, string message, int timeoutSec, int maxMessageBytes = 8 * 1024 * 1024)
|
||||
// Send and receive with automatic retry on first failure
|
||||
public async Task<string> SendAndReceiveAsync(string host, int port, string path, string message, int timeoutSec)
|
||||
{
|
||||
byte[] buf = Encoding.UTF8.GetBytes(message);
|
||||
var cts = timeoutSec > 0
|
||||
? new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSec))
|
||||
: new CancellationTokenSource();
|
||||
|
||||
byte[] buf = Encoding.UTF8.GetBytes(message);
|
||||
|
||||
for (int attempt = 0; attempt < 2; attempt++)
|
||||
{
|
||||
try
|
||||
{
|
||||
return await TrySendAndReceiveAsync(host, port, path, buf, cts.Token, maxMessageBytes);
|
||||
return await TrySendAndReceiveAsync(host, port, path, buf, cts.Token);
|
||||
}
|
||||
catch
|
||||
{
|
||||
|
|
@ -132,65 +119,22 @@ namespace WelsonJS.Launcher
|
|||
throw new InvalidOperationException("Unreachable");
|
||||
}
|
||||
|
||||
// Actual send/receive logic with full-frame accumulation until EndOfMessage
|
||||
private async Task<string> TrySendAndReceiveAsync(string host, int port, string path, byte[] sendBuf, CancellationToken token, int maxMessageBytes)
|
||||
// Actual send and receive implementation
|
||||
private async Task<string> TrySendAndReceiveAsync(string host, int port, string path, byte[] buf, CancellationToken token)
|
||||
{
|
||||
try
|
||||
{
|
||||
var entry = await GetOrCreateAsync(host, port, path);
|
||||
var sock = entry.Socket;
|
||||
var sock = await GetOrCreateAsync(host, port, path);
|
||||
|
||||
if (sock.State != WebSocketState.Open)
|
||||
throw new WebSocketException("WebSocket is not in an open state");
|
||||
|
||||
await entry.IoLock.WaitAsync(token);
|
||||
try
|
||||
{
|
||||
// Send message (single-frame; can be split if needed)
|
||||
await sock.SendAsync(new ArraySegment<byte>(sendBuf), WebSocketMessageType.Text, true, token);
|
||||
await sock.SendAsync(new ArraySegment<byte>(buf), WebSocketMessageType.Text, true, token);
|
||||
|
||||
// Receive message until EndOfMessage is reached
|
||||
var buffer = ArrayPool<byte>.Shared.Rent(8192);
|
||||
try
|
||||
{
|
||||
using (var ms = new MemoryStream())
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
var res = await sock.ReceiveAsync(new ArraySegment<byte>(buffer), token);
|
||||
byte[] recv = new byte[4096];
|
||||
var result = await sock.ReceiveAsync(new ArraySegment<byte>(recv), token);
|
||||
|
||||
if (res.MessageType == WebSocketMessageType.Close)
|
||||
{
|
||||
// Server requested closure
|
||||
try { await sock.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing as requested by server", token); } catch { }
|
||||
throw new WebSocketException($"WebSocket closed by server: {sock.CloseStatus} {sock.CloseStatusDescription}");
|
||||
}
|
||||
|
||||
if (res.Count > 0)
|
||||
{
|
||||
ms.Write(buffer, 0, res.Count);
|
||||
|
||||
if (ms.Length > maxMessageBytes)
|
||||
throw new InvalidOperationException($"Received message exceeds limit ({maxMessageBytes} bytes).");
|
||||
}
|
||||
|
||||
if (res.EndOfMessage)
|
||||
break;
|
||||
}
|
||||
|
||||
// Convert UTF-8 encoded text message to string
|
||||
return Encoding.UTF8.GetString(ms.ToArray());
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(buffer);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
entry.IoLock.Release();
|
||||
}
|
||||
return Encoding.UTF8.GetString(recv, 0, result.Count);
|
||||
}
|
||||
catch (WebSocketException ex)
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user