using System.Threading.Channels;
using CoreAgent.WebSocketTransport.Interfaces;
namespace CoreAgent.WebSocketTransport.Services;
///
/// 基于 Channel 的消息通道实现
/// 提供线程安全的消息队列操作
///
/// 消息类型
public class ChannelMessageChannel : IMessageChannel
{
private readonly Channel _channel;
private readonly ChannelWriter _writer;
private readonly ChannelReader _reader;
private volatile bool _disposed;
private readonly object _disposeLock = new object();
public int Capacity { get; }
public int Count => _channel.Reader.Count;
public bool IsCompleted => _channel.Reader.Completion.IsCompleted;
///
/// 构造函数
///
/// 通道容量
/// 队列满时的处理模式
public ChannelMessageChannel(int capacity, BoundedChannelFullMode fullMode = BoundedChannelFullMode.DropOldest)
{
if (capacity <= 0)
throw new ArgumentOutOfRangeException(nameof(capacity), "容量必须大于0");
Capacity = capacity;
var options = new BoundedChannelOptions(capacity)
{
SingleWriter = false, // 允许多个写入者
SingleReader = true, // 只允许一个读取者
FullMode = fullMode,
AllowSynchronousContinuations = false
};
_channel = Channel.CreateBounded(options);
_writer = _channel.Writer;
_reader = _channel.Reader;
}
///
/// 异步写入消息
///
public ValueTask WriteAsync(T message, CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
if (message == null)
throw new ArgumentNullException(nameof(message));
return _writer.WriteAsync(message, cancellationToken);
}
///
/// 尝试写入消息(非阻塞)
///
public bool TryWrite(T message)
{
ThrowIfDisposed();
if (message == null)
throw new ArgumentNullException(nameof(message));
return _writer.TryWrite(message);
}
///
/// 异步读取消息
///
public ValueTask ReadAsync(CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
return _reader.ReadAsync(cancellationToken);
}
///
/// 尝试读取消息(非阻塞)
///
public bool TryRead(out T message)
{
ThrowIfDisposed();
return _reader.TryRead(out message);
}
///
/// 等待消息可读
///
public ValueTask WaitToReadAsync(CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
return _reader.WaitToReadAsync(cancellationToken);
}
///
/// 完成通道,不再接受新消息
///
public void Complete()
{
ThrowIfDisposed();
try
{
_writer.Complete();
}
catch (InvalidOperationException)
{
// 通道可能已经完成,忽略异常
}
}
///
/// 清空队列中的所有消息
///
public void Clear()
{
ThrowIfDisposed();
try
{
// 读取并丢弃所有消息,直到队列为空
while (_reader.TryRead(out _))
{
// 继续读取直到没有更多消息
}
}
catch (InvalidOperationException)
{
// 通道可能已完成,忽略异常
}
}
///
/// 释放资源
///
public void Dispose()
{
lock (_disposeLock)
{
if (!_disposed)
{
_disposed = true;
try
{
// 完成写入器
if (!_writer.TryComplete())
{
_writer.Complete();
}
}
catch (InvalidOperationException)
{
// 忽略已完成的异常
}
catch (Exception)
{
// 记录其他异常但不抛出
}
GC.SuppressFinalize(this);
}
}
}
///
/// 检查是否已释放
///
private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ChannelMessageChannel));
}
}
}