|
|
@ -14,9 +14,17 @@ public class MessageChannelManager : IMessageChannelManager |
|
|
|
private volatile bool _disposed; |
|
|
|
private readonly object _disposeLock = new object(); |
|
|
|
|
|
|
|
public IMessageChannel<ProtocolMessage> SendChannel { get; } |
|
|
|
public IMessageChannel<object> ReceiveChannel { get; } |
|
|
|
public IMessageChannel<HeartbeatMessage> PriorityChannel { get; } |
|
|
|
private IMessageChannel<ProtocolMessage> _sendChannel; |
|
|
|
private IMessageChannel<object> _receiveChannel; |
|
|
|
private IMessageChannel<HeartbeatMessage> _priorityChannel; |
|
|
|
|
|
|
|
public IMessageChannel<ProtocolMessage> SendChannel => _sendChannel; |
|
|
|
public IMessageChannel<object> ReceiveChannel => _receiveChannel; |
|
|
|
public IMessageChannel<HeartbeatMessage> PriorityChannel => _priorityChannel; |
|
|
|
|
|
|
|
private readonly int _sendChannelCapacity; |
|
|
|
private readonly int _receiveChannelCapacity; |
|
|
|
private readonly int _priorityChannelCapacity; |
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
/// 构造函数
|
|
|
@ -27,24 +35,97 @@ public class MessageChannelManager : IMessageChannelManager |
|
|
|
/// <param name="priorityChannelCapacity">优先级通道容量</param>
|
|
|
|
public MessageChannelManager( |
|
|
|
ILogger<MessageChannelManager> logger, |
|
|
|
int sendChannelCapacity = 1000, |
|
|
|
int receiveChannelCapacity = 1000, |
|
|
|
int priorityChannelCapacity = 100) |
|
|
|
int sendChannelCapacity, |
|
|
|
int receiveChannelCapacity, |
|
|
|
int priorityChannelCapacity) |
|
|
|
{ |
|
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
SendChannel = new ChannelMessageChannel<ProtocolMessage>(sendChannelCapacity); |
|
|
|
ReceiveChannel = new ChannelMessageChannel<object>(receiveChannelCapacity); |
|
|
|
PriorityChannel = new ChannelMessageChannel<HeartbeatMessage>(priorityChannelCapacity); |
|
|
|
// 验证容量参数
|
|
|
|
if (sendChannelCapacity <= 0) |
|
|
|
throw new ArgumentOutOfRangeException(nameof(sendChannelCapacity), "发送通道容量必须大于0"); |
|
|
|
if (receiveChannelCapacity <= 0) |
|
|
|
throw new ArgumentOutOfRangeException(nameof(receiveChannelCapacity), "接收通道容量必须大于0"); |
|
|
|
if (priorityChannelCapacity <= 0) |
|
|
|
throw new ArgumentOutOfRangeException(nameof(priorityChannelCapacity), "优先级通道容量必须大于0"); |
|
|
|
|
|
|
|
_sendChannelCapacity = sendChannelCapacity; |
|
|
|
_receiveChannelCapacity = receiveChannelCapacity; |
|
|
|
_priorityChannelCapacity = priorityChannelCapacity; |
|
|
|
|
|
|
|
_logger.LogInformation("消息通道管理器已创建 - 发送通道容量: {SendCapacity}, 接收通道容量: {ReceiveCapacity}, 优先级通道容量: {PriorityCapacity}", |
|
|
|
sendChannelCapacity, receiveChannelCapacity, priorityChannelCapacity); |
|
|
|
} |
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
/// 创建所有通道
|
|
|
|
/// </summary>
|
|
|
|
public void CreateChannels() |
|
|
|
{ |
|
|
|
ThrowIfDisposed(); |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
// 检查通道是否已存在,如果存在则先释放
|
|
|
|
if (_sendChannel != null || _receiveChannel != null || _priorityChannel != null) |
|
|
|
{ |
|
|
|
_logger.LogWarning("通道已存在,先释放现有通道"); |
|
|
|
ReleaseChannels(); |
|
|
|
} |
|
|
|
|
|
|
|
_sendChannel = new ChannelMessageChannel<ProtocolMessage>(_sendChannelCapacity); |
|
|
|
_receiveChannel = new ChannelMessageChannel<object>(_receiveChannelCapacity); |
|
|
|
_priorityChannel = new ChannelMessageChannel<HeartbeatMessage>(_priorityChannelCapacity); |
|
|
|
|
|
|
|
_logger.LogInformation("所有通道已创建 - 发送通道容量: {SendCapacity}, 接收通道容量: {ReceiveCapacity}, 优先级通道容量: {PriorityCapacity}", |
|
|
|
_sendChannelCapacity, _receiveChannelCapacity, _priorityChannelCapacity); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
_logger.LogError(ex, "创建通道失败"); |
|
|
|
throw; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
/// 释放所有通道(完全释放资源)
|
|
|
|
/// </summary>
|
|
|
|
public void ReleaseChannels() |
|
|
|
{ |
|
|
|
try |
|
|
|
{ |
|
|
|
_logger.LogDebug("开始释放所有通道资源"); |
|
|
|
|
|
|
|
// 检查是否有通道需要释放
|
|
|
|
if (_sendChannel == null && _receiveChannel == null && _priorityChannel == null) |
|
|
|
{ |
|
|
|
_logger.LogDebug("没有通道需要释放"); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
// 先完成所有通道(标记不再接受新消息)
|
|
|
|
if (_sendChannel != null && !_sendChannel.IsCompleted) |
|
|
|
_sendChannel.Complete(); |
|
|
|
if (_receiveChannel != null && !_receiveChannel.IsCompleted) |
|
|
|
_receiveChannel.Complete(); |
|
|
|
if (_priorityChannel != null && !_priorityChannel.IsCompleted) |
|
|
|
_priorityChannel.Complete(); |
|
|
|
|
|
|
|
// 然后释放通道资源
|
|
|
|
_sendChannel?.Dispose(); |
|
|
|
_receiveChannel?.Dispose(); |
|
|
|
_priorityChannel?.Dispose(); |
|
|
|
|
|
|
|
// 清空引用
|
|
|
|
_sendChannel = null; |
|
|
|
_receiveChannel = null; |
|
|
|
_priorityChannel = null; |
|
|
|
|
|
|
|
_logger.LogInformation("所有通道资源已完全释放"); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
_logger.LogError(ex, "创建消息通道管理器失败"); |
|
|
|
_logger.LogError(ex, "释放通道资源失败"); |
|
|
|
throw; |
|
|
|
} |
|
|
|
} |
|
|
@ -79,7 +160,7 @@ public class MessageChannelManager : IMessageChannelManager |
|
|
|
} |
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
/// 清空所有通道
|
|
|
|
/// 清空所有通道中的消息(保持通道可用)
|
|
|
|
/// </summary>
|
|
|
|
public void ClearAllChannels() |
|
|
|
{ |
|
|
@ -87,21 +168,32 @@ public class MessageChannelManager : IMessageChannelManager |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
SendChannel.Clear(); |
|
|
|
ReceiveChannel.Clear(); |
|
|
|
PriorityChannel.Clear(); |
|
|
|
// 检查通道是否存在
|
|
|
|
if (_sendChannel == null && _receiveChannel == null && _priorityChannel == null) |
|
|
|
{ |
|
|
|
_logger.LogWarning("通道不存在,跳过清空操作"); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
// 清空各个通道中的消息
|
|
|
|
if (_sendChannel != null) |
|
|
|
_sendChannel.Clear(); |
|
|
|
if (_receiveChannel != null) |
|
|
|
_receiveChannel.Clear(); |
|
|
|
if (_priorityChannel != null) |
|
|
|
_priorityChannel.Clear(); |
|
|
|
|
|
|
|
_logger.LogInformation("所有通道已清空"); |
|
|
|
_logger.LogInformation("所有通道中的消息已清空"); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
_logger.LogError(ex, "清空通道失败"); |
|
|
|
_logger.LogError(ex, "清空通道消息失败"); |
|
|
|
throw; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
/// 完成所有通道
|
|
|
|
/// 完成所有通道(标记不再接受新消息,但保持可读)
|
|
|
|
/// </summary>
|
|
|
|
public void CompleteAllChannels() |
|
|
|
{ |
|
|
@ -109,11 +201,22 @@ public class MessageChannelManager : IMessageChannelManager |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
SendChannel.Complete(); |
|
|
|
ReceiveChannel.Complete(); |
|
|
|
PriorityChannel.Complete(); |
|
|
|
// 检查通道是否存在
|
|
|
|
if (_sendChannel == null && _receiveChannel == null && _priorityChannel == null) |
|
|
|
{ |
|
|
|
_logger.LogWarning("通道不存在,跳过完成操作"); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
// 完成各个通道(标记不再接受新消息)
|
|
|
|
if (_sendChannel != null && !_sendChannel.IsCompleted) |
|
|
|
_sendChannel.Complete(); |
|
|
|
if (_receiveChannel != null && !_receiveChannel.IsCompleted) |
|
|
|
_receiveChannel.Complete(); |
|
|
|
if (_priorityChannel != null && !_priorityChannel.IsCompleted) |
|
|
|
_priorityChannel.Complete(); |
|
|
|
|
|
|
|
_logger.LogInformation("所有通道已完成"); |
|
|
|
_logger.LogInformation("所有通道已完成(不再接受新消息)"); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
@ -122,6 +225,8 @@ public class MessageChannelManager : IMessageChannelManager |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
/// 释放资源
|
|
|
|
/// </summary>
|
|
|
@ -135,13 +240,8 @@ public class MessageChannelManager : IMessageChannelManager |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
// 先完成所有通道
|
|
|
|
CompleteAllChannels(); |
|
|
|
|
|
|
|
// 然后释放通道资源
|
|
|
|
SendChannel?.Dispose(); |
|
|
|
ReceiveChannel?.Dispose(); |
|
|
|
PriorityChannel?.Dispose(); |
|
|
|
// 释放所有通道
|
|
|
|
ReleaseChannels(); |
|
|
|
|
|
|
|
_logger.LogInformation("消息通道管理器已释放"); |
|
|
|
} |
|
|
|