using AsbCloudApp.Data; using AsbCloudApp.Services; using AsbCloudDb; using AsbCloudDb.Model; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using System; using System.Collections.Generic; using System.Configuration; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace AsbCloudInfrastructure.Services; class JobHandle { public int Id => Job.Id; public JobDto Job { get; set; } = null!; public DateTime DateCreate { get; set; } = DateTime.Now; public OnJobProgressDelagate? OnProgress { get; set; } } public class ReduceSamplingService : IReduceSamplingService { private const string jobResultKeyDeleted = "deleted"; private const string jobResultKeyTotal = "total"; private static ReduceSamplingService? instance; private readonly string connectionString; private const int ratio = 5; private readonly List jobHandlers = new(5); private bool isHandling; private CancellationTokenSource? cancellationTokenSource; private Task? task; private ReduceSamplingService(IConfiguration configuration) { connectionString = configuration.GetConnectionString("DefaultConnection") ?? throw new ConfigurationErrorsException("DefaultConnection не определен"); } ~ReduceSamplingService() { Stop(); } /// /// Get singleton Instance /// /// /// public static ReduceSamplingService GetInstance(IConfiguration configuration) { instance ??= new(configuration); return instance; } /// public bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate? onProgress, out JobDto jobDto) { lock (jobHandlers) { var oldJob = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry); if (oldJob is not null) { jobDto = oldJob.Job; return false; } jobDto = new JobDto { Id = idTelemetry, State = JobState.Waiting, Results = new(), }; var jobHandler = new JobHandle { Job = jobDto, OnProgress = onProgress, }; jobHandlers.Add(jobHandler); } EnsureHandleQueueStarted(); return true; } /// public JobDto? GetOrDefaultState(int idTelemetry) { JobHandle? jobHandler; lock (jobHandlers) { jobHandler = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry); } return jobHandler?.Job; } /// public IEnumerable GetJobs() => jobHandlers.Select(j=>j.Job); private bool TryTakeWaitingJob(out JobHandle? job) { lock (jobHandlers) { job = jobHandlers.FirstOrDefault(j => j.Job.State == JobState.Waiting); } return job is not null; } private void EnsureHandleQueueStarted() { if (isHandling) return; isHandling = true; cancellationTokenSource = new CancellationTokenSource(); var token = cancellationTokenSource.Token; task = Task.Run(async () => await HandleJobs(token)) .ContinueWith(_ => isHandling = false); } private async Task HandleJobs(CancellationToken token) { while (TryTakeWaitingJob(out JobHandle? jobHandler)) { jobHandler!.Job.State = JobState.Working; try { await RediceSamplingSaubAsync(jobHandler, token); await RediceSamplingSpinAsync(jobHandler, token); jobHandler.Job.State = JobState.Done; } catch (Exception exception) { jobHandler.Job.State = JobState.Fail; jobHandler.Job.Results = null; jobHandler.Job.Error = exception.Message; jobHandler.OnProgress?.Invoke(jobHandler.Job); } if (!jobHandlers.Any(j => j.Job.State == JobState.Waiting)) { var sw = Stopwatch.StartNew(); await VacuumAsync(token); sw.Stop(); if (sw.ElapsedMilliseconds < 10_000) { var delayMs = 10_000 - (int)sw.ElapsedMilliseconds; await Task.Delay(delayMs, token); } CleanJobs(); } } } private void CleanJobs() { lock (jobHandlers) { jobHandlers.RemoveAll(j => j.Job.State == JobState.Done || j.Job.State == JobState.Fail); } } private async Task VacuumAsync(CancellationToken token) { using var db = MakeContext(); var sqlVacuum = "vacuum (SKIP_LOCKED);"; await db.Database.ExecuteSqlRawAsync(sqlVacuum, token); } private void Stop() { cancellationTokenSource?.Cancel(); task?.Wait(1_000); task = null; cancellationTokenSource?.Dispose(); cancellationTokenSource = null; } private Task RediceSamplingSaubAsync(JobHandle job, CancellationToken token) { const int ramLimit = 10 * 1024 * 1024; const int dataItemSize = 345; // by profiler const int chankSize = ramLimit / dataItemSize; // ~ 90_000 const double maxWellDepthGap = 0.1; var maxDateGapSec = ratio; var sqlSelectTemplate = "select " + " * " + "from " + " (select " + " *, " + " rank() over win1 as row_num, " + " lag(\"date\", 1) over win1 as lag_date, " + " lag(\"mode\", 1) over win1 as lag_mode, " + " lag(mse_state, 1) over win1 as lag_mse_state, " + " lag(well_depth, 1) over win1 as lag_well_depth, " + " lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator " + " from t_telemetry_data_saub " + $" where id_telemetry = {job.Id} and \"date\" > {{0}}" + " window win1 as (order by \"date\") " + " ) as t_1 " + "where " + $" (row_num % {ratio}) = 0 " + " or \"mode\" != lag_mode " + $" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " + $" or well_depth - lag_well_depth > {maxWellDepthGap:#0,0#} " + " or mse_state != lag_mse_state " + " or id_feed_regulator != lag_id_feed_regulator " + "order by \"date\" "; var sqlDeleteTemplate = "delete " + "from t_telemetry_data_saub " + $"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};"; return RediceSamplingAsync( job, chankSize, sqlSelectTemplate, sqlDeleteTemplate, token); } private Task RediceSamplingSpinAsync(JobHandle job, CancellationToken token) { const int ramLimit = 10 * 1024 * 1024; const int dataItemSize = 345; // by profiler const int chankSize = ramLimit / dataItemSize; // ~ 90_000 var maxDateGapSec = ratio; var sqlSelectTemplate = "select " + " * " + "from " + " (select " + " *, " + " rank() over win1 as row_num, " + " lag(\"date\", 1) over win1 as lag_date, " + " lag(\"mode\", 1) over win1 as lag_mode, " + " lag(state, 1) over win1 as lag_state " + " from t_telemetry_data_spin " + $" where id_telemetry = {job.Id} and \"date\" > {{0}}" + " window win1 as (order by \"date\") " + " ) as t_1 " + "where " + $" (row_num % {ratio}) = 0 " + " or \"mode\" != lag_mode " + $" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " + " or state != lag_state " + "order by \"date\" "; var sqlDeleteTemplate = "delete " + "from t_telemetry_data_spin " + $"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};"; return RediceSamplingAsync( job, chankSize, sqlSelectTemplate, sqlDeleteTemplate, token); } private async Task RediceSamplingAsync( JobHandle jobHandle, int chankSize, string sqlSelectTemplate, string sqlDeleteTemplate, CancellationToken token) where TEntity : class, AsbCloudDb.Model.ITelemetryData { using var db = MakeContext(); var dbset = db.Set(); var deleted = 0; var totalCount = await dbset.Where(t => t.IdTelemetry == jobHandle.Id).CountAsync(token); var result = jobHandle.Job.Results!; if (result[jobResultKeyDeleted] is int previousDeleted) deleted += previousDeleted; if (result[jobResultKeyTotal] is int previousCount) totalCount += previousCount; result[jobResultKeyDeleted] = deleted; result[jobResultKeyTotal] = totalCount; jobHandle.OnProgress?.Invoke(jobHandle.Job); var startDate = DateTimeOffset.MinValue; do { var query = dbset .FromSqlRaw(sqlSelectTemplate, startDate) .AsNoTracking(); var data = await query .Take(chankSize) .ToArrayAsync(token); var currentDataCount = data.Length; if (currentDataCount == 0) break; var lastDate = data.Last().DateTime; var currentDeleted = await db.Database.ExecuteSqlRawAsync(sqlDeleteTemplate, new object[] { startDate, lastDate }.AsEnumerable(), token); if (currentDeleted == 0) break; await db.Database.ExecInsertAsync(dbset, data, token); startDate = lastDate; deleted += currentDeleted; result[jobResultKeyDeleted] = deleted; result[jobResultKeyTotal] = totalCount; jobHandle.OnProgress?.Invoke(jobHandle.Job); } while (true); return; } private AsbCloudDbContext MakeContext() { var options = new DbContextOptionsBuilder() .UseNpgsql(connectionString) .Options; return new AsbCloudDbContext(options); } }