diff --git a/AsbCloudApp/Data/JobDto.cs b/AsbCloudApp/Data/JobDto.cs
new file mode 100644
index 00000000..2e9b336a
--- /dev/null
+++ b/AsbCloudApp/Data/JobDto.cs
@@ -0,0 +1,56 @@
+using System;
+using System.Collections;
+
+namespace AsbCloudApp.Data
+{
+#nullable enable
+ ///
+ /// Состояние фоновой задачи
+ ///
+ public enum JobState
+ {
+ ///
+ /// Ожидает в очереди на выполнение
+ ///
+ Waiting,
+ ///
+ /// выполняется
+ ///
+ Working,
+ ///
+ /// успешно выполнена
+ ///
+ Done,
+ ///
+ /// завершена с ошибкой
+ ///
+ Fail
+ };
+
+ ///
+ /// работа фоновой задачи
+ ///
+ public class JobDto
+ {
+ ///
+ /// идентификатор
+ ///
+ public int Id { get; set; }
+
+ ///
+ /// Состояние
+ ///
+ public JobState State { get; set; }
+
+ ///
+ /// результат выполнения
+ ///
+ public Hashtable? Results { get; set; }
+
+ ///
+ /// Исключение, если возникла ошибка
+ ///
+ public string? Error { get; set; }
+ }
+#nullable disable
+}
diff --git a/AsbCloudApp/Services/IReduceSamplingService.cs b/AsbCloudApp/Services/IReduceSamplingService.cs
new file mode 100644
index 00000000..a0220c58
--- /dev/null
+++ b/AsbCloudApp/Services/IReduceSamplingService.cs
@@ -0,0 +1,43 @@
+using AsbCloudApp.Data;
+using System.Collections.Generic;
+
+namespace AsbCloudApp.Services
+{
+#nullable enable
+ ///
+ /// Делегат обновления состояния задачи
+ ///
+ ///
+ public delegate void OnJobProgressDelagate(JobDto job);
+
+ ///
+ /// Сервис прореживания архива БД.
+ /// Удаляет часть телеметрии.
+ /// Понижает частоту записей в БД с 1 запись за 1 сек до 1 запись за N сек.
+ ///
+ public interface IReduceSamplingService
+ {
+ ///
+ /// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее.
+ ///
+ /// Enumerable of JobDto or empty
+ IEnumerable GetJobs();
+
+ ///
+ /// Получить состояние определенной задачи
+ ///
+ ///
+ ///
+ JobDto? GetOrDefaultState(int idTelemetry);
+
+ ///
+ /// Создать задачу прореживанию архива и добавить её в очередь на выполнение
+ ///
+ /// телеметрия для прореживания
+ /// колбек процесса выполнения
+ /// созданная задача или задача из очереди
+ /// задача добавлена == true
+ bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate onProgress, out JobDto jobDto);
+ }
+#nullable disable
+}
\ No newline at end of file
diff --git a/AsbCloudInfrastructure/DependencyInjection.cs b/AsbCloudInfrastructure/DependencyInjection.cs
index 1c69736d..421e550c 100644
--- a/AsbCloudInfrastructure/DependencyInjection.cs
+++ b/AsbCloudInfrastructure/DependencyInjection.cs
@@ -102,6 +102,7 @@ namespace AsbCloudInfrastructure
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
+ services.AddSingleton(provider => ReduceSamplingService.GetInstance(configuration));
services.AddTransient();
services.AddTransient();
diff --git a/AsbCloudInfrastructure/Services/ReduceSamplingService.cs b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs
index 46489fec..d9c11e0d 100644
--- a/AsbCloudInfrastructure/Services/ReduceSamplingService.cs
+++ b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs
@@ -1,9 +1,12 @@
-using AsbCloudDb;
+using AsbCloudApp.Data;
+using AsbCloudApp.Services;
+using AsbCloudDb;
using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -11,27 +14,22 @@ using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services
{
#nullable enable
- public class JobDto
+ class JobHandle
{
- public enum JobState { Waiting, Working, Done, Fail };
- public int Id { get; internal set; }
- public JobState State { get; internal set; }
- public object? Result { get; internal set; }
- public Exception? Error { get; internal set; }
+ public int Id => Job.Id;
+ public JobDto Job { get; set; } = null!;
+ public DateTime DateCreate { get; set; } = DateTime.Now;
+ public OnJobProgressDelagate? OnProgress { get; set; }
}
- class JobWithProgress: JobDto
+ public class ReduceSamplingService : IReduceSamplingService
{
- public ReduceSamplingService.OnProgressDelagate OnProgress { get; set; } = null!;
- }
-
- public class ReduceSamplingService
- {
- public delegate void OnProgressDelagate(JobDto job);
+ private const string jobResultKeyDeleted = "deleted";
+ private const string jobResultKeyTotal = "total";
private static ReduceSamplingService? instance;
private readonly string connectionString;
private const int ratio = 5;
- private readonly List jobs = new(5);
+ private readonly List jobHandlers = new(5);
private bool isHandling;
private CancellationTokenSource? cancellationTokenSource;
private Task? task;
@@ -46,62 +44,73 @@ namespace AsbCloudInfrastructure.Services
Stop();
}
+ ///
+ /// Get singleton Instance
+ ///
+ ///
+ ///
public static ReduceSamplingService GetInstance(IConfiguration configuration)
{
- if (instance is null)
- instance = new(configuration);
+ instance ??= new(configuration);
return instance;
}
- public int TryEnqueueRediceSamplingJob(int idTelemetry, OnProgressDelagate onProgress)
+ ///
+ public bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate? onProgress, out JobDto jobDto)
{
- var result = 0;
- lock (jobs)
+ lock (jobHandlers)
{
- if (jobs.Any(j => j.Id == idTelemetry))
- result = - 1;
+ var oldJob = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry);
+ if (oldJob is not null)
+ {
+ jobDto = oldJob.Job;
+ return false;
+ }
- var job = new JobWithProgress
+ jobDto = new JobDto
{
Id = idTelemetry,
- State = JobDto.JobState.Waiting,
+ State = JobState.Waiting,
+ Results = new(),
+ };
+ var jobHandler = new JobHandle
+ {
+ Job = jobDto,
OnProgress = onProgress,
};
- jobs.Add(job);
- result = jobs.Count;
+ jobHandlers.Add(jobHandler);
}
EnsureHandleQueueStarted();
- return result;
+ return true;
}
- public JobDto? GetState(int idTelemetry)
+ ///
+ public JobDto? GetOrDefaultState(int idTelemetry)
{
- JobWithProgress? job;
- lock (jobs)
+ JobHandle? jobHandler;
+ lock (jobHandlers)
{
- job = jobs.FirstOrDefault(j=>j.Id == idTelemetry);
+ jobHandler = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry);
}
- return job;
+ return jobHandler?.Job;
}
- public IEnumerable GetJobs()
- {
- return jobs;
- }
+ ///
+ public IEnumerable GetJobs() => jobHandlers.Select(j=>j.Job);
- private bool TryTakeWaitingJob(out JobWithProgress? job)
+ private bool TryTakeWaitingJob(out JobHandle? job)
{
- lock (jobs)
+ lock (jobHandlers)
{
- job = jobs.FirstOrDefault(j => j.State == JobDto.JobState.Waiting);
+ job = jobHandlers.FirstOrDefault(j => j.Job.State == JobState.Waiting);
}
return job is not null;
}
private void EnsureHandleQueueStarted()
{
- if(isHandling)
+ if (isHandling)
return;
isHandling = true;
cancellationTokenSource = new CancellationTokenSource();
@@ -112,33 +121,43 @@ namespace AsbCloudInfrastructure.Services
private async Task HandleJobs(CancellationToken token)
{
- while (TryTakeWaitingJob(out JobWithProgress? job))
+ while (TryTakeWaitingJob(out JobHandle? jobHandler))
{
- job!.State = JobDto.JobState.Working;
+ jobHandler!.Job.State = JobState.Working;
try
{
- await RediceSamplingSaubAsync(job, token);
- await RediceSamplingSpinAsync(job, token);
- job.State = JobDto.JobState.Done;
+ await RediceSamplingSaubAsync(jobHandler, token);
+ await RediceSamplingSpinAsync(jobHandler, token);
+ jobHandler.Job.State = JobState.Done;
}
catch (Exception exception)
{
- job.State = JobDto.JobState.Fail;
- job.Error = exception;
- job.OnProgress.Invoke(job);
+ jobHandler.Job.State = JobState.Fail;
+ jobHandler.Job.Results = null;
+ jobHandler.Job.Error = exception.Message;
+ jobHandler.OnProgress?.Invoke(jobHandler.Job);
+ }
+
+ if (!jobHandlers.Any(j => j.Job.State == JobState.Waiting))
+ {
+ var sw = Stopwatch.StartNew();
+ await VacuumAsync(token);
+ sw.Stop();
+ if (sw.ElapsedMilliseconds < 10_000)
+ {
+ var delayMs = 10_000 - (int)sw.ElapsedMilliseconds;
+ await Task.Delay(delayMs, token);
+ }
+ CleanJobs();
}
- await Task.Delay(100, token);
}
- await VacuumAsync(token);
- await Task.Delay(1_000, token);
- CleanJobs();
}
private void CleanJobs()
{
- lock (jobs)
+ lock (jobHandlers)
{
- jobs.RemoveAll(j => j.State == JobDto.JobState.Done || j.State == JobDto.JobState.Fail);
+ jobHandlers.RemoveAll(j => j.Job.State == JobState.Done || j.Job.State == JobState.Fail);
}
}
@@ -154,17 +173,11 @@ namespace AsbCloudInfrastructure.Services
cancellationTokenSource?.Cancel();
task?.Wait(1_000);
task = null;
+ cancellationTokenSource?.Dispose();
cancellationTokenSource = null;
}
- ///
- /// Прореживание данных телеметрии САУБ.
- /// Каждая ratio запись будет сохранена, остальные удаляются. Остаются (Row_number % ratio) = 0.
- ///
- /// Id телеметрии
- /// желаемое отношение оставшихся записей к исходным
- ///
- private Task RediceSamplingSaubAsync(JobWithProgress job, CancellationToken token)
+ private Task RediceSamplingSaubAsync(JobHandle job, CancellationToken token)
{
const int ramLimit = 10 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
@@ -204,13 +217,13 @@ namespace AsbCloudInfrastructure.Services
return RediceSamplingAsync(
job,
- chankSize,
- sqlSelectTemplate,
+ chankSize,
+ sqlSelectTemplate,
sqlDeleteTemplate,
token);
}
- private Task RediceSamplingSpinAsync(JobWithProgress job, CancellationToken token)
+ private Task RediceSamplingSpinAsync(JobHandle job, CancellationToken token)
{
const int ramLimit = 10 * 1024 * 1024;
const int dataItemSize = 345; // by profiler
@@ -251,26 +264,30 @@ namespace AsbCloudInfrastructure.Services
}
private async Task RediceSamplingAsync(
- JobWithProgress job,
- int chankSize,
+ JobHandle jobHandle,
+ int chankSize,
string sqlSelectTemplate,
string sqlDeleteTemplate,
CancellationToken token)
- where TEntity: class, ITelemetryData
+ where TEntity : class, AsbCloudDb.Model.ITelemetryData
{
using var db = MakeContext();
var dbset = db.Set();
var deleted = 0;
- var oldCount = await dbset.Where(t => t.IdTelemetry == job.Id).CountAsync(token);
+ var totalCount = await dbset.Where(t => t.IdTelemetry == jobHandle.Id).CountAsync(token);
- if (job.Result is Tuple tupleResult)
- {
- deleted += tupleResult.Item1;
- oldCount += tupleResult.Item2;
- }
- job.Result = (deleted, oldCount);
- job.OnProgress?.Invoke(job);
+ var result = jobHandle.Job.Results!;
+ if (result[jobResultKeyDeleted] is int previousDeleted)
+ deleted += previousDeleted;
+
+ if (result[jobResultKeyTotal] is int previousCount)
+ totalCount += previousCount;
+
+ result[jobResultKeyDeleted] = deleted;
+ result[jobResultKeyTotal] = totalCount;
+
+ jobHandle.OnProgress?.Invoke(jobHandle.Job);
var startDate = DateTimeOffset.MinValue;
do
@@ -297,8 +314,9 @@ namespace AsbCloudInfrastructure.Services
startDate = lastDate;
deleted += currentDeleted;
- job.Result = (deleted, oldCount);
- job.OnProgress?.Invoke(job);
+ result[jobResultKeyDeleted] = deleted;
+ result[jobResultKeyTotal] = totalCount;
+ jobHandle.OnProgress?.Invoke(jobHandle.Job);
} while (true);
return;
diff --git a/AsbCloudWebApi/Controllers/ReduceSamplingController.cs b/AsbCloudWebApi/Controllers/ReduceSamplingController.cs
new file mode 100644
index 00000000..ddb99b2d
--- /dev/null
+++ b/AsbCloudWebApi/Controllers/ReduceSamplingController.cs
@@ -0,0 +1,78 @@
+using AsbCloudApp.Services;
+using Microsoft.AspNetCore.Authorization;
+using Microsoft.AspNetCore.Mvc;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using AsbCloudApp.Data;
+using System.Linq;
+using AsbCloudWebApi.SignalR;
+using Microsoft.AspNetCore.SignalR;
+
+namespace AsbCloudWebApi.Controllers
+{
+ ///
+ /// Редактор кустов для админки
+ ///
+ [Route("api/admin/[controller]")]
+ [ApiController]
+ [Authorize]
+ public class ReduceSamplingController: ControllerBase
+ {
+ private readonly IReduceSamplingService service;
+ private readonly IHubContext telemetryHubContext;
+ private const string sirnalRGroupName = "ReduceSampling";
+ private const string sirnalRMethodOnProgress = "OnProgress";
+
+ public ReduceSamplingController(
+ IReduceSamplingService service,
+ IHubContext telemetryHubContext )
+ {
+ this.service = service;
+ this.telemetryHubContext = telemetryHubContext;
+ }
+
+ ///
+ /// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее.
+ ///
+ ///
+ ///
+ [HttpGet]
+ public virtual ActionResult> GetAll(int idTelemetry)
+ {
+ var result = service.GetJobs();
+ if (result.Any())
+ return Ok(result);
+ else
+ return NoContent();
+ }
+
+ ///
+ /// Получить состояние определенной задачи
+ ///
+ ///
+ ///
+ [HttpGet("{idTelemetry}")]
+ public virtual ActionResult GetOrDefault(int idTelemetry)
+ {
+ var result = service.GetOrDefaultState(idTelemetry);
+ return Ok(result);
+ }
+
+ ///
+ /// Создать задачу прореживанию архива и добавить её в очередь на выполнение.
+ /// Если задача есть в очереди, она же и возвращается, но подписка не происходит.
+ ///
+ [HttpPost]
+ [Permission]
+ public virtual ActionResult Enqueue(int idTelemetry)
+ {
+ void onProgress(JobDto job) =>
+ Task.Run(async () =>
+ await telemetryHubContext.Clients.Group(sirnalRGroupName)
+ .SendAsync(sirnalRMethodOnProgress, job));
+
+ service.TryEnqueueRediceSamplingJob(idTelemetry, onProgress, out JobDto job);
+ return Ok(job);
+ }
+ }
+}
diff --git a/ConsoleApp1/Program.cs b/ConsoleApp1/Program.cs
index 11c0ca13..b3d1d6fb 100644
--- a/ConsoleApp1/Program.cs
+++ b/ConsoleApp1/Program.cs
@@ -8,6 +8,7 @@ using DocumentFormat.OpenXml.Wordprocessing;
using Microsoft.EntityFrameworkCore;
using Org.BouncyCastle.Utilities.Collections;
using System;
+using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Linq;
@@ -24,8 +25,13 @@ namespace ConsoleApp1
// use ServiceFactory to make services
static void Main(/*string[] args*/)
{
- Task.Run(() => { throw new Exception(); })
- .ContinueWith(t => { Console.WriteLine("Dooom"); });
+ var h = new Hashtable();
+ h.Add("name", 1);
+ h.Add("name2", "66");
+ var v = h["v"];
+
+ var s = System.Text.Json.JsonSerializer.Serialize(h);
+
Console.WriteLine($"total time: ms");
Console.ReadLine();
}