forked from ddrilling/AsbCloudServer
Add new background service.
This commit is contained in:
parent
ddcade44a3
commit
f61db91dd2
@ -0,0 +1,247 @@
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace AsbCloudInfrastructure.Services.Background
|
||||
{
|
||||
# nullable enable
|
||||
public class BackgroundWorkerService : BackgroundService
|
||||
{
|
||||
private static readonly TimeSpan executePeriod = TimeSpan.FromSeconds(10);
|
||||
private static readonly TimeSpan minDelay = TimeSpan.FromSeconds(2);
|
||||
private static readonly TimeSpan exceptionHandleTimeout = TimeSpan.FromSeconds(2);
|
||||
private readonly IServiceProvider serviceProvider;
|
||||
private readonly WorkQueue workQueue = new WorkQueue();
|
||||
|
||||
public BackgroundWorkerService(IServiceProvider serviceProvider)
|
||||
{
|
||||
this.serviceProvider = serviceProvider;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Добавление задачи в очередь.
|
||||
/// Не периодические задачи будут выполняться вперед.
|
||||
/// </summary>
|
||||
/// <param name="work"></param>
|
||||
/// <exception cref="ArgumentException">Id mast be unique</exception>
|
||||
public void Push(WorkBase work)
|
||||
{
|
||||
workQueue.Push(work);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken token)
|
||||
{
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
var dateStart = DateTime.Now;
|
||||
var work = workQueue.Pop();
|
||||
if (work is null)
|
||||
{
|
||||
await Task.Delay(executePeriod, token);
|
||||
continue;
|
||||
}
|
||||
|
||||
using (IServiceScope scope = serviceProvider.CreateScope())
|
||||
{
|
||||
try
|
||||
{
|
||||
var task = work.ActionAsync(work.Id, scope.ServiceProvider, token);
|
||||
await task.WaitAsync(work.Timeout, token);
|
||||
|
||||
work.ExecutionTime = DateTime.Now - dateStart;
|
||||
Trace.TraceInformation($"Backgroud work:\"{work.Id}\" done. ExecutionTime: {work.ExecutionTime:hh\\:mm\\:ss\\.fff}");
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
Trace.TraceError($"Backgroud work:\"{work.Id}\" throw exception: {exception.Message}");
|
||||
if (work.OnErrorAsync is not null)
|
||||
{
|
||||
using var task = Task.Run(
|
||||
async () => await work.OnErrorAsync(work.Id, exception, token),
|
||||
token);
|
||||
await task.WaitAsync(exceptionHandleTimeout, token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await Task.Delay(minDelay, token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// <para>
|
||||
/// Очередь работ
|
||||
/// </para>
|
||||
/// Не периодические задачи будут возвращаться первыми, как самые приоритетные.
|
||||
/// </summary>
|
||||
public class WorkQueue
|
||||
{
|
||||
private Queue<WorkBase> Primary = new (8);
|
||||
private readonly List<WorkPeriodic> Periodic = new (8);
|
||||
internal TimeSpan MaxTimeToNextWork { get; set; } = TimeSpan.FromSeconds(20);
|
||||
|
||||
/// <summary>
|
||||
/// Добавление работы.
|
||||
/// </summary>
|
||||
/// <param name="work"></param>
|
||||
/// <exception cref="ArgumentException">Id mast be unique</exception>
|
||||
public void Push(WorkBase work)
|
||||
{
|
||||
if (Periodic.Any(w => w.Id == work.Id))
|
||||
throw new ArgumentException("work.Id is not unique", nameof(work));
|
||||
|
||||
if (Primary.Any(w => w.Id == work.Id))
|
||||
throw new ArgumentException("work.Id is not unique", nameof(work));
|
||||
|
||||
if (work is WorkPeriodic workPeriodic)
|
||||
{
|
||||
Periodic.Add(workPeriodic);
|
||||
return;
|
||||
}
|
||||
|
||||
Primary.Enqueue(work);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Удаление работы по ID
|
||||
/// </summary>
|
||||
/// <param name="id"></param>
|
||||
/// <returns></returns>
|
||||
public bool Delete(string id)
|
||||
{
|
||||
var workPeriodic = Periodic.FirstOrDefault(w => w.Id == id);
|
||||
if(workPeriodic is not null)
|
||||
{
|
||||
Periodic.Remove(workPeriodic);
|
||||
return true;
|
||||
}
|
||||
|
||||
var work = Primary.FirstOrDefault(w => w.Id == id);
|
||||
if (work is not null)
|
||||
{
|
||||
Primary = new Queue<WorkBase>(Primary.Where(w => w.Id != id));
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// <para>
|
||||
/// Возвращает приоритетную задачу.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Если приоритетные закончились, то ищет ближайшую периодическую.
|
||||
/// Если до старта ближайшей периодической работы меньше 20 сек,
|
||||
/// то этой задаче устанавливается время последнего запуска в now и она возвращается.
|
||||
/// Если больше 20 сек, то возвращается null.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
/// <param name="maxTimeToNextWork"></param>
|
||||
/// <returns></returns>
|
||||
public WorkBase? Pop(TimeSpan? maxTimeToNextWork = null)
|
||||
{
|
||||
if (Primary.Any())
|
||||
return Primary.Dequeue();
|
||||
|
||||
var maxTimeToNextWorkLocal = maxTimeToNextWork ?? MaxTimeToNextWork;
|
||||
var work = GetNextPeriodic();
|
||||
if (work is null || work.NextStart - DateTime.Now > maxTimeToNextWorkLocal)
|
||||
return null;
|
||||
|
||||
work.LastStart = DateTime.Now;
|
||||
return work;
|
||||
}
|
||||
|
||||
private WorkPeriodic? GetNextPeriodic()
|
||||
{
|
||||
var work = Periodic
|
||||
.OrderBy(w => w.NextStart)
|
||||
.FirstOrDefault();
|
||||
return work;
|
||||
}
|
||||
}
|
||||
|
||||
public class WorkBase
|
||||
{
|
||||
/// <summary>
|
||||
/// Идентификатор работы. Должен быть уникальным. Используется в логах и передается в колбэки.
|
||||
/// </summary>
|
||||
public string Id { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Делегат работы.
|
||||
/// <para>
|
||||
/// Параметры:
|
||||
/// <list type="number">
|
||||
/// <item>
|
||||
/// <term>string</term>
|
||||
/// <description>Id Идентификатор работы</description>
|
||||
/// </item>
|
||||
/// <item>
|
||||
/// <term>IServiceProvider</term>
|
||||
/// <description>Поставщик сервисов</description>
|
||||
/// </item>
|
||||
/// <item>
|
||||
/// <term>CancellationToken</term>
|
||||
/// <description>Токен отмены задачи</description>
|
||||
/// </item>
|
||||
/// </list>
|
||||
/// </para>
|
||||
/// </summary>
|
||||
public Func<string, IServiceProvider, CancellationToken, Task> ActionAsync { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Делегат обработки ошибки.
|
||||
/// Не должен выполняться долго.
|
||||
/// </summary>
|
||||
public Func<string, Exception, CancellationToken, Task>? OnErrorAsync { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// максимально допустимое время выполнения работы
|
||||
/// </summary>
|
||||
public TimeSpan Timeout { get; set; } = TimeSpan.FromMinutes(1);
|
||||
|
||||
/// <summary>
|
||||
/// Фактическое время успешного выполнения работы
|
||||
/// </summary>
|
||||
public TimeSpan? ExecutionTime { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Время последнего запуска
|
||||
/// </summary>
|
||||
public DateTime LastStart { get; set; }
|
||||
|
||||
public WorkBase(string id, Func<string, IServiceProvider, CancellationToken, Task> actionAsync)
|
||||
{
|
||||
Id = id;
|
||||
ActionAsync = actionAsync;
|
||||
}
|
||||
}
|
||||
|
||||
public class WorkPeriodic : WorkBase
|
||||
{
|
||||
/// <summary>
|
||||
/// Период выполнения задачи
|
||||
/// </summary>
|
||||
public TimeSpan Period { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Время следующего запуска
|
||||
/// </summary>
|
||||
public DateTime NextStart => LastStart + Period;
|
||||
|
||||
public WorkPeriodic(string id, Func<string, IServiceProvider, CancellationToken, Task> actionAsync, TimeSpan period)
|
||||
:base(id, actionAsync)
|
||||
{
|
||||
Period = period;
|
||||
}
|
||||
}
|
||||
#nullable disable
|
||||
}
|
@ -0,0 +1,120 @@
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Moq;
|
||||
using System;
|
||||
using AsbCloudInfrastructure.Services.Background;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Xunit;
|
||||
using Org.BouncyCastle.Asn1.X509.Qualified;
|
||||
|
||||
namespace AsbCloudWebApi.Tests.ServicesTests
|
||||
{
|
||||
public class BackgroundWorkerServiceTest
|
||||
{
|
||||
private readonly Mock<IServiceProvider> mockServiceProvider;
|
||||
private readonly Mock<IServiceScopeFactory> mockServiceScopeFactory;
|
||||
|
||||
public BackgroundWorkerServiceTest()
|
||||
{
|
||||
var mockServiceScope = new Mock<IServiceScope>();
|
||||
mockServiceScopeFactory = new Mock<IServiceScopeFactory>();
|
||||
mockServiceProvider = new Mock<IServiceProvider>();
|
||||
|
||||
mockServiceScope.SetReturnsDefault(mockServiceProvider.Object);
|
||||
mockServiceProvider.SetReturnsDefault(mockServiceScopeFactory.Object);
|
||||
mockServiceProvider.Setup(s=>s.GetService(It.IsAny<Type>()))
|
||||
.Returns(mockServiceScopeFactory.Object);
|
||||
mockServiceScopeFactory.SetReturnsDefault(mockServiceScope.Object);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Push_makes_new_scope_after_start()
|
||||
{
|
||||
mockServiceScopeFactory.Invocations.Clear();
|
||||
|
||||
var backgroundService = new BackgroundWorkerService(mockServiceProvider.Object);
|
||||
var work = new WorkBase("", (_, _, _) => Task.CompletedTask );
|
||||
backgroundService.Push(work);
|
||||
await backgroundService.StartAsync(CancellationToken.None);
|
||||
await Task.Delay(10);
|
||||
|
||||
mockServiceScopeFactory.Verify(f => f.CreateScope());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Push_makes_primary_work_done()
|
||||
{
|
||||
var backgroundService = new BackgroundWorkerService(mockServiceProvider.Object);
|
||||
var workDone = false;
|
||||
var work = new WorkBase("", (_, _, _) =>
|
||||
{
|
||||
workDone = true;
|
||||
return Task.CompletedTask;
|
||||
});
|
||||
backgroundService.Push(work);
|
||||
await backgroundService.StartAsync(CancellationToken.None);
|
||||
await Task.Delay(10);
|
||||
|
||||
Assert.True(workDone);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Push_makes_pperiodic_work_done()
|
||||
{
|
||||
var backgroundService = new BackgroundWorkerService(mockServiceProvider.Object);
|
||||
var workDone = false;
|
||||
var work = new WorkPeriodic("", (_, _, _) =>
|
||||
{
|
||||
workDone = true;
|
||||
return Task.CompletedTask;
|
||||
},
|
||||
TimeSpan.FromMilliseconds(10));
|
||||
backgroundService.Push(work);
|
||||
await backgroundService.StartAsync(CancellationToken.None);
|
||||
await Task.Delay(10);
|
||||
|
||||
Assert.True(workDone);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Aborts_long_work()
|
||||
{
|
||||
var backgroundService = new BackgroundWorkerService(mockServiceProvider.Object);
|
||||
var workCanceled = false;
|
||||
var work = new WorkBase("", async(_, _, _) => await Task.Delay(1000000));
|
||||
work.Timeout = TimeSpan.FromMilliseconds(1);
|
||||
work.OnErrorAsync = async (id, ex, token) =>
|
||||
{
|
||||
workCanceled = ex is System.TimeoutException;
|
||||
await Task.CompletedTask;
|
||||
};
|
||||
|
||||
backgroundService.Push(work);
|
||||
await backgroundService.StartAsync(CancellationToken.None);
|
||||
await Task.Delay(20*4);
|
||||
|
||||
Assert.True(workCanceled);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Execution_continues_after_work_exception()
|
||||
{
|
||||
var backgroundService = new BackgroundWorkerService(mockServiceProvider.Object);
|
||||
var work2done = false;
|
||||
var work1 = new WorkBase("1", (_, _, _) => throw new Exception());
|
||||
var work2 = new WorkBase("2", (_, _, _) =>
|
||||
{
|
||||
work2done = true;
|
||||
return Task.CompletedTask;
|
||||
});
|
||||
|
||||
backgroundService.Push(work1);
|
||||
backgroundService.Push(work2);
|
||||
|
||||
await backgroundService.StartAsync(CancellationToken.None);
|
||||
await Task.Delay(2_100);
|
||||
|
||||
Assert.True(work2done);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,109 @@
|
||||
using AsbCloudInfrastructure.Services.Background;
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Xunit;
|
||||
|
||||
namespace AsbCloudWebApi.Tests.ServicesTests
|
||||
{
|
||||
public class BackgroundWorkerService_WorkQueue_Test
|
||||
{
|
||||
private readonly TimeSpan period = TimeSpan.FromSeconds(10);
|
||||
private readonly Func<string, IServiceProvider, CancellationToken, Task> somAction = (string id, IServiceProvider scope, CancellationToken token) => Task.CompletedTask;
|
||||
|
||||
[Fact]
|
||||
public void Push_not_unique_id_should_throw()
|
||||
{
|
||||
var work1 = new WorkPeriodic("1", somAction, TimeSpan.FromSeconds(30));
|
||||
var work2 = new WorkBase("1", somAction);
|
||||
|
||||
var queue = new WorkQueue();
|
||||
queue.Push(work1);
|
||||
|
||||
Assert.Throws<ArgumentException>(
|
||||
() => queue.Push(work2));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Pop_should_return_null()
|
||||
{
|
||||
var work1 = new WorkPeriodic("1", somAction, TimeSpan.FromSeconds(30))
|
||||
{ LastStart = DateTime.Now };
|
||||
|
||||
var queue = new WorkQueue();
|
||||
queue.Push(work1);
|
||||
var workpoPoped= queue.Pop();
|
||||
|
||||
Assert.Null(workpoPoped);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Pop_primary_first()
|
||||
{
|
||||
var work1 = new WorkBase("1", somAction);
|
||||
var work2 = new WorkPeriodic("1", somAction, period);
|
||||
|
||||
var queue = new WorkQueue();
|
||||
queue.Push(work2);
|
||||
queue.Push(work1);
|
||||
var workpoPoped= queue.Pop();
|
||||
|
||||
Assert.Equal(work1, workpoPoped);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Pop_second_after_delete_first()
|
||||
{
|
||||
var work1 = new WorkPeriodic("1", somAction, period);
|
||||
var work2 = new WorkPeriodic("2", somAction, period);
|
||||
|
||||
var queue = new WorkQueue();
|
||||
queue.Push(work1);
|
||||
queue.Push(work2);
|
||||
queue.Delete("1");
|
||||
|
||||
var workpoPoped= queue.Pop();
|
||||
|
||||
Assert.Equal(work2, workpoPoped);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Pop_closest_to_nextStart()
|
||||
{
|
||||
var work1 = new WorkPeriodic("1", somAction, period) {
|
||||
LastStart = DateTime.Now,
|
||||
};
|
||||
var work2 = new WorkPeriodic("2", somAction, period);
|
||||
|
||||
var queue = new WorkQueue();
|
||||
queue.Push(work1);
|
||||
queue.Push(work2);
|
||||
|
||||
var workpoPoped= queue.Pop();
|
||||
|
||||
Assert.Equal(work2, workpoPoped);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Pop_closest_to_explicit_nextStart()
|
||||
{
|
||||
var baseTime = DateTime.Now - period;
|
||||
var work1 = new WorkPeriodic("1", somAction, period)
|
||||
{
|
||||
LastStart = baseTime - TimeSpan.FromSeconds(-1),
|
||||
};
|
||||
var work2 = new WorkPeriodic("2", somAction, period)
|
||||
{
|
||||
LastStart = baseTime,
|
||||
};
|
||||
|
||||
var queue = new WorkQueue();
|
||||
queue.Push(work1);
|
||||
queue.Push(work2);
|
||||
|
||||
var workpoPoped= queue.Pop();
|
||||
|
||||
Assert.Equal(work2, workpoPoped);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user