forked from ddrilling/AsbCloudServer
194 lines
5.5 KiB
C#
194 lines
5.5 KiB
C#
using AsbCloudApp.Services;
|
|
using Microsoft.Extensions.Configuration;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace AsbCloudInfrastructure.Services
|
|
{
|
|
/// <summary>
|
|
/// Сервис выстраивает очередь из фоновых задач. Ограничивает количество одновременно выполняющихся задач.
|
|
/// </summary>
|
|
public class BackgroundWorkerService : IDisposable, IBackgroundWorkerService
|
|
{
|
|
private readonly Worker[] workers;
|
|
private readonly Dictionary<string, Work> works = new Dictionary<string, Work>();
|
|
private bool isRunning = false;
|
|
private CancellationTokenSource cts;
|
|
private Task task;
|
|
|
|
public BackgroundWorkerService(IConfiguration configuration)
|
|
{
|
|
var workersCount = configuration.GetValue("BackgroundWorkersCount", 4);
|
|
workers = new Worker[workersCount];
|
|
for (int i = 0; i < workers.Length; i++)
|
|
workers[i] = new Worker();
|
|
}
|
|
|
|
~BackgroundWorkerService()
|
|
{
|
|
Dispose();
|
|
}
|
|
|
|
public string Enqueue(Func<string, CancellationToken, Task> func)
|
|
{
|
|
var work = new Work
|
|
{
|
|
ActionAsync = func
|
|
};
|
|
return Enqueue(work);
|
|
}
|
|
|
|
public string Enqueue(string id, Func<string, CancellationToken, Task> func)
|
|
{
|
|
var work = new Work(id, func);
|
|
return Enqueue(work);
|
|
}
|
|
|
|
public string Enqueue(string id, Func<string, CancellationToken, Task> func, Func<string, Exception, CancellationToken, Task> onError)
|
|
{
|
|
var work = new Work(id, func)
|
|
{
|
|
OnErrorAsync = onError
|
|
};
|
|
return Enqueue(work);
|
|
}
|
|
|
|
string Enqueue(Work work)
|
|
{
|
|
works[work.Id] = work;
|
|
if (!isRunning)
|
|
{
|
|
isRunning = true;
|
|
cts = new CancellationTokenSource();
|
|
task = Task.Run(() => ExecuteAsync(cts.Token), cts.Token);
|
|
}
|
|
return work.Id;
|
|
}
|
|
|
|
private Work Dequeue()
|
|
{
|
|
var item = works.First();
|
|
works.Remove(item.Key);
|
|
return item.Value;
|
|
}
|
|
|
|
public bool TryRemove(string id)
|
|
=> works.Remove(id);
|
|
|
|
public bool Contains(string id)
|
|
=> works.ContainsKey(id);
|
|
|
|
protected async Task ExecuteAsync(CancellationToken token)
|
|
{
|
|
while (works.Any() && !token.IsCancellationRequested)
|
|
{
|
|
var freeworker = workers.FirstOrDefault(w => !w.IsBusy);
|
|
if (freeworker is not null)
|
|
{
|
|
var work = Dequeue();
|
|
freeworker.Start(work);
|
|
}
|
|
else
|
|
await Task.Delay(10, token).ConfigureAwait(false);
|
|
}
|
|
isRunning = false;
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
cts?.Cancel();
|
|
task?.Wait(1);
|
|
task?.Dispose();
|
|
cts?.Dispose();
|
|
task = null;
|
|
cts = null;
|
|
GC.SuppressFinalize(this);
|
|
}
|
|
}
|
|
|
|
class Worker : IDisposable
|
|
{
|
|
private CancellationTokenSource cts;
|
|
private Task task;
|
|
public bool IsBusy { get; private set; }
|
|
|
|
~Worker()
|
|
{
|
|
Dispose();
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
Stop();
|
|
GC.SuppressFinalize(this);
|
|
}
|
|
|
|
public void Start(Work work)
|
|
{
|
|
IsBusy = true;
|
|
cts = new CancellationTokenSource();
|
|
task = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
var actionTask = work.ActionAsync(work.Id, cts.Token);
|
|
await actionTask.WaitAsync(TimeSpan.FromMinutes(2), cts.Token);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Trace.TraceError(ex.Message);
|
|
|
|
if (work.OnErrorAsync is not null)
|
|
{
|
|
try
|
|
{
|
|
await work.OnErrorAsync(work.Id, ex, cts.Token).ConfigureAwait(false);
|
|
}
|
|
catch (Exception exOnErrorHandler)
|
|
{
|
|
Trace.TraceError(exOnErrorHandler.Message);
|
|
}
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
cts?.Dispose();
|
|
cts = null;
|
|
IsBusy = false;
|
|
}
|
|
}, cts.Token);
|
|
}
|
|
|
|
public void Stop()
|
|
{
|
|
cts?.Cancel();
|
|
task?.Wait(1);
|
|
task = null;
|
|
cts?.Dispose();
|
|
cts = null;
|
|
IsBusy = false;
|
|
}
|
|
}
|
|
class Work
|
|
{
|
|
public string Id { get; private set; }
|
|
public Func<string, CancellationToken, Task> ActionAsync { get; set; }
|
|
public Func<string, Exception, CancellationToken, Task> OnErrorAsync { get; set; }
|
|
|
|
public Work()
|
|
{
|
|
Id = Guid.NewGuid().ToString();
|
|
}
|
|
|
|
public Work(string id, Func<string, CancellationToken, Task> actionAsync)
|
|
{
|
|
Id = id;
|
|
ActionAsync = actionAsync;
|
|
}
|
|
}
|
|
}
|