You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

473 lines
15 KiB

using Avalonia.Threading;
using Microsoft.Extensions.Logging;
using ReactiveUI;
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.ObjectModel;
using System.IO;
using System.IO.Hashing;
using System.Net;
using System.Net.Sockets;
using System.Reactive;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace AuroraDesk.Presentation.ViewModels.Pages;
public sealed class TcpClientSessionViewModel : ReactiveObject, IDisposable
{
private const int MaxFrameLength = 1024 * 1024; // 1 MB 安全上限
private readonly ILogger<TcpClientSessionViewModel>? _logger;
private TcpClient? _tcpClient;
private NetworkStream? _networkStream;
private CancellationTokenSource? _receiveCancellationTokenSource;
private int _receivedMessageIndex;
private string _sessionName;
private string _serverIp = "127.0.0.1";
private int _serverPort = 5000;
private int _localPort;
private bool _enableCrc;
private int _frameType = 1;
private bool _isConnected;
private bool _isAutoScrollToBottom = true;
private string _statusMessage = "未连接";
private string _message = string.Empty;
private bool _isBusy;
public TcpClientSessionViewModel(
string sessionName,
ILogger<TcpClientSessionViewModel>? logger = null)
{
_sessionName = sessionName;
_logger = logger;
SentMessages = new ObservableCollection<string>();
ReceivedMessages = new ObservableCollection<TcpReceivedMessageEntry>();
ConnectCommand = ReactiveCommand.CreateFromTask(ConnectAsync,
this.WhenAnyValue(x => x.IsConnected, x => x.IsBusy,
(connected, busy) => !connected && !busy));
DisconnectCommand = ReactiveCommand.Create(Disconnect,
this.WhenAnyValue(x => x.IsConnected, x => x.IsBusy,
(connected, busy) => connected && !busy));
SendMessageCommand = ReactiveCommand.CreateFromTask(SendMessageAsync,
this.WhenAnyValue(x => x.IsConnected, x => x.Message, x => x.IsBusy,
(connected, message, busy) => connected && !busy && !string.IsNullOrWhiteSpace(message)));
ReconnectCommand = ReactiveCommand.CreateFromTask(ReconnectAsync,
this.WhenAnyValue(x => x.IsBusy, busy => !busy));
ClearMessagesCommand = ReactiveCommand.Create(ClearMessages);
}
public ObservableCollection<string> SentMessages { get; }
public ObservableCollection<TcpReceivedMessageEntry> ReceivedMessages { get; }
public ReactiveCommand<Unit, Unit> ConnectCommand { get; }
public ReactiveCommand<Unit, Unit> DisconnectCommand { get; }
public ReactiveCommand<Unit, Unit> SendMessageCommand { get; }
public ReactiveCommand<Unit, Unit> ReconnectCommand { get; }
public ReactiveCommand<Unit, Unit> ClearMessagesCommand { get; }
public string SessionName
{
get => _sessionName;
set => this.RaiseAndSetIfChanged(ref _sessionName, value);
}
public string ServerIp
{
get => _serverIp;
set => this.RaiseAndSetIfChanged(ref _serverIp, value);
}
public int ServerPort
{
get => _serverPort;
set => this.RaiseAndSetIfChanged(ref _serverPort, value);
}
public int LocalPort
{
get => _localPort;
private set => this.RaiseAndSetIfChanged(ref _localPort, value);
}
public bool EnableCrc
{
get => _enableCrc;
set => this.RaiseAndSetIfChanged(ref _enableCrc, value);
}
public int FrameType
{
get => _frameType;
set
{
var clamped = Math.Clamp(value, 0, 0x7FFF);
this.RaiseAndSetIfChanged(ref _frameType, clamped);
}
}
public bool IsConnected
{
get => _isConnected;
private set => this.RaiseAndSetIfChanged(ref _isConnected, value);
}
public bool IsAutoScrollToBottom
{
get => _isAutoScrollToBottom;
set => this.RaiseAndSetIfChanged(ref _isAutoScrollToBottom, value);
}
public string StatusMessage
{
get => _statusMessage;
private set => this.RaiseAndSetIfChanged(ref _statusMessage, value);
}
public string Message
{
get => _message;
set => this.RaiseAndSetIfChanged(ref _message, value);
}
public bool IsBusy
{
get => _isBusy;
private set => this.RaiseAndSetIfChanged(ref _isBusy, value);
}
public event EventHandler? MetricsUpdated;
private async Task ConnectAsync()
{
if (IsConnected || IsBusy)
{
return;
}
try
{
IsBusy = true;
if (!IPAddress.TryParse(ServerIp, out _))
{
StatusMessage = $"无效的服务器地址: {ServerIp}";
return;
}
if (ServerPort is < 1 or > 65535)
{
StatusMessage = $"无效的端口号: {ServerPort}";
return;
}
var client = new TcpClient();
await client.ConnectAsync(ServerIp, ServerPort);
_tcpClient = client;
_networkStream = client.GetStream();
LocalPort = ((IPEndPoint)client.Client.LocalEndPoint!).Port;
IsConnected = true;
StatusMessage = $"已连接至 {ServerIp}:{ServerPort} (本地端口 {LocalPort})";
_logger?.LogInformation("TCP 会话 {Session} 已连接到 {Server}:{Port}", SessionName, ServerIp, ServerPort);
_receiveCancellationTokenSource = new CancellationTokenSource();
_ = Task.Run(() => ReceiveLoopAsync(_receiveCancellationTokenSource.Token));
}
catch (Exception ex)
{
_logger?.LogError(ex, "TCP 会话 {Session} 连接失败", SessionName);
StatusMessage = $"连接失败: {ex.Message}";
DisconnectInternal();
}
finally
{
IsBusy = false;
}
}
private void Disconnect()
{
if (!IsConnected)
{
return;
}
DisconnectInternal();
StatusMessage = "已断开连接";
_logger?.LogInformation("TCP 会话 {Session} 已断开", SessionName);
}
private async Task ReconnectAsync()
{
if (IsBusy)
{
return;
}
DisconnectInternal();
await ConnectAsync();
}
private async Task SendMessageAsync()
{
if (!IsConnected || _networkStream is null)
{
StatusMessage = "尚未连接,无法发送";
return;
}
try
{
IsBusy = true;
var payloadBytes = Encoding.UTF8.GetBytes(Message);
var hasPayload = payloadBytes.Length > 0;
var hasCrc = EnableCrc;
var baseType = (ushort)FrameType;
var frameType = hasCrc ? (ushort)(baseType | 0x8000) : baseType;
var frameLength = 2 + payloadBytes.Length + (hasCrc ? 4 : 0);
if (frameLength > MaxFrameLength)
{
StatusMessage = $"帧长度超出限制 ({frameLength} 字节)";
return;
}
var totalLength = 4 + frameLength;
var buffer = ArrayPool<byte>.Shared.Rent(totalLength);
try
{
var span = buffer.AsSpan(0, totalLength);
BinaryPrimitives.WriteInt32BigEndian(span[..4], frameLength);
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(4, 2), frameType);
if (hasPayload)
{
payloadBytes.AsSpan().CopyTo(span.Slice(6, payloadBytes.Length));
}
if (hasCrc)
{
var crc = Crc32.HashToUInt32(payloadBytes);
BinaryPrimitives.WriteUInt32BigEndian(span.Slice(6 + payloadBytes.Length, 4), crc);
}
await _networkStream.WriteAsync(buffer.AsMemory(0, totalLength));
await _networkStream.FlushAsync();
var timestamp = DateTime.Now.ToString("HH:mm:ss");
var payloadPreview = hasPayload ? Encoding.UTF8.GetString(payloadBytes) : string.Empty;
var hexPreview = payloadBytes.Length > 0 ? BitConverter.ToString(payloadBytes).Replace("-", " ") : "无";
var displayMessage = $"[{timestamp}] 类型=0x{frameType:X4} CRC={(hasCrc ? "" : "")} PayloadLen={payloadBytes.Length} 文本=\"{payloadPreview}\" HEX={hexPreview}";
await Dispatcher.UIThread.InvokeAsync(() =>
{
SentMessages.Add(displayMessage);
Message = string.Empty;
MetricsUpdated?.Invoke(this, EventArgs.Empty);
});
StatusMessage = "发送成功";
_logger?.LogInformation("TCP 会话 {Session} 已发送 {Length} 字节,类型 0x{Type:X4}", SessionName, payloadBytes.Length, frameType);
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
catch (Exception ex)
{
_logger?.LogError(ex, "TCP 会话 {Session} 发送失败", SessionName);
StatusMessage = $"发送失败: {ex.Message}";
}
finally
{
IsBusy = false;
}
}
private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
{
if (_networkStream is null)
{
return;
}
var lengthBuffer = new byte[4];
try
{
while (!cancellationToken.IsCancellationRequested)
{
var lengthRead = await ReadExactAsync(_networkStream, lengthBuffer, cancellationToken);
if (!lengthRead)
{
break;
}
var frameLength = BinaryPrimitives.ReadInt32BigEndian(lengthBuffer);
if (frameLength <= 2 || frameLength > MaxFrameLength)
{
await AppendSystemMessageAsync($"收到非法帧长度: {frameLength}");
continue;
}
var frameBuffer = ArrayPool<byte>.Shared.Rent(frameLength);
try
{
var success = await ReadExactAsync(_networkStream, frameBuffer.AsMemory(0, frameLength), cancellationToken);
if (!success)
{
break;
}
var frameSpan = frameBuffer.AsSpan(0, frameLength);
var rawType = BinaryPrimitives.ReadUInt16BigEndian(frameSpan[..2]);
var hasCrc = (rawType & 0x8000) != 0;
var baseType = (ushort)(rawType & 0x7FFF);
var payloadLength = frameLength - 2 - (hasCrc ? 4 : 0);
if (payloadLength < 0)
{
await AppendSystemMessageAsync($"帧格式错误,payload 长度为负: {payloadLength}");
continue;
}
var payloadSpan = frameSpan.Slice(2, payloadLength);
var timestamp = DateTime.Now.ToString("HH:mm:ss");
var payloadText = payloadSpan.Length > 0 ? Encoding.UTF8.GetString(payloadSpan) : string.Empty;
var payloadHex = payloadSpan.Length > 0 ? BitConverter.ToString(payloadSpan.ToArray()).Replace("-", " ") : "无";
string crcInfo = "无";
if (hasCrc)
{
var crcSpan = frameSpan.Slice(2 + payloadLength, 4);
var receivedCrc = BinaryPrimitives.ReadUInt32BigEndian(crcSpan);
var calculatedCrc = Crc32.HashToUInt32(payloadSpan);
crcInfo = $"0x{receivedCrc:X8} (校验{(receivedCrc == calculatedCrc ? "" : "")})";
}
var displayText = $"[{timestamp}] 类型=0x{rawType:X4} 基础=0x{baseType:X4} CRC={crcInfo} PayloadLen={payloadSpan.Length} 文本=\"{payloadText}\" HEX={payloadHex}";
var index = Interlocked.Increment(ref _receivedMessageIndex);
await Dispatcher.UIThread.InvokeAsync(() =>
{
ReceivedMessages.Add(new TcpReceivedMessageEntry(index, displayText));
MetricsUpdated?.Invoke(this, EventArgs.Empty);
});
}
finally
{
ArrayPool<byte>.Shared.Return(frameBuffer);
}
}
}
catch (OperationCanceledException)
{
// 忽略取消
}
catch (IOException ex)
{
_logger?.LogWarning(ex, "TCP 会话 {Session} 读取时发生 IO 异常", SessionName);
await AppendSystemMessageAsync($"读取失败: {ex.Message}");
}
catch (Exception ex)
{
_logger?.LogError(ex, "TCP 会话 {Session} 接收数据出错", SessionName);
await AppendSystemMessageAsync($"接收异常: {ex.Message}");
}
finally
{
DisconnectInternal();
await Dispatcher.UIThread.InvokeAsync(() =>
{
StatusMessage = "连接已关闭";
});
}
}
private async Task<bool> ReadExactAsync(NetworkStream stream, Memory<byte> buffer, CancellationToken token)
{
var totalRead = 0;
while (totalRead < buffer.Length)
{
var read = await stream.ReadAsync(buffer.Slice(totalRead), token);
if (read == 0)
{
return false;
}
totalRead += read;
}
return true;
}
private async Task AppendSystemMessageAsync(string message)
{
await Dispatcher.UIThread.InvokeAsync(() =>
{
var index = Interlocked.Increment(ref _receivedMessageIndex);
ReceivedMessages.Add(new TcpReceivedMessageEntry(index, $"[系统] {message}"));
MetricsUpdated?.Invoke(this, EventArgs.Empty);
});
}
private void ClearMessages()
{
SentMessages.Clear();
ReceivedMessages.Clear();
Interlocked.Exchange(ref _receivedMessageIndex, 0);
MetricsUpdated?.Invoke(this, EventArgs.Empty);
_logger?.LogInformation("TCP 会话 {Session} 已清空消息", SessionName);
}
private void DisconnectInternal()
{
try
{
IsBusy = true;
_receiveCancellationTokenSource?.Cancel();
_receiveCancellationTokenSource?.Dispose();
_receiveCancellationTokenSource = null;
_networkStream?.Close();
_networkStream?.Dispose();
_networkStream = null;
_tcpClient?.Close();
_tcpClient?.Dispose();
_tcpClient = null;
IsConnected = false;
}
catch (Exception ex)
{
_logger?.LogError(ex, "TCP 会话 {Session} 断开连接时出错", SessionName);
}
finally
{
IsBusy = false;
}
}
public void Dispose()
{
DisconnectInternal();
}
}
public sealed record TcpReceivedMessageEntry(int Index, string DisplayText);