You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

451 lines
14 KiB

1 month ago
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using AuroraDesk.Core.Entities;
using AuroraDesk.Core.Interfaces;
using Microsoft.Extensions.Logging;
namespace AuroraDesk.Infrastructure.Services;
/// <summary>
/// 基于 plink.exe 的多会话管理服务,实现实时消息收发
/// </summary>
public sealed class PlinkSessionService : IPlinkSessionService
{
private readonly ConcurrentDictionary<Guid, SessionContext> _sessions = new();
private readonly ILogger<PlinkSessionService>? _logger;
private readonly TimeSpan _shutdownGracePeriod = TimeSpan.FromSeconds(2);
public PlinkSessionService(ILogger<PlinkSessionService>? logger = null)
{
_logger = logger;
}
public async Task<PlinkSessionInfo> StartSessionAsync(PlinkSessionOptions options, CancellationToken cancellationToken = default)
{
if (options is null)
{
throw new ArgumentNullException(nameof(options));
}
ValidateOptions(options);
var sessionId = Guid.NewGuid();
var displayName = string.IsNullOrWhiteSpace(options.DisplayName)
? $"{options.UserName}@{options.Host}:{options.Port}"
: options.DisplayName;
var info = new PlinkSessionInfo(sessionId, displayName, options.Host, options.Port, options.UserName, PlinkSessionStatus.Connecting, DateTime.UtcNow);
var context = new SessionContext(info, options);
if (!_sessions.TryAdd(sessionId, context))
{
throw new InvalidOperationException("无法注册新的 Plink 会话。");
}
try
{
await StartProcessAsync(context, cancellationToken).ConfigureAwait(false);
context.UpdateStatus(PlinkSessionStatus.Connected);
_logger?.LogInformation("Plink 会话 {SessionId} 已连接到 {User}@{Host}:{Port}", sessionId, options.UserName, options.Host, options.Port);
return context.Info;
}
catch (Exception ex)
{
context.UpdateStatus(PlinkSessionStatus.Error);
context.PublishSystemMessage($"启动会话失败: {ex.Message}", isError: true);
_sessions.TryRemove(sessionId, out _);
context.Dispose();
_logger?.LogError(ex, "启动 Plink 会话失败: {User}@{Host}:{Port}", options.UserName, options.Host, options.Port);
throw;
}
}
public async Task StopSessionAsync(Guid sessionId)
{
if (!_sessions.TryRemove(sessionId, out var context))
{
return;
}
context.UpdateStatus(PlinkSessionStatus.Disconnected);
context.PublishSystemMessage("会话已手动关闭。");
try
{
await ShutdownProcessAsync(context).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger?.LogWarning(ex, "关闭 Plink 会话时出现异常: {SessionId}", sessionId);
}
finally
{
context.Complete();
context.Dispose();
}
}
public async Task<bool> SendAsync(Guid sessionId, string payload, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(payload))
{
return false;
}
if (!_sessions.TryGetValue(sessionId, out var context))
{
return false;
}
if (context.StatusSubject.Value != PlinkSessionStatus.Connected || context.Process is null || context.Process.HasExited)
{
context.PublishSystemMessage("会话未连接,无法发送数据。", isError: true);
return false;
}
await context.SendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await context.Process.StandardInput.WriteLineAsync(payload).ConfigureAwait(false);
await context.Process.StandardInput.FlushAsync().ConfigureAwait(false);
context.PublishMessage(payload, PlinkMessageDirection.Outgoing);
return true;
}
catch (Exception ex)
{
context.PublishSystemMessage($"发送失败: {ex.Message}", isError: true);
_logger?.LogError(ex, "Plink 会话 {SessionId} 发送数据失败。", sessionId);
return false;
}
finally
{
context.SendLock.Release();
}
}
public IReadOnlyCollection<PlinkSessionInfo> GetSessions()
{
return _sessions.Values.Select(context => context.Info).ToArray();
}
public IObservable<PlinkSessionStatus> ObserveStatus(Guid sessionId)
{
if (_sessions.TryGetValue(sessionId, out var context))
{
return context.StatusSubject.AsObservable();
}
throw new KeyNotFoundException($"未找到会话 {sessionId}");
}
public IObservable<PlinkMessage> ObserveMessages(Guid sessionId)
{
if (_sessions.TryGetValue(sessionId, out var context))
{
return context.MessageSubject.AsObservable();
}
throw new KeyNotFoundException($"未找到会话 {sessionId}");
}
public void Dispose()
{
foreach (var sessionId in _sessions.Keys.ToArray())
{
try
{
StopSessionAsync(sessionId).GetAwaiter().GetResult();
}
catch (Exception ex)
{
_logger?.LogWarning(ex, "释放 PlinkSessionService 时关闭会话 {SessionId} 失败。", sessionId);
}
}
}
private static void ValidateOptions(PlinkSessionOptions options)
{
if (string.IsNullOrWhiteSpace(options.Host))
{
throw new ArgumentException("Host 不能为空。", nameof(options));
}
if (options.Port is < 1 or > 65535)
{
throw new ArgumentException("端口必须在 1~65535 之间。", nameof(options));
}
if (string.IsNullOrWhiteSpace(options.UserName))
{
throw new ArgumentException("UserName 不能为空。", nameof(options));
}
}
private async Task StartProcessAsync(SessionContext context, CancellationToken cancellationToken)
{
var options = context.Options;
var startInfo = new ProcessStartInfo
{
FileName = string.IsNullOrWhiteSpace(options.PlinkExecutablePath) ? "plink" : options.PlinkExecutablePath,
RedirectStandardInput = true,
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
CreateNoWindow = true
};
if (!string.IsNullOrWhiteSpace(options.PrivateKeyPath))
{
startInfo.ArgumentList.Add("-i");
startInfo.ArgumentList.Add(options.PrivateKeyPath);
}
startInfo.ArgumentList.Add("-ssh");
startInfo.ArgumentList.Add("-P");
startInfo.ArgumentList.Add(options.Port.ToString());
startInfo.ArgumentList.Add("-l");
startInfo.ArgumentList.Add(options.UserName);
if (!string.IsNullOrWhiteSpace(options.Password))
{
startInfo.ArgumentList.Add("-pw");
startInfo.ArgumentList.Add(options.Password);
}
if (options.AllowAnyHostKey)
{
startInfo.ArgumentList.Add("-batch");
startInfo.ArgumentList.Add("-noagent");
}
if (!string.IsNullOrWhiteSpace(options.AdditionalArguments))
{
// 将附加参数拆分为 tokens,简化处理避免 shell 注入
foreach (var token in SplitArguments(options.AdditionalArguments))
{
startInfo.ArgumentList.Add(token);
}
}
startInfo.ArgumentList.Add(options.Host);
var process = new Process
{
StartInfo = startInfo,
EnableRaisingEvents = true
};
var processStarted = process.Start();
if (!processStarted)
{
throw new InvalidOperationException("无法启动 plink 进程。");
}
process.StandardInput.AutoFlush = true;
context.AttachProcess(process);
process.Exited += (_, _) =>
{
context.UpdateStatus(PlinkSessionStatus.Disconnected);
context.PublishSystemMessage("plink 进程已退出。");
// 如果由外部事件触发退出,则确保从集合中移除
_sessions.TryRemove(context.Info.SessionId, out _);
context.Complete();
context.Dispose();
};
_ = Task.Run(() => PumpStreamAsync(process.StandardOutput, context, PlinkMessageDirection.Incoming), cancellationToken);
_ = Task.Run(() => PumpStreamAsync(process.StandardError, context, PlinkMessageDirection.System, isErrorChannel: true), cancellationToken);
await Task.Delay(150, cancellationToken).ConfigureAwait(false);
}
private async Task ShutdownProcessAsync(SessionContext context)
{
if (context.Process is null)
{
return;
}
try
{
if (!context.Process.HasExited)
{
await context.SendLock.WaitAsync().ConfigureAwait(false);
try
{
await context.Process.StandardInput.WriteLineAsync("exit").ConfigureAwait(false);
await context.Process.StandardInput.FlushAsync().ConfigureAwait(false);
}
finally
{
context.SendLock.Release();
}
if (!context.Process.WaitForExit((int)_shutdownGracePeriod.TotalMilliseconds))
{
context.Process.Kill(entireProcessTree: true);
}
}
}
catch (InvalidOperationException)
{
// 进程可能已经退出
}
}
private static async Task PumpStreamAsync(StreamReader reader, SessionContext context, PlinkMessageDirection direction, bool isErrorChannel = false)
{
try
{
while (true)
{
var line = await reader.ReadLineAsync().ConfigureAwait(false);
if (line is null)
{
break;
}
context.PublishMessage(line, direction, isErrorChannel);
}
}
catch (ObjectDisposedException)
{
// reader 已释放
}
catch (IOException)
{
// 通信流关闭
}
catch (Exception ex)
{
context.PublishSystemMessage($"读取 {(isErrorChannel ? "" : "")} 流失败: {ex.Message}", isError: true);
}
}
private static IEnumerable<string> SplitArguments(string arguments)
{
if (string.IsNullOrWhiteSpace(arguments))
{
yield break;
}
var current = new StringBuilder();
var inQuotes = false;
foreach (var ch in arguments)
{
switch (ch)
{
case '"':
inQuotes = !inQuotes;
break;
case ' ' when !inQuotes:
if (current.Length > 0)
{
yield return current.ToString();
current.Clear();
}
break;
default:
current.Append(ch);
break;
}
}
if (current.Length > 0)
{
yield return current.ToString();
}
}
private sealed class SessionContext : IDisposable
{
private bool _disposed;
public SessionContext(PlinkSessionInfo info, PlinkSessionOptions options)
{
Info = info;
Options = options;
StatusSubject = new BehaviorSubject<PlinkSessionStatus>(info.Status);
MessageSubject = new ReplaySubject<PlinkMessage>(bufferSize: 200);
}
public PlinkSessionInfo Info { get; private set; }
public PlinkSessionOptions Options { get; }
public Process? Process { get; private set; }
public BehaviorSubject<PlinkSessionStatus> StatusSubject { get; }
public ReplaySubject<PlinkMessage> MessageSubject { get; }
public SemaphoreSlim SendLock { get; } = new(1, 1);
public void AttachProcess(Process process)
{
Process = process ?? throw new ArgumentNullException(nameof(process));
}
public void UpdateStatus(PlinkSessionStatus status)
{
Info = Info with { Status = status };
StatusSubject.OnNext(status);
}
public void PublishMessage(string content, PlinkMessageDirection direction, bool isErrorChannel = false)
{
var message = new PlinkMessage(Info.SessionId, direction, content, DateTime.UtcNow, isErrorChannel && direction == PlinkMessageDirection.System);
MessageSubject.OnNext(message);
}
public void PublishSystemMessage(string content, bool isError = false)
{
PublishMessage(content, PlinkMessageDirection.System, isError);
}
public void Complete()
{
if (!MessageSubject.IsDisposed)
{
MessageSubject.OnCompleted();
}
if (!StatusSubject.IsDisposed)
{
StatusSubject.OnCompleted();
}
}
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
try
{
Process?.Dispose();
}
catch
{
// ignored
}
SendLock.Dispose();
MessageSubject.Dispose();
StatusSubject.Dispose();
}
}
}