Abstract connection management and add serial port support

This commit is contained in:
Namhyeon, Go 2025-10-28 13:30:48 +09:00
parent cab9013f18
commit 6e759008dd
4 changed files with 392 additions and 76 deletions

View File

@ -0,0 +1,147 @@
// ConnectionManagerBase.cs
// SPDX-License-Identifier: GPL-3.0-or-later
// SPDX-FileCopyrightText: 2025 Catswords OSS and WelsonJS Contributors
// https://github.com/gnh1201/welsonjs
//
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace WelsonJS.Launcher
{
/// <summary>
/// Provides a reusable pattern for keeping long-lived connections alive and
/// recreating them transparently when the underlying connection becomes invalid.
/// </summary>
/// <typeparam name="TParameters">A descriptor used to create a unique key for each connection.</typeparam>
/// <typeparam name="TConnection">The concrete connection type.</typeparam>
public abstract class ConnectionManagerBase<TParameters, TConnection>
where TConnection : class
{
private readonly ConcurrentDictionary<string, TConnection> _pool = new ConcurrentDictionary<string, TConnection>();
/// <summary>
/// Creates a unique cache key for the given connection parameters.
/// </summary>
protected abstract string CreateKey(TParameters parameters);
/// <summary>
/// Establishes a new connection using the provided parameters.
/// </summary>
protected abstract Task<TConnection> OpenConnectionAsync(TParameters parameters, CancellationToken token);
/// <summary>
/// Validates whether the existing connection is still usable.
/// </summary>
protected abstract bool IsConnectionValid(TConnection connection);
/// <summary>
/// Releases the resources associated with a connection instance.
/// </summary>
protected virtual void CloseConnection(TConnection connection)
{
if (connection is IDisposable disposable)
{
disposable.Dispose();
}
}
/// <summary>
/// Retrieves a cached connection or creates a new one if needed.
/// </summary>
protected async Task<TConnection> GetOrCreateAsync(TParameters parameters, CancellationToken token)
{
string key = CreateKey(parameters);
if (_pool.TryGetValue(key, out var existing) && IsConnectionValid(existing))
{
return existing;
}
RemoveInternal(key, existing);
var connection = await OpenConnectionAsync(parameters, token).ConfigureAwait(false);
_pool[key] = connection;
return connection;
}
/// <summary>
/// Removes the connection associated with the provided parameters.
/// </summary>
public void Remove(TParameters parameters)
{
string key = CreateKey(parameters);
if (_pool.TryRemove(key, out var connection))
{
CloseSafely(connection);
}
}
/// <summary>
/// Executes an action against the managed connection, retrying once if the first attempt fails.
/// </summary>
protected async Task<TResult> ExecuteWithRetryAsync<TResult>(
TParameters parameters,
Func<TConnection, CancellationToken, Task<TResult>> operation,
int maxAttempts,
CancellationToken token)
{
if (operation == null) throw new ArgumentNullException(nameof(operation));
if (maxAttempts < 1) throw new ArgumentOutOfRangeException(nameof(maxAttempts));
Exception lastError = null;
for (int attempt = 0; attempt < maxAttempts; attempt++)
{
token.ThrowIfCancellationRequested();
var connection = await GetOrCreateAsync(parameters, token).ConfigureAwait(false);
try
{
return await operation(connection, token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
lastError = ex;
Remove(parameters);
if (attempt == maxAttempts - 1)
{
throw;
}
}
}
throw lastError ?? new InvalidOperationException("Unreachable state in ExecuteWithRetryAsync");
}
private void RemoveInternal(string key, TConnection connection)
{
if (!string.IsNullOrEmpty(key))
{
_pool.TryRemove(key, out _);
}
if (connection != null)
{
CloseSafely(connection);
}
}
private void CloseSafely(TConnection connection)
{
try
{
CloseConnection(connection);
}
catch
{
// Ignore dispose exceptions.
}
}
}
}

View File

@ -0,0 +1,183 @@
// SerialPortManager.cs
// SPDX-License-Identifier: GPL-3.0-or-later
// SPDX-FileCopyrightText: 2025 Catswords OSS and WelsonJS Contributors
// https://github.com/gnh1201/welsonjs
//
using System;
using System.IO;
using System.IO.Ports;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace WelsonJS.Launcher
{
public sealed class SerialPortManager : ConnectionManagerBase<SerialPortManager.ConnectionParameters, SerialPort>
{
public struct ConnectionParameters
{
public ConnectionParameters(
string portName,
int baudRate,
Parity parity = Parity.None,
int dataBits = 8,
StopBits stopBits = StopBits.One,
Handshake handshake = Handshake.None,
int readTimeout = 500,
int writeTimeout = 500,
int readBufferSize = 1024)
{
if (string.IsNullOrWhiteSpace(portName)) throw new ArgumentNullException(nameof(portName));
PortName = portName;
BaudRate = baudRate;
Parity = parity;
DataBits = dataBits;
StopBits = stopBits;
Handshake = handshake;
ReadTimeout = readTimeout;
WriteTimeout = writeTimeout;
ReadBufferSize = readBufferSize > 0 ? readBufferSize : 1024;
}
public string PortName { get; }
public int BaudRate { get; }
public Parity Parity { get; }
public int DataBits { get; }
public StopBits StopBits { get; }
public Handshake Handshake { get; }
public int ReadTimeout { get; }
public int WriteTimeout { get; }
public int ReadBufferSize { get; }
}
protected override string CreateKey(ConnectionParameters parameters)
{
return string.Join(",", new object[]
{
parameters.PortName.ToUpperInvariant(),
parameters.BaudRate,
parameters.Parity,
parameters.DataBits,
parameters.StopBits,
parameters.Handshake
});
}
protected override Task<SerialPort> OpenConnectionAsync(ConnectionParameters parameters, CancellationToken token)
{
token.ThrowIfCancellationRequested();
var port = new SerialPort(parameters.PortName, parameters.BaudRate, parameters.Parity, parameters.DataBits, parameters.StopBits)
{
Handshake = parameters.Handshake,
ReadTimeout = parameters.ReadTimeout,
WriteTimeout = parameters.WriteTimeout
};
try
{
port.Open();
return Task.FromResult(port);
}
catch
{
port.Dispose();
throw;
}
}
protected override bool IsConnectionValid(SerialPort connection)
{
return connection != null && connection.IsOpen;
}
protected override void CloseConnection(SerialPort connection)
{
try
{
if (connection != null && connection.IsOpen)
{
connection.Close();
}
}
finally
{
connection?.Dispose();
}
}
public Task<TResult> ExecuteAsync<TResult>(
ConnectionParameters parameters,
Func<SerialPort, CancellationToken, Task<TResult>> operation,
int maxAttempts = 2,
CancellationToken cancellationToken = default)
{
if (operation == null) throw new ArgumentNullException(nameof(operation));
return ExecuteWithRetryAsync(parameters, operation, maxAttempts, cancellationToken);
}
public async Task<string> SendAndReceiveAsync(
ConnectionParameters parameters,
string message,
Encoding encoding,
CancellationToken cancellationToken = default)
{
if (encoding == null) throw new ArgumentNullException(nameof(encoding));
byte[] payload = encoding.GetBytes(message ?? string.Empty);
return await ExecuteWithRetryAsync(
parameters,
(port, token) => SendAndReceiveInternalAsync(port, parameters.ReadBufferSize, payload, encoding, token),
2,
cancellationToken).ConfigureAwait(false);
}
private static async Task<string> SendAndReceiveInternalAsync(
SerialPort port,
int bufferSize,
byte[] payload,
Encoding encoding,
CancellationToken token)
{
port.DiscardInBuffer();
port.DiscardOutBuffer();
if (payload.Length > 0)
{
await Task.Run(() => port.Write(payload, 0, payload.Length), token).ConfigureAwait(false);
}
using (var stream = new MemoryStream())
{
var buffer = new byte[bufferSize];
while (true)
{
try
{
int read = await Task.Run(() => port.Read(buffer, 0, buffer.Length), token).ConfigureAwait(false);
if (read > 0)
{
stream.Write(buffer, 0, read);
if (read < buffer.Length)
{
break;
}
}
else
{
break;
}
}
catch (TimeoutException)
{
break;
}
}
return encoding.GetString(stream.ToArray());
}
}
}
}

View File

@ -4,7 +4,6 @@
// https://github.com/gnh1201/welsonjs
//
using System;
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Security.Cryptography;
using System.Text;
@ -13,86 +12,73 @@ using System.Threading.Tasks;
namespace WelsonJS.Launcher
{
public class WebSocketManager
public sealed class WebSocketManager : ConnectionManagerBase<WebSocketManager.Endpoint, ClientWebSocket>
{
private class Entry
public struct Endpoint
{
public ClientWebSocket Socket;
public string Host;
public int Port;
public string Path;
public Endpoint(string host, int port, string path)
{
Host = host ?? throw new ArgumentNullException(nameof(host));
Port = port;
Path = path ?? string.Empty;
}
public string Host { get; }
public int Port { get; }
public string Path { get; }
}
private readonly ConcurrentDictionary<string, Entry> _pool = new ConcurrentDictionary<string, Entry>();
// Create a unique cache key using MD5 hash
private string MakeKey(string host, int port, string path)
protected override string CreateKey(Endpoint parameters)
{
string raw = host + ":" + port + "/" + path;
string raw = parameters.Host + ":" + parameters.Port + "/" + parameters.Path;
using (var md5 = MD5.Create())
{
byte[] hash = md5.ComputeHash(Encoding.UTF8.GetBytes(raw));
return BitConverter.ToString(hash).Replace("-", "").ToLower();
return BitConverter.ToString(hash).Replace("-", string.Empty).ToLowerInvariant();
}
}
// Get an open WebSocket or connect a new one
public async Task<ClientWebSocket> GetOrCreateAsync(string host, int port, string path)
protected override async Task<ClientWebSocket> OpenConnectionAsync(Endpoint parameters, CancellationToken token)
{
string key = MakeKey(host, port, path);
if (_pool.TryGetValue(key, out var entry))
{
var sock = entry.Socket;
if (sock == null || sock.State != WebSocketState.Open)
{
Remove(host, port, path);
}
else
{
return sock;
}
}
var newSock = new ClientWebSocket();
var uri = new Uri($"ws://{host}:{port}/{path}");
var socket = new ClientWebSocket();
var uri = new Uri($"ws://{parameters.Host}:{parameters.Port}/{parameters.Path}");
try
{
await newSock.ConnectAsync(uri, CancellationToken.None);
_pool[key] = new Entry
{
Socket = newSock,
Host = host,
Port = port,
Path = path
};
return newSock;
await socket.ConnectAsync(uri, token).ConfigureAwait(false);
return socket;
}
catch (Exception ex)
{
newSock.Dispose();
Remove(host, port, path);
socket.Dispose();
throw new WebSocketException("WebSocket connection failed", ex);
}
}
// Remove a socket from the pool and dispose it
protected override bool IsConnectionValid(ClientWebSocket connection)
{
return connection != null && connection.State == WebSocketState.Open;
}
protected override void CloseConnection(ClientWebSocket connection)
{
try
{
connection?.Abort();
}
catch
{
// Ignore abort exceptions.
}
finally
{
connection?.Dispose();
}
}
public void Remove(string host, int port, string path)
{
string key = MakeKey(host, port, path);
if (_pool.TryRemove(key, out var entry))
{
try
{
entry.Socket?.Abort();
entry.Socket?.Dispose();
}
catch { /* Ignore dispose exceptions */ }
}
Remove(new Endpoint(host, port, path));
}
// Send and receive with automatic retry on first failure
@ -103,35 +89,32 @@ namespace WelsonJS.Launcher
? new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSec))
: new CancellationTokenSource();
for (int attempt = 0; attempt < 2; attempt++)
try
{
try
{
return await TrySendAndReceiveAsync(host, port, path, buf, cts.Token);
}
catch
{
Remove(host, port, path);
if (attempt == 1) throw;
}
return await ExecuteWithRetryAsync(
new Endpoint(host, port, path),
(socket, token) => TrySendAndReceiveAsync(socket, buf, token),
2,
cts.Token).ConfigureAwait(false);
}
finally
{
cts.Dispose();
}
throw new InvalidOperationException("Unreachable");
}
// Actual send and receive implementation that never truncates the accumulated data.
// - Uses a fixed-size read buffer ONLY for I/O
// - Accumulates dynamically into a List<byte[]> until EndOfMessage
private async Task<string> TrySendAndReceiveAsync(string host, int port, string path, byte[] buf, CancellationToken token)
private async Task<string> TrySendAndReceiveAsync(ClientWebSocket socket, byte[] buf, CancellationToken token)
{
try
{
var sock = await GetOrCreateAsync(host, port, path);
if (sock.State != WebSocketState.Open)
if (socket.State != WebSocketState.Open)
throw new WebSocketException("WebSocket is not in an open state");
// Send request as a single text frame
await sock.SendAsync(new ArraySegment<byte>(buf), WebSocketMessageType.Text, true, token);
await socket.SendAsync(new ArraySegment<byte>(buf), WebSocketMessageType.Text, true, token).ConfigureAwait(false);
// Fixed-size read buffer for I/O (does NOT cap total message size)
byte[] readBuffer = new byte[8192];
@ -142,12 +125,12 @@ namespace WelsonJS.Launcher
while (true)
{
var res = await sock.ReceiveAsync(new ArraySegment<byte>(readBuffer), token);
var res = await socket.ReceiveAsync(new ArraySegment<byte>(readBuffer), token).ConfigureAwait(false);
if (res.MessageType == WebSocketMessageType.Close)
{
try { await sock.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing as requested by server", token); } catch { }
throw new WebSocketException($"WebSocket closed by server: {sock.CloseStatus} {sock.CloseStatusDescription}");
try { await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing as requested by server", token).ConfigureAwait(false); } catch { }
throw new WebSocketException($"WebSocket closed by server: {socket.CloseStatus} {socket.CloseStatusDescription}");
}
if (res.Count > 0)

View File

@ -80,6 +80,7 @@
<Reference Include="System.Deployment" />
<Reference Include="System.Drawing" />
<Reference Include="System.IO.Compression.FileSystem" />
<Reference Include="System.IO.Ports" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Web" />
<Reference Include="System.Windows.Forms" />
@ -87,6 +88,7 @@
<Reference Include="System.Xml.Linq" />
</ItemGroup>
<ItemGroup>
<Compile Include="ConnectionManagerBase.cs" />
<Compile Include="ICompatibleLogger.cs" />
<Compile Include="IResourceTool.cs" />
<Compile Include="JsCore.cs" />
@ -128,6 +130,7 @@
<DependentUpon>GlobalSettingsForm.cs</DependentUpon>
</Compile>
<Compile Include="ResourceServer.cs" />
<Compile Include="SerialPortManager.cs" />
<Compile Include="TraceLogger.cs" />
<Compile Include="WebSocketManager.cs" />
<EmbeddedResource Include="EnvForm.resx">