diff --git a/AsbCloudApp/Data/JobDto.cs b/AsbCloudApp/Data/JobDto.cs new file mode 100644 index 00000000..2e9b336a --- /dev/null +++ b/AsbCloudApp/Data/JobDto.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections; + +namespace AsbCloudApp.Data +{ +#nullable enable + /// + /// Состояние фоновой задачи + /// + public enum JobState + { + /// + /// Ожидает в очереди на выполнение + /// + Waiting, + /// + /// выполняется + /// + Working, + /// + /// успешно выполнена + /// + Done, + /// + /// завершена с ошибкой + /// + Fail + }; + + /// + /// работа фоновой задачи + /// + public class JobDto + { + /// + /// идентификатор + /// + public int Id { get; set; } + + /// + /// Состояние + /// + public JobState State { get; set; } + + /// + /// результат выполнения + /// + public Hashtable? Results { get; set; } + + /// + /// Исключение, если возникла ошибка + /// + public string? Error { get; set; } + } +#nullable disable +} diff --git a/AsbCloudApp/Services/IReduceSamplingService.cs b/AsbCloudApp/Services/IReduceSamplingService.cs new file mode 100644 index 00000000..a0220c58 --- /dev/null +++ b/AsbCloudApp/Services/IReduceSamplingService.cs @@ -0,0 +1,43 @@ +using AsbCloudApp.Data; +using System.Collections.Generic; + +namespace AsbCloudApp.Services +{ +#nullable enable + /// + /// Делегат обновления состояния задачи + /// + /// + public delegate void OnJobProgressDelagate(JobDto job); + + /// + /// Сервис прореживания архива БД. + /// Удаляет часть телеметрии. + /// Понижает частоту записей в БД с 1 запись за 1 сек до 1 запись за N сек. + /// + public interface IReduceSamplingService + { + /// + /// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее. + /// + /// Enumerable of JobDto or empty + IEnumerable GetJobs(); + + /// + /// Получить состояние определенной задачи + /// + /// + /// + JobDto? GetOrDefaultState(int idTelemetry); + + /// + /// Создать задачу прореживанию архива и добавить её в очередь на выполнение + /// + /// телеметрия для прореживания + /// колбек процесса выполнения + /// созданная задача или задача из очереди + /// задача добавлена == true + bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate onProgress, out JobDto jobDto); + } +#nullable disable +} \ No newline at end of file diff --git a/AsbCloudDb/EFExtentions.cs b/AsbCloudDb/EFExtentions.cs index db66be04..f0523294 100644 --- a/AsbCloudDb/EFExtentions.cs +++ b/AsbCloudDb/EFExtentions.cs @@ -69,6 +69,15 @@ namespace AsbCloudDb return database.ExecuteSqlRawAsync(query, token); } + public static Task ExecInsertAsync(this Microsoft.EntityFrameworkCore.Infrastructure.DatabaseFacade database, DbSet dbSet, IEnumerable items, CancellationToken token) + where T : class + { + var factory = GetQueryStringFactory(dbSet); + var query = factory.MakeInsertSql(items); + + return database.ExecuteSqlRawAsync(query, token); + } + public static string GetTableName(this DbSet dbSet) where T : class { @@ -154,6 +163,14 @@ namespace AsbCloudDb return builder.ToString(); } + public string MakeInsertSql(IEnumerable items) + { + var builder = new StringBuilder(insertHeader, 7); + BuildRows(builder, items); + builder.Append(';'); + return builder.ToString(); + } + private StringBuilder BuildRows(StringBuilder builder, IEnumerable items) { var list = items.ToList(); diff --git a/AsbCloudInfrastructure/DependencyInjection.cs b/AsbCloudInfrastructure/DependencyInjection.cs index 53e2d792..3f089a0c 100644 --- a/AsbCloudInfrastructure/DependencyInjection.cs +++ b/AsbCloudInfrastructure/DependencyInjection.cs @@ -103,6 +103,7 @@ namespace AsbCloudInfrastructure services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(provider => ReduceSamplingService.GetInstance(configuration)); services.AddTransient(); services.AddTransient(); diff --git a/AsbCloudInfrastructure/Services/ReduceSamplingService.cs b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs new file mode 100644 index 00000000..d9c11e0d --- /dev/null +++ b/AsbCloudInfrastructure/Services/ReduceSamplingService.cs @@ -0,0 +1,335 @@ +using AsbCloudApp.Data; +using AsbCloudApp.Services; +using AsbCloudDb; +using AsbCloudDb.Model; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace AsbCloudInfrastructure.Services +{ +#nullable enable + class JobHandle + { + public int Id => Job.Id; + public JobDto Job { get; set; } = null!; + public DateTime DateCreate { get; set; } = DateTime.Now; + public OnJobProgressDelagate? OnProgress { get; set; } + } + + public class ReduceSamplingService : IReduceSamplingService + { + private const string jobResultKeyDeleted = "deleted"; + private const string jobResultKeyTotal = "total"; + private static ReduceSamplingService? instance; + private readonly string connectionString; + private const int ratio = 5; + private readonly List jobHandlers = new(5); + private bool isHandling; + private CancellationTokenSource? cancellationTokenSource; + private Task? task; + + private ReduceSamplingService(IConfiguration configuration) + { + connectionString = configuration.GetConnectionString("DefaultConnection"); + } + + ~ReduceSamplingService() + { + Stop(); + } + + /// + /// Get singleton Instance + /// + /// + /// + public static ReduceSamplingService GetInstance(IConfiguration configuration) + { + instance ??= new(configuration); + return instance; + } + + /// + public bool TryEnqueueRediceSamplingJob(int idTelemetry, OnJobProgressDelagate? onProgress, out JobDto jobDto) + { + lock (jobHandlers) + { + var oldJob = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry); + if (oldJob is not null) + { + jobDto = oldJob.Job; + return false; + } + + jobDto = new JobDto + { + Id = idTelemetry, + State = JobState.Waiting, + Results = new(), + }; + var jobHandler = new JobHandle + { + Job = jobDto, + OnProgress = onProgress, + }; + + jobHandlers.Add(jobHandler); + } + EnsureHandleQueueStarted(); + return true; + } + + /// + public JobDto? GetOrDefaultState(int idTelemetry) + { + JobHandle? jobHandler; + lock (jobHandlers) + { + jobHandler = jobHandlers.FirstOrDefault(j => j.Id == idTelemetry); + } + return jobHandler?.Job; + } + + /// + public IEnumerable GetJobs() => jobHandlers.Select(j=>j.Job); + + private bool TryTakeWaitingJob(out JobHandle? job) + { + lock (jobHandlers) + { + job = jobHandlers.FirstOrDefault(j => j.Job.State == JobState.Waiting); + } + return job is not null; + } + + private void EnsureHandleQueueStarted() + { + if (isHandling) + return; + isHandling = true; + cancellationTokenSource = new CancellationTokenSource(); + var token = cancellationTokenSource.Token; + task = Task.Run(async () => await HandleJobs(token)) + .ContinueWith(_ => isHandling = false); + } + + private async Task HandleJobs(CancellationToken token) + { + while (TryTakeWaitingJob(out JobHandle? jobHandler)) + { + jobHandler!.Job.State = JobState.Working; + try + { + await RediceSamplingSaubAsync(jobHandler, token); + await RediceSamplingSpinAsync(jobHandler, token); + jobHandler.Job.State = JobState.Done; + } + catch (Exception exception) + { + jobHandler.Job.State = JobState.Fail; + jobHandler.Job.Results = null; + jobHandler.Job.Error = exception.Message; + jobHandler.OnProgress?.Invoke(jobHandler.Job); + } + + if (!jobHandlers.Any(j => j.Job.State == JobState.Waiting)) + { + var sw = Stopwatch.StartNew(); + await VacuumAsync(token); + sw.Stop(); + if (sw.ElapsedMilliseconds < 10_000) + { + var delayMs = 10_000 - (int)sw.ElapsedMilliseconds; + await Task.Delay(delayMs, token); + } + CleanJobs(); + } + } + } + + private void CleanJobs() + { + lock (jobHandlers) + { + jobHandlers.RemoveAll(j => j.Job.State == JobState.Done || j.Job.State == JobState.Fail); + } + } + + private async Task VacuumAsync(CancellationToken token) + { + using var db = MakeContext(); + var sqlVacuum = "vacuum (SKIP_LOCKED);"; + await db.Database.ExecuteSqlRawAsync(sqlVacuum, token); + } + + private void Stop() + { + cancellationTokenSource?.Cancel(); + task?.Wait(1_000); + task = null; + cancellationTokenSource?.Dispose(); + cancellationTokenSource = null; + } + + private Task RediceSamplingSaubAsync(JobHandle job, CancellationToken token) + { + const int ramLimit = 10 * 1024 * 1024; + const int dataItemSize = 345; // by profiler + const int chankSize = ramLimit / dataItemSize; // ~ 90_000 + const double maxWellDepthGap = 0.1; + + var maxDateGapSec = ratio; + + var sqlSelectTemplate = + "select " + + " * " + + "from " + + " (select " + + " *, " + + " rank() over win1 as row_num, " + + " lag(\"date\", 1) over win1 as lag_date, " + + " lag(\"mode\", 1) over win1 as lag_mode, " + + " lag(mse_state, 1) over win1 as lag_mse_state, " + + " lag(well_depth, 1) over win1 as lag_well_depth, " + + " lag(id_feed_regulator, 1) over win1 as lag_id_feed_regulator " + + " from t_telemetry_data_saub " + + $" where id_telemetry = {job.Id} and \"date\" > {{0}}" + + " window win1 as (order by \"date\") " + + " ) as t_1 " + + "where " + + $" (row_num % {ratio}) = 0 " + + " or \"mode\" != lag_mode " + + $" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " + + $" or well_depth - lag_well_depth > {maxWellDepthGap:#0,0#} " + + " or mse_state != lag_mse_state " + + " or id_feed_regulator != lag_id_feed_regulator " + + "order by \"date\" "; + + var sqlDeleteTemplate = "delete " + + "from t_telemetry_data_saub " + + $"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};"; + + return RediceSamplingAsync( + job, + chankSize, + sqlSelectTemplate, + sqlDeleteTemplate, + token); + } + + private Task RediceSamplingSpinAsync(JobHandle job, CancellationToken token) + { + const int ramLimit = 10 * 1024 * 1024; + const int dataItemSize = 345; // by profiler + const int chankSize = ramLimit / dataItemSize; // ~ 90_000 + var maxDateGapSec = ratio; + + var sqlSelectTemplate = + "select " + + " * " + + "from " + + " (select " + + " *, " + + " rank() over win1 as row_num, " + + " lag(\"date\", 1) over win1 as lag_date, " + + " lag(\"mode\", 1) over win1 as lag_mode, " + + " lag(state, 1) over win1 as lag_state " + + " from t_telemetry_data_spin " + + $" where id_telemetry = {job.Id} and \"date\" > {{0}}" + + " window win1 as (order by \"date\") " + + " ) as t_1 " + + "where " + + $" (row_num % {ratio}) = 0 " + + " or \"mode\" != lag_mode " + + $" or(\"date\" - lag_date) >= interval '{maxDateGapSec} second' " + + " or state != lag_state " + + "order by \"date\" "; + + var sqlDeleteTemplate = "delete " + + "from t_telemetry_data_spin " + + $"where id_telemetry = {job.Id} and \"date\" between {{0}} and {{1}};"; + + return RediceSamplingAsync( + job, + chankSize, + sqlSelectTemplate, + sqlDeleteTemplate, + token); + } + + private async Task RediceSamplingAsync( + JobHandle jobHandle, + int chankSize, + string sqlSelectTemplate, + string sqlDeleteTemplate, + CancellationToken token) + where TEntity : class, AsbCloudDb.Model.ITelemetryData + { + using var db = MakeContext(); + var dbset = db.Set(); + + var deleted = 0; + var totalCount = await dbset.Where(t => t.IdTelemetry == jobHandle.Id).CountAsync(token); + + var result = jobHandle.Job.Results!; + if (result[jobResultKeyDeleted] is int previousDeleted) + deleted += previousDeleted; + + if (result[jobResultKeyTotal] is int previousCount) + totalCount += previousCount; + + result[jobResultKeyDeleted] = deleted; + result[jobResultKeyTotal] = totalCount; + + jobHandle.OnProgress?.Invoke(jobHandle.Job); + var startDate = DateTimeOffset.MinValue; + + do + { + var query = dbset + .FromSqlRaw(sqlSelectTemplate, startDate) + .AsNoTracking(); + + var data = await query + .Take(chankSize) + .ToArrayAsync(token); + + var currentDataCount = data.Length; + if (currentDataCount == 0) + break; + + var lastDate = data.Last().DateTime; + + var currentDeleted = await db.Database.ExecuteSqlRawAsync(sqlDeleteTemplate, new object[] { startDate, lastDate }.AsEnumerable(), token); + if (currentDeleted == 0) + break; + + await db.Database.ExecInsertAsync(dbset, data, token); + + startDate = lastDate; + deleted += currentDeleted; + result[jobResultKeyDeleted] = deleted; + result[jobResultKeyTotal] = totalCount; + jobHandle.OnProgress?.Invoke(jobHandle.Job); + } while (true); + + return; + } + + private AsbCloudDbContext MakeContext() + { + var options = new DbContextOptionsBuilder() + .UseNpgsql(connectionString) + .Options; + + return new AsbCloudDbContext(options); + } + } +#nullable disable +} diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs index ebfe6822..285ac3d8 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataBaseService.cs @@ -16,7 +16,7 @@ namespace AsbCloudInfrastructure.Services.SAUB where TDto : AsbCloudApp.Data.ITelemetryData where TModel : class, AsbCloudDb.Model.ITelemetryData { - private readonly IAsbCloudDbContext db; + protected readonly IAsbCloudDbContext db; private readonly ITelemetryService telemetryService; protected readonly CacheTable cacheTelemetry; protected readonly CacheTable cacheTelemetryUsers; @@ -170,5 +170,7 @@ namespace AsbCloudInfrastructure.Services.SAUB return offset; } + + } } diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs index 2941b1ca..419c5831 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSaubService.cs @@ -2,7 +2,12 @@ using AsbCloudApp.Services; using AsbCloudDb.Model; using AsbCloudInfrastructure.Services.Cache; +using DocumentFormat.OpenXml.Drawing.Charts; using Mapster; +using Microsoft.EntityFrameworkCore; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace AsbCloudInfrastructure.Services.SAUB { diff --git a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs index c33b7cab..5cd51161 100644 --- a/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs +++ b/AsbCloudInfrastructure/Services/SAUB/TelemetryDataSpinService.cs @@ -3,6 +3,8 @@ using AsbCloudApp.Services; using AsbCloudDb.Model; using AsbCloudInfrastructure.Services.Cache; using Mapster; +using System.Threading; +using System.Threading.Tasks; namespace AsbCloudInfrastructure.Services.SAUB { diff --git a/AsbCloudInfrastructure/Startup.cs b/AsbCloudInfrastructure/Startup.cs index 884a3383..5ccb0df7 100644 --- a/AsbCloudInfrastructure/Startup.cs +++ b/AsbCloudInfrastructure/Startup.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using System; +using System.Linq; namespace AsbCloudInfrastructure { diff --git a/AsbCloudWebApi.Tests/AsbCloudWebApi.Tests.csproj b/AsbCloudWebApi.Tests/AsbCloudWebApi.Tests.csproj index f8b181c5..0f182ff4 100644 --- a/AsbCloudWebApi.Tests/AsbCloudWebApi.Tests.csproj +++ b/AsbCloudWebApi.Tests/AsbCloudWebApi.Tests.csproj @@ -4,6 +4,8 @@ net6.0 false + + enable diff --git a/AsbCloudWebApi/Controllers/ReduceSamplingController.cs b/AsbCloudWebApi/Controllers/ReduceSamplingController.cs new file mode 100644 index 00000000..ddb99b2d --- /dev/null +++ b/AsbCloudWebApi/Controllers/ReduceSamplingController.cs @@ -0,0 +1,78 @@ +using AsbCloudApp.Services; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using System.Collections.Generic; +using System.Threading.Tasks; +using AsbCloudApp.Data; +using System.Linq; +using AsbCloudWebApi.SignalR; +using Microsoft.AspNetCore.SignalR; + +namespace AsbCloudWebApi.Controllers +{ + /// + /// Редактор кустов для админки + /// + [Route("api/admin/[controller]")] + [ApiController] + [Authorize] + public class ReduceSamplingController: ControllerBase + { + private readonly IReduceSamplingService service; + private readonly IHubContext telemetryHubContext; + private const string sirnalRGroupName = "ReduceSampling"; + private const string sirnalRMethodOnProgress = "OnProgress"; + + public ReduceSamplingController( + IReduceSamplingService service, + IHubContext telemetryHubContext ) + { + this.service = service; + this.telemetryHubContext = telemetryHubContext; + } + + /// + /// Получить все задания. Задания удаляются минимум через 10 сек после выполнения, возможно позднее. + /// + /// + /// + [HttpGet] + public virtual ActionResult> GetAll(int idTelemetry) + { + var result = service.GetJobs(); + if (result.Any()) + return Ok(result); + else + return NoContent(); + } + + /// + /// Получить состояние определенной задачи + /// + /// + /// + [HttpGet("{idTelemetry}")] + public virtual ActionResult GetOrDefault(int idTelemetry) + { + var result = service.GetOrDefaultState(idTelemetry); + return Ok(result); + } + + /// + /// Создать задачу прореживанию архива и добавить её в очередь на выполнение. + /// Если задача есть в очереди, она же и возвращается, но подписка не происходит. + /// + [HttpPost] + [Permission] + public virtual ActionResult Enqueue(int idTelemetry) + { + void onProgress(JobDto job) => + Task.Run(async () => + await telemetryHubContext.Clients.Group(sirnalRGroupName) + .SendAsync(sirnalRMethodOnProgress, job)); + + service.TryEnqueueRediceSamplingJob(idTelemetry, onProgress, out JobDto job); + return Ok(job); + } + } +} diff --git a/ConsoleApp1/ConsoleApp1.csproj b/ConsoleApp1/ConsoleApp1.csproj index da0e1e97..c0c5ae85 100644 --- a/ConsoleApp1/ConsoleApp1.csproj +++ b/ConsoleApp1/ConsoleApp1.csproj @@ -4,6 +4,7 @@ Exe net6.0 ConsoleApp1.Program + enable diff --git a/ConsoleApp1/Program.cs b/ConsoleApp1/Program.cs index 5b345220..b3d1d6fb 100644 --- a/ConsoleApp1/Program.cs +++ b/ConsoleApp1/Program.cs @@ -1,72 +1,39 @@ using AsbCloudApp.Data; using AsbCloudApp.Data.DailyReport; +using AsbCloudDb; +using AsbCloudDb.Model; using AsbCloudInfrastructure.Services.DailyReport; using ClosedXML.Excel; +using DocumentFormat.OpenXml.Wordprocessing; +using Microsoft.EntityFrameworkCore; +using Org.BouncyCastle.Utilities.Collections; using System; +using System.Collections; +using System.Collections.Generic; using System.IO; - +using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace ConsoleApp1 { class Program { + private static AsbCloudDbContext db = ServiceFactory.Context; + // use ServiceFactory to make services static void Main(/*string[] args*/) { - + var h = new Hashtable(); + h.Add("name", 1); + h.Add("name2", "66"); + var v = h["v"]; - var block = new HeadDto() - { - AzimuthAngle = 12, - WellName = "WellName", - ClusterName = "clusterName", - Customer = "customer", - Contractor = "Contractor", - ReportDate = DateTime.Now, - WellDepthIntervalFinishDate = 27.5, - WellDepthIntervalStartDate = 26.5, - BottomholeDepth = 66.6 - }; - var block2 = new BhaDto() - { - BHADescription = "sadasdasdasdasdasdjlaskjdaksjdlasdlalskdklj" - }; - var block3 = new SaubDto(); - var bloks = new DailyReportDto() - { - Head = block, - Saub = block3 - }; + var s = System.Text.Json.JsonSerializer.Serialize(h); - - - var service = new DailyReportMakerExcel(); - var stream = service.MakeReportFromBlocks(bloks); - var filename = "____.xlsx"; - if (File.Exists(filename)) - File.Delete(filename); - using var fileStream = File.OpenWrite(filename); - stream.CopyTo(fileStream); - - return; - - - - - - - //var ms = MakeReportFromBlocks(block,block3); - ////File.Create("", MakeReportFromBlocks(block)); - //using var file = new FileStream("file.xlsx", FileMode.Create, System.IO.FileAccess.Write); - //byte[] bytes = new byte[ms.Length]; - //ms.Read(bytes, 0, (int)ms.Length); - //file.Write(bytes, 0, bytes.Length); - //ms.Close(); + Console.WriteLine($"total time: ms"); + Console.ReadLine(); } - - - - } }