using System.Threading.Channels; using CoreAgent.WebSocketTransport.Interfaces; namespace CoreAgent.WebSocketTransport.Services; /// /// 基于 Channel 的消息通道实现 /// 提供线程安全的消息队列操作 /// /// 消息类型 public class ChannelMessageChannel : IMessageChannel { private readonly Channel _channel; private readonly ChannelWriter _writer; private readonly ChannelReader _reader; private volatile bool _disposed; private readonly object _disposeLock = new object(); public int Capacity { get; } public int Count => _channel.Reader.Count; public bool IsCompleted => _channel.Reader.Completion.IsCompleted; /// /// 构造函数 /// /// 通道容量 /// 队列满时的处理模式 public ChannelMessageChannel(int capacity, BoundedChannelFullMode fullMode = BoundedChannelFullMode.DropOldest) { if (capacity <= 0) throw new ArgumentOutOfRangeException(nameof(capacity), "容量必须大于0"); Capacity = capacity; var options = new BoundedChannelOptions(capacity) { SingleWriter = false, // 允许多个写入者 SingleReader = true, // 只允许一个读取者 FullMode = fullMode, AllowSynchronousContinuations = false }; _channel = Channel.CreateBounded(options); _writer = _channel.Writer; _reader = _channel.Reader; } /// /// 异步写入消息 /// public ValueTask WriteAsync(T message, CancellationToken cancellationToken = default) { ThrowIfDisposed(); if (message == null) throw new ArgumentNullException(nameof(message)); return _writer.WriteAsync(message, cancellationToken); } /// /// 尝试写入消息(非阻塞) /// public bool TryWrite(T message) { ThrowIfDisposed(); if (message == null) throw new ArgumentNullException(nameof(message)); return _writer.TryWrite(message); } /// /// 异步读取消息 /// public ValueTask ReadAsync(CancellationToken cancellationToken = default) { ThrowIfDisposed(); return _reader.ReadAsync(cancellationToken); } /// /// 尝试读取消息(非阻塞) /// public bool TryRead(out T message) { ThrowIfDisposed(); return _reader.TryRead(out message); } /// /// 等待消息可读 /// public ValueTask WaitToReadAsync(CancellationToken cancellationToken = default) { ThrowIfDisposed(); return _reader.WaitToReadAsync(cancellationToken); } /// /// 完成通道,不再接受新消息 /// public void Complete() { ThrowIfDisposed(); try { _writer.Complete(); } catch (InvalidOperationException) { // 通道可能已经完成,忽略异常 } } /// /// 清空队列中的所有消息 /// public void Clear() { ThrowIfDisposed(); try { // 读取并丢弃所有消息,直到队列为空 while (_reader.TryRead(out _)) { // 继续读取直到没有更多消息 } } catch (InvalidOperationException) { // 通道可能已完成,忽略异常 } } /// /// 释放资源 /// public void Dispose() { lock (_disposeLock) { if (!_disposed) { _disposed = true; try { // 完成写入器 if (!_writer.TryComplete()) { _writer.Complete(); } } catch (InvalidOperationException) { // 忽略已完成的异常 } catch (Exception) { // 记录其他异常但不抛出 } GC.SuppressFinalize(this); } } } /// /// 检查是否已释放 /// private void ThrowIfDisposed() { if (_disposed) { throw new ObjectDisposedException(nameof(ChannelMessageChannel)); } } }