From 707f2a638a21ed2f9d8900398228212d0d23caf9 Mon Sep 17 00:00:00 2001 From: ngfrolov Date: Tue, 11 Oct 2022 17:04:26 +0500 Subject: [PATCH] Add ReduceSamplingController. --- AsbCloudApp/Data/JobDto.cs | 56 ++++++ .../Services/IReduceSamplingService.cs | 43 +++++ AsbCloudInfrastructure/DependencyInjection.cs | 1 + .../Services/ReduceSamplingService.cs | 174 ++++++++++-------- .../Controllers/ReduceSamplingController.cs | 78 ++++++++ ConsoleApp1/Program.cs | 10 +- 6 files changed, 282 insertions(+), 80 deletions(-) create mode 100644 AsbCloudApp/Data/JobDto.cs create mode 100644 AsbCloudApp/Services/IReduceSamplingService.cs create mode 100644 AsbCloudWebApi/Controllers/ReduceSamplingController.cs diff --git a/AsbCloudApp/Data/JobDto.cs b/AsbCloudApp/Data/JobDto.cs new file mode 100644 index 00000000..2e9b336a --- /dev/null +++ b/AsbCloudApp/Data/JobDto.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections; + +namespace AsbCloudApp.Data +{ +#nullable enable + /// + /// Состояние фоновой задачи + /// + public enum JobState + { + /// + /// Ожидает в очереди на выполнение + /// + Waiting, + /// + /// выполняется + /// + Working, + /// + /// успешно выполнена + /// + Done, + /// + /// завершена с ошибкой + /// + Fail + }; + + /// + /// работа фоновой задачи + /// + public class JobDto + { + /// + /// идентификатор + /// + public int Id { get; set; } + + /// + /// Состояние + /// + public JobState State { get; set; } + + /// + /// результат выполнения + /// + public Hashtable? Results { get; set; } + + /// + /// Исключение, если возникла ошибка + /// + public string? Error { get; set; } + } +#nullable disable +} diff --git a/AsbCloudApp/Services/IReduceSamplingService.cs b/AsbCloudApp/Services/IReduceSamplingService.cs new file mode 100644 index 00000000..a0220c58 --- /dev/null +++ b/AsbCloudApp/Services/IReduceSamplingService.cs @@ -0,0 +1,43 @@ +using AsbCloudApp.Data; +using System.Collections.Generic; + +namespace AsbCloudApp.Services +{ +#nullable enable + /// + /// Делегат обновления состояния задачи + /// + /// + public delegate void OnJobProgressDelagate(JobDto job); + + /// + /// Сервис прореживания архива БД. + /// Удаляет часть телеметрии. + /// Понижает частоту записей в БД с 1 запись за 1 сек до 1 запись за N сек. + /// + public interface IReduceSamplingService + { + /// + /// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее. + /// + /// Enumerable of JobDto or empty + IEnumerable GetJobs(); + + /// + /// Получить состояние определенной задачи + /// + /// + /// + JobDto? GetOrDefaultState(int idTelemetry); + + /// + /// Создать задачу прореживанию архива и добавить её в очередь на выполнение + /// + /// телеметрия для прореживания + /// колбек процесса выполнения + /// созданная задача или задача из очереди + /// задача добавлена == true + bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate onProgress, out JobDto jobDto); + } +#nullable disable +} \ No newline at end of file diff --git a/AsbCloudInfrastructure/DependencyInjection.cs b/AsbCloudInfrastructure/DependencyInjection.cs index 1c69736d..421e550c 100644 --- a/AsbCloudInfrastructure/DependencyInjection.cs +++ b/AsbCloudInfrastructure/DependencyInjection.cs @@ -102,6 +102,7 @@ namespace AsbCloudInfrastructure services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(provider => ReduceSamplingService.GetInstance(configuration)); services.AddTransient(); services.AddTransient(); diff --git a/AsbCloudInfrastructure/Services/ReduceSamplingService.cs b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs index 46489fec..d9c11e0d 100644 --- a/AsbCloudInfrastructure/Services/ReduceSamplingService.cs +++ b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs @@ -1,9 +1,12 @@ -using AsbCloudDb; +using AsbCloudApp.Data; +using AsbCloudApp.Services; +using AsbCloudDb; using AsbCloudDb.Model; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -11,27 +14,22 @@ using System.Threading.Tasks; namespace AsbCloudInfrastructure.Services { #nullable enable - public class JobDto + class JobHandle { - public enum JobState { Waiting, Working, Done, Fail }; - public int Id { get; internal set; } - public JobState State { get; internal set; } - public object? Result { get; internal set; } - public Exception? Error { get; internal set; } + public int Id => Job.Id; + public JobDto Job { get; set; } = null!; + public DateTime DateCreate { get; set; } = DateTime.Now; + public OnJobProgressDelagate? OnProgress { get; set; } } - class JobWithProgress: JobDto + public class ReduceSamplingService : IReduceSamplingService { - public ReduceSamplingService.OnProgressDelagate OnProgress { get; set; } = null!; - } - - public class ReduceSamplingService - { - public delegate void OnProgressDelagate(JobDto job); + private const string jobResultKeyDeleted = "deleted"; + private const string jobResultKeyTotal = "total"; private static ReduceSamplingService? instance; private readonly string connectionString; private const int ratio = 5; - private readonly List jobs = new(5); + private readonly List jobHandlers = new(5); private bool isHandling; private CancellationTokenSource? cancellationTokenSource; private Task? task; @@ -46,62 +44,73 @@ namespace AsbCloudInfrastructure.Services Stop(); } + /// + /// Get singleton Instance + /// + /// + /// public static ReduceSamplingService GetInstance(IConfiguration configuration) { - if (instance is null) - instance = new(configuration); + instance ??= new(configuration); return instance; } - public int TryEnqueueRediceSamplingJob(int idTelemetry, OnProgressDelagate onProgress) + /// + public bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate? onProgress, out JobDto jobDto) { - var result = 0; - lock (jobs) + lock (jobHandlers) { - if (jobs.Any(j => j.Id == idTelemetry)) - result = - 1; + var oldJob = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry); + if (oldJob is not null) + { + jobDto = oldJob.Job; + return false; + } - var job = new JobWithProgress + jobDto = new JobDto { Id = idTelemetry, - State = JobDto.JobState.Waiting, + State = JobState.Waiting, + Results = new(), + }; + var jobHandler = new JobHandle + { + Job = jobDto, OnProgress = onProgress, }; - jobs.Add(job); - result = jobs.Count; + jobHandlers.Add(jobHandler); } EnsureHandleQueueStarted(); - return result; + return true; } - public JobDto? GetState(int idTelemetry) + /// + public JobDto? GetOrDefaultState(int idTelemetry) { - JobWithProgress? job; - lock (jobs) + JobHandle? jobHandler; + lock (jobHandlers) { - job = jobs.FirstOrDefault(j=>j.Id == idTelemetry); + jobHandler = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry); } - return job; + return jobHandler?.Job; } - public IEnumerable GetJobs() - { - return jobs; - } + /// + public IEnumerable GetJobs() => jobHandlers.Select(j=>j.Job); - private bool TryTakeWaitingJob(out JobWithProgress? job) + private bool TryTakeWaitingJob(out JobHandle? job) { - lock (jobs) + lock (jobHandlers) { - job = jobs.FirstOrDefault(j => j.State == JobDto.JobState.Waiting); + job = jobHandlers.FirstOrDefault(j => j.Job.State == JobState.Waiting); } return job is not null; } private void EnsureHandleQueueStarted() { - if(isHandling) + if (isHandling) return; isHandling = true; cancellationTokenSource = new CancellationTokenSource(); @@ -112,33 +121,43 @@ namespace AsbCloudInfrastructure.Services private async Task HandleJobs(CancellationToken token) { - while (TryTakeWaitingJob(out JobWithProgress? job)) + while (TryTakeWaitingJob(out JobHandle? jobHandler)) { - job!.State = JobDto.JobState.Working; + jobHandler!.Job.State = JobState.Working; try { - await RediceSamplingSaubAsync(job, token); - await RediceSamplingSpinAsync(job, token); - job.State = JobDto.JobState.Done; + await RediceSamplingSaubAsync(jobHandler, token); + await RediceSamplingSpinAsync(jobHandler, token); + jobHandler.Job.State = JobState.Done; } catch (Exception exception) { - job.State = JobDto.JobState.Fail; - job.Error = exception; - job.OnProgress.Invoke(job); + jobHandler.Job.State = JobState.Fail; + jobHandler.Job.Results = null; + jobHandler.Job.Error = exception.Message; + jobHandler.OnProgress?.Invoke(jobHandler.Job); + } + + if (!jobHandlers.Any(j => j.Job.State == JobState.Waiting)) + { + var sw = Stopwatch.StartNew(); + await VacuumAsync(token); + sw.Stop(); + if (sw.ElapsedMilliseconds < 10_000) + { + var delayMs = 10_000 - (int)sw.ElapsedMilliseconds; + await Task.Delay(delayMs, token); + } + CleanJobs(); } - await Task.Delay(100, token); } - await VacuumAsync(token); - await Task.Delay(1_000, token); - CleanJobs(); } private void CleanJobs() { - lock (jobs) + lock (jobHandlers) { - jobs.RemoveAll(j => j.State == JobDto.JobState.Done || j.State == JobDto.JobState.Fail); + jobHandlers.RemoveAll(j => j.Job.State == JobState.Done || j.Job.State == JobState.Fail); } } @@ -154,17 +173,11 @@ namespace AsbCloudInfrastructure.Services cancellationTokenSource?.Cancel(); task?.Wait(1_000); task = null; + cancellationTokenSource?.Dispose(); cancellationTokenSource = null; } - /// - /// Прореживание данных телеметрии САУБ. - /// Каждая ratio запись будет сохранена, остальные удаляются. Остаются (Row_number % ratio) = 0. - /// - /// Id телеметрии - /// желаемое отношение оставшихся записей к исходным - /// - private Task RediceSamplingSaubAsync(JobWithProgress job, CancellationToken token) + private Task RediceSamplingSaubAsync(JobHandle job, CancellationToken token) { const int ramLimit = 10 * 1024 * 1024; const int dataItemSize = 345; // by profiler @@ -204,13 +217,13 @@ namespace AsbCloudInfrastructure.Services return RediceSamplingAsync( job, - chankSize, - sqlSelectTemplate, + chankSize, + sqlSelectTemplate, sqlDeleteTemplate, token); } - private Task RediceSamplingSpinAsync(JobWithProgress job, CancellationToken token) + private Task RediceSamplingSpinAsync(JobHandle job, CancellationToken token) { const int ramLimit = 10 * 1024 * 1024; const int dataItemSize = 345; // by profiler @@ -251,26 +264,30 @@ namespace AsbCloudInfrastructure.Services } private async Task RediceSamplingAsync( - JobWithProgress job, - int chankSize, + JobHandle jobHandle, + int chankSize, string sqlSelectTemplate, string sqlDeleteTemplate, CancellationToken token) - where TEntity: class, ITelemetryData + where TEntity : class, AsbCloudDb.Model.ITelemetryData { using var db = MakeContext(); var dbset = db.Set(); var deleted = 0; - var oldCount = await dbset.Where(t => t.IdTelemetry == job.Id).CountAsync(token); + var totalCount = await dbset.Where(t => t.IdTelemetry == jobHandle.Id).CountAsync(token); - if (job.Result is Tuple tupleResult) - { - deleted += tupleResult.Item1; - oldCount += tupleResult.Item2; - } - job.Result = (deleted, oldCount); - job.OnProgress?.Invoke(job); + var result = jobHandle.Job.Results!; + if (result[jobResultKeyDeleted] is int previousDeleted) + deleted += previousDeleted; + + if (result[jobResultKeyTotal] is int previousCount) + totalCount += previousCount; + + result[jobResultKeyDeleted] = deleted; + result[jobResultKeyTotal] = totalCount; + + jobHandle.OnProgress?.Invoke(jobHandle.Job); var startDate = DateTimeOffset.MinValue; do @@ -297,8 +314,9 @@ namespace AsbCloudInfrastructure.Services startDate = lastDate; deleted += currentDeleted; - job.Result = (deleted, oldCount); - job.OnProgress?.Invoke(job); + result[jobResultKeyDeleted] = deleted; + result[jobResultKeyTotal] = totalCount; + jobHandle.OnProgress?.Invoke(jobHandle.Job); } while (true); return; diff --git a/AsbCloudWebApi/Controllers/ReduceSamplingController.cs b/AsbCloudWebApi/Controllers/ReduceSamplingController.cs new file mode 100644 index 00000000..ddb99b2d --- /dev/null +++ b/AsbCloudWebApi/Controllers/ReduceSamplingController.cs @@ -0,0 +1,78 @@ +using AsbCloudApp.Services; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using System.Collections.Generic; +using System.Threading.Tasks; +using AsbCloudApp.Data; +using System.Linq; +using AsbCloudWebApi.SignalR; +using Microsoft.AspNetCore.SignalR; + +namespace AsbCloudWebApi.Controllers +{ + /// + /// Редактор кустов для админки + /// + [Route("api/admin/[controller]")] + [ApiController] + [Authorize] + public class ReduceSamplingController: ControllerBase + { + private readonly IReduceSamplingService service; + private readonly IHubContext telemetryHubContext; + private const string sirnalRGroupName = "ReduceSampling"; + private const string sirnalRMethodOnProgress = "OnProgress"; + + public ReduceSamplingController( + IReduceSamplingService service, + IHubContext telemetryHubContext ) + { + this.service = service; + this.telemetryHubContext = telemetryHubContext; + } + + /// + /// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее. + /// + /// + /// + [HttpGet] + public virtual ActionResult> GetAll(int idTelemetry) + { + var result = service.GetJobs(); + if (result.Any()) + return Ok(result); + else + return NoContent(); + } + + /// + /// Получить состояние определенной задачи + /// + /// + /// + [HttpGet("{idTelemetry}")] + public virtual ActionResult GetOrDefault(int idTelemetry) + { + var result = service.GetOrDefaultState(idTelemetry); + return Ok(result); + } + + /// + /// Создать задачу прореживанию архива и добавить её в очередь на выполнение. + /// Если задача есть в очереди, она же и возвращается, но подписка не происходит. + /// + [HttpPost] + [Permission] + public virtual ActionResult Enqueue(int idTelemetry) + { + void onProgress(JobDto job) => + Task.Run(async () => + await telemetryHubContext.Clients.Group(sirnalRGroupName) + .SendAsync(sirnalRMethodOnProgress, job)); + + service.TryEnqueueRediceSamplingJob(idTelemetry, onProgress, out JobDto job); + return Ok(job); + } + } +} diff --git a/ConsoleApp1/Program.cs b/ConsoleApp1/Program.cs index 11c0ca13..b3d1d6fb 100644 --- a/ConsoleApp1/Program.cs +++ b/ConsoleApp1/Program.cs @@ -8,6 +8,7 @@ using DocumentFormat.OpenXml.Wordprocessing; using Microsoft.EntityFrameworkCore; using Org.BouncyCastle.Utilities.Collections; using System; +using System.Collections; using System.Collections.Generic; using System.IO; using System.Linq; @@ -24,8 +25,13 @@ namespace ConsoleApp1 // use ServiceFactory to make services static void Main(/*string[] args*/) { - Task.Run(() => { throw new Exception(); }) - .ContinueWith(t => { Console.WriteLine("Dooom"); }); + var h = new Hashtable(); + h.Add("name", 1); + h.Add("name2", "66"); + var v = h["v"]; + + var s = System.Text.Json.JsonSerializer.Serialize(h); + Console.WriteLine($"total time: ms"); Console.ReadLine(); }