From ea1eb20f821782cb583a17b864f56e999ad550d7 Mon Sep 17 00:00:00 2001 From: ngfrolov Date: Thu, 6 Oct 2022 13:49:20 +0500 Subject: [PATCH 1/4] 1-st try --- .../Services/SAUB/TelemetryDataBaseService.cs | 13 +- .../Services/SAUB/TelemetryDataSaubService.cs | 44 ++++++ .../Services/SAUB/TelemetryDataSpinService.cs | 7 + AsbCloudInfrastructure/Startup.cs | 1 + .../AsbCloudWebApi.Tests.csproj | 2 + ConsoleApp1/ConsoleApp1.csproj | 1 + ConsoleApp1/Program.cs | 140 +++++++++++------- 7 files changed, 153 insertions(+), 55 deletions(-) diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs index ebfe6822..5231e573 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs @@ -16,7 +16,7 @@ namespace AsbCloudInfrastructure.Services.SAUB where TDto : AsbCloudApp.Data.ITelemetryData where TModel : class, AsbCloudDb.Model.ITelemetryData { - private readonly IAsbCloudDbContext db; + protected readonly IAsbCloudDbContext db; private readonly ITelemetryService telemetryService; protected readonly CacheTable cacheTelemetry; protected readonly CacheTable cacheTelemetryUsers; @@ -170,5 +170,16 @@ namespace AsbCloudInfrastructure.Services.SAUB return offset; } + + /// + /// Прореживание данных телеметрии. + /// Каждая ratio запись будет сохранена, остальные удаляются. Остаются (Row_number % ratio) = 0. + /// Из-за возможности запуска повторного прореживания можно ограничить величину разрыва по времени параметром maxDateGapSec. + /// + /// Id телеметрии + /// желаемое отношение оставшихся записей к исходным + /// ограничение разрыва по времени + /// + public abstract Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token); } } diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs index 2941b1ca..a6330757 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs @@ -2,7 +2,12 @@ using AsbCloudApp.Services; using AsbCloudDb.Model; using AsbCloudInfrastructure.Services.Cache; +using DocumentFormat.OpenXml.Drawing.Charts; using Mapster; +using Microsoft.EntityFrameworkCore; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace AsbCloudInfrastructure.Services.SAUB { @@ -34,5 +39,44 @@ namespace AsbCloudInfrastructure.Services.SAUB dto.DateTime = src.DateTime.ToRemoteDateTime(timezoneOffset); return dto; } + + public override async Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token) + { + const int ramLimit = 50 * 1024 * 1024; + const int dataItemSize = 345; // by profiler + const int itemsCountLimit = ramLimit / dataItemSize; // ~ 150_000, iterations count ~ 46 + const int maxWellDepthGap = 1; + + var dbset = db.Set(); + + var sql = + "select" + + " *" + + "from" + + " (select" + + " *," + + " rank() over win1 as row_num," + + " lag(\"date\", 1) over win1 as lag_date," + + " lag(\"mode\", 1) over win1 as lag_mode," + + " lag(mse_state, 1) over win1 as lag_mse_state," + + " lag(well_depth, 1) over win1 as lag_well_depth," + + " lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator" + + " from t_telemetry_data_saub" + + $" where id_telemetry = {idTelemetry}" + + " window win1 as (order by \"date\")" + + " ) as ttt" + + "where" + + $" (row_num % {ratio}) = 0" + + " or \"mode\" != lag_mode" + + $" or(\"date\" - lag_date) > interval '{maxDateGapSec} second'" + + $" or well_depth - lag_well_depth > {maxWellDepthGap}" + + " or mse_state != lag_mse_state" + + " or id_feed_regulator != lag_id_feed_regulator;"; + + var query = dbset.FromSqlRaw(sql); + + await Task.Delay(0); + return (0, 0); + } } } diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs index c33b7cab..b555286b 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs @@ -3,6 +3,8 @@ using AsbCloudApp.Services; using AsbCloudDb.Model; using AsbCloudInfrastructure.Services.Cache; using Mapster; +using System.Threading; +using System.Threading.Tasks; namespace AsbCloudInfrastructure.Services.SAUB { @@ -28,5 +30,10 @@ namespace AsbCloudInfrastructure.Services.SAUB dto.DateTime = src.DateTime.ToRemoteDateTime(timezoneOffset); return dto; } + + public override Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token) + { + throw new System.NotImplementedException(); + } } } diff --git a/AsbCloudInfrastructure/Startup.cs b/AsbCloudInfrastructure/Startup.cs index 884a3383..5ccb0df7 100644 --- a/AsbCloudInfrastructure/Startup.cs +++ b/AsbCloudInfrastructure/Startup.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using System; +using System.Linq; namespace AsbCloudInfrastructure { diff --git a/AsbCloudWebApi.Tests/AsbCloudWebApi.Tests.csproj b/AsbCloudWebApi.Tests/AsbCloudWebApi.Tests.csproj index f8b181c5..0f182ff4 100644 --- a/AsbCloudWebApi.Tests/AsbCloudWebApi.Tests.csproj +++ b/AsbCloudWebApi.Tests/AsbCloudWebApi.Tests.csproj @@ -4,6 +4,8 @@ net6.0 false + + enable diff --git a/ConsoleApp1/ConsoleApp1.csproj b/ConsoleApp1/ConsoleApp1.csproj index da0e1e97..c0c5ae85 100644 --- a/ConsoleApp1/ConsoleApp1.csproj +++ b/ConsoleApp1/ConsoleApp1.csproj @@ -4,6 +4,7 @@ Exe net6.0 ConsoleApp1.Program + enable diff --git a/ConsoleApp1/Program.cs b/ConsoleApp1/Program.cs index 5b345220..c1fe0dc8 100644 --- a/ConsoleApp1/Program.cs +++ b/ConsoleApp1/Program.cs @@ -1,72 +1,104 @@ using AsbCloudApp.Data; using AsbCloudApp.Data.DailyReport; +using AsbCloudDb.Model; using AsbCloudInfrastructure.Services.DailyReport; using ClosedXML.Excel; +using DocumentFormat.OpenXml.Wordprocessing; +using Microsoft.EntityFrameworkCore; using System; using System.IO; - +using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace ConsoleApp1 { class Program { + private static AsbCloudDbContext db = ServiceFactory.Context; + + public static async Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token) + { + const int ramLimit = 30 * 1024 * 1024; + const int dataItemSize = 345; // by profiler + const int itemsCountLimit = ramLimit / dataItemSize; // ~ 90_000 + const int maxWellDepthGap = 1; + + ratio = ratio > 0 ? ratio : 5; + maxDateGapSec = maxDateGapSec > 0 ? maxDateGapSec : 9; + + var dbset = db.Set(); + + var sqlSelect = + "select " + + " * " + + "from " + + " (select " + + " *, " + + " rank() over win1 as row_num, " + + " lag(\"date\", 1) over win1 as lag_date, " + + " lag(\"mode\", 1) over win1 as lag_mode, " + + " lag(mse_state, 1) over win1 as lag_mse_state, " + + " lag(well_depth, 1) over win1 as lag_well_depth, " + + " lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator " + + " from t_telemetry_data_saub " + + $" where id_telemetry = {idTelemetry} and \"date\" > {{0}}" + + " window win1 as (order by \"date\") " + + " ) as ttt " + + "where " + + $" (row_num % {ratio}) = 0 " + + " or \"mode\" != lag_mode " + + $" or(\"date\" - lag_date) > interval '{maxDateGapSec} second' " + + $" or well_depth - lag_well_depth > {maxWellDepthGap} " + + " or mse_state != lag_mse_state " + + " or id_feed_regulator != lag_id_feed_regulator " + + "order by \"date\" "; + + var sqlDelete = "delete " + + "from t_telemetry_data_saub " + + $"where id_telemetry = {idTelemetry} and \"date\" between {{0}} and {{1}};"; + + var startDate = DateTimeOffset.MinValue; + var iteration = 0; + var deleted = 0; + var saved = 0; + do + { + var query = dbset + .FromSqlRaw(sqlSelect, startDate) + .AsNoTracking(); + + var data = await query + .Take(itemsCountLimit) + .ToArrayAsync(token); + + var currentDataCount = data.Length; + if (currentDataCount == 0) + break; + + var lastDate = data.Last().DateTime; + + var currentDeleted = await db.Database.ExecuteSqlRawAsync(sqlDelete, new object[]{ startDate, lastDate}.AsEnumerable(), token); + if (currentDeleted == 0) + break; + + dbset.AddRange(data); + await db.SaveChangesAsync(token); + + startDate = lastDate; + deleted += currentDeleted; + saved += currentDataCount; + iteration++; + } while (true); + + return (deleted, saved); + } + // use ServiceFactory to make services static void Main(/*string[] args*/) { - - - var block = new HeadDto() - { - AzimuthAngle = 12, - WellName = "WellName", - ClusterName = "clusterName", - Customer = "customer", - Contractor = "Contractor", - ReportDate = DateTime.Now, - WellDepthIntervalFinishDate = 27.5, - WellDepthIntervalStartDate = 26.5, - BottomholeDepth = 66.6 - }; - var block2 = new BhaDto() - { - BHADescription = "sadasdasdasdasdasdjlaskjdaksjdlasdlalskdklj" - }; - var block3 = new SaubDto(); - var bloks = new DailyReportDto() - { - Head = block, - Saub = block3 - }; - - - - var service = new DailyReportMakerExcel(); - var stream = service.MakeReportFromBlocks(bloks); - var filename = "____.xlsx"; - if (File.Exists(filename)) - File.Delete(filename); - using var fileStream = File.OpenWrite(filename); - stream.CopyTo(fileStream); - - return; - - - - - - - //var ms = MakeReportFromBlocks(block,block3); - ////File.Create("", MakeReportFromBlocks(block)); - //using var file = new FileStream("file.xlsx", FileMode.Create, System.IO.FileAccess.Write); - //byte[] bytes = new byte[ms.Length]; - //ms.Read(bytes, 0, (int)ms.Length); - //file.Write(bytes, 0, bytes.Length); - //ms.Close(); + RediceSamplingAsync(183, 5, 10, CancellationToken.None).Wait(); } - - - - } } From 4d42c9e5add73a257c7af59895cef1d189573711 Mon Sep 17 00:00:00 2001 From: ngfrolov Date: Mon, 10 Oct 2022 12:39:54 +0500 Subject: [PATCH 2/4] poc debagged. --- AsbCloudDb/EFExtentions.cs | 17 +++++++++++++++++ ConsoleApp1/Program.cs | 31 ++++++++++++++++++++----------- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/AsbCloudDb/EFExtentions.cs b/AsbCloudDb/EFExtentions.cs index db66be04..f0523294 100644 --- a/AsbCloudDb/EFExtentions.cs +++ b/AsbCloudDb/EFExtentions.cs @@ -69,6 +69,15 @@ namespace AsbCloudDb return database.ExecuteSqlRawAsync(query, token); } + public static Task ExecInsertAsync(this Microsoft.EntityFrameworkCore.Infrastructure.DatabaseFacade database, DbSet dbSet, IEnumerable items, CancellationToken token) + where T : class + { + var factory = GetQueryStringFactory(dbSet); + var query = factory.MakeInsertSql(items); + + return database.ExecuteSqlRawAsync(query, token); + } + public static string GetTableName(this DbSet dbSet) where T : class { @@ -154,6 +163,14 @@ namespace AsbCloudDb return builder.ToString(); } + public string MakeInsertSql(IEnumerable items) + { + var builder = new StringBuilder(insertHeader, 7); + BuildRows(builder, items); + builder.Append(';'); + return builder.ToString(); + } + private StringBuilder BuildRows(StringBuilder builder, IEnumerable items) { var list = items.ToList(); diff --git a/ConsoleApp1/Program.cs b/ConsoleApp1/Program.cs index c1fe0dc8..3bb591fc 100644 --- a/ConsoleApp1/Program.cs +++ b/ConsoleApp1/Program.cs @@ -1,5 +1,6 @@ using AsbCloudApp.Data; using AsbCloudApp.Data.DailyReport; +using AsbCloudDb; using AsbCloudDb.Model; using AsbCloudInfrastructure.Services.DailyReport; using ClosedXML.Excel; @@ -18,17 +19,21 @@ namespace ConsoleApp1 { private static AsbCloudDbContext db = ServiceFactory.Context; - public static async Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token) + public delegate void OnProgress(int handled, int total); + + public static async Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, OnProgress onProgress, CancellationToken token) { - const int ramLimit = 30 * 1024 * 1024; + const int ramLimit = 10 * 1024 * 1024; const int dataItemSize = 345; // by profiler const int itemsCountLimit = ramLimit / dataItemSize; // ~ 90_000 - const int maxWellDepthGap = 1; + const double maxWellDepthGap = 0.1; ratio = ratio > 0 ? ratio : 5; maxDateGapSec = maxDateGapSec > 0 ? maxDateGapSec : 9; var dbset = db.Set(); + var oldCount = await dbset.Where(t => t.IdTelemetry == idTelemetry).CountAsync(token); + onProgress?.Invoke(0, oldCount); var sqlSelect = "select " + @@ -49,8 +54,8 @@ namespace ConsoleApp1 "where " + $" (row_num % {ratio}) = 0 " + " or \"mode\" != lag_mode " + - $" or(\"date\" - lag_date) > interval '{maxDateGapSec} second' " + - $" or well_depth - lag_well_depth > {maxWellDepthGap} " + + $" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " + + $" or well_depth - lag_well_depth > {maxWellDepthGap:#0,0#} " + " or mse_state != lag_mse_state " + " or id_feed_regulator != lag_id_feed_regulator " + "order by \"date\" "; @@ -60,7 +65,6 @@ namespace ConsoleApp1 $"where id_telemetry = {idTelemetry} and \"date\" between {{0}} and {{1}};"; var startDate = DateTimeOffset.MinValue; - var iteration = 0; var deleted = 0; var saved = 0; do @@ -83,22 +87,27 @@ namespace ConsoleApp1 if (currentDeleted == 0) break; - dbset.AddRange(data); - await db.SaveChangesAsync(token); + await db.Database.ExecInsertAsync(dbset, data, token); startDate = lastDate; deleted += currentDeleted; saved += currentDataCount; - iteration++; + onProgress?.Invoke(deleted, oldCount); } while (true); - return (deleted, saved); + return (oldCount, saved); } // use ServiceFactory to make services static void Main(/*string[] args*/) { - RediceSamplingAsync(183, 5, 10, CancellationToken.None).Wait(); + var i = 0; + var sw = new System.Diagnostics.Stopwatch(); + sw.Start(); + var result = RediceSamplingAsync(94, 5, 5, (p, t) => { Console.WriteLine($"{i++:0000}\t{p:00_000_000}\t{t:00_000_000}\t{1d*p/t:00.00}"); }, CancellationToken.None).Result; + sw.Stop(); + Console.WriteLine($"result: saved {result.newCount} old = {result.oldCount} ratio = {1d*result.oldCount/result.newCount}"); + Console.WriteLine($"total time: {sw.ElapsedMilliseconds} ms"); } } } From 8cbcd9a115eeba1f7f13be54d9288ff7ab67e585 Mon Sep 17 00:00:00 2001 From: ngfrolov Date: Tue, 11 Oct 2022 09:02:53 +0500 Subject: [PATCH 3/4] =?UTF-8?q?=D0=BF=D0=B5=D1=80=D0=B2=D0=B0=D1=8F=20?= =?UTF-8?q?=D0=B2=D0=B5=D1=80=D1=81=D0=B8=D1=8F=20Reduce=20service?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Services/ReduceSamplingService.cs | 317 ++++++++++++++++++ .../Services/SAUB/TelemetryDataBaseService.cs | 11 +- .../Services/SAUB/TelemetryDataSaubService.cs | 39 --- .../Services/SAUB/TelemetryDataSpinService.cs | 5 - ConsoleApp1/Program.cs | 94 +----- 5 files changed, 325 insertions(+), 141 deletions(-) create mode 100644 AsbCloudInfrastructure/Services/ReduceSamplingService.cs diff --git a/AsbCloudInfrastructure/Services/ReduceSamplingService.cs b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs new file mode 100644 index 00000000..46489fec --- /dev/null +++ b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs @@ -0,0 +1,317 @@ +using AsbCloudDb; +using AsbCloudDb.Model; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace AsbCloudInfrastructure.Services +{ +#nullable enable + public class JobDto + { + public enum JobState { Waiting, Working, Done, Fail }; + public int Id { get; internal set; } + public JobState State { get; internal set; } + public object? Result { get; internal set; } + public Exception? Error { get; internal set; } + } + + class JobWithProgress: JobDto + { + public ReduceSamplingService.OnProgressDelagate OnProgress { get; set; } = null!; + } + + public class ReduceSamplingService + { + public delegate void OnProgressDelagate(JobDto job); + private static ReduceSamplingService? instance; + private readonly string connectionString; + private const int ratio = 5; + private readonly List jobs = new(5); + private bool isHandling; + private CancellationTokenSource? cancellationTokenSource; + private Task? task; + + private ReduceSamplingService(IConfiguration configuration) + { + connectionString = configuration.GetConnectionString("DefaultConnection"); + } + + ~ReduceSamplingService() + { + Stop(); + } + + public static ReduceSamplingService GetInstance(IConfiguration configuration) + { + if (instance is null) + instance = new(configuration); + return instance; + } + + public int TryEnqueueRediceSamplingJob(int idTelemetry, OnProgressDelagate onProgress) + { + var result = 0; + lock (jobs) + { + if (jobs.Any(j => j.Id == idTelemetry)) + result = - 1; + + var job = new JobWithProgress + { + Id = idTelemetry, + State = JobDto.JobState.Waiting, + OnProgress = onProgress, + }; + + jobs.Add(job); + result = jobs.Count; + } + EnsureHandleQueueStarted(); + return result; + } + + public JobDto? GetState(int idTelemetry) + { + JobWithProgress? job; + lock (jobs) + { + job = jobs.FirstOrDefault(j=>j.Id == idTelemetry); + } + return job; + } + + public IEnumerable GetJobs() + { + return jobs; + } + + private bool TryTakeWaitingJob(out JobWithProgress? job) + { + lock (jobs) + { + job = jobs.FirstOrDefault(j => j.State == JobDto.JobState.Waiting); + } + return job is not null; + } + + private void EnsureHandleQueueStarted() + { + if(isHandling) + return; + isHandling = true; + cancellationTokenSource = new CancellationTokenSource(); + var token = cancellationTokenSource.Token; + task = Task.Run(async () => await HandleJobs(token)) + .ContinueWith(_ => isHandling = false); + } + + private async Task HandleJobs(CancellationToken token) + { + while (TryTakeWaitingJob(out JobWithProgress? job)) + { + job!.State = JobDto.JobState.Working; + try + { + await RediceSamplingSaubAsync(job, token); + await RediceSamplingSpinAsync(job, token); + job.State = JobDto.JobState.Done; + } + catch (Exception exception) + { + job.State = JobDto.JobState.Fail; + job.Error = exception; + job.OnProgress.Invoke(job); + } + await Task.Delay(100, token); + } + await VacuumAsync(token); + await Task.Delay(1_000, token); + CleanJobs(); + } + + private void CleanJobs() + { + lock (jobs) + { + jobs.RemoveAll(j => j.State == JobDto.JobState.Done || j.State == JobDto.JobState.Fail); + } + } + + private async Task VacuumAsync(CancellationToken token) + { + using var db = MakeContext(); + var sqlVacuum = "vacuum (SKIP_LOCKED);"; + await db.Database.ExecuteSqlRawAsync(sqlVacuum, token); + } + + private void Stop() + { + cancellationTokenSource?.Cancel(); + task?.Wait(1_000); + task = null; + cancellationTokenSource = null; + } + + /// + /// Прореживание данных телеметрии САУБ. + /// Каждая ratio запись будет сохранена, остальные удаляются. Остаются (Row_number % ratio) = 0. + /// + /// Id телеметрии + /// желаемое отношение оставшихся записей к исходным + /// + private Task RediceSamplingSaubAsync(JobWithProgress job, CancellationToken token) + { + const int ramLimit = 10 * 1024 * 1024; + const int dataItemSize = 345; // by profiler + const int chankSize = ramLimit / dataItemSize; // ~ 90_000 + const double maxWellDepthGap = 0.1; + + var maxDateGapSec = ratio; + + var sqlSelectTemplate = + "select " + + " * " + + "from " + + " (select " + + " *, " + + " rank() over win1 as row_num, " + + " lag(\"date\", 1) over win1 as lag_date, " + + " lag(\"mode\", 1) over win1 as lag_mode, " + + " lag(mse_state, 1) over win1 as lag_mse_state, " + + " lag(well_depth, 1) over win1 as lag_well_depth, " + + " lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator " + + " from t_telemetry_data_saub " + + $" where id_telemetry = {job.Id} and \"date\" > {{0}}" + + " window win1 as (order by \"date\") " + + " ) as t_1 " + + "where " + + $" (row_num % {ratio}) = 0 " + + " or \"mode\" != lag_mode " + + $" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " + + $" or well_depth - lag_well_depth > {maxWellDepthGap:#0,0#} " + + " or mse_state != lag_mse_state " + + " or id_feed_regulator != lag_id_feed_regulator " + + "order by \"date\" "; + + var sqlDeleteTemplate = "delete " + + "from t_telemetry_data_saub " + + $"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};"; + + return RediceSamplingAsync( + job, + chankSize, + sqlSelectTemplate, + sqlDeleteTemplate, + token); + } + + private Task RediceSamplingSpinAsync(JobWithProgress job, CancellationToken token) + { + const int ramLimit = 10 * 1024 * 1024; + const int dataItemSize = 345; // by profiler + const int chankSize = ramLimit / dataItemSize; // ~ 90_000 + var maxDateGapSec = ratio; + + var sqlSelectTemplate = + "select " + + " * " + + "from " + + " (select " + + " *, " + + " rank() over win1 as row_num, " + + " lag(\"date\", 1) over win1 as lag_date, " + + " lag(\"mode\", 1) over win1 as lag_mode, " + + " lag(state, 1) over win1 as lag_state " + + " from t_telemetry_data_spin " + + $" where id_telemetry = {job.Id} and \"date\" > {{0}}" + + " window win1 as (order by \"date\") " + + " ) as t_1 " + + "where " + + $" (row_num % {ratio}) = 0 " + + " or \"mode\" != lag_mode " + + $" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " + + " or state != lag_state " + + "order by \"date\" "; + + var sqlDeleteTemplate = "delete " + + "from t_telemetry_data_spin " + + $"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};"; + + return RediceSamplingAsync( + job, + chankSize, + sqlSelectTemplate, + sqlDeleteTemplate, + token); + } + + private async Task RediceSamplingAsync( + JobWithProgress job, + int chankSize, + string sqlSelectTemplate, + string sqlDeleteTemplate, + CancellationToken token) + where TEntity: class, ITelemetryData + { + using var db = MakeContext(); + var dbset = db.Set(); + + var deleted = 0; + var oldCount = await dbset.Where(t => t.IdTelemetry == job.Id).CountAsync(token); + + if (job.Result is Tuple tupleResult) + { + deleted += tupleResult.Item1; + oldCount += tupleResult.Item2; + } + job.Result = (deleted, oldCount); + job.OnProgress?.Invoke(job); + var startDate = DateTimeOffset.MinValue; + + do + { + var query = dbset + .FromSqlRaw(sqlSelectTemplate, startDate) + .AsNoTracking(); + + var data = await query + .Take(chankSize) + .ToArrayAsync(token); + + var currentDataCount = data.Length; + if (currentDataCount == 0) + break; + + var lastDate = data.Last().DateTime; + + var currentDeleted = await db.Database.ExecuteSqlRawAsync(sqlDeleteTemplate, new object[] { startDate, lastDate }.AsEnumerable(), token); + if (currentDeleted == 0) + break; + + await db.Database.ExecInsertAsync(dbset, data, token); + + startDate = lastDate; + deleted += currentDeleted; + job.Result = (deleted, oldCount); + job.OnProgress?.Invoke(job); + } while (true); + + return; + } + + private AsbCloudDbContext MakeContext() + { + var options = new DbContextOptionsBuilder() + .UseNpgsql(connectionString) + .Options; + + return new AsbCloudDbContext(options); + } + } +#nullable disable +} diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs index 5231e573..285ac3d8 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs @@ -171,15 +171,6 @@ namespace AsbCloudInfrastructure.Services.SAUB return offset; } - /// - /// Прореживание данных телеметрии. - /// Каждая ratio запись будет сохранена, остальные удаляются. Остаются (Row_number % ratio) = 0. - /// Из-за возможности запуска повторного прореживания можно ограничить величину разрыва по времени параметром maxDateGapSec. - /// - /// Id телеметрии - /// желаемое отношение оставшихся записей к исходным - /// ограничение разрыва по времени - /// - public abstract Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token); + } } diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs index a6330757..419c5831 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs @@ -39,44 +39,5 @@ namespace AsbCloudInfrastructure.Services.SAUB dto.DateTime = src.DateTime.ToRemoteDateTime(timezoneOffset); return dto; } - - public override async Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token) - { - const int ramLimit = 50 * 1024 * 1024; - const int dataItemSize = 345; // by profiler - const int itemsCountLimit = ramLimit / dataItemSize; // ~ 150_000, iterations count ~ 46 - const int maxWellDepthGap = 1; - - var dbset = db.Set(); - - var sql = - "select" + - " *" + - "from" + - " (select" + - " *," + - " rank() over win1 as row_num," + - " lag(\"date\", 1) over win1 as lag_date," + - " lag(\"mode\", 1) over win1 as lag_mode," + - " lag(mse_state, 1) over win1 as lag_mse_state," + - " lag(well_depth, 1) over win1 as lag_well_depth," + - " lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator" + - " from t_telemetry_data_saub" + - $" where id_telemetry = {idTelemetry}" + - " window win1 as (order by \"date\")" + - " ) as ttt" + - "where" + - $" (row_num % {ratio}) = 0" + - " or \"mode\" != lag_mode" + - $" or(\"date\" - lag_date) > interval '{maxDateGapSec} second'" + - $" or well_depth - lag_well_depth > {maxWellDepthGap}" + - " or mse_state != lag_mse_state" + - " or id_feed_regulator != lag_id_feed_regulator;"; - - var query = dbset.FromSqlRaw(sql); - - await Task.Delay(0); - return (0, 0); - } } } diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs index b555286b..5cd51161 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs @@ -30,10 +30,5 @@ namespace AsbCloudInfrastructure.Services.SAUB dto.DateTime = src.DateTime.ToRemoteDateTime(timezoneOffset); return dto; } - - public override Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token) - { - throw new System.NotImplementedException(); - } } } diff --git a/ConsoleApp1/Program.cs b/ConsoleApp1/Program.cs index 3bb591fc..11c0ca13 100644 --- a/ConsoleApp1/Program.cs +++ b/ConsoleApp1/Program.cs @@ -6,7 +6,9 @@ using AsbCloudInfrastructure.Services.DailyReport; using ClosedXML.Excel; using DocumentFormat.OpenXml.Wordprocessing; using Microsoft.EntityFrameworkCore; +using Org.BouncyCastle.Utilities.Collections; using System; +using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; @@ -18,96 +20,14 @@ namespace ConsoleApp1 class Program { private static AsbCloudDbContext db = ServiceFactory.Context; - - public delegate void OnProgress(int handled, int total); - - public static async Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, OnProgress onProgress, CancellationToken token) - { - const int ramLimit = 10 * 1024 * 1024; - const int dataItemSize = 345; // by profiler - const int itemsCountLimit = ramLimit / dataItemSize; // ~ 90_000 - const double maxWellDepthGap = 0.1; - - ratio = ratio > 0 ? ratio : 5; - maxDateGapSec = maxDateGapSec > 0 ? maxDateGapSec : 9; - - var dbset = db.Set(); - var oldCount = await dbset.Where(t => t.IdTelemetry == idTelemetry).CountAsync(token); - onProgress?.Invoke(0, oldCount); - - var sqlSelect = - "select " + - " * " + - "from " + - " (select " + - " *, " + - " rank() over win1 as row_num, " + - " lag(\"date\", 1) over win1 as lag_date, " + - " lag(\"mode\", 1) over win1 as lag_mode, " + - " lag(mse_state, 1) over win1 as lag_mse_state, " + - " lag(well_depth, 1) over win1 as lag_well_depth, " + - " lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator " + - " from t_telemetry_data_saub " + - $" where id_telemetry = {idTelemetry} and \"date\" > {{0}}" + - " window win1 as (order by \"date\") " + - " ) as ttt " + - "where " + - $" (row_num % {ratio}) = 0 " + - " or \"mode\" != lag_mode " + - $" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " + - $" or well_depth - lag_well_depth > {maxWellDepthGap:#0,0#} " + - " or mse_state != lag_mse_state " + - " or id_feed_regulator != lag_id_feed_regulator " + - "order by \"date\" "; - - var sqlDelete = "delete " + - "from t_telemetry_data_saub " + - $"where id_telemetry = {idTelemetry} and \"date\" between {{0}} and {{1}};"; - - var startDate = DateTimeOffset.MinValue; - var deleted = 0; - var saved = 0; - do - { - var query = dbset - .FromSqlRaw(sqlSelect, startDate) - .AsNoTracking(); - - var data = await query - .Take(itemsCountLimit) - .ToArrayAsync(token); - - var currentDataCount = data.Length; - if (currentDataCount == 0) - break; - - var lastDate = data.Last().DateTime; - - var currentDeleted = await db.Database.ExecuteSqlRawAsync(sqlDelete, new object[]{ startDate, lastDate}.AsEnumerable(), token); - if (currentDeleted == 0) - break; - - await db.Database.ExecInsertAsync(dbset, data, token); - - startDate = lastDate; - deleted += currentDeleted; - saved += currentDataCount; - onProgress?.Invoke(deleted, oldCount); - } while (true); - - return (oldCount, saved); - } - + // use ServiceFactory to make services static void Main(/*string[] args*/) { - var i = 0; - var sw = new System.Diagnostics.Stopwatch(); - sw.Start(); - var result = RediceSamplingAsync(94, 5, 5, (p, t) => { Console.WriteLine($"{i++:0000}\t{p:00_000_000}\t{t:00_000_000}\t{1d*p/t:00.00}"); }, CancellationToken.None).Result; - sw.Stop(); - Console.WriteLine($"result: saved {result.newCount} old = {result.oldCount} ratio = {1d*result.oldCount/result.newCount}"); - Console.WriteLine($"total time: {sw.ElapsedMilliseconds} ms"); + Task.Run(() => { throw new Exception(); }) + .ContinueWith(t => { Console.WriteLine("Dooom"); }); + Console.WriteLine($"total time: ms"); + Console.ReadLine(); } } } From 707f2a638a21ed2f9d8900398228212d0d23caf9 Mon Sep 17 00:00:00 2001 From: ngfrolov Date: Tue, 11 Oct 2022 17:04:26 +0500 Subject: [PATCH 4/4] Add ReduceSamplingController. --- AsbCloudApp/Data/JobDto.cs | 56 ++++++ .../Services/IReduceSamplingService.cs | 43 +++++ AsbCloudInfrastructure/DependencyInjection.cs | 1 + .../Services/ReduceSamplingService.cs | 174 ++++++++++-------- .../Controllers/ReduceSamplingController.cs | 78 ++++++++ ConsoleApp1/Program.cs | 10 +- 6 files changed, 282 insertions(+), 80 deletions(-) create mode 100644 AsbCloudApp/Data/JobDto.cs create mode 100644 AsbCloudApp/Services/IReduceSamplingService.cs create mode 100644 AsbCloudWebApi/Controllers/ReduceSamplingController.cs diff --git a/AsbCloudApp/Data/JobDto.cs b/AsbCloudApp/Data/JobDto.cs new file mode 100644 index 00000000..2e9b336a --- /dev/null +++ b/AsbCloudApp/Data/JobDto.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections; + +namespace AsbCloudApp.Data +{ +#nullable enable + /// + /// Состояние фоновой задачи + /// + public enum JobState + { + /// + /// Ожидает в очереди на выполнение + /// + Waiting, + /// + /// выполняется + /// + Working, + /// + /// успешно выполнена + /// + Done, + /// + /// завершена с ошибкой + /// + Fail + }; + + /// + /// работа фоновой задачи + /// + public class JobDto + { + /// + /// идентификатор + /// + public int Id { get; set; } + + /// + /// Состояние + /// + public JobState State { get; set; } + + /// + /// результат выполнения + /// + public Hashtable? Results { get; set; } + + /// + /// Исключение, если возникла ошибка + /// + public string? Error { get; set; } + } +#nullable disable +} diff --git a/AsbCloudApp/Services/IReduceSamplingService.cs b/AsbCloudApp/Services/IReduceSamplingService.cs new file mode 100644 index 00000000..a0220c58 --- /dev/null +++ b/AsbCloudApp/Services/IReduceSamplingService.cs @@ -0,0 +1,43 @@ +using AsbCloudApp.Data; +using System.Collections.Generic; + +namespace AsbCloudApp.Services +{ +#nullable enable + /// + /// Делегат обновления состояния задачи + /// + /// + public delegate void OnJobProgressDelagate(JobDto job); + + /// + /// Сервис прореживания архива БД. + /// Удаляет часть телеметрии. + /// Понижает частоту записей в БД с 1 запись за 1 сек до 1 запись за N сек. + /// + public interface IReduceSamplingService + { + /// + /// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее. + /// + /// Enumerable of JobDto or empty + IEnumerable GetJobs(); + + /// + /// Получить состояние определенной задачи + /// + /// + /// + JobDto? GetOrDefaultState(int idTelemetry); + + /// + /// Создать задачу прореживанию архива и добавить её в очередь на выполнение + /// + /// телеметрия для прореживания + /// колбек процесса выполнения + /// созданная задача или задача из очереди + /// задача добавлена == true + bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate onProgress, out JobDto jobDto); + } +#nullable disable +} \ No newline at end of file diff --git a/AsbCloudInfrastructure/DependencyInjection.cs b/AsbCloudInfrastructure/DependencyInjection.cs index 1c69736d..421e550c 100644 --- a/AsbCloudInfrastructure/DependencyInjection.cs +++ b/AsbCloudInfrastructure/DependencyInjection.cs @@ -102,6 +102,7 @@ namespace AsbCloudInfrastructure services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(provider => ReduceSamplingService.GetInstance(configuration)); services.AddTransient(); services.AddTransient(); diff --git a/AsbCloudInfrastructure/Services/ReduceSamplingService.cs b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs index 46489fec..d9c11e0d 100644 --- a/AsbCloudInfrastructure/Services/ReduceSamplingService.cs +++ b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs @@ -1,9 +1,12 @@ -using AsbCloudDb; +using AsbCloudApp.Data; +using AsbCloudApp.Services; +using AsbCloudDb; using AsbCloudDb.Model; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -11,27 +14,22 @@ using System.Threading.Tasks; namespace AsbCloudInfrastructure.Services { #nullable enable - public class JobDto + class JobHandle { - public enum JobState { Waiting, Working, Done, Fail }; - public int Id { get; internal set; } - public JobState State { get; internal set; } - public object? Result { get; internal set; } - public Exception? Error { get; internal set; } + public int Id => Job.Id; + public JobDto Job { get; set; } = null!; + public DateTime DateCreate { get; set; } = DateTime.Now; + public OnJobProgressDelagate? OnProgress { get; set; } } - class JobWithProgress: JobDto + public class ReduceSamplingService : IReduceSamplingService { - public ReduceSamplingService.OnProgressDelagate OnProgress { get; set; } = null!; - } - - public class ReduceSamplingService - { - public delegate void OnProgressDelagate(JobDto job); + private const string jobResultKeyDeleted = "deleted"; + private const string jobResultKeyTotal = "total"; private static ReduceSamplingService? instance; private readonly string connectionString; private const int ratio = 5; - private readonly List jobs = new(5); + private readonly List jobHandlers = new(5); private bool isHandling; private CancellationTokenSource? cancellationTokenSource; private Task? task; @@ -46,62 +44,73 @@ namespace AsbCloudInfrastructure.Services Stop(); } + /// + /// Get singleton Instance + /// + /// + /// public static ReduceSamplingService GetInstance(IConfiguration configuration) { - if (instance is null) - instance = new(configuration); + instance ??= new(configuration); return instance; } - public int TryEnqueueRediceSamplingJob(int idTelemetry, OnProgressDelagate onProgress) + /// + public bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate? onProgress, out JobDto jobDto) { - var result = 0; - lock (jobs) + lock (jobHandlers) { - if (jobs.Any(j => j.Id == idTelemetry)) - result = - 1; + var oldJob = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry); + if (oldJob is not null) + { + jobDto = oldJob.Job; + return false; + } - var job = new JobWithProgress + jobDto = new JobDto { Id = idTelemetry, - State = JobDto.JobState.Waiting, + State = JobState.Waiting, + Results = new(), + }; + var jobHandler = new JobHandle + { + Job = jobDto, OnProgress = onProgress, }; - jobs.Add(job); - result = jobs.Count; + jobHandlers.Add(jobHandler); } EnsureHandleQueueStarted(); - return result; + return true; } - public JobDto? GetState(int idTelemetry) + /// + public JobDto? GetOrDefaultState(int idTelemetry) { - JobWithProgress? job; - lock (jobs) + JobHandle? jobHandler; + lock (jobHandlers) { - job = jobs.FirstOrDefault(j=>j.Id == idTelemetry); + jobHandler = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry); } - return job; + return jobHandler?.Job; } - public IEnumerable GetJobs() - { - return jobs; - } + /// + public IEnumerable GetJobs() => jobHandlers.Select(j=>j.Job); - private bool TryTakeWaitingJob(out JobWithProgress? job) + private bool TryTakeWaitingJob(out JobHandle? job) { - lock (jobs) + lock (jobHandlers) { - job = jobs.FirstOrDefault(j => j.State == JobDto.JobState.Waiting); + job = jobHandlers.FirstOrDefault(j => j.Job.State == JobState.Waiting); } return job is not null; } private void EnsureHandleQueueStarted() { - if(isHandling) + if (isHandling) return; isHandling = true; cancellationTokenSource = new CancellationTokenSource(); @@ -112,33 +121,43 @@ namespace AsbCloudInfrastructure.Services private async Task HandleJobs(CancellationToken token) { - while (TryTakeWaitingJob(out JobWithProgress? job)) + while (TryTakeWaitingJob(out JobHandle? jobHandler)) { - job!.State = JobDto.JobState.Working; + jobHandler!.Job.State = JobState.Working; try { - await RediceSamplingSaubAsync(job, token); - await RediceSamplingSpinAsync(job, token); - job.State = JobDto.JobState.Done; + await RediceSamplingSaubAsync(jobHandler, token); + await RediceSamplingSpinAsync(jobHandler, token); + jobHandler.Job.State = JobState.Done; } catch (Exception exception) { - job.State = JobDto.JobState.Fail; - job.Error = exception; - job.OnProgress.Invoke(job); + jobHandler.Job.State = JobState.Fail; + jobHandler.Job.Results = null; + jobHandler.Job.Error = exception.Message; + jobHandler.OnProgress?.Invoke(jobHandler.Job); + } + + if (!jobHandlers.Any(j => j.Job.State == JobState.Waiting)) + { + var sw = Stopwatch.StartNew(); + await VacuumAsync(token); + sw.Stop(); + if (sw.ElapsedMilliseconds < 10_000) + { + var delayMs = 10_000 - (int)sw.ElapsedMilliseconds; + await Task.Delay(delayMs, token); + } + CleanJobs(); } - await Task.Delay(100, token); } - await VacuumAsync(token); - await Task.Delay(1_000, token); - CleanJobs(); } private void CleanJobs() { - lock (jobs) + lock (jobHandlers) { - jobs.RemoveAll(j => j.State == JobDto.JobState.Done || j.State == JobDto.JobState.Fail); + jobHandlers.RemoveAll(j => j.Job.State == JobState.Done || j.Job.State == JobState.Fail); } } @@ -154,17 +173,11 @@ namespace AsbCloudInfrastructure.Services cancellationTokenSource?.Cancel(); task?.Wait(1_000); task = null; + cancellationTokenSource?.Dispose(); cancellationTokenSource = null; } - /// - /// Прореживание данных телеметрии САУБ. - /// Каждая ratio запись будет сохранена, остальные удаляются. Остаются (Row_number % ratio) = 0. - /// - /// Id телеметрии - /// желаемое отношение оставшихся записей к исходным - /// - private Task RediceSamplingSaubAsync(JobWithProgress job, CancellationToken token) + private Task RediceSamplingSaubAsync(JobHandle job, CancellationToken token) { const int ramLimit = 10 * 1024 * 1024; const int dataItemSize = 345; // by profiler @@ -204,13 +217,13 @@ namespace AsbCloudInfrastructure.Services return RediceSamplingAsync( job, - chankSize, - sqlSelectTemplate, + chankSize, + sqlSelectTemplate, sqlDeleteTemplate, token); } - private Task RediceSamplingSpinAsync(JobWithProgress job, CancellationToken token) + private Task RediceSamplingSpinAsync(JobHandle job, CancellationToken token) { const int ramLimit = 10 * 1024 * 1024; const int dataItemSize = 345; // by profiler @@ -251,26 +264,30 @@ namespace AsbCloudInfrastructure.Services } private async Task RediceSamplingAsync( - JobWithProgress job, - int chankSize, + JobHandle jobHandle, + int chankSize, string sqlSelectTemplate, string sqlDeleteTemplate, CancellationToken token) - where TEntity: class, ITelemetryData + where TEntity : class, AsbCloudDb.Model.ITelemetryData { using var db = MakeContext(); var dbset = db.Set(); var deleted = 0; - var oldCount = await dbset.Where(t => t.IdTelemetry == job.Id).CountAsync(token); + var totalCount = await dbset.Where(t => t.IdTelemetry == jobHandle.Id).CountAsync(token); - if (job.Result is Tuple tupleResult) - { - deleted += tupleResult.Item1; - oldCount += tupleResult.Item2; - } - job.Result = (deleted, oldCount); - job.OnProgress?.Invoke(job); + var result = jobHandle.Job.Results!; + if (result[jobResultKeyDeleted] is int previousDeleted) + deleted += previousDeleted; + + if (result[jobResultKeyTotal] is int previousCount) + totalCount += previousCount; + + result[jobResultKeyDeleted] = deleted; + result[jobResultKeyTotal] = totalCount; + + jobHandle.OnProgress?.Invoke(jobHandle.Job); var startDate = DateTimeOffset.MinValue; do @@ -297,8 +314,9 @@ namespace AsbCloudInfrastructure.Services startDate = lastDate; deleted += currentDeleted; - job.Result = (deleted, oldCount); - job.OnProgress?.Invoke(job); + result[jobResultKeyDeleted] = deleted; + result[jobResultKeyTotal] = totalCount; + jobHandle.OnProgress?.Invoke(jobHandle.Job); } while (true); return; diff --git a/AsbCloudWebApi/Controllers/ReduceSamplingController.cs b/AsbCloudWebApi/Controllers/ReduceSamplingController.cs new file mode 100644 index 00000000..ddb99b2d --- /dev/null +++ b/AsbCloudWebApi/Controllers/ReduceSamplingController.cs @@ -0,0 +1,78 @@ +using AsbCloudApp.Services; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using System.Collections.Generic; +using System.Threading.Tasks; +using AsbCloudApp.Data; +using System.Linq; +using AsbCloudWebApi.SignalR; +using Microsoft.AspNetCore.SignalR; + +namespace AsbCloudWebApi.Controllers +{ + /// + /// Редактор кустов для админки + /// + [Route("api/admin/[controller]")] + [ApiController] + [Authorize] + public class ReduceSamplingController: ControllerBase + { + private readonly IReduceSamplingService service; + private readonly IHubContext telemetryHubContext; + private const string sirnalRGroupName = "ReduceSampling"; + private const string sirnalRMethodOnProgress = "OnProgress"; + + public ReduceSamplingController( + IReduceSamplingService service, + IHubContext telemetryHubContext ) + { + this.service = service; + this.telemetryHubContext = telemetryHubContext; + } + + /// + /// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее. + /// + /// + /// + [HttpGet] + public virtual ActionResult> GetAll(int idTelemetry) + { + var result = service.GetJobs(); + if (result.Any()) + return Ok(result); + else + return NoContent(); + } + + /// + /// Получить состояние определенной задачи + /// + /// + /// + [HttpGet("{idTelemetry}")] + public virtual ActionResult GetOrDefault(int idTelemetry) + { + var result = service.GetOrDefaultState(idTelemetry); + return Ok(result); + } + + /// + /// Создать задачу прореживанию архива и добавить её в очередь на выполнение. + /// Если задача есть в очереди, она же и возвращается, но подписка не происходит. + /// + [HttpPost] + [Permission] + public virtual ActionResult Enqueue(int idTelemetry) + { + void onProgress(JobDto job) => + Task.Run(async () => + await telemetryHubContext.Clients.Group(sirnalRGroupName) + .SendAsync(sirnalRMethodOnProgress, job)); + + service.TryEnqueueRediceSamplingJob(idTelemetry, onProgress, out JobDto job); + return Ok(job); + } + } +} diff --git a/ConsoleApp1/Program.cs b/ConsoleApp1/Program.cs index 11c0ca13..b3d1d6fb 100644 --- a/ConsoleApp1/Program.cs +++ b/ConsoleApp1/Program.cs @@ -8,6 +8,7 @@ using DocumentFormat.OpenXml.Wordprocessing; using Microsoft.EntityFrameworkCore; using Org.BouncyCastle.Utilities.Collections; using System; +using System.Collections; using System.Collections.Generic; using System.IO; using System.Linq; @@ -24,8 +25,13 @@ namespace ConsoleApp1 // use ServiceFactory to make services static void Main(/*string[] args*/) { - Task.Run(() => { throw new Exception(); }) - .ContinueWith(t => { Console.WriteLine("Dooom"); }); + var h = new Hashtable(); + h.Add("name", 1); + h.Add("name2", "66"); + var v = h["v"]; + + var s = System.Text.Json.JsonSerializer.Serialize(h); + Console.WriteLine($"total time: ms"); Console.ReadLine(); }