weekend test

This commit is contained in:
ngfrolov 2023-11-03 17:02:44 +05:00
parent 4f45cfc1dd
commit 7f92f07423
Signed by untrusted user who does not match committer: ng.frolov
GPG Key ID: E99907A0357B29A7
30 changed files with 362 additions and 159 deletions

View File

@ -172,7 +172,7 @@ namespace AsbCloudApp.Data
if (progress.HasValue)
CurrentState.Progress = progress.Value;
Trace.TraceInformation($"{WorkNameForTrace} state: {newState} [{100*progress:#}%]");
Trace.TraceInformation($"{WorkNameForTrace} state[{100*progress:#}%]: {newState}");
}
/// <summary>

View File

@ -1,7 +1,9 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -12,13 +14,37 @@ namespace AsbCloudInfrastructure.Background;
/// </summary>
public class BackgroundWorker : BackgroundService
{
private static readonly TimeSpan executePeriod = TimeSpan.FromSeconds(10);
private static readonly TimeSpan minDelay = TimeSpan.FromSeconds(2);
private readonly TimeSpan minDelay = TimeSpan.FromSeconds(1);
private readonly IServiceProvider serviceProvider;
public WorkStore WorkStore { get; } = new WorkStore();
/// <summary>
/// Очередь работ
/// </summary>
private Queue<Work> works = new(8);
/// <summary>
/// Список периодических работ
/// </summary>
public IEnumerable<Work> Works => works;
/// <summary>
/// Работа выполняемая в данный момент
/// </summary>
public Work? CurrentWork;
/// <summary>
/// последние 16 завершившиеся с ошибкой
/// </summary>
public CyclycArray<Work> Felled { get; } = new(16);
/// <summary>
/// последние 16 успешно завершенных
/// </summary>
public CyclycArray<Work> Done { get; } = new(16);
/// <summary>
/// Ошибка в главном цикле, никогда не должна появляться
/// </summary>
public string MainLoopLastException { get; private set; } = string.Empty;
public BackgroundWorker(IServiceProvider serviceProvider)
@ -26,28 +52,31 @@ public class BackgroundWorker : BackgroundService
this.serviceProvider = serviceProvider;
}
/// <summary>
/// Добавить в очередь
/// </summary>
/// <param name="work"></param>
public void Enqueue(Work work)
{
works.Enqueue(work);
if (ExecuteTask is null || ExecuteTask.IsCompleted)
StartAsync(CancellationToken.None).Wait();
}
protected override async Task ExecuteAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
while (!token.IsCancellationRequested && works.TryDequeue(out CurrentWork))
{
try
{
var work = WorkStore.GetNext();
if (work is null)
{
await Task.Delay(executePeriod, token);
continue;
}
CurrentWork = work;
using var scope = serviceProvider.CreateScope();
var result = await work.Start(scope.ServiceProvider, token);
var result = await CurrentWork.Start(scope.ServiceProvider, token);
if (!result)
WorkStore.Felled.Add(work);
Felled.Add(CurrentWork);
else
WorkStore.Done.Add(work);
Done.Add(CurrentWork);
CurrentWork = null;
await Task.Delay(minDelay, token);
@ -64,4 +93,21 @@ public class BackgroundWorker : BackgroundService
}
}
}
/// <summary>
/// Удаление работы по ID из одноразовой очереди
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public bool TryRemoveFromRunOnceQueue(string id)
{
var work = Works.FirstOrDefault(w => w.Id == id);
if (work is not null)
{
works = new Queue<Work>(Works.Where(w => w.Id != id));
return true;
}
return false;
}
}

View File

@ -0,0 +1,113 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Background;
/// <summary>
/// Сервис для фонового выполнения периодической работы
/// </summary>
public class PeriodicBackgroundWorker : BackgroundService
{
private static readonly TimeSpan executePeriod = TimeSpan.FromSeconds(10);
private static readonly TimeSpan minDelay = TimeSpan.FromSeconds(1);
private readonly IServiceProvider serviceProvider;
private readonly List<WorkPeriodic> works = new(8);
/// <summary>
/// Список периодических работ
/// </summary>
public IEnumerable<WorkPeriodic> Works => works;
/// <summary>
/// Работа выполняемая в данный момент
/// </summary>
public Work? CurrentWork;
/// <summary>
/// Ошибка в главном цикле, никогда не должна появляться
/// </summary>
public string MainLoopLastException { get; private set; } = string.Empty;
public PeriodicBackgroundWorker(IServiceProvider serviceProvider)
{
this.serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
var periodicWork = GetNext();
if (periodicWork is null)
{
await Task.Delay(executePeriod, token);
continue;
}
CurrentWork = periodicWork.Work;
using var scope = serviceProvider.CreateScope();
var result = await periodicWork.Work.Start(scope.ServiceProvider, token);
CurrentWork = null;
await Task.Delay(minDelay, token);
}
catch (Exception ex)
{
MainLoopLastException = $"BackgroundWorker " +
$"MainLoopLastException: \r\n" +
$"date: {DateTime.Now:O}\r\n" +
$"message: {ex.Message}\r\n" +
$"inner: {ex.InnerException?.Message}\r\n" +
$"stackTrace: {ex.StackTrace}";
Trace.TraceError(MainLoopLastException);
}
}
}
/// <summary>
/// Добавить фоновую работу выполняющуюся с заданным периодом
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="period"></param>
public void Add<T>(TimeSpan period)
where T : Work, new()
{
var work = new T();
var periodic = new WorkPeriodic(work, period);
works.Add(periodic);
}
/// <summary>
/// Добавить фоновую работу выполняющуюся с заданным периодом
/// </summary>
/// <param name="work"></param>
/// <param name="period"></param>
public void Add(Work work, TimeSpan period)
{
var periodic = new WorkPeriodic(work, period);
works.Add(periodic);
}
private WorkPeriodic? GetNext()
{
var work = works
.OrderBy(w => w.NextStart)
.FirstOrDefault();
if (work is null || work.NextStart > DateTime.Now)
return null;
return work;
}
}

View File

@ -13,6 +13,8 @@ namespace AsbCloudInfrastructure.Background;
/// </summary>
public abstract class Work : BackgroundWorkDto
{
private CancellationTokenSource? stoppingCts;
private sealed class WorkBase : Work
{
private Func<string, IServiceProvider, Action<string, double?>, CancellationToken, Task> ActionAsync { get; }
@ -69,8 +71,9 @@ public abstract class Work : BackgroundWorkDto
SetStatusStart();
try
{
var task = Action(Id, services, UpdateStatus, token);
await task.WaitAsync(Timeout, token);
stoppingCts = CancellationTokenSource.CreateLinkedTokenSource(token);
var task = Action(Id, services, UpdateStatus, stoppingCts.Token);
await task.WaitAsync(Timeout, stoppingCts.Token);
SetStatusComplete();
return true;
}
@ -97,6 +100,11 @@ public abstract class Work : BackgroundWorkDto
return false;
}
public void Stop()
{
stoppingCts?.Cancel();
}
private static string FormatExceptionMessage(Exception exception)
{
var firstException = FirstException(exception);

View File

@ -1,110 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace AsbCloudInfrastructure.Background;
/// <summary>
/// <para>
/// Очередь работ
/// </para>
/// Не периодические задачи будут возвращаться первыми, как самые приоритетные.
/// </summary>
public class WorkStore
{
private readonly List<WorkPeriodic> periodics = new(8);
/// <summary>
/// Список периодических задач
/// </summary>
public IEnumerable<WorkPeriodic> Periodics => periodics;
/// <summary>
/// Работы выполняемые один раз
/// </summary>
public Queue<Work> RunOnceQueue { get; private set; } = new(8);
/// <summary>
/// последние 16 завершившиеся с ошибкой
/// </summary>
public CyclycArray<Work> Felled { get; } = new(16);
/// <summary>
/// последние 16 успешно завершенных
/// </summary>
public CyclycArray<Work> Done { get; } = new(16);
/// <summary>
/// Добавить фоновую работу выполняющуюся с заданным периодом
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="period"></param>
public void AddPeriodic<T>(TimeSpan period)
where T : Work, new()
{
var work = new T();
var periodic = new WorkPeriodic(work, period);
periodics.Add(periodic);
}
/// <summary>
/// Добавить фоновую работу выполняющуюся с заданным периодом
/// </summary>
/// <param name="work"></param>
/// <param name="period"></param>
public void AddPeriodic(Work work, TimeSpan period)
{
var periodic = new WorkPeriodic(work, period);
periodics.Add(periodic);
}
/// <summary>
/// Удаление работы по ID из одноразовой очереди
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public bool TryRemoveFromRunOnceQueue(string id)
{
var work = RunOnceQueue.FirstOrDefault(w => w.Id == id);
if (work is not null)
{
RunOnceQueue = new Queue<Work>(RunOnceQueue.Where(w => w.Id != id));
return true;
}
return false;
}
/// <summary>
/// <para>
/// Возвращает приоритетную задачу.
/// </para>
/// <para>
/// Если приоритетные закончились, то ищет ближайшую периодическую.
/// Если до старта ближайшей периодической работы меньше 20 сек,
/// то этой задаче устанавливается время последнего запуска в now и она возвращается.
/// Если больше 20 сек, то возвращается null.
/// </para>
/// </summary>
/// <param name="maxTimeToNextWork"></param>
/// <returns></returns>
public Work? GetNext()
{
if (RunOnceQueue.Any())
return RunOnceQueue.Dequeue();
var work = GetNextPeriodic();
if (work is null || work.NextStart > DateTime.Now)
return null;
return work.Work;
}
private WorkPeriodic? GetNextPeriodic()
{
var work = Periodics
.OrderBy(w => w.NextStart)
.FirstOrDefault();
return work;
}
}

View File

@ -513,7 +513,7 @@ namespace AsbCloudInfrastructure.Services.DrillingProgram
if (state.IdState == idStateCreating)
{
var workId = MakeWorkId(idWell);
if (!backgroundWorker.WorkStore.RunOnceQueue.Any(w => w.Id == workId))
if (!backgroundWorker.Works.Any(w => w.Id == workId))
{
var well = (await wellService.GetOrDefaultAsync(idWell, token))!;
var resultFileName = $"Программа бурения {well.Cluster} {well.Caption}.pdf";
@ -542,7 +542,7 @@ namespace AsbCloudInfrastructure.Services.DrillingProgram
var work = Work.CreateByDelegate(workId, workAction);
work.OnErrorAsync = onErrorAction;
backgroundWorker.WorkStore.RunOnceQueue.Enqueue(work);
backgroundWorker.Enqueue(work);
}
}
}
@ -556,7 +556,7 @@ namespace AsbCloudInfrastructure.Services.DrillingProgram
private async Task<int> RemoveDrillingProgramAsync(int idWell, CancellationToken token)
{
var workId = MakeWorkId(idWell);
backgroundWorker.WorkStore.TryRemoveFromRunOnceQueue(workId);
backgroundWorker.TryRemoveFromRunOnceQueue(workId);
var filesIds = await context.Files
.Where(f => f.IdWell == idWell &&

View File

@ -52,12 +52,12 @@ namespace AsbCloudInfrastructure.Services.Email
}
var workId = MakeWorkId(notification.IdUser, notification.Title, notification.Message);
if (!backgroundWorker.WorkStore.RunOnceQueue.Any(w=>w.Id==workId))
if (!backgroundWorker.Works.Any(w=>w.Id==workId))
{
var workAction = MakeEmailSendWorkAction(notification);
var work = Work.CreateByDelegate(workId, workAction);
backgroundWorker.WorkStore.RunOnceQueue.Enqueue(work);
backgroundWorker.Enqueue(work);
}
return Task.CompletedTask;

View File

@ -95,7 +95,7 @@ namespace AsbCloudInfrastructure.Services
};
var work = Work.CreateByDelegate(workId, workAction);
backgroundWorkerService.WorkStore.RunOnceQueue.Enqueue(work);
backgroundWorkerService.Enqueue(work);
progressHandler.Invoke(new ReportProgressDto
{

View File

@ -52,7 +52,7 @@ namespace AsbCloudInfrastructure.Services.SAUB
await instance.InitializeCacheFromDBAsync<TEntity>(db, onProgress, token);
});
work.Timeout = TimeSpan.FromMinutes(15);
worker.WorkStore.RunOnceQueue.Enqueue(work);
worker.Enqueue(work);
}
return instance;
}

View File

@ -33,12 +33,12 @@ namespace AsbCloudInfrastructure
_ = provider.GetRequiredService<ITelemetryDataCache<TelemetryDataSaubDto>>();
_ = provider.GetRequiredService<ITelemetryDataCache<TelemetryDataSpinDto>>();
var backgroundWorker = provider.GetRequiredService<BackgroundWorker>();
backgroundWorker.WorkStore.AddPeriodic<WellInfoService.WorkWellInfoUpdate>(TimeSpan.FromMinutes(30));
backgroundWorker.WorkStore.AddPeriodic<WorkOperationDetection>(TimeSpan.FromMinutes(15));
backgroundWorker.WorkStore.AddPeriodic<WorkSubsystemOperationTimeCalc>(TimeSpan.FromMinutes(30));
backgroundWorker.WorkStore.AddPeriodic<WorkLimitingParameterCalc>(TimeSpan.FromMinutes(30));
backgroundWorker.WorkStore.AddPeriodic(MakeMemoryMonitoringWork(), TimeSpan.FromMinutes(1));
var backgroundWorker = provider.GetRequiredService<PeriodicBackgroundWorker>();
backgroundWorker.Add<WellInfoService.WorkWellInfoUpdate>(TimeSpan.FromMinutes(30));
backgroundWorker.Add<WorkOperationDetection>(TimeSpan.FromMinutes(15));
backgroundWorker.Add<WorkSubsystemOperationTimeCalc>(TimeSpan.FromMinutes(30));
backgroundWorker.Add<WorkLimitingParameterCalc>(TimeSpan.FromMinutes(30));
backgroundWorker.Add(MakeMemoryMonitoringWork(), TimeSpan.FromMinutes(1));
var notificationBackgroundWorker = provider.GetRequiredService<NotificationBackgroundWorker>();

View File

@ -53,11 +53,21 @@ namespace AsbCloudWebApi.Tests.Middlware
throw new NotImplementedException();
}
public DatesRangeDto? GetRange(int idWell)
{
throw new NotImplementedException();
}
public Task<DatesRangeDto?> GetRangeAsync(int idWell, DateTimeOffset start, DateTimeOffset end, CancellationToken token)
{
throw new NotImplementedException();
}
public Task<DatesRangeDto?> GetRangeAsync(int idWell, DateTimeOffset geDate, DateTimeOffset? leDate, CancellationToken token)
{
throw new NotImplementedException();
}
public Task<IEnumerable<TelemetryDataSaubStatDto>> GetTelemetryDataStatAsync(int idTelemetry, CancellationToken token) => throw new NotImplementedException();
public Task<Stream> GetZippedCsv(int idWell, DateTime beginDate, DateTime endDate, CancellationToken token)

View File

@ -0,0 +1,93 @@
using AsbCloudApp.Data;
using AsbCloudApp.Data.SAUB;
using AsbCloudApp.Repositories;
using AsbCloudApp.Requests;
using AsbCloudApp.Services;
using AsbCloudInfrastructure.Background;
using AsbCloudInfrastructure.Services;
using AsbCloudInfrastructure.Services.SAUB;
using Microsoft.Extensions.DependencyInjection;
using NSubstitute;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace AsbCloudWebApi.Tests.Services;
public class BackgroundWorkertest
{
private IServiceProvider provider;
private BackgroundWorker service;
public BackgroundWorkertest()
{
provider = Substitute.For<IServiceProvider, ISupportRequiredService>();
var serviceScope = Substitute.For<IServiceScope>();
var serviceScopeFactory = Substitute.For<IServiceScopeFactory>();
serviceScopeFactory.CreateScope().Returns(serviceScope);
((ISupportRequiredService)provider).GetRequiredService(typeof(IServiceScopeFactory)).Returns(serviceScopeFactory);
service = new BackgroundWorker(provider);
typeof(BackgroundWorker)
.GetField("minDelay", BindingFlags.NonPublic | BindingFlags.Instance)?
.SetValue(service, TimeSpan.FromMilliseconds(1));
}
[Fact]
public async Task Enqueue_n_works()
{
var workCount = 10;
var result = 0;
Task workAction(string id, IServiceProvider services, Action<string, double?> callback, CancellationToken token)
{
result++;
return Task.Delay(1);
}
//act
for (int i = 0; i < workCount; i++)
{
var work = Work.CreateByDelegate(i.ToString(), workAction);
service.Enqueue(work);
}
var waitI = workCount;
await Task.Delay(1_000);
//while (waitI-- > 0 && service.ExecuteTask is not null && service.ExecuteTask.IsCompleted)
// await Task.Delay(4);
//assert
Assert.Equal(workCount, result);
}
[Fact]
public async Task Enqueue_continues_after_exceptions()
{
var expectadResult = 42;
var result = 0;
Task workAction(string id, IServiceProvider services, Action<string, double?> callback, CancellationToken token)
{
result = expectadResult;
return Task.CompletedTask;
}
Task failAction(string id, IServiceProvider services, Action<string, double?> callback, CancellationToken token)
=> throw new Exception();
var goodWork = Work.CreateByDelegate("", workAction);
var badWork = Work.CreateByDelegate("", failAction);
badWork.OnErrorAsync = (id, exception, token) => throw new Exception();
//act
service.Enqueue(badWork);
service.Enqueue(goodWork);
await Task.Delay(1200);
//assert
Assert.Equal(expectadResult, result);
}
}

View File

@ -366,7 +366,7 @@ namespace AsbCloudWebApi.Tests.ServicesTests
var state = await service.GetStateAsync(idWell, publisher1.Id, CancellationToken.None);
Assert.Equal(2, state.IdState);
backgroundWorkerMock.Verify(s => s.WorkStore.RunOnceQueue.Enqueue(It.IsAny<Work>()));
backgroundWorkerMock.Verify(s => s.Enqueue(It.IsAny<Work>()));
}
[Fact]

View File

@ -14,23 +14,22 @@ namespace AsbCloudWebApi.Controllers
[ApiController]
public class BackgroundWorkController : ControllerBase
{
private readonly BackgroundWorker backgroundWorker;
private readonly BackgroundWorker worker;
public BackgroundWorkController(BackgroundWorker backgroundWorker)
public BackgroundWorkController(BackgroundWorker worker)
{
this.backgroundWorker = backgroundWorker;
this.worker = worker;
}
[HttpGet]
public IActionResult GetAll()
{
var result = new {
CurrentWork = (BackgroundWorkDto?)backgroundWorker.CurrentWork,
backgroundWorker.MainLoopLastException,
RunOnceQueue = backgroundWorker.WorkStore.RunOnceQueue.Select(work => (BackgroundWorkDto)work),
Periodics = backgroundWorker.WorkStore.Periodics.Select(work => (BackgroundWorkDto)work.Work),
Done = backgroundWorker.WorkStore.Done.Select(work => (BackgroundWorkDto)work),
Felled = backgroundWorker.WorkStore.Felled.Select(work => (BackgroundWorkDto)work),
CurrentWork = (BackgroundWorkDto?)worker.CurrentWork,
worker.MainLoopLastException,
RunOnceQueue = worker.Works.Select(work => (BackgroundWorkDto)work),
Done = worker.Done.Select(work => (BackgroundWorkDto)work),
Felled = worker.Felled.Select(work => (BackgroundWorkDto)work),
};
return Ok(result);
}
@ -38,7 +37,7 @@ namespace AsbCloudWebApi.Controllers
[HttpGet("current")]
public IActionResult GetCurrent()
{
var work = backgroundWorker.CurrentWork;
var work = worker.CurrentWork;
if (work == null)
return NoContent();
@ -48,22 +47,22 @@ namespace AsbCloudWebApi.Controllers
[HttpGet("failed")]
public IActionResult GetFelled()
{
var result = backgroundWorker.WorkStore.Felled.Select(work => (BackgroundWorkDto)work);
var result = worker.Felled.Select(work => (BackgroundWorkDto)work);
return Ok(result);
}
[HttpGet("done")]
public IActionResult GetDone()
{
var result = backgroundWorker.WorkStore.Done.Select(work => (BackgroundWorkDto)work);
var result = worker.Done.Select(work => (BackgroundWorkDto)work);
return Ok(result);
}
[HttpPost("restart"), Obsolete("temporary method")]
public async Task<IActionResult> RestartAsync(CancellationToken token)
{
await backgroundWorker.StopAsync(token);
await backgroundWorker.StartAsync(token);
await worker.StopAsync(token);
await worker.StartAsync(token);
return Ok();
}
}

View File

@ -0,0 +1,44 @@
using AsbCloudApp.Data;
using AsbCloudInfrastructure.Background;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudWebApi.Controllers
{
[Route("api/[controller]")]
[Authorize]
[ApiController]
public class PeriodicBackgroundWorkerController : ControllerBase
{
private readonly PeriodicBackgroundWorker worker;
public PeriodicBackgroundWorkerController(PeriodicBackgroundWorker worker)
{
this.worker = worker;
}
[HttpGet]
public IActionResult GetAll()
{
var result = new
{
currentWork = (BackgroundWorkDto?)worker.CurrentWork,
worker.MainLoopLastException,
works = worker.Works.Select(work => (BackgroundWorkDto)work.Work),
};
return Ok(result);
}
[HttpPost("restart"), Obsolete("temporary method")]
public async Task<IActionResult> RestartAsync(CancellationToken token)
{
await worker.StopAsync(token);
await worker.StartAsync(token);
return Ok();
}
}
}

View File

@ -26,7 +26,7 @@ namespace AsbCloudWebApi.SignalR
await base.AddToGroup(groupName);
var workId = groupName.Replace("Report_", "");
var work = backgroundWorker.WorkStore.RunOnceQueue.FirstOrDefault(work => work.Id == workId);
var work = backgroundWorker.Works.FirstOrDefault(work => work.Id == workId);
var progress = new ReportProgressDto()
{

View File

@ -29,12 +29,12 @@ public class SignalRNotificationTransportService : INotificationTransportService
{
var workId = HashCode.Combine(notifications.Select(n => n.Id)).ToString("x");
if (backgroundWorker.WorkStore.RunOnceQueue.Any(w => w.Id == workId))
if (backgroundWorker.Works.Any(w => w.Id == workId))
return Task.CompletedTask;
var workAction = MakeSignalRSendWorkAction(notifications);
var work = Work.CreateByDelegate(workId, workAction);
backgroundWorker.WorkStore.RunOnceQueue.Enqueue(work);
backgroundWorker.Enqueue(work);
return Task.CompletedTask;
}