You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
182 lines
4.7 KiB
182 lines
4.7 KiB
using System.Threading.Channels;
|
|
using CoreAgent.WebSocketTransport.Interfaces;
|
|
|
|
namespace CoreAgent.WebSocketTransport.Services;
|
|
|
|
/// <summary>
|
|
/// 基于 Channel 的消息通道实现
|
|
/// 提供线程安全的消息队列操作
|
|
/// </summary>
|
|
/// <typeparam name="T">消息类型</typeparam>
|
|
public class ChannelMessageChannel<T> : IMessageChannel<T>
|
|
{
|
|
private readonly Channel<T> _channel;
|
|
private readonly ChannelWriter<T> _writer;
|
|
private readonly ChannelReader<T> _reader;
|
|
private volatile bool _disposed;
|
|
private readonly object _disposeLock = new object();
|
|
|
|
public int Capacity { get; }
|
|
public int Count => _channel.Reader.Count;
|
|
public bool IsCompleted => _channel.Reader.Completion.IsCompleted;
|
|
|
|
/// <summary>
|
|
/// 构造函数
|
|
/// </summary>
|
|
/// <param name="capacity">通道容量</param>
|
|
/// <param name="fullMode">队列满时的处理模式</param>
|
|
public ChannelMessageChannel(int capacity, BoundedChannelFullMode fullMode = BoundedChannelFullMode.DropOldest)
|
|
{
|
|
if (capacity <= 0)
|
|
throw new ArgumentOutOfRangeException(nameof(capacity), "容量必须大于0");
|
|
|
|
Capacity = capacity;
|
|
|
|
var options = new BoundedChannelOptions(capacity)
|
|
{
|
|
SingleWriter = false, // 允许多个写入者
|
|
SingleReader = true, // 只允许一个读取者
|
|
FullMode = fullMode,
|
|
AllowSynchronousContinuations = false
|
|
};
|
|
|
|
_channel = Channel.CreateBounded<T>(options);
|
|
_writer = _channel.Writer;
|
|
_reader = _channel.Reader;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 异步写入消息
|
|
/// </summary>
|
|
public ValueTask WriteAsync(T message, CancellationToken cancellationToken = default)
|
|
{
|
|
ThrowIfDisposed();
|
|
|
|
if (message == null)
|
|
throw new ArgumentNullException(nameof(message));
|
|
|
|
return _writer.WriteAsync(message, cancellationToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 尝试写入消息(非阻塞)
|
|
/// </summary>
|
|
public bool TryWrite(T message)
|
|
{
|
|
ThrowIfDisposed();
|
|
|
|
if (message == null)
|
|
throw new ArgumentNullException(nameof(message));
|
|
|
|
return _writer.TryWrite(message);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 异步读取消息
|
|
/// </summary>
|
|
public ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
ThrowIfDisposed();
|
|
return _reader.ReadAsync(cancellationToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 尝试读取消息(非阻塞)
|
|
/// </summary>
|
|
public bool TryRead(out T message)
|
|
{
|
|
ThrowIfDisposed();
|
|
return _reader.TryRead(out message);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 等待消息可读
|
|
/// </summary>
|
|
public ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
ThrowIfDisposed();
|
|
return _reader.WaitToReadAsync(cancellationToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 完成通道,不再接受新消息
|
|
/// </summary>
|
|
public void Complete()
|
|
{
|
|
ThrowIfDisposed();
|
|
|
|
try
|
|
{
|
|
_writer.Complete();
|
|
}
|
|
catch (InvalidOperationException)
|
|
{
|
|
// 通道可能已经完成,忽略异常
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 清空队列中的所有消息
|
|
/// </summary>
|
|
public void Clear()
|
|
{
|
|
ThrowIfDisposed();
|
|
|
|
try
|
|
{
|
|
// 读取并丢弃所有消息,直到队列为空
|
|
while (_reader.TryRead(out _))
|
|
{
|
|
// 继续读取直到没有更多消息
|
|
}
|
|
}
|
|
catch (InvalidOperationException)
|
|
{
|
|
// 通道可能已完成,忽略异常
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 释放资源
|
|
/// </summary>
|
|
public void Dispose()
|
|
{
|
|
lock (_disposeLock)
|
|
{
|
|
if (!_disposed)
|
|
{
|
|
_disposed = true;
|
|
|
|
try
|
|
{
|
|
// 完成写入器
|
|
if (!_writer.TryComplete())
|
|
{
|
|
_writer.Complete();
|
|
}
|
|
}
|
|
catch (InvalidOperationException)
|
|
{
|
|
// 忽略已完成的异常
|
|
}
|
|
catch (Exception)
|
|
{
|
|
// 记录其他异常但不抛出
|
|
}
|
|
|
|
GC.SuppressFinalize(this);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 检查是否已释放
|
|
/// </summary>
|
|
private void ThrowIfDisposed()
|
|
{
|
|
if (_disposed)
|
|
{
|
|
throw new ObjectDisposedException(nameof(ChannelMessageChannel<T>));
|
|
}
|
|
}
|
|
}
|