You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

182 lines
4.7 KiB

using System.Threading.Channels;
using CoreAgent.WebSocketTransport.Interfaces;
namespace CoreAgent.WebSocketTransport.Services;
/// <summary>
/// 基于 Channel 的消息通道实现
/// 提供线程安全的消息队列操作
/// </summary>
/// <typeparam name="T">消息类型</typeparam>
public class ChannelMessageChannel<T> : IMessageChannel<T>
{
private readonly Channel<T> _channel;
private readonly ChannelWriter<T> _writer;
private readonly ChannelReader<T> _reader;
private volatile bool _disposed;
private readonly object _disposeLock = new object();
public int Capacity { get; }
public int Count => _channel.Reader.Count;
public bool IsCompleted => _channel.Reader.Completion.IsCompleted;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="capacity">通道容量</param>
/// <param name="fullMode">队列满时的处理模式</param>
public ChannelMessageChannel(int capacity, BoundedChannelFullMode fullMode = BoundedChannelFullMode.DropOldest)
{
if (capacity <= 0)
throw new ArgumentOutOfRangeException(nameof(capacity), "容量必须大于0");
Capacity = capacity;
var options = new BoundedChannelOptions(capacity)
{
SingleWriter = false, // 允许多个写入者
SingleReader = true, // 只允许一个读取者
FullMode = fullMode,
AllowSynchronousContinuations = false
};
_channel = Channel.CreateBounded<T>(options);
_writer = _channel.Writer;
_reader = _channel.Reader;
}
/// <summary>
/// 异步写入消息
/// </summary>
public ValueTask WriteAsync(T message, CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
if (message == null)
throw new ArgumentNullException(nameof(message));
return _writer.WriteAsync(message, cancellationToken);
}
/// <summary>
/// 尝试写入消息(非阻塞)
/// </summary>
public bool TryWrite(T message)
{
ThrowIfDisposed();
if (message == null)
throw new ArgumentNullException(nameof(message));
return _writer.TryWrite(message);
}
/// <summary>
/// 异步读取消息
/// </summary>
public ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
return _reader.ReadAsync(cancellationToken);
}
/// <summary>
/// 尝试读取消息(非阻塞)
/// </summary>
public bool TryRead(out T message)
{
ThrowIfDisposed();
return _reader.TryRead(out message);
}
/// <summary>
/// 等待消息可读
/// </summary>
public ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
return _reader.WaitToReadAsync(cancellationToken);
}
/// <summary>
/// 完成通道,不再接受新消息
/// </summary>
public void Complete()
{
ThrowIfDisposed();
try
{
_writer.Complete();
}
catch (InvalidOperationException)
{
// 通道可能已经完成,忽略异常
}
}
/// <summary>
/// 清空队列中的所有消息
/// </summary>
public void Clear()
{
ThrowIfDisposed();
try
{
// 读取并丢弃所有消息,直到队列为空
while (_reader.TryRead(out _))
{
// 继续读取直到没有更多消息
}
}
catch (InvalidOperationException)
{
// 通道可能已完成,忽略异常
}
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
lock (_disposeLock)
{
if (!_disposed)
{
_disposed = true;
try
{
// 完成写入器
if (!_writer.TryComplete())
{
_writer.Complete();
}
}
catch (InvalidOperationException)
{
// 忽略已完成的异常
}
catch (Exception)
{
// 记录其他异常但不抛出
}
GC.SuppressFinalize(this);
}
}
}
/// <summary>
/// 检查是否已释放
/// </summary>
private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ChannelMessageChannel<T>));
}
}
}