Add ReduceSamplingController.

This commit is contained in:
ngfrolov 2022-10-11 17:04:26 +05:00
parent 8cbcd9a115
commit 707f2a638a
6 changed files with 282 additions and 80 deletions

View File

@ -0,0 +1,56 @@
using System;
using System.Collections;
namespace AsbCloudApp.Data
{
#nullable enable
/// <summary>
/// Состояние фоновой задачи
/// </summary>
public enum JobState
{
/// <summary>
/// Ожидает в очереди на выполнение
/// </summary>
Waiting,
/// <summary>
/// выполняется
/// </summary>
Working,
/// <summary>
/// успешно выполнена
/// </summary>
Done,
/// <summary>
/// завершена с ошибкой
/// </summary>
Fail
};
/// <summary>
/// работа фоновой задачи
/// </summary>
public class JobDto
{
/// <summary>
/// идентификатор
/// </summary>
public int Id { get; set; }
/// <summary>
/// Состояние
/// </summary>
public JobState State { get; set; }
/// <summary>
/// результат выполнения
/// </summary>
public Hashtable? Results { get; set; }
/// <summary>
/// Исключение, если возникла ошибка
/// </summary>
public string? Error { get; set; }
}
#nullable disable
}

View File

@ -0,0 +1,43 @@
using AsbCloudApp.Data;
using System.Collections.Generic;
namespace AsbCloudApp.Services
{
#nullable enable
/// <summary>
/// Делегат обновления состояния задачи
/// </summary>
/// <param name="job"></param>
public delegate void OnJobProgressDelagate(JobDto job);
/// <summary>
/// Сервис прореживания архива БД.
/// Удаляет часть телеметрии.
/// Понижает частоту записей в БД с 1 запись за 1 сек до 1 запись за N сек.
/// </summary>
public interface IReduceSamplingService
{
/// <summary>
/// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее.
/// </summary>
/// <returns>Enumerable of JobDto or empty</returns>
IEnumerable<JobDto> GetJobs();
/// <summary>
/// Получить состояние определенной задачи
/// </summary>
/// <param name="idTelemetry"></param>
/// <returns></returns>
JobDto? GetOrDefaultState(int idTelemetry);
/// <summary>
/// Создать задачу прореживанию архива и добавить её в очередь на выполнение
/// </summary>
/// <param name="idTelemetry">телеметрия для прореживания</param>
/// <param name="onProgress">колбек процесса выполнения</param>
/// <param name="jobDto">созданная задача или задача из очереди</param>
/// <returns>задача добавлена == true</returns>
bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate onProgress, out JobDto jobDto);
}
#nullable disable
}

View File

@ -102,6 +102,7 @@ namespace AsbCloudInfrastructure
services.AddSingleton<ITelemetryTracker, TelemetryTracker>();
services.AddSingleton<IRequerstTrackerService, RequestTrackerService>();
services.AddSingleton<IBackgroundWorkerService, BackgroundWorkerService>();
services.AddSingleton<IReduceSamplingService>(provider => ReduceSamplingService.GetInstance(configuration));
services.AddTransient<IAuthService, AuthService>();
services.AddTransient<IClusterService, ClusterService>();

View File

@ -1,9 +1,12 @@
using AsbCloudDb;
using AsbCloudApp.Data;
using AsbCloudApp.Services;
using AsbCloudDb;
using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -11,27 +14,22 @@ using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services
{
#nullable enable
public class JobDto
class JobHandle
{
public enum JobState { Waiting, Working, Done, Fail };
public int Id { get; internal set; }
public JobState State { get; internal set; }
public object? Result { get; internal set; }
public Exception? Error { get; internal set; }
public int Id => Job.Id;
public JobDto Job { get; set; } = null!;
public DateTime DateCreate { get; set; } = DateTime.Now;
public OnJobProgressDelagate? OnProgress { get; set; }
}
class JobWithProgress: JobDto
public class ReduceSamplingService : IReduceSamplingService
{
public ReduceSamplingService.OnProgressDelagate OnProgress { get; set; } = null!;
}
public class ReduceSamplingService
{
public delegate void OnProgressDelagate(JobDto job);
private const string jobResultKeyDeleted = "deleted";
private const string jobResultKeyTotal = "total";
private static ReduceSamplingService? instance;
private readonly string connectionString;
private const int ratio = 5;
private readonly List<JobWithProgress> jobs = new(5);
private readonly List<JobHandle> jobHandlers = new(5);
private bool isHandling;
private CancellationTokenSource? cancellationTokenSource;
private Task? task;
@ -46,62 +44,73 @@ namespace AsbCloudInfrastructure.Services
Stop();
}
/// <summary>
/// Get singleton Instance
/// </summary>
/// <param name="configuration"></param>
/// <returns></returns>
public static ReduceSamplingService GetInstance(IConfiguration configuration)
{
if (instance is null)
instance = new(configuration);
instance ??= new(configuration);
return instance;
}
public int TryEnqueueRediceSamplingJob(int idTelemetry, OnProgressDelagate onProgress)
/// <inheritdoc/>
public bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate? onProgress, out JobDto jobDto)
{
var result = 0;
lock (jobs)
lock (jobHandlers)
{
if (jobs.Any(j => j.Id == idTelemetry))
result = - 1;
var oldJob = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry);
if (oldJob is not null)
{
jobDto = oldJob.Job;
return false;
}
var job = new JobWithProgress
jobDto = new JobDto
{
Id = idTelemetry,
State = JobDto.JobState.Waiting,
State = JobState.Waiting,
Results = new(),
};
var jobHandler = new JobHandle
{
Job = jobDto,
OnProgress = onProgress,
};
jobs.Add(job);
result = jobs.Count;
jobHandlers.Add(jobHandler);
}
EnsureHandleQueueStarted();
return result;
return true;
}
public JobDto? GetState(int idTelemetry)
/// <inheritdoc/>
public JobDto? GetOrDefaultState(int idTelemetry)
{
JobWithProgress? job;
lock (jobs)
JobHandle? jobHandler;
lock (jobHandlers)
{
job = jobs.FirstOrDefault(j=>j.Id == idTelemetry);
jobHandler = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry);
}
return job;
return jobHandler?.Job;
}
public IEnumerable<JobDto> GetJobs()
{
return jobs;
}
/// <inheritdoc/>
public IEnumerable<JobDto> GetJobs() => jobHandlers.Select(j=>j.Job);
private bool TryTakeWaitingJob(out JobWithProgress? job)
private bool TryTakeWaitingJob(out JobHandle? job)
{
lock (jobs)
lock (jobHandlers)
{
job = jobs.FirstOrDefault(j => j.State == JobDto.JobState.Waiting);
job = jobHandlers.FirstOrDefault(j => j.Job.State == JobState.Waiting);
}
return job is not null;
}
private void EnsureHandleQueueStarted()
{
if(isHandling)
if (isHandling)
return;
isHandling = true;
cancellationTokenSource = new CancellationTokenSource();
@ -112,33 +121,43 @@ namespace AsbCloudInfrastructure.Services
private async Task HandleJobs(CancellationToken token)
{
while (TryTakeWaitingJob(out JobWithProgress? job))
while (TryTakeWaitingJob(out JobHandle? jobHandler))
{
job!.State = JobDto.JobState.Working;
jobHandler!.Job.State = JobState.Working;
try
{
await RediceSamplingSaubAsync(job, token);
await RediceSamplingSpinAsync(job, token);
job.State = JobDto.JobState.Done;
await RediceSamplingSaubAsync(jobHandler, token);
await RediceSamplingSpinAsync(jobHandler, token);
jobHandler.Job.State = JobState.Done;
}
catch (Exception exception)
{
job.State = JobDto.JobState.Fail;
job.Error = exception;
job.OnProgress.Invoke(job);
jobHandler.Job.State = JobState.Fail;
jobHandler.Job.Results = null;
jobHandler.Job.Error = exception.Message;
jobHandler.OnProgress?.Invoke(jobHandler.Job);
}
if (!jobHandlers.Any(j => j.Job.State == JobState.Waiting))
{
var sw = Stopwatch.StartNew();
await VacuumAsync(token);
sw.Stop();
if (sw.ElapsedMilliseconds < 10_000)
{
var delayMs = 10_000 - (int)sw.ElapsedMilliseconds;
await Task.Delay(delayMs, token);
}
CleanJobs();
}
await Task.Delay(100, token);
}
await VacuumAsync(token);
await Task.Delay(1_000, token);
CleanJobs();
}
private void CleanJobs()
{
lock (jobs)
lock (jobHandlers)
{
jobs.RemoveAll(j => j.State == JobDto.JobState.Done || j.State == JobDto.JobState.Fail);
jobHandlers.RemoveAll(j => j.Job.State == JobState.Done || j.Job.State == JobState.Fail);
}
}
@ -154,17 +173,11 @@ namespace AsbCloudInfrastructure.Services
cancellationTokenSource?.Cancel();
task?.Wait(1_000);
task = null;
cancellationTokenSource?.Dispose();
cancellationTokenSource = null;
}
/// <summary>
/// Прореживание данных телеметрии САУБ.
/// Каждая ratio запись будет сохранена, остальные удаляются. Остаются (Row_number % ratio) = 0.
/// </summary>
/// <param name="idTelemetry">Id телеметрии</param>
/// <param name="ratio">желаемое отношение оставшихся записей к исходным</param>
/// <returns></returns>
private Task RediceSamplingSaubAsync(JobWithProgress job, CancellationToken token)
private Task RediceSamplingSaubAsync(JobHandle job, CancellationToken token)
{
const int ramLimit = 10 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
@ -204,13 +217,13 @@ namespace AsbCloudInfrastructure.Services
return RediceSamplingAsync<TelemetryDataSaub>(
job,
chankSize,
sqlSelectTemplate,
chankSize,
sqlSelectTemplate,
sqlDeleteTemplate,
token);
}
private Task RediceSamplingSpinAsync(JobWithProgress job, CancellationToken token)
private Task RediceSamplingSpinAsync(JobHandle job, CancellationToken token)
{
const int ramLimit = 10 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
@ -251,26 +264,30 @@ namespace AsbCloudInfrastructure.Services
}
private async Task RediceSamplingAsync<TEntity>(
JobWithProgress job,
int chankSize,
JobHandle jobHandle,
int chankSize,
string sqlSelectTemplate,
string sqlDeleteTemplate,
CancellationToken token)
where TEntity: class, ITelemetryData
where TEntity : class, AsbCloudDb.Model.ITelemetryData
{
using var db = MakeContext();
var dbset = db.Set<TEntity>();
var deleted = 0;
var oldCount = await dbset.Where(t => t.IdTelemetry == job.Id).CountAsync(token);
var totalCount = await dbset.Where(t => t.IdTelemetry == jobHandle.Id).CountAsync(token);
if (job.Result is Tuple<int, int> tupleResult)
{
deleted += tupleResult.Item1;
oldCount += tupleResult.Item2;
}
job.Result = (deleted, oldCount);
job.OnProgress?.Invoke(job);
var result = jobHandle.Job.Results!;
if (result[jobResultKeyDeleted] is int previousDeleted)
deleted += previousDeleted;
if (result[jobResultKeyTotal] is int previousCount)
totalCount += previousCount;
result[jobResultKeyDeleted] = deleted;
result[jobResultKeyTotal] = totalCount;
jobHandle.OnProgress?.Invoke(jobHandle.Job);
var startDate = DateTimeOffset.MinValue;
do
@ -297,8 +314,9 @@ namespace AsbCloudInfrastructure.Services
startDate = lastDate;
deleted += currentDeleted;
job.Result = (deleted, oldCount);
job.OnProgress?.Invoke(job);
result[jobResultKeyDeleted] = deleted;
result[jobResultKeyTotal] = totalCount;
jobHandle.OnProgress?.Invoke(jobHandle.Job);
} while (true);
return;

View File

@ -0,0 +1,78 @@
using AsbCloudApp.Services;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System.Collections.Generic;
using System.Threading.Tasks;
using AsbCloudApp.Data;
using System.Linq;
using AsbCloudWebApi.SignalR;
using Microsoft.AspNetCore.SignalR;
namespace AsbCloudWebApi.Controllers
{
/// <summary>
/// Редактор кустов для админки
/// </summary>
[Route("api/admin/[controller]")]
[ApiController]
[Authorize]
public class ReduceSamplingController: ControllerBase
{
private readonly IReduceSamplingService service;
private readonly IHubContext<TelemetryHub> telemetryHubContext;
private const string sirnalRGroupName = "ReduceSampling";
private const string sirnalRMethodOnProgress = "OnProgress";
public ReduceSamplingController(
IReduceSamplingService service,
IHubContext<TelemetryHub> telemetryHubContext )
{
this.service = service;
this.telemetryHubContext = telemetryHubContext;
}
/// <summary>
/// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее.
/// </summary>
/// <param name="idTelemetry"></param>
/// <returns></returns>
[HttpGet]
public virtual ActionResult<IEnumerable<JobDto>> GetAll(int idTelemetry)
{
var result = service.GetJobs();
if (result.Any())
return Ok(result);
else
return NoContent();
}
/// <summary>
/// Получить состояние определенной задачи
/// </summary>
/// <param name="idTelemetry"></param>
/// <returns></returns>
[HttpGet("{idTelemetry}")]
public virtual ActionResult<JobDto> GetOrDefault(int idTelemetry)
{
var result = service.GetOrDefaultState(idTelemetry);
return Ok(result);
}
/// <summary>
/// Создать задачу прореживанию архива и добавить её в очередь на выполнение.
/// Если задача есть в очереди, она же и возвращается, но подписка не происходит.
/// </summary>
[HttpPost]
[Permission]
public virtual ActionResult<JobDto> Enqueue(int idTelemetry)
{
void onProgress(JobDto job) =>
Task.Run(async () =>
await telemetryHubContext.Clients.Group(sirnalRGroupName)
.SendAsync(sirnalRMethodOnProgress, job));
service.TryEnqueueRediceSamplingJob(idTelemetry, onProgress, out JobDto job);
return Ok(job);
}
}
}

View File

@ -8,6 +8,7 @@ using DocumentFormat.OpenXml.Wordprocessing;
using Microsoft.EntityFrameworkCore;
using Org.BouncyCastle.Utilities.Collections;
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Linq;
@ -24,8 +25,13 @@ namespace ConsoleApp1
// use ServiceFactory to make services
static void Main(/*string[] args*/)
{
Task.Run(() => { throw new Exception(); })
.ContinueWith(t => { Console.WriteLine("Dooom"); });
var h = new Hashtable();
h.Add("name", 1);
h.Add("name2", "66");
var v = h["v"];
var s = System.Text.Json.JsonSerializer.Serialize(h);
Console.WriteLine($"total time: ms");
Console.ReadLine();
}