DD.WellWorkover.Cloud/AsbCloudInfrastructure/Services/ReduceSamplingService.cs

338 lines
12 KiB
C#
Raw Normal View History

using AsbCloudApp.Data;
2022-10-11 17:04:26 +05:00
using AsbCloudApp.Services;
using AsbCloudDb;
using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Configuration;
2022-10-11 17:04:26 +05:00
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services
{
2022-10-11 17:04:26 +05:00
class JobHandle
{
2022-10-11 17:04:26 +05:00
public int Id => Job.Id;
public JobDto Job { get; set; } = null!;
public DateTime DateCreate { get; set; } = DateTime.Now;
public OnJobProgressDelagate? OnProgress { get; set; }
}
2022-10-11 17:04:26 +05:00
public class ReduceSamplingService : IReduceSamplingService
{
2022-10-11 17:04:26 +05:00
private const string jobResultKeyDeleted = "deleted";
private const string jobResultKeyTotal = "total";
private static ReduceSamplingService? instance;
private readonly string connectionString;
private const int ratio = 5;
2022-10-11 17:04:26 +05:00
private readonly List<JobHandle> jobHandlers = new(5);
private bool isHandling;
private CancellationTokenSource? cancellationTokenSource;
private Task? task;
private ReduceSamplingService(IConfiguration configuration)
{
connectionString = configuration.GetConnectionString("DefaultConnection")
?? throw new ConfigurationErrorsException("DefaultConnection не определен");
}
~ReduceSamplingService()
{
Stop();
}
2022-10-11 17:04:26 +05:00
/// <summary>
/// Get singleton Instance
/// </summary>
/// <param name="configuration"></param>
/// <returns></returns>
public static ReduceSamplingService GetInstance(IConfiguration configuration)
{
2022-10-11 17:04:26 +05:00
instance ??= new(configuration);
return instance;
}
2022-10-11 17:04:26 +05:00
/// <inheritdoc/>
public bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate? onProgress, out JobDto jobDto)
{
2022-10-11 17:04:26 +05:00
lock (jobHandlers)
{
2022-10-11 17:04:26 +05:00
var oldJob = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry);
if (oldJob is not null)
{
jobDto = oldJob.Job;
return false;
}
2022-10-11 17:04:26 +05:00
jobDto = new JobDto
{
Id = idTelemetry,
2022-10-11 17:04:26 +05:00
State = JobState.Waiting,
Results = new(),
};
var jobHandler = new JobHandle
{
Job = jobDto,
OnProgress = onProgress,
};
2022-10-11 17:04:26 +05:00
jobHandlers.Add(jobHandler);
}
EnsureHandleQueueStarted();
2022-10-11 17:04:26 +05:00
return true;
}
2022-10-11 17:04:26 +05:00
/// <inheritdoc/>
public JobDto? GetOrDefaultState(int idTelemetry)
{
2022-10-11 17:04:26 +05:00
JobHandle? jobHandler;
lock (jobHandlers)
{
2022-10-11 17:04:26 +05:00
jobHandler = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry);
}
2022-10-11 17:04:26 +05:00
return jobHandler?.Job;
}
2022-10-11 17:04:26 +05:00
/// <inheritdoc/>
public IEnumerable<JobDto> GetJobs() => jobHandlers.Select(j=>j.Job);
2022-10-11 17:04:26 +05:00
private bool TryTakeWaitingJob(out JobHandle? job)
{
2022-10-11 17:04:26 +05:00
lock (jobHandlers)
{
2022-10-11 17:04:26 +05:00
job = jobHandlers.FirstOrDefault(j => j.Job.State == JobState.Waiting);
}
return job is not null;
}
private void EnsureHandleQueueStarted()
{
2022-10-11 17:04:26 +05:00
if (isHandling)
return;
isHandling = true;
cancellationTokenSource = new CancellationTokenSource();
var token = cancellationTokenSource.Token;
task = Task.Run(async () => await HandleJobs(token))
.ContinueWith(_ => isHandling = false);
}
private async Task HandleJobs(CancellationToken token)
{
2022-10-11 17:04:26 +05:00
while (TryTakeWaitingJob(out JobHandle? jobHandler))
{
2022-10-11 17:04:26 +05:00
jobHandler!.Job.State = JobState.Working;
try
{
2022-10-11 17:04:26 +05:00
await RediceSamplingSaubAsync(jobHandler, token);
await RediceSamplingSpinAsync(jobHandler, token);
jobHandler.Job.State = JobState.Done;
}
catch (Exception exception)
{
2022-10-11 17:04:26 +05:00
jobHandler.Job.State = JobState.Fail;
jobHandler.Job.Results = null;
jobHandler.Job.Error = exception.Message;
jobHandler.OnProgress?.Invoke(jobHandler.Job);
}
if (!jobHandlers.Any(j => j.Job.State == JobState.Waiting))
{
var sw = Stopwatch.StartNew();
await VacuumAsync(token);
sw.Stop();
if (sw.ElapsedMilliseconds < 10_000)
{
var delayMs = 10_000 - (int)sw.ElapsedMilliseconds;
await Task.Delay(delayMs, token);
}
CleanJobs();
}
}
}
private void CleanJobs()
{
2022-10-11 17:04:26 +05:00
lock (jobHandlers)
{
2022-10-11 17:04:26 +05:00
jobHandlers.RemoveAll(j => j.Job.State == JobState.Done || j.Job.State == JobState.Fail);
}
}
private async Task VacuumAsync(CancellationToken token)
{
using var db = MakeContext();
var sqlVacuum = "vacuum (SKIP_LOCKED);";
await db.Database.ExecuteSqlRawAsync(sqlVacuum, token);
}
private void Stop()
{
cancellationTokenSource?.Cancel();
task?.Wait(1_000);
task = null;
2022-10-11 17:04:26 +05:00
cancellationTokenSource?.Dispose();
cancellationTokenSource = null;
}
2022-10-11 17:04:26 +05:00
private Task RediceSamplingSaubAsync(JobHandle job, CancellationToken token)
{
const int ramLimit = 10 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
const int chankSize = ramLimit / dataItemSize; // ~ 90_000
const double maxWellDepthGap = 0.1;
var maxDateGapSec = ratio;
var sqlSelectTemplate =
"select " +
" * " +
"from " +
" (select " +
" *, " +
" rank() over win1 as row_num, " +
" lag(\"date\", 1) over win1 as lag_date, " +
" lag(\"mode\", 1) over win1 as lag_mode, " +
" lag(mse_state, 1) over win1 as lag_mse_state, " +
" lag(well_depth, 1) over win1 as lag_well_depth, " +
" lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator " +
" from t_telemetry_data_saub " +
$" where id_telemetry = {job.Id} and \"date\" > {{0}}" +
" window win1 as (order by \"date\") " +
" ) as t_1 " +
"where " +
$" (row_num % {ratio}) = 0 " +
" or \"mode\" != lag_mode " +
$" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " +
$" or well_depth - lag_well_depth > {maxWellDepthGap:#0,0#} " +
" or mse_state != lag_mse_state " +
" or id_feed_regulator != lag_id_feed_regulator " +
"order by \"date\" ";
var sqlDeleteTemplate = "delete " +
"from t_telemetry_data_saub " +
$"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};";
return RediceSamplingAsync<TelemetryDataSaub>(
job,
2022-10-11 17:04:26 +05:00
chankSize,
sqlSelectTemplate,
sqlDeleteTemplate,
token);
}
2022-10-11 17:04:26 +05:00
private Task RediceSamplingSpinAsync(JobHandle job, CancellationToken token)
{
const int ramLimit = 10 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
const int chankSize = ramLimit / dataItemSize; // ~ 90_000
var maxDateGapSec = ratio;
var sqlSelectTemplate =
"select " +
" * " +
"from " +
" (select " +
" *, " +
" rank() over win1 as row_num, " +
" lag(\"date\", 1) over win1 as lag_date, " +
" lag(\"mode\", 1) over win1 as lag_mode, " +
" lag(state, 1) over win1 as lag_state " +
" from t_telemetry_data_spin " +
$" where id_telemetry = {job.Id} and \"date\" > {{0}}" +
" window win1 as (order by \"date\") " +
" ) as t_1 " +
"where " +
$" (row_num % {ratio}) = 0 " +
" or \"mode\" != lag_mode " +
$" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " +
" or state != lag_state " +
"order by \"date\" ";
var sqlDeleteTemplate = "delete " +
"from t_telemetry_data_spin " +
$"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};";
return RediceSamplingAsync<TelemetryDataSpin>(
job,
chankSize,
sqlSelectTemplate,
sqlDeleteTemplate,
token);
}
private async Task RediceSamplingAsync<TEntity>(
2022-10-11 17:04:26 +05:00
JobHandle jobHandle,
int chankSize,
string sqlSelectTemplate,
string sqlDeleteTemplate,
CancellationToken token)
2022-10-11 17:04:26 +05:00
where TEntity : class, AsbCloudDb.Model.ITelemetryData
{
using var db = MakeContext();
var dbset = db.Set<TEntity>();
var deleted = 0;
2022-10-11 17:04:26 +05:00
var totalCount = await dbset.Where(t => t.IdTelemetry == jobHandle.Id).CountAsync(token);
2022-10-11 17:04:26 +05:00
var result = jobHandle.Job.Results!;
if (result[jobResultKeyDeleted] is int previousDeleted)
deleted += previousDeleted;
if (result[jobResultKeyTotal] is int previousCount)
totalCount += previousCount;
result[jobResultKeyDeleted] = deleted;
result[jobResultKeyTotal] = totalCount;
jobHandle.OnProgress?.Invoke(jobHandle.Job);
var startDate = DateTimeOffset.MinValue;
do
{
var query = dbset
.FromSqlRaw(sqlSelectTemplate, startDate)
.AsNoTracking();
var data = await query
.Take(chankSize)
.ToArrayAsync(token);
var currentDataCount = data.Length;
if (currentDataCount == 0)
break;
var lastDate = data.Last().DateTime;
var currentDeleted = await db.Database.ExecuteSqlRawAsync(sqlDeleteTemplate, new object[] { startDate, lastDate }.AsEnumerable(), token);
if (currentDeleted == 0)
break;
await db.Database.ExecInsertAsync(dbset, data, token);
startDate = lastDate;
deleted += currentDeleted;
2022-10-11 17:04:26 +05:00
result[jobResultKeyDeleted] = deleted;
result[jobResultKeyTotal] = totalCount;
jobHandle.OnProgress?.Invoke(jobHandle.Job);
} while (true);
return;
}
private AsbCloudDbContext MakeContext()
{
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql(connectionString)
.Options;
return new AsbCloudDbContext(options);
}
}
}