TProPCMonitor/LoraGamepad/Util/IAsyncPipe.cs

84 lines
2.2 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Threading;
namespace LoraGamepad.Util;
/// <summary>
/// 异步管道
/// </summary>
/// <typeparam name="T1">流入数据类型</typeparam>
/// <typeparam name="T2">流出数据类型</typeparam>
public abstract class IAsyncPipe<T1, T2> : IPipe<T1, T2> {
private readonly ConcurrentQueue<T1> _dataQueue = new();
private bool _isWorking;
private DateTime _urgeTime;
private DateTime _workTime;
private readonly string _threadName;
private readonly TimeSpan _waitWorkTimeMax;
protected IAsyncPipe(string threadName = "IAsyncPipe", long waitWorkMills = 3000) {
_threadName = threadName;
_waitWorkTimeMax = TimeSpan.FromMilliseconds(waitWorkMills);
}
/// <summary>
/// 催工
/// </summary>
private void UrgeWork() {
_urgeTime = DateTime.Now;
if (_isWorking) {
// 在干活?打扰了..
return;
}
_isWorking = true;
var thread = new Thread(() => {
// 在失业的边缘反复横跳
while (!TryStrike()) {
// 干活
Process(_dataQueue);
}
}) {
IsBackground = true,
Name = _threadName
};
thread.Start();
}
/// <summary>
/// 尝试罢工
/// </summary>
/// <returns>是否罢工成功</returns>
private bool TryStrike() {
if (!_dataQueue.IsEmpty) {
// 有活继续干
_workTime = DateTime.Now;
} else if (DateTime.Now - _workTime > _waitWorkTimeMax) {
// 闲3秒罢工
_isWorking = false;
}
return !_isWorking;
}
protected abstract void Process(ConcurrentQueue<T1> queue);
public override void Push(T1 pack) {
if (pack == null) {
return;
}
_dataQueue.Enqueue(pack);
UrgeWork();
}
/// <summary>
/// 数据是否停止流入达到指定时间
/// </summary>
/// <param name="time"></param>
/// <returns></returns>
protected bool IsPushStop(TimeSpan time) {
return DateTime.Now - _urgeTime > time;
}
}