BackgroundWorker Move periodic services

This commit is contained in:
ngfrolov 2022-12-02 17:18:16 +05:00
parent 71aff8d1ec
commit 8e9baf22d8
9 changed files with 82 additions and 151 deletions

View File

@ -6,6 +6,7 @@ using System.Threading.Tasks;
namespace AsbCloudApp.Services
{
#nullable enable
/// <summary>
/// Сервис рапортов
/// </summary>
@ -57,5 +58,7 @@ namespace AsbCloudApp.Services
/// <param name="token"></param>
/// <returns></returns>
Task<IEnumerable<ReportPropertiesDto>> GetAllReportsByWellAsync(int idWell, CancellationToken token);
#nullable disable
}
}

View File

@ -1,4 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using AsbCloudInfrastructure.Services.DetectOperations;
using AsbCloudInfrastructure.Services.Subsystems;
using AsbCloudInfrastructure.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Diagnostics;
@ -18,13 +21,10 @@ namespace AsbCloudInfrastructure.Background
private static readonly TimeSpan exceptionHandleTimeout = TimeSpan.FromSeconds(2);
private readonly IServiceProvider serviceProvider;
private readonly WorkQueue workQueue = new WorkQueue();
public string? CurrentWorkId;
public BackgroundWorker(IServiceProvider serviceProvider)
{
this.serviceProvider = serviceProvider;
#warning move StartAsync(CancellationToken.None).Wait() to THE factory
Task.Delay(1_000)
.ContinueWith(_=> StartAsync(CancellationToken.None).Wait());
}
/// <summary>
@ -69,11 +69,12 @@ namespace AsbCloudInfrastructure.Background
await Task.Delay(executePeriod, token);
continue;
}
CurrentWorkId = work.Id;
using var scope = serviceProvider.CreateScope();
try
{
Trace.TraceInformation($"Backgroud work:\"{work.Id}\" start.");
var task = work.ActionAsync(work.Id, scope.ServiceProvider, token);
await task.WaitAsync(work.Timeout, token);
@ -90,8 +91,8 @@ namespace AsbCloudInfrastructure.Background
token);
await task.WaitAsync(exceptionHandleTimeout, token);
}
}
}
CurrentWorkId = null;
await Task.Delay(minDelay, token);
}
}

View File

@ -98,9 +98,6 @@ namespace AsbCloudInfrastructure
services.AddScoped<IAsbCloudDbContext>(provider => provider.GetService<AsbCloudDbContext>());
services.AddScoped<IEmailService, EmailService>();
services.AddHostedService<OperationDetectionBackgroundService>();
services.AddHostedService<SubsystemOperationTimeBackgroundService>();
services.AddHostedService<LimitingParameterBackgroundService>();
services.AddSingleton(new WitsInfoService());
services.AddSingleton(new InstantDataRepository());
services.AddSingleton(provider=> TelemetryDataCache<TelemetryDataSaubDto>.GetInstance<TelemetryDataSaub>(configuration));

View File

@ -268,7 +268,7 @@ namespace AsbCloudInfrastructure.Services.DetectOperations
return query;
}
private DetectedOperationDto Convert(DetectedOperation operation, WellDto well, IEnumerable<OperationValueDto> operationValues, IEnumerable<ScheduleDto> schedules)
private static DetectedOperationDto Convert(DetectedOperation operation, WellDto well, IEnumerable<OperationValueDto> operationValues, IEnumerable<ScheduleDto> schedules)
{
var dto = operation.Adapt<DetectedOperationDto>();
dto.IdWell = well.Id;

View File

@ -1,7 +1,5 @@
using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
@ -9,14 +7,16 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using AsbCloudInfrastructure.Services.DetectOperations.Detectors;
using AsbCloudInfrastructure.Background;
using Microsoft.Extensions.DependencyInjection;
namespace AsbCloudInfrastructure.Services.DetectOperations
{
#nullable enable
public class OperationDetectionBackgroundService : BackgroundService
public static class OperationDetectionWorkFactory
{
private readonly string connectionString;
private readonly TimeSpan period = TimeSpan.FromHours(1);
private const string workId = "Operation detection";
private static readonly TimeSpan workPeriod = TimeSpan.FromMinutes(30);
private static readonly DetectorAbstract[] detectors = new DetectorAbstract[]
{
@ -31,49 +31,18 @@ namespace AsbCloudInfrastructure.Services.DetectOperations
new DetectorTemplatingWhileDrilling(),
};
public OperationDetectionBackgroundService(IConfiguration configuration)
{
connectionString = configuration.GetConnectionString("DefaultConnection");
public static WorkPeriodic MakeWork()
{
var workPeriodic = new WorkPeriodic(workId, WorkAction, workPeriod);
workPeriodic.Timeout = TimeSpan.FromMinutes(30);
return workPeriodic;
}
protected override async Task ExecuteAsync(CancellationToken token = default)
// TODO: Разделить этот акшн на более мелкие части И использовать telemetryServiceData<..> вместо прямого обращения к БД.
private static async Task WorkAction(string _, IServiceProvider serviceProvider, CancellationToken token)
{
var timeToStartAnalysis = DateTime.Now;
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql(connectionString)
.Options;
using var db = serviceProvider.GetRequiredService<IAsbCloudDbContext>();
while (!token.IsCancellationRequested)
{
if (DateTime.Now > timeToStartAnalysis)
{
timeToStartAnalysis = DateTime.Now + period;
try
{
using var context = new AsbCloudDbContext(options);
var added = await DetectedAllTelemetriesAsync(context, token);
Trace.TraceInformation($"Total detection complete. Added {added} operations.");
}
catch (Exception ex)
{
Trace.TraceError(ex.Message);
}
GC.Collect();
}
var ms = (int)(timeToStartAnalysis - DateTime.Now).TotalMilliseconds;
ms = ms > 100 ? ms : 100;
await Task.Delay(ms, token).ConfigureAwait(false);
}
}
public override async Task StopAsync(CancellationToken token)
{
await base.StopAsync(token).ConfigureAwait(false);
}
private static async Task<int> DetectedAllTelemetriesAsync(IAsbCloudDbContext db, CancellationToken token)
{
var lastDetectedDates = await db.DetectedOperations
.GroupBy(o => o.IdTelemetry)
.Select(g => new
@ -88,7 +57,7 @@ namespace AsbCloudInfrastructure.Services.DetectOperations
.Select(t => t.Id)
.ToListAsync(token);
var JounedlastDetectedDates = telemetryIds
var joinedlastDetectedDates = telemetryIds
.GroupJoin(lastDetectedDates,
t => t,
o => o.IdTelemetry,
@ -97,8 +66,9 @@ namespace AsbCloudInfrastructure.Services.DetectOperations
IdTelemetry = outer,
inner.SingleOrDefault()?.LastDate,
});
var affected = 0;
foreach (var item in JounedlastDetectedDates)
foreach (var item in joinedlastDetectedDates)
{
var stopwatch = Stopwatch.StartNew();
var newOperations = await DetectOperationsAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, db, token);
@ -109,7 +79,6 @@ namespace AsbCloudInfrastructure.Services.DetectOperations
affected += await db.SaveChangesAsync(token);
}
}
return affected;
}
private static async Task<IEnumerable<DetectedOperation>> DetectOperationsAsync(int idTelemetry, DateTimeOffset begin, IAsbCloudDbContext db, CancellationToken token)

View File

@ -1,61 +1,37 @@
using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using System;
using System.Data.Common;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using AsbCloudInfrastructure.Background;
using Microsoft.Extensions.DependencyInjection;
namespace AsbCloudInfrastructure.Services
{
#nullable enable
internal class LimitingParameterBackgroundService : BackgroundService
internal static class LimitingParameterCalcWorkFactory
{
private readonly string connectionString;
private readonly TimeSpan period = TimeSpan.FromHours(1);
private const string workId = "Limiting parameter calc";
private static readonly TimeSpan workPeriod = TimeSpan.FromMinutes(30);
public LimitingParameterBackgroundService(IConfiguration configuration)
public static WorkPeriodic MakeWork()
{
connectionString = configuration.GetConnectionString("DefaultConnection");
}
protected override async Task ExecuteAsync(CancellationToken token)
{
var timeToStart = DateTime.Now;
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql(connectionString)
.Options;
while (!token.IsCancellationRequested)
var workPeriodic = new WorkPeriodic(workId, WorkAction, workPeriod)
{
if (DateTime.Now > timeToStart)
{
timeToStart = DateTime.Now + period;
try
{
using var context = new AsbCloudDbContext(options);
var added = await LimitingParameterAsync(context, token);
Trace.TraceInformation($"Total limiting parameter complete. Added {added} limiting parameters.");
}
catch (Exception ex)
{
Trace.TraceError(ex.Message);
}
GC.Collect();
}
var ms = (int)(timeToStart - DateTime.Now).TotalMilliseconds;
ms = ms > 100 ? ms : 100;
await Task.Delay(ms, token).ConfigureAwait(false);
}
Timeout = TimeSpan.FromMinutes(30)
};
return workPeriodic;
}
private static async Task<int> LimitingParameterAsync(IAsbCloudDbContext context, CancellationToken token)
// TODO: Разделить этот акшн на более мелкие части И использовать telemetryServiceData<..> вместо прямого обращения к БД.
private static async Task WorkAction(string _, IServiceProvider serviceProvider, CancellationToken token)
{
var lastDetectedDates = await context.LimitingParameter
using var db = serviceProvider.GetRequiredService<IAsbCloudDbContext>();
var lastDetectedDates = await db.LimitingParameter
.GroupBy(o => o.IdTelemetry)
.Select(g => new
{
@ -64,7 +40,7 @@ namespace AsbCloudInfrastructure.Services
})
.ToListAsync(token);
var telemetryIds = await context.Telemetries
var telemetryIds = await db.Telemetries
.Where(t => t.Info != null && t.TimeZone != null)
.Select(t => t.Id)
.ToListAsync(token);
@ -79,17 +55,15 @@ namespace AsbCloudInfrastructure.Services
inner.SingleOrDefault()?.LastDate,
});
var affected = 0;
foreach (var item in telemetryLastDetectedDates)
{
var newLimitingParameters = await GetLimitingParameterAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, context, token);
var newLimitingParameters = await GetLimitingParameterAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, db, token);
if (newLimitingParameters?.Any() == true)
{
context.LimitingParameter.AddRange(newLimitingParameters);
affected += await context.SaveChangesAsync(token);
db.LimitingParameter.AddRange(newLimitingParameters);
await db.SaveChangesAsync(token);
}
}
return affected;
}
private static async Task<IEnumerable<LimitingParameter>> GetLimitingParameterAsync(int idTelemetry, DateTimeOffset begin, IAsbCloudDbContext db, CancellationToken token)

View File

@ -41,6 +41,7 @@ namespace AsbCloudInfrastructure.Services.SAUB
public TelemetryTracker(IConfiguration configuration, IMemoryCache memoryCache)
{
// TODO: make this background work
var contextOptions = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql(configuration.GetConnectionString("DefaultConnection"))
.Options;

View File

@ -1,9 +1,9 @@
using AsbCloudDb.Model;
using AsbCloudDb.Model.Subsystems;
using AsbCloudInfrastructure.Background;
using AsbCloudInfrastructure.Services.Subsystems.Utils;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Data;
@ -16,56 +16,30 @@ using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services.Subsystems
{
#nullable enable
internal class SubsystemOperationTimeBackgroundService : BackgroundService
internal static class SubsystemOperationTimeCalcWorkFactory
{
private readonly string connectionString;
private readonly TimeSpan period = TimeSpan.FromHours(1);
private const string workId = "Subsystem operation time calc";
private static readonly TimeSpan workPeriod = TimeSpan.FromMinutes(30);
private const int idSubsytemTorqueMaster = 65537;
private const int idSubsytemSpinMaster = 65536;
private const int idSubsytemAkb = 1;
private const int idSubsytemMse = 2;
public SubsystemOperationTimeBackgroundService(IConfiguration configuration)
public static WorkPeriodic MakeWork()
{
connectionString = configuration.GetConnectionString("DefaultConnection");
}
protected override async Task ExecuteAsync(CancellationToken token)
{
var timeToStart = DateTime.Now;
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql(connectionString)
.Options;
while (!token.IsCancellationRequested)
var workPeriodic = new WorkPeriodic(workId, WorkAction, workPeriod)
{
if (DateTime.Now > timeToStart)
{
timeToStart = DateTime.Now + period;
try
{
using var context = new AsbCloudDbContext(options);
var added = await OperationTimeAllTelemetriesAsync(context, token);
Trace.TraceInformation($"Total subsystem operation time complete. Added {added} operations time.");
}
catch (Exception ex)
{
Trace.TraceError(ex.Message);
}
GC.Collect();
}
var ms = (int)(timeToStart - DateTime.Now).TotalMilliseconds;
ms = ms > 100 ? ms : 100;
await Task.Delay(ms, token).ConfigureAwait(false);
}
Timeout = TimeSpan.FromMinutes(30)
};
return workPeriodic;
}
public override async Task StopAsync(CancellationToken token)
// TODO: Разделить этот акшн на более мелкие части И использовать telemetryServiceData<..> вместо прямого обращения к БД.
private static async Task WorkAction(string _, IServiceProvider serviceProvider, CancellationToken token)
{
await base.StopAsync(token).ConfigureAwait(false);
}
using var db = serviceProvider.GetRequiredService<IAsbCloudDbContext>();
private static async Task<int> OperationTimeAllTelemetriesAsync(IAsbCloudDbContext db, CancellationToken token)
{
var lastDetectedDates = await db.SubsystemOperationTimes
.GroupBy(o => o.IdTelemetry)
.Select(g => new
@ -90,23 +64,21 @@ namespace AsbCloudInfrastructure.Services.Subsystems
inner.SingleOrDefault()?.LastDate,
});
var affected = 0;
foreach (var item in telemetryLastDetectedDates)
{
var newOperationsSaub = await OperationTimeSaubAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, db, token);
if (newOperationsSaub?.Any() == true)
{
db.SubsystemOperationTimes.AddRange(newOperationsSaub);
affected += await db.SaveChangesAsync(token);
await db.SaveChangesAsync(token);
}
var newOperationsSpin = await OperationTimeSpinAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, db, token);
if (newOperationsSpin?.Any() == true)
{
db.SubsystemOperationTimes.AddRange(newOperationsSpin);
affected += await db.SaveChangesAsync(token);
await db.SaveChangesAsync(token);
}
}
return affected;
}
private static async Task<DbDataReader> ExecuteReaderAsync(IAsbCloudDbContext db, string query, CancellationToken token)

View File

@ -1,9 +1,14 @@
using AsbCloudApp.Services;
using AsbCloudDb.Model;
using AsbCloudInfrastructure.Services.DetectOperations;
using AsbCloudInfrastructure.Services.Subsystems;
using AsbCloudInfrastructure.Services;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading.Tasks;
using System.Threading;
namespace AsbCloudInfrastructure
{
@ -12,13 +17,22 @@ namespace AsbCloudInfrastructure
public static void BeforeRunHandler(IHost host)
{
using var scope = host.Services.CreateScope();
var context = scope.ServiceProvider.GetService<IAsbCloudDbContext>();
context.Database.SetCommandTimeout(TimeSpan.FromSeconds(2 * 60));
var provider = scope.ServiceProvider;
var context = provider.GetService<IAsbCloudDbContext>();
context.Database.SetCommandTimeout(TimeSpan.FromSeconds(2 * 60));
context.Database.Migrate();
var wellService = scope.ServiceProvider.GetService<IWellService>();
wellService.EnshureTimezonesIsSetAsync(System.Threading.CancellationToken.None).Wait();
var wellService = provider.GetRequiredService<IWellService>();
wellService.EnshureTimezonesIsSetAsync(CancellationToken.None).Wait();// TODO: make this background work
var backgroundWorker = provider.GetRequiredService<Background.BackgroundWorker>();
backgroundWorker.Push(OperationDetectionWorkFactory.MakeWork());
backgroundWorker.Push(SubsystemOperationTimeCalcWorkFactory.MakeWork());
backgroundWorker.Push(LimitingParameterCalcWorkFactory.MakeWork());
Task.Delay(1_000)
.ContinueWith(async (_) => await backgroundWorker.StartAsync(CancellationToken.None));
}
}
}