From 6e759008dda649c4e9c7ea64a75173bc916b845a Mon Sep 17 00:00:00 2001 From: "Namhyeon, Go" Date: Tue, 28 Oct 2025 13:30:48 +0900 Subject: [PATCH] Abstract connection management and add serial port support --- .../ConnectionManagerBase.cs | 147 ++++++++++++++ .../WelsonJS.Launcher/SerialPortManager.cs | 183 ++++++++++++++++++ .../WelsonJS.Launcher/WebSocketManager.cs | 135 ++++++------- .../WelsonJS.Launcher.csproj | 3 + 4 files changed, 392 insertions(+), 76 deletions(-) create mode 100644 WelsonJS.Toolkit/WelsonJS.Launcher/ConnectionManagerBase.cs create mode 100644 WelsonJS.Toolkit/WelsonJS.Launcher/SerialPortManager.cs diff --git a/WelsonJS.Toolkit/WelsonJS.Launcher/ConnectionManagerBase.cs b/WelsonJS.Toolkit/WelsonJS.Launcher/ConnectionManagerBase.cs new file mode 100644 index 0000000..68fb8c2 --- /dev/null +++ b/WelsonJS.Toolkit/WelsonJS.Launcher/ConnectionManagerBase.cs @@ -0,0 +1,147 @@ +// ConnectionManagerBase.cs +// SPDX-License-Identifier: GPL-3.0-or-later +// SPDX-FileCopyrightText: 2025 Catswords OSS and WelsonJS Contributors +// https://github.com/gnh1201/welsonjs +// +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace WelsonJS.Launcher +{ + /// + /// Provides a reusable pattern for keeping long-lived connections alive and + /// recreating them transparently when the underlying connection becomes invalid. + /// + /// A descriptor used to create a unique key for each connection. + /// The concrete connection type. + public abstract class ConnectionManagerBase + where TConnection : class + { + private readonly ConcurrentDictionary _pool = new ConcurrentDictionary(); + + /// + /// Creates a unique cache key for the given connection parameters. + /// + protected abstract string CreateKey(TParameters parameters); + + /// + /// Establishes a new connection using the provided parameters. + /// + protected abstract Task OpenConnectionAsync(TParameters parameters, CancellationToken token); + + /// + /// Validates whether the existing connection is still usable. + /// + protected abstract bool IsConnectionValid(TConnection connection); + + /// + /// Releases the resources associated with a connection instance. + /// + protected virtual void CloseConnection(TConnection connection) + { + if (connection is IDisposable disposable) + { + disposable.Dispose(); + } + } + + /// + /// Retrieves a cached connection or creates a new one if needed. + /// + protected async Task GetOrCreateAsync(TParameters parameters, CancellationToken token) + { + string key = CreateKey(parameters); + + if (_pool.TryGetValue(key, out var existing) && IsConnectionValid(existing)) + { + return existing; + } + + RemoveInternal(key, existing); + + var connection = await OpenConnectionAsync(parameters, token).ConfigureAwait(false); + _pool[key] = connection; + return connection; + } + + /// + /// Removes the connection associated with the provided parameters. + /// + public void Remove(TParameters parameters) + { + string key = CreateKey(parameters); + if (_pool.TryRemove(key, out var connection)) + { + CloseSafely(connection); + } + } + + /// + /// Executes an action against the managed connection, retrying once if the first attempt fails. + /// + protected async Task ExecuteWithRetryAsync( + TParameters parameters, + Func> operation, + int maxAttempts, + CancellationToken token) + { + if (operation == null) throw new ArgumentNullException(nameof(operation)); + if (maxAttempts < 1) throw new ArgumentOutOfRangeException(nameof(maxAttempts)); + + Exception lastError = null; + + for (int attempt = 0; attempt < maxAttempts; attempt++) + { + token.ThrowIfCancellationRequested(); + var connection = await GetOrCreateAsync(parameters, token).ConfigureAwait(false); + + try + { + return await operation(connection, token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + lastError = ex; + Remove(parameters); + if (attempt == maxAttempts - 1) + { + throw; + } + } + } + + throw lastError ?? new InvalidOperationException("Unreachable state in ExecuteWithRetryAsync"); + } + + private void RemoveInternal(string key, TConnection connection) + { + if (!string.IsNullOrEmpty(key)) + { + _pool.TryRemove(key, out _); + } + + if (connection != null) + { + CloseSafely(connection); + } + } + + private void CloseSafely(TConnection connection) + { + try + { + CloseConnection(connection); + } + catch + { + // Ignore dispose exceptions. + } + } + } +} diff --git a/WelsonJS.Toolkit/WelsonJS.Launcher/SerialPortManager.cs b/WelsonJS.Toolkit/WelsonJS.Launcher/SerialPortManager.cs new file mode 100644 index 0000000..82d2fa3 --- /dev/null +++ b/WelsonJS.Toolkit/WelsonJS.Launcher/SerialPortManager.cs @@ -0,0 +1,183 @@ +// SerialPortManager.cs +// SPDX-License-Identifier: GPL-3.0-or-later +// SPDX-FileCopyrightText: 2025 Catswords OSS and WelsonJS Contributors +// https://github.com/gnh1201/welsonjs +// +using System; +using System.IO; +using System.IO.Ports; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace WelsonJS.Launcher +{ + public sealed class SerialPortManager : ConnectionManagerBase + { + public struct ConnectionParameters + { + public ConnectionParameters( + string portName, + int baudRate, + Parity parity = Parity.None, + int dataBits = 8, + StopBits stopBits = StopBits.One, + Handshake handshake = Handshake.None, + int readTimeout = 500, + int writeTimeout = 500, + int readBufferSize = 1024) + { + if (string.IsNullOrWhiteSpace(portName)) throw new ArgumentNullException(nameof(portName)); + + PortName = portName; + BaudRate = baudRate; + Parity = parity; + DataBits = dataBits; + StopBits = stopBits; + Handshake = handshake; + ReadTimeout = readTimeout; + WriteTimeout = writeTimeout; + ReadBufferSize = readBufferSize > 0 ? readBufferSize : 1024; + } + + public string PortName { get; } + public int BaudRate { get; } + public Parity Parity { get; } + public int DataBits { get; } + public StopBits StopBits { get; } + public Handshake Handshake { get; } + public int ReadTimeout { get; } + public int WriteTimeout { get; } + public int ReadBufferSize { get; } + } + + protected override string CreateKey(ConnectionParameters parameters) + { + return string.Join(",", new object[] + { + parameters.PortName.ToUpperInvariant(), + parameters.BaudRate, + parameters.Parity, + parameters.DataBits, + parameters.StopBits, + parameters.Handshake + }); + } + + protected override Task OpenConnectionAsync(ConnectionParameters parameters, CancellationToken token) + { + token.ThrowIfCancellationRequested(); + + var port = new SerialPort(parameters.PortName, parameters.BaudRate, parameters.Parity, parameters.DataBits, parameters.StopBits) + { + Handshake = parameters.Handshake, + ReadTimeout = parameters.ReadTimeout, + WriteTimeout = parameters.WriteTimeout + }; + + try + { + port.Open(); + return Task.FromResult(port); + } + catch + { + port.Dispose(); + throw; + } + } + + protected override bool IsConnectionValid(SerialPort connection) + { + return connection != null && connection.IsOpen; + } + + protected override void CloseConnection(SerialPort connection) + { + try + { + if (connection != null && connection.IsOpen) + { + connection.Close(); + } + } + finally + { + connection?.Dispose(); + } + } + + public Task ExecuteAsync( + ConnectionParameters parameters, + Func> operation, + int maxAttempts = 2, + CancellationToken cancellationToken = default) + { + if (operation == null) throw new ArgumentNullException(nameof(operation)); + return ExecuteWithRetryAsync(parameters, operation, maxAttempts, cancellationToken); + } + + public async Task SendAndReceiveAsync( + ConnectionParameters parameters, + string message, + Encoding encoding, + CancellationToken cancellationToken = default) + { + if (encoding == null) throw new ArgumentNullException(nameof(encoding)); + byte[] payload = encoding.GetBytes(message ?? string.Empty); + + return await ExecuteWithRetryAsync( + parameters, + (port, token) => SendAndReceiveInternalAsync(port, parameters.ReadBufferSize, payload, encoding, token), + 2, + cancellationToken).ConfigureAwait(false); + } + + private static async Task SendAndReceiveInternalAsync( + SerialPort port, + int bufferSize, + byte[] payload, + Encoding encoding, + CancellationToken token) + { + port.DiscardInBuffer(); + port.DiscardOutBuffer(); + + if (payload.Length > 0) + { + await Task.Run(() => port.Write(payload, 0, payload.Length), token).ConfigureAwait(false); + } + + using (var stream = new MemoryStream()) + { + var buffer = new byte[bufferSize]; + + while (true) + { + try + { + int read = await Task.Run(() => port.Read(buffer, 0, buffer.Length), token).ConfigureAwait(false); + if (read > 0) + { + stream.Write(buffer, 0, read); + if (read < buffer.Length) + { + break; + } + } + else + { + break; + } + } + catch (TimeoutException) + { + break; + } + } + + return encoding.GetString(stream.ToArray()); + } + } + } +} diff --git a/WelsonJS.Toolkit/WelsonJS.Launcher/WebSocketManager.cs b/WelsonJS.Toolkit/WelsonJS.Launcher/WebSocketManager.cs index 4d953c3..f34ebcb 100644 --- a/WelsonJS.Toolkit/WelsonJS.Launcher/WebSocketManager.cs +++ b/WelsonJS.Toolkit/WelsonJS.Launcher/WebSocketManager.cs @@ -4,7 +4,6 @@ // https://github.com/gnh1201/welsonjs // using System; -using System.Collections.Concurrent; using System.Net.WebSockets; using System.Security.Cryptography; using System.Text; @@ -13,86 +12,73 @@ using System.Threading.Tasks; namespace WelsonJS.Launcher { - public class WebSocketManager + public sealed class WebSocketManager : ConnectionManagerBase { - private class Entry + public struct Endpoint { - public ClientWebSocket Socket; - public string Host; - public int Port; - public string Path; + public Endpoint(string host, int port, string path) + { + Host = host ?? throw new ArgumentNullException(nameof(host)); + Port = port; + Path = path ?? string.Empty; + } + + public string Host { get; } + public int Port { get; } + public string Path { get; } } - private readonly ConcurrentDictionary _pool = new ConcurrentDictionary(); - - // Create a unique cache key using MD5 hash - private string MakeKey(string host, int port, string path) + protected override string CreateKey(Endpoint parameters) { - string raw = host + ":" + port + "/" + path; + string raw = parameters.Host + ":" + parameters.Port + "/" + parameters.Path; using (var md5 = MD5.Create()) { byte[] hash = md5.ComputeHash(Encoding.UTF8.GetBytes(raw)); - return BitConverter.ToString(hash).Replace("-", "").ToLower(); + return BitConverter.ToString(hash).Replace("-", string.Empty).ToLowerInvariant(); } } - // Get an open WebSocket or connect a new one - public async Task GetOrCreateAsync(string host, int port, string path) + protected override async Task OpenConnectionAsync(Endpoint parameters, CancellationToken token) { - string key = MakeKey(host, port, path); - - if (_pool.TryGetValue(key, out var entry)) - { - var sock = entry.Socket; - - if (sock == null || sock.State != WebSocketState.Open) - { - Remove(host, port, path); - } - else - { - return sock; - } - } - - var newSock = new ClientWebSocket(); - var uri = new Uri($"ws://{host}:{port}/{path}"); + var socket = new ClientWebSocket(); + var uri = new Uri($"ws://{parameters.Host}:{parameters.Port}/{parameters.Path}"); try { - await newSock.ConnectAsync(uri, CancellationToken.None); - - _pool[key] = new Entry - { - Socket = newSock, - Host = host, - Port = port, - Path = path - }; - - return newSock; + await socket.ConnectAsync(uri, token).ConfigureAwait(false); + return socket; } catch (Exception ex) { - newSock.Dispose(); - Remove(host, port, path); + socket.Dispose(); throw new WebSocketException("WebSocket connection failed", ex); } } - // Remove a socket from the pool and dispose it + protected override bool IsConnectionValid(ClientWebSocket connection) + { + return connection != null && connection.State == WebSocketState.Open; + } + + protected override void CloseConnection(ClientWebSocket connection) + { + try + { + connection?.Abort(); + } + catch + { + // Ignore abort exceptions. + } + finally + { + connection?.Dispose(); + } + } + public void Remove(string host, int port, string path) { - string key = MakeKey(host, port, path); - if (_pool.TryRemove(key, out var entry)) - { - try - { - entry.Socket?.Abort(); - entry.Socket?.Dispose(); - } - catch { /* Ignore dispose exceptions */ } - } + Remove(new Endpoint(host, port, path)); } // Send and receive with automatic retry on first failure @@ -103,35 +89,32 @@ namespace WelsonJS.Launcher ? new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSec)) : new CancellationTokenSource(); - for (int attempt = 0; attempt < 2; attempt++) + try { - try - { - return await TrySendAndReceiveAsync(host, port, path, buf, cts.Token); - } - catch - { - Remove(host, port, path); - if (attempt == 1) throw; - } + return await ExecuteWithRetryAsync( + new Endpoint(host, port, path), + (socket, token) => TrySendAndReceiveAsync(socket, buf, token), + 2, + cts.Token).ConfigureAwait(false); + } + finally + { + cts.Dispose(); } - - throw new InvalidOperationException("Unreachable"); } // Actual send and receive implementation that never truncates the accumulated data. // - Uses a fixed-size read buffer ONLY for I/O // - Accumulates dynamically into a List until EndOfMessage - private async Task TrySendAndReceiveAsync(string host, int port, string path, byte[] buf, CancellationToken token) + private async Task TrySendAndReceiveAsync(ClientWebSocket socket, byte[] buf, CancellationToken token) { try { - var sock = await GetOrCreateAsync(host, port, path); - if (sock.State != WebSocketState.Open) + if (socket.State != WebSocketState.Open) throw new WebSocketException("WebSocket is not in an open state"); // Send request as a single text frame - await sock.SendAsync(new ArraySegment(buf), WebSocketMessageType.Text, true, token); + await socket.SendAsync(new ArraySegment(buf), WebSocketMessageType.Text, true, token).ConfigureAwait(false); // Fixed-size read buffer for I/O (does NOT cap total message size) byte[] readBuffer = new byte[8192]; @@ -142,12 +125,12 @@ namespace WelsonJS.Launcher while (true) { - var res = await sock.ReceiveAsync(new ArraySegment(readBuffer), token); + var res = await socket.ReceiveAsync(new ArraySegment(readBuffer), token).ConfigureAwait(false); if (res.MessageType == WebSocketMessageType.Close) { - try { await sock.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing as requested by server", token); } catch { } - throw new WebSocketException($"WebSocket closed by server: {sock.CloseStatus} {sock.CloseStatusDescription}"); + try { await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing as requested by server", token).ConfigureAwait(false); } catch { } + throw new WebSocketException($"WebSocket closed by server: {socket.CloseStatus} {socket.CloseStatusDescription}"); } if (res.Count > 0) diff --git a/WelsonJS.Toolkit/WelsonJS.Launcher/WelsonJS.Launcher.csproj b/WelsonJS.Toolkit/WelsonJS.Launcher/WelsonJS.Launcher.csproj index 5bc8b61..e1fc1f4 100644 --- a/WelsonJS.Toolkit/WelsonJS.Launcher/WelsonJS.Launcher.csproj +++ b/WelsonJS.Toolkit/WelsonJS.Launcher/WelsonJS.Launcher.csproj @@ -80,6 +80,7 @@ + @@ -87,6 +88,7 @@ + @@ -128,6 +130,7 @@ GlobalSettingsForm.cs +