DD.WellWorkover.Cloud/AsbCloudInfrastructure/Services/BackgroundWorkerService.cs
2022-02-28 14:44:26 +05:00

195 lines
5.5 KiB
C#

using AsbCloudApp.Data;
using AsbCloudApp.Services;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services
{
/// <summary>
/// Сервис выстраивает очередь из фоновых задач. Ограничивает количество одновременно выполняющихся задач.
/// </summary>
public class BackgroundWorkerService : IDisposable, IBackgroundWorkerService
{
private readonly Worker[] workers;
private readonly Dictionary<string, Work> works = new Dictionary<string, Work>();
private bool isRunning = false;
private CancellationTokenSource cts;
private Task task;
public BackgroundWorkerService(IConfiguration configuration)
{
var workersCount = configuration.GetValue("BackgroundWorkersCount", 4);
workers = new Worker[workersCount];
for (int i = 0; i < workers.Length; i++)
workers[i] = new Worker();
}
~BackgroundWorkerService()
{
Dispose();
}
public string Enqueue(Func<string, CancellationToken, Task> func)
{
var work = new Work
{
ActionAsync = func
};
return Enqueue(work);
}
public string Enqueue(string id, Func<string, CancellationToken, Task> func)
{
var work = new Work(id, func);
return Enqueue(work);
}
public string Enqueue(string id, Func<string, CancellationToken, Task> func, Func<string, Exception, CancellationToken, Task> onError)
{
var work = new Work(id, func)
{
OnErrorAsync = onError
};
return Enqueue(work);
}
string Enqueue(Work work)
{
works[work.Id] = work;
if (!isRunning)
{
isRunning = true;
cts = new CancellationTokenSource();
task = Task.Run(() => ExecuteAsync(cts.Token), cts.Token);
}
return work.Id;
}
private Work Dequeue()
{
var item = works.First();
works.Remove(item.Key);
return item.Value;
}
public bool TryRemove(string id)
=> works.Remove(id);
public bool Contains(string id)
=> works.ContainsKey(id);
protected async Task ExecuteAsync(CancellationToken token)
{
while (works.Any() && !token.IsCancellationRequested)
{
var freeworker = workers.FirstOrDefault(w => !w.IsBusy);
if (freeworker is not null)
{
var work = Dequeue();
freeworker.Start(work);
}
else
await Task.Delay(10, token).ConfigureAwait(false);
}
isRunning = false;
}
public void Dispose()
{
cts?.Cancel();
task?.Wait(1);
task?.Dispose();
cts?.Dispose();
task = null;
cts = null;
GC.SuppressFinalize(this);
}
}
class Worker : IDisposable
{
private CancellationTokenSource cts;
private Task task;
public bool IsBusy { get; private set; }
~Worker()
{
Dispose();
}
public void Dispose()
{
Stop();
GC.SuppressFinalize(this);
}
public void Start(Work work)
{
IsBusy = true;
cts = new CancellationTokenSource();
task = Task.Run(async () =>
{
try
{
var actionTask = work.ActionAsync(work.Id, cts.Token);
await actionTask.WaitAsync(TimeSpan.FromMinutes(2), cts.Token);
}
catch (Exception ex)
{
Trace.TraceError(ex.Message);
if (work.OnErrorAsync is not null)
{
try
{
await work.OnErrorAsync(work.Id, ex, cts.Token).ConfigureAwait(false);
}
catch (Exception exOnErrorHandler)
{
Trace.TraceError(exOnErrorHandler.Message);
}
}
}
finally
{
cts?.Dispose();
cts = null;
IsBusy = false;
}
}, cts.Token);
}
public void Stop()
{
cts?.Cancel();
task?.Wait(1);
task = null;
cts?.Dispose();
cts = null;
IsBusy = false;
}
}
class Work
{
public string Id { get; private set; }
public Func<string, CancellationToken, Task> ActionAsync { get; set; }
public Func<string, Exception, CancellationToken, Task> OnErrorAsync { get; set; }
public Work()
{
Id = Guid.NewGuid().ToString();
}
public Work(string id, Func<string, CancellationToken, Task> actionAsync)
{
Id = id;
ActionAsync = actionAsync;
}
}
}