using System; using System.Collections.Concurrent; using System.Threading; namespace LoraGamepad.Util; /// /// 异步管道 /// /// 流入数据类型 /// 流出数据类型 public abstract class IAsyncPipe : IPipe { private readonly ConcurrentQueue _dataQueue = new(); private bool _isWorking; private DateTime _urgeTime; private DateTime _workTime; private readonly string _threadName; private readonly TimeSpan _waitWorkTimeMax; protected IAsyncPipe(string threadName = "IAsyncPipe", long waitWorkMills = 3000) { _threadName = threadName; _waitWorkTimeMax = TimeSpan.FromMilliseconds(waitWorkMills); } /// /// 催工 /// private void UrgeWork() { _urgeTime = DateTime.Now; if (_isWorking) { // 在干活?打扰了.. return; } _isWorking = true; var thread = new Thread(() => { // 在失业的边缘反复横跳 while (!TryStrike()) { // 干活 Process(_dataQueue); } }) { IsBackground = true, Name = _threadName }; thread.Start(); } /// /// 尝试罢工 /// /// 是否罢工成功 private bool TryStrike() { if (!_dataQueue.IsEmpty) { // 有活继续干 _workTime = DateTime.Now; } else if (DateTime.Now - _workTime > _waitWorkTimeMax) { // 闲3秒罢工 _isWorking = false; } return !_isWorking; } protected abstract void Process(ConcurrentQueue queue); public override void Push(T1 pack) { if (pack == null) { return; } _dataQueue.Enqueue(pack); UrgeWork(); } /// /// 数据是否停止流入达到指定时间 /// /// /// protected bool IsPushStop(TimeSpan time) { return DateTime.Now - _urgeTime > time; } }