From 8cbcd9a115eeba1f7f13be54d9288ff7ab67e585 Mon Sep 17 00:00:00 2001 From: ngfrolov Date: Tue, 11 Oct 2022 09:02:53 +0500 Subject: [PATCH] =?UTF-8?q?=D0=BF=D0=B5=D1=80=D0=B2=D0=B0=D1=8F=20=D0=B2?= =?UTF-8?q?=D0=B5=D1=80=D1=81=D0=B8=D1=8F=20Reduce=20service?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Services/ReduceSamplingService.cs | 317 ++++++++++++++++++ .../Services/SAUB/TelemetryDataBaseService.cs | 11 +- .../Services/SAUB/TelemetryDataSaubService.cs | 39 --- .../Services/SAUB/TelemetryDataSpinService.cs | 5 - ConsoleApp1/Program.cs | 94 +----- 5 files changed, 325 insertions(+), 141 deletions(-) create mode 100644 AsbCloudInfrastructure/Services/ReduceSamplingService.cs diff --git a/AsbCloudInfrastructure/Services/ReduceSamplingService.cs b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs new file mode 100644 index 00000000..46489fec --- /dev/null +++ b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs @@ -0,0 +1,317 @@ +using AsbCloudDb; +using AsbCloudDb.Model; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace AsbCloudInfrastructure.Services +{ +#nullable enable + public class JobDto + { + public enum JobState { Waiting, Working, Done, Fail }; + public int Id { get; internal set; } + public JobState State { get; internal set; } + public object? Result { get; internal set; } + public Exception? Error { get; internal set; } + } + + class JobWithProgress: JobDto + { + public ReduceSamplingService.OnProgressDelagate OnProgress { get; set; } = null!; + } + + public class ReduceSamplingService + { + public delegate void OnProgressDelagate(JobDto job); + private static ReduceSamplingService? instance; + private readonly string connectionString; + private const int ratio = 5; + private readonly List jobs = new(5); + private bool isHandling; + private CancellationTokenSource? cancellationTokenSource; + private Task? task; + + private ReduceSamplingService(IConfiguration configuration) + { + connectionString = configuration.GetConnectionString("DefaultConnection"); + } + + ~ReduceSamplingService() + { + Stop(); + } + + public static ReduceSamplingService GetInstance(IConfiguration configuration) + { + if (instance is null) + instance = new(configuration); + return instance; + } + + public int TryEnqueueRediceSamplingJob(int idTelemetry, OnProgressDelagate onProgress) + { + var result = 0; + lock (jobs) + { + if (jobs.Any(j => j.Id == idTelemetry)) + result = - 1; + + var job = new JobWithProgress + { + Id = idTelemetry, + State = JobDto.JobState.Waiting, + OnProgress = onProgress, + }; + + jobs.Add(job); + result = jobs.Count; + } + EnsureHandleQueueStarted(); + return result; + } + + public JobDto? GetState(int idTelemetry) + { + JobWithProgress? job; + lock (jobs) + { + job = jobs.FirstOrDefault(j=>j.Id == idTelemetry); + } + return job; + } + + public IEnumerable GetJobs() + { + return jobs; + } + + private bool TryTakeWaitingJob(out JobWithProgress? job) + { + lock (jobs) + { + job = jobs.FirstOrDefault(j => j.State == JobDto.JobState.Waiting); + } + return job is not null; + } + + private void EnsureHandleQueueStarted() + { + if(isHandling) + return; + isHandling = true; + cancellationTokenSource = new CancellationTokenSource(); + var token = cancellationTokenSource.Token; + task = Task.Run(async () => await HandleJobs(token)) + .ContinueWith(_ => isHandling = false); + } + + private async Task HandleJobs(CancellationToken token) + { + while (TryTakeWaitingJob(out JobWithProgress? job)) + { + job!.State = JobDto.JobState.Working; + try + { + await RediceSamplingSaubAsync(job, token); + await RediceSamplingSpinAsync(job, token); + job.State = JobDto.JobState.Done; + } + catch (Exception exception) + { + job.State = JobDto.JobState.Fail; + job.Error = exception; + job.OnProgress.Invoke(job); + } + await Task.Delay(100, token); + } + await VacuumAsync(token); + await Task.Delay(1_000, token); + CleanJobs(); + } + + private void CleanJobs() + { + lock (jobs) + { + jobs.RemoveAll(j => j.State == JobDto.JobState.Done || j.State == JobDto.JobState.Fail); + } + } + + private async Task VacuumAsync(CancellationToken token) + { + using var db = MakeContext(); + var sqlVacuum = "vacuum (SKIP_LOCKED);"; + await db.Database.ExecuteSqlRawAsync(sqlVacuum, token); + } + + private void Stop() + { + cancellationTokenSource?.Cancel(); + task?.Wait(1_000); + task = null; + cancellationTokenSource = null; + } + + /// + /// Прореживание данных телеметрии САУБ. + /// Каждая ratio запись будет сохранена, остальные удаляются. Остаются (Row_number % ratio) = 0. + /// + /// Id телеметрии + /// желаемое отношение оставшихся записей к исходным + /// + private Task RediceSamplingSaubAsync(JobWithProgress job, CancellationToken token) + { + const int ramLimit = 10 * 1024 * 1024; + const int dataItemSize = 345; // by profiler + const int chankSize = ramLimit / dataItemSize; // ~ 90_000 + const double maxWellDepthGap = 0.1; + + var maxDateGapSec = ratio; + + var sqlSelectTemplate = + "select " + + " * " + + "from " + + " (select " + + " *, " + + " rank() over win1 as row_num, " + + " lag(\"date\", 1) over win1 as lag_date, " + + " lag(\"mode\", 1) over win1 as lag_mode, " + + " lag(mse_state, 1) over win1 as lag_mse_state, " + + " lag(well_depth, 1) over win1 as lag_well_depth, " + + " lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator " + + " from t_telemetry_data_saub " + + $" where id_telemetry = {job.Id} and \"date\" > {{0}}" + + " window win1 as (order by \"date\") " + + " ) as t_1 " + + "where " + + $" (row_num % {ratio}) = 0 " + + " or \"mode\" != lag_mode " + + $" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " + + $" or well_depth - lag_well_depth > {maxWellDepthGap:#0,0#} " + + " or mse_state != lag_mse_state " + + " or id_feed_regulator != lag_id_feed_regulator " + + "order by \"date\" "; + + var sqlDeleteTemplate = "delete " + + "from t_telemetry_data_saub " + + $"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};"; + + return RediceSamplingAsync( + job, + chankSize, + sqlSelectTemplate, + sqlDeleteTemplate, + token); + } + + private Task RediceSamplingSpinAsync(JobWithProgress job, CancellationToken token) + { + const int ramLimit = 10 * 1024 * 1024; + const int dataItemSize = 345; // by profiler + const int chankSize = ramLimit / dataItemSize; // ~ 90_000 + var maxDateGapSec = ratio; + + var sqlSelectTemplate = + "select " + + " * " + + "from " + + " (select " + + " *, " + + " rank() over win1 as row_num, " + + " lag(\"date\", 1) over win1 as lag_date, " + + " lag(\"mode\", 1) over win1 as lag_mode, " + + " lag(state, 1) over win1 as lag_state " + + " from t_telemetry_data_spin " + + $" where id_telemetry = {job.Id} and \"date\" > {{0}}" + + " window win1 as (order by \"date\") " + + " ) as t_1 " + + "where " + + $" (row_num % {ratio}) = 0 " + + " or \"mode\" != lag_mode " + + $" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " + + " or state != lag_state " + + "order by \"date\" "; + + var sqlDeleteTemplate = "delete " + + "from t_telemetry_data_spin " + + $"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};"; + + return RediceSamplingAsync( + job, + chankSize, + sqlSelectTemplate, + sqlDeleteTemplate, + token); + } + + private async Task RediceSamplingAsync( + JobWithProgress job, + int chankSize, + string sqlSelectTemplate, + string sqlDeleteTemplate, + CancellationToken token) + where TEntity: class, ITelemetryData + { + using var db = MakeContext(); + var dbset = db.Set(); + + var deleted = 0; + var oldCount = await dbset.Where(t => t.IdTelemetry == job.Id).CountAsync(token); + + if (job.Result is Tuple tupleResult) + { + deleted += tupleResult.Item1; + oldCount += tupleResult.Item2; + } + job.Result = (deleted, oldCount); + job.OnProgress?.Invoke(job); + var startDate = DateTimeOffset.MinValue; + + do + { + var query = dbset + .FromSqlRaw(sqlSelectTemplate, startDate) + .AsNoTracking(); + + var data = await query + .Take(chankSize) + .ToArrayAsync(token); + + var currentDataCount = data.Length; + if (currentDataCount == 0) + break; + + var lastDate = data.Last().DateTime; + + var currentDeleted = await db.Database.ExecuteSqlRawAsync(sqlDeleteTemplate, new object[] { startDate, lastDate }.AsEnumerable(), token); + if (currentDeleted == 0) + break; + + await db.Database.ExecInsertAsync(dbset, data, token); + + startDate = lastDate; + deleted += currentDeleted; + job.Result = (deleted, oldCount); + job.OnProgress?.Invoke(job); + } while (true); + + return; + } + + private AsbCloudDbContext MakeContext() + { + var options = new DbContextOptionsBuilder() + .UseNpgsql(connectionString) + .Options; + + return new AsbCloudDbContext(options); + } + } +#nullable disable +} diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs index 5231e573..285ac3d8 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs @@ -171,15 +171,6 @@ namespace AsbCloudInfrastructure.Services.SAUB return offset; } - /// - /// Прореживание данных телеметрии. - /// Каждая ratio запись будет сохранена, остальные удаляются. Остаются (Row_number % ratio) = 0. - /// Из-за возможности запуска повторного прореживания можно ограничить величину разрыва по времени параметром maxDateGapSec. - /// - /// Id телеметрии - /// желаемое отношение оставшихся записей к исходным - /// ограничение разрыва по времени - /// - public abstract Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token); + } } diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs index a6330757..419c5831 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs @@ -39,44 +39,5 @@ namespace AsbCloudInfrastructure.Services.SAUB dto.DateTime = src.DateTime.ToRemoteDateTime(timezoneOffset); return dto; } - - public override async Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token) - { - const int ramLimit = 50 * 1024 * 1024; - const int dataItemSize = 345; // by profiler - const int itemsCountLimit = ramLimit / dataItemSize; // ~ 150_000, iterations count ~ 46 - const int maxWellDepthGap = 1; - - var dbset = db.Set(); - - var sql = - "select" + - " *" + - "from" + - " (select" + - " *," + - " rank() over win1 as row_num," + - " lag(\"date\", 1) over win1 as lag_date," + - " lag(\"mode\", 1) over win1 as lag_mode," + - " lag(mse_state, 1) over win1 as lag_mse_state," + - " lag(well_depth, 1) over win1 as lag_well_depth," + - " lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator" + - " from t_telemetry_data_saub" + - $" where id_telemetry = {idTelemetry}" + - " window win1 as (order by \"date\")" + - " ) as ttt" + - "where" + - $" (row_num % {ratio}) = 0" + - " or \"mode\" != lag_mode" + - $" or(\"date\" - lag_date) > interval '{maxDateGapSec} second'" + - $" or well_depth - lag_well_depth > {maxWellDepthGap}" + - " or mse_state != lag_mse_state" + - " or id_feed_regulator != lag_id_feed_regulator;"; - - var query = dbset.FromSqlRaw(sql); - - await Task.Delay(0); - return (0, 0); - } } } diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs index b555286b..5cd51161 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs @@ -30,10 +30,5 @@ namespace AsbCloudInfrastructure.Services.SAUB dto.DateTime = src.DateTime.ToRemoteDateTime(timezoneOffset); return dto; } - - public override Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token) - { - throw new System.NotImplementedException(); - } } } diff --git a/ConsoleApp1/Program.cs b/ConsoleApp1/Program.cs index 3bb591fc..11c0ca13 100644 --- a/ConsoleApp1/Program.cs +++ b/ConsoleApp1/Program.cs @@ -6,7 +6,9 @@ using AsbCloudInfrastructure.Services.DailyReport; using ClosedXML.Excel; using DocumentFormat.OpenXml.Wordprocessing; using Microsoft.EntityFrameworkCore; +using Org.BouncyCastle.Utilities.Collections; using System; +using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; @@ -18,96 +20,14 @@ namespace ConsoleApp1 class Program { private static AsbCloudDbContext db = ServiceFactory.Context; - - public delegate void OnProgress(int handled, int total); - - public static async Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, OnProgress onProgress, CancellationToken token) - { - const int ramLimit = 10 * 1024 * 1024; - const int dataItemSize = 345; // by profiler - const int itemsCountLimit = ramLimit / dataItemSize; // ~ 90_000 - const double maxWellDepthGap = 0.1; - - ratio = ratio > 0 ? ratio : 5; - maxDateGapSec = maxDateGapSec > 0 ? maxDateGapSec : 9; - - var dbset = db.Set(); - var oldCount = await dbset.Where(t => t.IdTelemetry == idTelemetry).CountAsync(token); - onProgress?.Invoke(0, oldCount); - - var sqlSelect = - "select " + - " * " + - "from " + - " (select " + - " *, " + - " rank() over win1 as row_num, " + - " lag(\"date\", 1) over win1 as lag_date, " + - " lag(\"mode\", 1) over win1 as lag_mode, " + - " lag(mse_state, 1) over win1 as lag_mse_state, " + - " lag(well_depth, 1) over win1 as lag_well_depth, " + - " lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator " + - " from t_telemetry_data_saub " + - $" where id_telemetry = {idTelemetry} and \"date\" > {{0}}" + - " window win1 as (order by \"date\") " + - " ) as ttt " + - "where " + - $" (row_num % {ratio}) = 0 " + - " or \"mode\" != lag_mode " + - $" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " + - $" or well_depth - lag_well_depth > {maxWellDepthGap:#0,0#} " + - " or mse_state != lag_mse_state " + - " or id_feed_regulator != lag_id_feed_regulator " + - "order by \"date\" "; - - var sqlDelete = "delete " + - "from t_telemetry_data_saub " + - $"where id_telemetry = {idTelemetry} and \"date\" between {{0}} and {{1}};"; - - var startDate = DateTimeOffset.MinValue; - var deleted = 0; - var saved = 0; - do - { - var query = dbset - .FromSqlRaw(sqlSelect, startDate) - .AsNoTracking(); - - var data = await query - .Take(itemsCountLimit) - .ToArrayAsync(token); - - var currentDataCount = data.Length; - if (currentDataCount == 0) - break; - - var lastDate = data.Last().DateTime; - - var currentDeleted = await db.Database.ExecuteSqlRawAsync(sqlDelete, new object[]{ startDate, lastDate}.AsEnumerable(), token); - if (currentDeleted == 0) - break; - - await db.Database.ExecInsertAsync(dbset, data, token); - - startDate = lastDate; - deleted += currentDeleted; - saved += currentDataCount; - onProgress?.Invoke(deleted, oldCount); - } while (true); - - return (oldCount, saved); - } - + // use ServiceFactory to make services static void Main(/*string[] args*/) { - var i = 0; - var sw = new System.Diagnostics.Stopwatch(); - sw.Start(); - var result = RediceSamplingAsync(94, 5, 5, (p, t) => { Console.WriteLine($"{i++:0000}\t{p:00_000_000}\t{t:00_000_000}\t{1d*p/t:00.00}"); }, CancellationToken.None).Result; - sw.Stop(); - Console.WriteLine($"result: saved {result.newCount} old = {result.oldCount} ratio = {1d*result.oldCount/result.newCount}"); - Console.WriteLine($"total time: {sw.ElapsedMilliseconds} ms"); + Task.Run(() => { throw new Exception(); }) + .ContinueWith(t => { Console.WriteLine("Dooom"); }); + Console.WriteLine($"total time: ms"); + Console.ReadLine(); } } }