первая версия Reduce service

This commit is contained in:
ngfrolov 2022-10-11 09:02:53 +05:00
parent 4d42c9e5ad
commit 8cbcd9a115
5 changed files with 325 additions and 141 deletions

View File

@ -0,0 +1,317 @@
using AsbCloudDb;
using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services
{
#nullable enable
public class JobDto
{
public enum JobState { Waiting, Working, Done, Fail };
public int Id { get; internal set; }
public JobState State { get; internal set; }
public object? Result { get; internal set; }
public Exception? Error { get; internal set; }
}
class JobWithProgress: JobDto
{
public ReduceSamplingService.OnProgressDelagate OnProgress { get; set; } = null!;
}
public class ReduceSamplingService
{
public delegate void OnProgressDelagate(JobDto job);
private static ReduceSamplingService? instance;
private readonly string connectionString;
private const int ratio = 5;
private readonly List<JobWithProgress> jobs = new(5);
private bool isHandling;
private CancellationTokenSource? cancellationTokenSource;
private Task? task;
private ReduceSamplingService(IConfiguration configuration)
{
connectionString = configuration.GetConnectionString("DefaultConnection");
}
~ReduceSamplingService()
{
Stop();
}
public static ReduceSamplingService GetInstance(IConfiguration configuration)
{
if (instance is null)
instance = new(configuration);
return instance;
}
public int TryEnqueueRediceSamplingJob(int idTelemetry, OnProgressDelagate onProgress)
{
var result = 0;
lock (jobs)
{
if (jobs.Any(j => j.Id == idTelemetry))
result = - 1;
var job = new JobWithProgress
{
Id = idTelemetry,
State = JobDto.JobState.Waiting,
OnProgress = onProgress,
};
jobs.Add(job);
result = jobs.Count;
}
EnsureHandleQueueStarted();
return result;
}
public JobDto? GetState(int idTelemetry)
{
JobWithProgress? job;
lock (jobs)
{
job = jobs.FirstOrDefault(j=>j.Id == idTelemetry);
}
return job;
}
public IEnumerable<JobDto> GetJobs()
{
return jobs;
}
private bool TryTakeWaitingJob(out JobWithProgress? job)
{
lock (jobs)
{
job = jobs.FirstOrDefault(j => j.State == JobDto.JobState.Waiting);
}
return job is not null;
}
private void EnsureHandleQueueStarted()
{
if(isHandling)
return;
isHandling = true;
cancellationTokenSource = new CancellationTokenSource();
var token = cancellationTokenSource.Token;
task = Task.Run(async () => await HandleJobs(token))
.ContinueWith(_ => isHandling = false);
}
private async Task HandleJobs(CancellationToken token)
{
while (TryTakeWaitingJob(out JobWithProgress? job))
{
job!.State = JobDto.JobState.Working;
try
{
await RediceSamplingSaubAsync(job, token);
await RediceSamplingSpinAsync(job, token);
job.State = JobDto.JobState.Done;
}
catch (Exception exception)
{
job.State = JobDto.JobState.Fail;
job.Error = exception;
job.OnProgress.Invoke(job);
}
await Task.Delay(100, token);
}
await VacuumAsync(token);
await Task.Delay(1_000, token);
CleanJobs();
}
private void CleanJobs()
{
lock (jobs)
{
jobs.RemoveAll(j => j.State == JobDto.JobState.Done || j.State == JobDto.JobState.Fail);
}
}
private async Task VacuumAsync(CancellationToken token)
{
using var db = MakeContext();
var sqlVacuum = "vacuum (SKIP_LOCKED);";
await db.Database.ExecuteSqlRawAsync(sqlVacuum, token);
}
private void Stop()
{
cancellationTokenSource?.Cancel();
task?.Wait(1_000);
task = null;
cancellationTokenSource = null;
}
/// <summary>
/// Прореживание данных телеметрии САУБ.
/// Каждая ratio запись будет сохранена, остальные удаляются. Остаются (Row_number % ratio) = 0.
/// </summary>
/// <param name="idTelemetry">Id телеметрии</param>
/// <param name="ratio">желаемое отношение оставшихся записей к исходным</param>
/// <returns></returns>
private Task RediceSamplingSaubAsync(JobWithProgress job, CancellationToken token)
{
const int ramLimit = 10 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
const int chankSize = ramLimit / dataItemSize; // ~ 90_000
const double maxWellDepthGap = 0.1;
var maxDateGapSec = ratio;
var sqlSelectTemplate =
"select " +
" * " +
"from " +
" (select " +
" *, " +
" rank() over win1 as row_num, " +
" lag(\"date\", 1) over win1 as lag_date, " +
" lag(\"mode\", 1) over win1 as lag_mode, " +
" lag(mse_state, 1) over win1 as lag_mse_state, " +
" lag(well_depth, 1) over win1 as lag_well_depth, " +
" lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator " +
" from t_telemetry_data_saub " +
$" where id_telemetry = {job.Id} and \"date\" > {{0}}" +
" window win1 as (order by \"date\") " +
" ) as t_1 " +
"where " +
$" (row_num % {ratio}) = 0 " +
" or \"mode\" != lag_mode " +
$" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " +
$" or well_depth - lag_well_depth > {maxWellDepthGap:#0,0#} " +
" or mse_state != lag_mse_state " +
" or id_feed_regulator != lag_id_feed_regulator " +
"order by \"date\" ";
var sqlDeleteTemplate = "delete " +
"from t_telemetry_data_saub " +
$"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};";
return RediceSamplingAsync<TelemetryDataSaub>(
job,
chankSize,
sqlSelectTemplate,
sqlDeleteTemplate,
token);
}
private Task RediceSamplingSpinAsync(JobWithProgress job, CancellationToken token)
{
const int ramLimit = 10 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
const int chankSize = ramLimit / dataItemSize; // ~ 90_000
var maxDateGapSec = ratio;
var sqlSelectTemplate =
"select " +
" * " +
"from " +
" (select " +
" *, " +
" rank() over win1 as row_num, " +
" lag(\"date\", 1) over win1 as lag_date, " +
" lag(\"mode\", 1) over win1 as lag_mode, " +
" lag(state, 1) over win1 as lag_state " +
" from t_telemetry_data_spin " +
$" where id_telemetry = {job.Id} and \"date\" > {{0}}" +
" window win1 as (order by \"date\") " +
" ) as t_1 " +
"where " +
$" (row_num % {ratio}) = 0 " +
" or \"mode\" != lag_mode " +
$" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " +
" or state != lag_state " +
"order by \"date\" ";
var sqlDeleteTemplate = "delete " +
"from t_telemetry_data_spin " +
$"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};";
return RediceSamplingAsync<TelemetryDataSpin>(
job,
chankSize,
sqlSelectTemplate,
sqlDeleteTemplate,
token);
}
private async Task RediceSamplingAsync<TEntity>(
JobWithProgress job,
int chankSize,
string sqlSelectTemplate,
string sqlDeleteTemplate,
CancellationToken token)
where TEntity: class, ITelemetryData
{
using var db = MakeContext();
var dbset = db.Set<TEntity>();
var deleted = 0;
var oldCount = await dbset.Where(t => t.IdTelemetry == job.Id).CountAsync(token);
if (job.Result is Tuple<int, int> tupleResult)
{
deleted += tupleResult.Item1;
oldCount += tupleResult.Item2;
}
job.Result = (deleted, oldCount);
job.OnProgress?.Invoke(job);
var startDate = DateTimeOffset.MinValue;
do
{
var query = dbset
.FromSqlRaw(sqlSelectTemplate, startDate)
.AsNoTracking();
var data = await query
.Take(chankSize)
.ToArrayAsync(token);
var currentDataCount = data.Length;
if (currentDataCount == 0)
break;
var lastDate = data.Last().DateTime;
var currentDeleted = await db.Database.ExecuteSqlRawAsync(sqlDeleteTemplate, new object[] { startDate, lastDate }.AsEnumerable(), token);
if (currentDeleted == 0)
break;
await db.Database.ExecInsertAsync(dbset, data, token);
startDate = lastDate;
deleted += currentDeleted;
job.Result = (deleted, oldCount);
job.OnProgress?.Invoke(job);
} while (true);
return;
}
private AsbCloudDbContext MakeContext()
{
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql(connectionString)
.Options;
return new AsbCloudDbContext(options);
}
}
#nullable disable
}

View File

@ -171,15 +171,6 @@ namespace AsbCloudInfrastructure.Services.SAUB
return offset;
}
/// <summary>
/// Прореживание данных телеметрии.
/// Каждая ratio запись будет сохранена, остальные удаляются. Остаются (Row_number % ratio) = 0.
/// Из-за возможности запуска повторного прореживания можно ограничить величину разрыва по времени параметром maxDateGapSec.
/// </summary>
/// <param name="idTelemetry">Id телеметрии</param>
/// <param name="ratio">желаемое отношение оставшихся записей к исходным</param>
/// <param name="maxDateGapSec">ограничение разрыва по времени</param>
/// <returns></returns>
public abstract Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token);
}
}

View File

@ -39,44 +39,5 @@ namespace AsbCloudInfrastructure.Services.SAUB
dto.DateTime = src.DateTime.ToRemoteDateTime(timezoneOffset);
return dto;
}
public override async Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token)
{
const int ramLimit = 50 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
const int itemsCountLimit = ramLimit / dataItemSize; // ~ 150_000, iterations count ~ 46
const int maxWellDepthGap = 1;
var dbset = db.Set<TelemetryDataSaub>();
var sql =
"select" +
" *" +
"from" +
" (select" +
" *," +
" rank() over win1 as row_num," +
" lag(\"date\", 1) over win1 as lag_date," +
" lag(\"mode\", 1) over win1 as lag_mode," +
" lag(mse_state, 1) over win1 as lag_mse_state," +
" lag(well_depth, 1) over win1 as lag_well_depth," +
" lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator" +
" from t_telemetry_data_saub" +
$" where id_telemetry = {idTelemetry}" +
" window win1 as (order by \"date\")" +
" ) as ttt" +
"where" +
$" (row_num % {ratio}) = 0" +
" or \"mode\" != lag_mode" +
$" or(\"date\" - lag_date) > interval '{maxDateGapSec} second'" +
$" or well_depth - lag_well_depth > {maxWellDepthGap}" +
" or mse_state != lag_mse_state" +
" or id_feed_regulator != lag_id_feed_regulator;";
var query = dbset.FromSqlRaw(sql);
await Task.Delay(0);
return (0, 0);
}
}
}

View File

@ -30,10 +30,5 @@ namespace AsbCloudInfrastructure.Services.SAUB
dto.DateTime = src.DateTime.ToRemoteDateTime(timezoneOffset);
return dto;
}
public override Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, CancellationToken token)
{
throw new System.NotImplementedException();
}
}
}

View File

@ -6,7 +6,9 @@ using AsbCloudInfrastructure.Services.DailyReport;
using ClosedXML.Excel;
using DocumentFormat.OpenXml.Wordprocessing;
using Microsoft.EntityFrameworkCore;
using Org.BouncyCastle.Utilities.Collections;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
@ -18,96 +20,14 @@ namespace ConsoleApp1
class Program
{
private static AsbCloudDbContext db = ServiceFactory.Context;
public delegate void OnProgress(int handled, int total);
public static async Task<(int oldCount, int newCount)> RediceSamplingAsync(int idTelemetry, int ratio, int maxDateGapSec, OnProgress onProgress, CancellationToken token)
{
const int ramLimit = 10 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
const int itemsCountLimit = ramLimit / dataItemSize; // ~ 90_000
const double maxWellDepthGap = 0.1;
ratio = ratio > 0 ? ratio : 5;
maxDateGapSec = maxDateGapSec > 0 ? maxDateGapSec : 9;
var dbset = db.Set<TelemetryDataSaub>();
var oldCount = await dbset.Where(t => t.IdTelemetry == idTelemetry).CountAsync(token);
onProgress?.Invoke(0, oldCount);
var sqlSelect =
"select " +
" * " +
"from " +
" (select " +
" *, " +
" rank() over win1 as row_num, " +
" lag(\"date\", 1) over win1 as lag_date, " +
" lag(\"mode\", 1) over win1 as lag_mode, " +
" lag(mse_state, 1) over win1 as lag_mse_state, " +
" lag(well_depth, 1) over win1 as lag_well_depth, " +
" lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator " +
" from t_telemetry_data_saub " +
$" where id_telemetry = {idTelemetry} and \"date\" > {{0}}" +
" window win1 as (order by \"date\") " +
" ) as ttt " +
"where " +
$" (row_num % {ratio}) = 0 " +
" or \"mode\" != lag_mode " +
$" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " +
$" or well_depth - lag_well_depth > {maxWellDepthGap:#0,0#} " +
" or mse_state != lag_mse_state " +
" or id_feed_regulator != lag_id_feed_regulator " +
"order by \"date\" ";
var sqlDelete = "delete " +
"from t_telemetry_data_saub " +
$"where id_telemetry = {idTelemetry} and \"date\" between {{0}} and {{1}};";
var startDate = DateTimeOffset.MinValue;
var deleted = 0;
var saved = 0;
do
{
var query = dbset
.FromSqlRaw(sqlSelect, startDate)
.AsNoTracking();
var data = await query
.Take(itemsCountLimit)
.ToArrayAsync(token);
var currentDataCount = data.Length;
if (currentDataCount == 0)
break;
var lastDate = data.Last().DateTime;
var currentDeleted = await db.Database.ExecuteSqlRawAsync(sqlDelete, new object[]{ startDate, lastDate}.AsEnumerable(), token);
if (currentDeleted == 0)
break;
await db.Database.ExecInsertAsync(dbset, data, token);
startDate = lastDate;
deleted += currentDeleted;
saved += currentDataCount;
onProgress?.Invoke(deleted, oldCount);
} while (true);
return (oldCount, saved);
}
// use ServiceFactory to make services
static void Main(/*string[] args*/)
{
var i = 0;
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
var result = RediceSamplingAsync(94, 5, 5, (p, t) => { Console.WriteLine($"{i++:0000}\t{p:00_000_000}\t{t:00_000_000}\t{1d*p/t:00.00}"); }, CancellationToken.None).Result;
sw.Stop();
Console.WriteLine($"result: saved {result.newCount} old = {result.oldCount} ratio = {1d*result.oldCount/result.newCount}");
Console.WriteLine($"total time: {sw.ElapsedMilliseconds} ms");
Task.Run(() => { throw new Exception(); })
.ContinueWith(t => { Console.WriteLine("Dooom"); });
Console.WriteLine($"total time: ms");
Console.ReadLine();
}
}
}