Merge branch 'Redice_telemetry_sampling' into dev

This commit is contained in:
ngfrolov 2022-10-11 17:05:27 +05:00
commit d86f3fc1de
13 changed files with 563 additions and 53 deletions

View File

@ -0,0 +1,56 @@
using System;
using System.Collections;
namespace AsbCloudApp.Data
{
#nullable enable
/// <summary>
/// Состояние фоновой задачи
/// </summary>
public enum JobState
{
/// <summary>
/// Ожидает в очереди на выполнение
/// </summary>
Waiting,
/// <summary>
/// выполняется
/// </summary>
Working,
/// <summary>
/// успешно выполнена
/// </summary>
Done,
/// <summary>
/// завершена с ошибкой
/// </summary>
Fail
};
/// <summary>
/// работа фоновой задачи
/// </summary>
public class JobDto
{
/// <summary>
/// идентификатор
/// </summary>
public int Id { get; set; }
/// <summary>
/// Состояние
/// </summary>
public JobState State { get; set; }
/// <summary>
/// результат выполнения
/// </summary>
public Hashtable? Results { get; set; }
/// <summary>
/// Исключение, если возникла ошибка
/// </summary>
public string? Error { get; set; }
}
#nullable disable
}

View File

@ -0,0 +1,43 @@
using AsbCloudApp.Data;
using System.Collections.Generic;
namespace AsbCloudApp.Services
{
#nullable enable
/// <summary>
/// Делегат обновления состояния задачи
/// </summary>
/// <param name="job"></param>
public delegate void OnJobProgressDelagate(JobDto job);
/// <summary>
/// Сервис прореживания архива БД.
/// Удаляет часть телеметрии.
/// Понижает частоту записей в БД с 1 запись за 1 сек до 1 запись за N сек.
/// </summary>
public interface IReduceSamplingService
{
/// <summary>
/// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее.
/// </summary>
/// <returns>Enumerable of JobDto or empty</returns>
IEnumerable<JobDto> GetJobs();
/// <summary>
/// Получить состояние определенной задачи
/// </summary>
/// <param name="idTelemetry"></param>
/// <returns></returns>
JobDto? GetOrDefaultState(int idTelemetry);
/// <summary>
/// Создать задачу прореживанию архива и добавить её в очередь на выполнение
/// </summary>
/// <param name="idTelemetry">телеметрия для прореживания</param>
/// <param name="onProgress">колбек процесса выполнения</param>
/// <param name="jobDto">созданная задача или задача из очереди</param>
/// <returns>задача добавлена == true</returns>
bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate onProgress, out JobDto jobDto);
}
#nullable disable
}

View File

@ -69,6 +69,15 @@ namespace AsbCloudDb
return database.ExecuteSqlRawAsync(query, token);
}
public static Task<int> ExecInsertAsync<T>(this Microsoft.EntityFrameworkCore.Infrastructure.DatabaseFacade database, DbSet<T> dbSet, IEnumerable<T> items, CancellationToken token)
where T : class
{
var factory = GetQueryStringFactory(dbSet);
var query = factory.MakeInsertSql(items);
return database.ExecuteSqlRawAsync(query, token);
}
public static string GetTableName<T>(this DbSet<T> dbSet)
where T : class
{
@ -154,6 +163,14 @@ namespace AsbCloudDb
return builder.ToString();
}
public string MakeInsertSql(IEnumerable<T> items)
{
var builder = new StringBuilder(insertHeader, 7);
BuildRows(builder, items);
builder.Append(';');
return builder.ToString();
}
private StringBuilder BuildRows(StringBuilder builder, IEnumerable<T> items)
{
var list = items.ToList();

View File

@ -103,6 +103,7 @@ namespace AsbCloudInfrastructure
services.AddSingleton<ITelemetryTracker, TelemetryTracker>();
services.AddSingleton<IRequerstTrackerService, RequestTrackerService>();
services.AddSingleton<IBackgroundWorkerService, BackgroundWorkerService>();
services.AddSingleton<IReduceSamplingService>(provider => ReduceSamplingService.GetInstance(configuration));
services.AddTransient<IAuthService, AuthService>();
services.AddTransient<IClusterService, ClusterService>();

View File

@ -0,0 +1,335 @@
using AsbCloudApp.Data;
using AsbCloudApp.Services;
using AsbCloudDb;
using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services
{
#nullable enable
class JobHandle
{
public int Id => Job.Id;
public JobDto Job { get; set; } = null!;
public DateTime DateCreate { get; set; } = DateTime.Now;
public OnJobProgressDelagate? OnProgress { get; set; }
}
public class ReduceSamplingService : IReduceSamplingService
{
private const string jobResultKeyDeleted = "deleted";
private const string jobResultKeyTotal = "total";
private static ReduceSamplingService? instance;
private readonly string connectionString;
private const int ratio = 5;
private readonly List<JobHandle> jobHandlers = new(5);
private bool isHandling;
private CancellationTokenSource? cancellationTokenSource;
private Task? task;
private ReduceSamplingService(IConfiguration configuration)
{
connectionString = configuration.GetConnectionString("DefaultConnection");
}
~ReduceSamplingService()
{
Stop();
}
/// <summary>
/// Get singleton Instance
/// </summary>
/// <param name="configuration"></param>
/// <returns></returns>
public static ReduceSamplingService GetInstance(IConfiguration configuration)
{
instance ??= new(configuration);
return instance;
}
/// <inheritdoc/>
public bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate? onProgress, out JobDto jobDto)
{
lock (jobHandlers)
{
var oldJob = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry);
if (oldJob is not null)
{
jobDto = oldJob.Job;
return false;
}
jobDto = new JobDto
{
Id = idTelemetry,
State = JobState.Waiting,
Results = new(),
};
var jobHandler = new JobHandle
{
Job = jobDto,
OnProgress = onProgress,
};
jobHandlers.Add(jobHandler);
}
EnsureHandleQueueStarted();
return true;
}
/// <inheritdoc/>
public JobDto? GetOrDefaultState(int idTelemetry)
{
JobHandle? jobHandler;
lock (jobHandlers)
{
jobHandler = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry);
}
return jobHandler?.Job;
}
/// <inheritdoc/>
public IEnumerable<JobDto> GetJobs() => jobHandlers.Select(j=>j.Job);
private bool TryTakeWaitingJob(out JobHandle? job)
{
lock (jobHandlers)
{
job = jobHandlers.FirstOrDefault(j => j.Job.State == JobState.Waiting);
}
return job is not null;
}
private void EnsureHandleQueueStarted()
{
if (isHandling)
return;
isHandling = true;
cancellationTokenSource = new CancellationTokenSource();
var token = cancellationTokenSource.Token;
task = Task.Run(async () => await HandleJobs(token))
.ContinueWith(_ => isHandling = false);
}
private async Task HandleJobs(CancellationToken token)
{
while (TryTakeWaitingJob(out JobHandle? jobHandler))
{
jobHandler!.Job.State = JobState.Working;
try
{
await RediceSamplingSaubAsync(jobHandler, token);
await RediceSamplingSpinAsync(jobHandler, token);
jobHandler.Job.State = JobState.Done;
}
catch (Exception exception)
{
jobHandler.Job.State = JobState.Fail;
jobHandler.Job.Results = null;
jobHandler.Job.Error = exception.Message;
jobHandler.OnProgress?.Invoke(jobHandler.Job);
}
if (!jobHandlers.Any(j => j.Job.State == JobState.Waiting))
{
var sw = Stopwatch.StartNew();
await VacuumAsync(token);
sw.Stop();
if (sw.ElapsedMilliseconds < 10_000)
{
var delayMs = 10_000 - (int)sw.ElapsedMilliseconds;
await Task.Delay(delayMs, token);
}
CleanJobs();
}
}
}
private void CleanJobs()
{
lock (jobHandlers)
{
jobHandlers.RemoveAll(j => j.Job.State == JobState.Done || j.Job.State == JobState.Fail);
}
}
private async Task VacuumAsync(CancellationToken token)
{
using var db = MakeContext();
var sqlVacuum = "vacuum (SKIP_LOCKED);";
await db.Database.ExecuteSqlRawAsync(sqlVacuum, token);
}
private void Stop()
{
cancellationTokenSource?.Cancel();
task?.Wait(1_000);
task = null;
cancellationTokenSource?.Dispose();
cancellationTokenSource = null;
}
private Task RediceSamplingSaubAsync(JobHandle job, CancellationToken token)
{
const int ramLimit = 10 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
const int chankSize = ramLimit / dataItemSize; // ~ 90_000
const double maxWellDepthGap = 0.1;
var maxDateGapSec = ratio;
var sqlSelectTemplate =
"select " +
" * " +
"from " +
" (select " +
" *, " +
" rank() over win1 as row_num, " +
" lag(\"date\", 1) over win1 as lag_date, " +
" lag(\"mode\", 1) over win1 as lag_mode, " +
" lag(mse_state, 1) over win1 as lag_mse_state, " +
" lag(well_depth, 1) over win1 as lag_well_depth, " +
" lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator " +
" from t_telemetry_data_saub " +
$" where id_telemetry = {job.Id} and \"date\" > {{0}}" +
" window win1 as (order by \"date\") " +
" ) as t_1 " +
"where " +
$" (row_num % {ratio}) = 0 " +
" or \"mode\" != lag_mode " +
$" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " +
$" or well_depth - lag_well_depth > {maxWellDepthGap:#0,0#} " +
" or mse_state != lag_mse_state " +
" or id_feed_regulator != lag_id_feed_regulator " +
"order by \"date\" ";
var sqlDeleteTemplate = "delete " +
"from t_telemetry_data_saub " +
$"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};";
return RediceSamplingAsync<TelemetryDataSaub>(
job,
chankSize,
sqlSelectTemplate,
sqlDeleteTemplate,
token);
}
private Task RediceSamplingSpinAsync(JobHandle job, CancellationToken token)
{
const int ramLimit = 10 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
const int chankSize = ramLimit / dataItemSize; // ~ 90_000
var maxDateGapSec = ratio;
var sqlSelectTemplate =
"select " +
" * " +
"from " +
" (select " +
" *, " +
" rank() over win1 as row_num, " +
" lag(\"date\", 1) over win1 as lag_date, " +
" lag(\"mode\", 1) over win1 as lag_mode, " +
" lag(state, 1) over win1 as lag_state " +
" from t_telemetry_data_spin " +
$" where id_telemetry = {job.Id} and \"date\" > {{0}}" +
" window win1 as (order by \"date\") " +
" ) as t_1 " +
"where " +
$" (row_num % {ratio}) = 0 " +
" or \"mode\" != lag_mode " +
$" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " +
" or state != lag_state " +
"order by \"date\" ";
var sqlDeleteTemplate = "delete " +
"from t_telemetry_data_spin " +
$"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};";
return RediceSamplingAsync<TelemetryDataSpin>(
job,
chankSize,
sqlSelectTemplate,
sqlDeleteTemplate,
token);
}
private async Task RediceSamplingAsync<TEntity>(
JobHandle jobHandle,
int chankSize,
string sqlSelectTemplate,
string sqlDeleteTemplate,
CancellationToken token)
where TEntity : class, AsbCloudDb.Model.ITelemetryData
{
using var db = MakeContext();
var dbset = db.Set<TEntity>();
var deleted = 0;
var totalCount = await dbset.Where(t => t.IdTelemetry == jobHandle.Id).CountAsync(token);
var result = jobHandle.Job.Results!;
if (result[jobResultKeyDeleted] is int previousDeleted)
deleted += previousDeleted;
if (result[jobResultKeyTotal] is int previousCount)
totalCount += previousCount;
result[jobResultKeyDeleted] = deleted;
result[jobResultKeyTotal] = totalCount;
jobHandle.OnProgress?.Invoke(jobHandle.Job);
var startDate = DateTimeOffset.MinValue;
do
{
var query = dbset
.FromSqlRaw(sqlSelectTemplate, startDate)
.AsNoTracking();
var data = await query
.Take(chankSize)
.ToArrayAsync(token);
var currentDataCount = data.Length;
if (currentDataCount == 0)
break;
var lastDate = data.Last().DateTime;
var currentDeleted = await db.Database.ExecuteSqlRawAsync(sqlDeleteTemplate, new object[] { startDate, lastDate }.AsEnumerable(), token);
if (currentDeleted == 0)
break;
await db.Database.ExecInsertAsync(dbset, data, token);
startDate = lastDate;
deleted += currentDeleted;
result[jobResultKeyDeleted] = deleted;
result[jobResultKeyTotal] = totalCount;
jobHandle.OnProgress?.Invoke(jobHandle.Job);
} while (true);
return;
}
private AsbCloudDbContext MakeContext()
{
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql(connectionString)
.Options;
return new AsbCloudDbContext(options);
}
}
#nullable disable
}

View File

@ -16,7 +16,7 @@ namespace AsbCloudInfrastructure.Services.SAUB
where TDto : AsbCloudApp.Data.ITelemetryData
where TModel : class, AsbCloudDb.Model.ITelemetryData
{
private readonly IAsbCloudDbContext db;
protected readonly IAsbCloudDbContext db;
private readonly ITelemetryService telemetryService;
protected readonly CacheTable<Telemetry> cacheTelemetry;
protected readonly CacheTable<TelemetryUser> cacheTelemetryUsers;
@ -170,5 +170,7 @@ namespace AsbCloudInfrastructure.Services.SAUB
return offset;
}
}
}

View File

@ -2,7 +2,12 @@
using AsbCloudApp.Services;
using AsbCloudDb.Model;
using AsbCloudInfrastructure.Services.Cache;
using DocumentFormat.OpenXml.Drawing.Charts;
using Mapster;
using Microsoft.EntityFrameworkCore;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services.SAUB
{

View File

@ -3,6 +3,8 @@ using AsbCloudApp.Services;
using AsbCloudDb.Model;
using AsbCloudInfrastructure.Services.Cache;
using Mapster;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services.SAUB
{

View File

@ -5,6 +5,7 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Linq;
namespace AsbCloudInfrastructure
{

View File

@ -4,6 +4,8 @@
<TargetFramework>net6.0</TargetFramework>
<IsPackable>false</IsPackable>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -0,0 +1,78 @@
using AsbCloudApp.Services;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System.Collections.Generic;
using System.Threading.Tasks;
using AsbCloudApp.Data;
using System.Linq;
using AsbCloudWebApi.SignalR;
using Microsoft.AspNetCore.SignalR;
namespace AsbCloudWebApi.Controllers
{
/// <summary>
/// Редактор кустов для админки
/// </summary>
[Route("api/admin/[controller]")]
[ApiController]
[Authorize]
public class ReduceSamplingController: ControllerBase
{
private readonly IReduceSamplingService service;
private readonly IHubContext<TelemetryHub> telemetryHubContext;
private const string sirnalRGroupName = "ReduceSampling";
private const string sirnalRMethodOnProgress = "OnProgress";
public ReduceSamplingController(
IReduceSamplingService service,
IHubContext<TelemetryHub> telemetryHubContext )
{
this.service = service;
this.telemetryHubContext = telemetryHubContext;
}
/// <summary>
/// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее.
/// </summary>
/// <param name="idTelemetry"></param>
/// <returns></returns>
[HttpGet]
public virtual ActionResult<IEnumerable<JobDto>> GetAll(int idTelemetry)
{
var result = service.GetJobs();
if (result.Any())
return Ok(result);
else
return NoContent();
}
/// <summary>
/// Получить состояние определенной задачи
/// </summary>
/// <param name="idTelemetry"></param>
/// <returns></returns>
[HttpGet("{idTelemetry}")]
public virtual ActionResult<JobDto> GetOrDefault(int idTelemetry)
{
var result = service.GetOrDefaultState(idTelemetry);
return Ok(result);
}
/// <summary>
/// Создать задачу прореживанию архива и добавить её в очередь на выполнение.
/// Если задача есть в очереди, она же и возвращается, но подписка не происходит.
/// </summary>
[HttpPost]
[Permission]
public virtual ActionResult<JobDto> Enqueue(int idTelemetry)
{
void onProgress(JobDto job) =>
Task.Run(async () =>
await telemetryHubContext.Clients.Group(sirnalRGroupName)
.SendAsync(sirnalRMethodOnProgress, job));
service.TryEnqueueRediceSamplingJob(idTelemetry, onProgress, out JobDto job);
return Ok(job);
}
}
}

View File

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<StartupObject>ConsoleApp1.Program</StartupObject>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -1,72 +1,39 @@
using AsbCloudApp.Data;
using AsbCloudApp.Data.DailyReport;
using AsbCloudDb;
using AsbCloudDb.Model;
using AsbCloudInfrastructure.Services.DailyReport;
using ClosedXML.Excel;
using DocumentFormat.OpenXml.Wordprocessing;
using Microsoft.EntityFrameworkCore;
using Org.BouncyCastle.Utilities.Collections;
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
private static AsbCloudDbContext db = ServiceFactory.Context;
// use ServiceFactory to make services
static void Main(/*string[] args*/)
{
var h = new Hashtable();
h.Add("name", 1);
h.Add("name2", "66");
var v = h["v"];
var block = new HeadDto()
{
AzimuthAngle = 12,
WellName = "WellName",
ClusterName = "clusterName",
Customer = "customer",
Contractor = "Contractor",
ReportDate = DateTime.Now,
WellDepthIntervalFinishDate = 27.5,
WellDepthIntervalStartDate = 26.5,
BottomholeDepth = 66.6
};
var block2 = new BhaDto()
{
BHADescription = "sadasdasdasdasdasdjlaskjdaksjdlasdlalskdklj"
};
var block3 = new SaubDto();
var bloks = new DailyReportDto()
{
Head = block,
Saub = block3
};
var s = System.Text.Json.JsonSerializer.Serialize(h);
var service = new DailyReportMakerExcel();
var stream = service.MakeReportFromBlocks(bloks);
var filename = "____.xlsx";
if (File.Exists(filename))
File.Delete(filename);
using var fileStream = File.OpenWrite(filename);
stream.CopyTo(fileStream);
return;
//var ms = MakeReportFromBlocks(block,block3);
////File.Create("", MakeReportFromBlocks(block));
//using var file = new FileStream("file.xlsx", FileMode.Create, System.IO.FileAccess.Write);
//byte[] bytes = new byte[ms.Length];
//ms.Read(bytes, 0, (int)ms.Length);
//file.Write(bytes, 0, bytes.Length);
//ms.Close();
Console.WriteLine($"total time: ms");
Console.ReadLine();
}
}
}