DD.WellWorkover.Cloud/AsbCloudInfrastructure/Services/SAUB/TelemetryDataCache.cs
2023-10-08 21:20:28 +05:00

296 lines
11 KiB
C#

using AsbCloudDb.Model;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using Mapster;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using AsbCloudInfrastructure.Background;
using System.Threading;
using AsbCloudApp.Data;
using AsbCloudApp.Requests;
namespace AsbCloudInfrastructure.Services.SAUB
{
public class TelemetryDataCache<TDto>
where TDto : AsbCloudApp.Data.ITelemetryData
{
class TelemetryDataCacheItem
{
public TDto? FirstByDate { get; init; }
public CyclycArray<TDto> LastData { get; init; } = null!;
public double TimezoneHours { get; init; } = 5;
}
private IServiceProvider provider = null!;
private const int activeWellCapacity = 12 * 60 * 60;
private const int doneWellCapacity = 65 * 60;
// key == idTelemetry
private readonly ConcurrentDictionary<int, TelemetryDataCacheItem> caches;
private bool isLoading = false;
private TelemetryDataCache()
{
caches = new();
}
private static TelemetryDataCache<TDto>? instance;
public static TelemetryDataCache<TDto> GetInstance<TEntity>(IServiceProvider provider)
where TEntity : class, AsbCloudDb.Model.ITelemetryData
{
if (instance is null)
{
instance = new TelemetryDataCache<TDto>();
var worker = provider.GetRequiredService<BackgroundWorker>();
var workId = $"Telemetry cache loading from DB {typeof(TEntity).Name}";
var work = new Work(workId, async (workId, provider, onProgress, token) => {
var db = provider.GetRequiredService<IAsbCloudDbContext>();
await instance.InitializeCacheFromDBAsync<TEntity>(db, onProgress, token);
});
worker.WorkStore.RunOnceQueue.Enqueue(work);
}
instance.provider = provider;
return instance;
}
/// <summary>
/// Добавить новые элементы в кеш
/// </summary>
/// <param name="idTelemetry"></param>
/// <param name="range"></param>
public void AddRange(int idTelemetry, IEnumerable<TDto> range)
{
if (!range.Any())
return;
var newItems = range
.OrderBy(i => i.DateTime);
foreach (var item in newItems)
item.IdTelemetry = idTelemetry;
TelemetryDataCacheItem cacheItem;
if (isLoading)
{
if (caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? localCacheItem))
cacheItem = localCacheItem;
else
return;
}
else
{
cacheItem = caches.GetOrAdd(idTelemetry, _ => new TelemetryDataCacheItem()
{
FirstByDate = newItems.ElementAt(0),
LastData = new CyclycArray<TDto>(activeWellCapacity)
});
}
cacheItem.LastData.AddRange(newItems);
}
/// <summary>
/// Получить данные из кеша. <br/>
/// Если dateBegin меньше минимального элемента в кеше, то вернется null.
/// Даже если intervalSec частично перекрыт данными из кеша.
/// </summary>
/// <param name="idTelemetry"></param>
/// <param name="dateBegin"></param>
/// <param name="intervalSec"></param>
/// <param name="approxPointsCount">кол-во элементов до которых эти данные прореживаются</param>
/// <returns></returns>
public IEnumerable<TDto>? GetOrDefault(int idTelemetry, DateTime dateBegin, double intervalSec = 600d, int approxPointsCount = 1024)
{
if(!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem))
return null;
var cacheLastData = cacheItem.LastData;
if (!cacheLastData.Any() || cacheLastData[0].DateTime > dateBegin)
return null;
var dateEnd = dateBegin.AddSeconds(intervalSec);
var items = cacheLastData
.Where(i => i.DateTime >= dateBegin && i.DateTime <= dateEnd);
var ratio = items.Count() / approxPointsCount;
if (ratio > 1)
items = items
.Where((_, index) => index % ratio == 0);
return items;
}
public TDto? GetLastOrDefault(int idTelemetry)
{
if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem))
return default;
return cacheItem.LastData.LastOrDefault();
}
public DatesRangeDto? GetOrDefaultDataDateRange(int idTelemetry)
{
if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem))
return null;
var from = cacheItem.FirstByDate?.DateTime;
if(!cacheItem.LastData.Any())
return null;
var to = cacheItem.LastData[^1].DateTime;
from = from ?? cacheItem.LastData[0].DateTime;
return new DatesRangeDto { From = from.Value, To = to };
}
private async Task InitializeCacheFromDBAsync<TEntity>(IAsbCloudDbContext db, Action<string, double?> onProgress, CancellationToken token)
where TEntity : class, AsbCloudDb.Model.ITelemetryData
{
if (isLoading)
throw new Exception("Multiple cache loading detected.");
isLoading = true;
var defaultTimeout = db.Database.GetCommandTimeout();
System.Diagnostics.Trace.TraceInformation($"cache loading starting. Setting CommandTimeout 90s ({defaultTimeout})");
db.Database.SetCommandTimeout(TimeSpan.FromSeconds(90));
Well[] wells = await db.Set<Well>()
.Include(well => well.Telemetry)
.Include(well => well.Cluster)
.Where(well => well.IdTelemetry != null)
.ToArrayAsync(token);
var count = wells.Length;
var i = 0;
foreach (Well well in wells)
{
var capacity = well.IdState == 1
? activeWellCapacity
: doneWellCapacity;
var idTelemetry = well.IdTelemetry!.Value;
var hoursOffset = well.Timezone.Hours;
// TODO: remove traces
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}>: Loading for well: {well.Cluster?.Caption}/{well.Caption} (capacity:{capacity}) idTelemetry:{idTelemetry}");
var cacheItem = await GetOrDefaultCacheDataFromDbAsync<TEntity>(db, idTelemetry, capacity, hoursOffset, token);
if(cacheItem is not null)
{
caches.TryAdd(idTelemetry, cacheItem);
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> for well: {well.Cluster?.Caption}/{well.Caption} loaded");
}
else
{
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> for well: {well.Cluster?.Caption}/{well.Caption} has no data");
}
}
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> load complete");
isLoading = false;
db.Database.SetCommandTimeout(defaultTimeout);
}
private static async Task<TelemetryDataCacheItem?> GetOrDefaultCacheDataFromDbAsync<TEntity>(IAsbCloudDbContext db, int idTelemetry, int capacity, double hoursOffset, CancellationToken token)
where TEntity : class, AsbCloudDb.Model.ITelemetryData
{
var query = db.Set<TEntity>()
.Where(i => i.IdTelemetry == idTelemetry);
var firstDbEntity = await query
.OrderBy(i => i.DateTime)
.FirstOrDefaultAsync(token);
if (firstDbEntity is null)
return default;
var first = firstDbEntity.Adapt<TDto>();
first.DateTime = firstDbEntity.DateTime.ToRemoteDateTime(hoursOffset);
var entities = await query
.OrderByDescending(i => i.DateTime)
.Take(capacity)
.ToArrayAsync(token);
var dtos = entities
.AsEnumerable()
.Reverse()
.Select(entity => {
var dto = entity.Adapt<TDto>();
dto.DateTime = entity.DateTime.ToRemoteDateTime(hoursOffset);
return dto;
});
var cacheItem = new CyclycArray<TDto>(capacity);
cacheItem.AddRange(dtos);
var item = new TelemetryDataCacheItem
{
FirstByDate = first,
LastData = cacheItem,
TimezoneHours = hoursOffset,
};
return item;
}
public IEnumerable<TDto>? GetOrDefault(int idTelemetry, TelemetryDataRequest request)
{
if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem))
return null;
IEnumerable<TDto> data = cacheItem.LastData;
if (!data.Any())
return null;
if (request.GeDate.HasValue)
{
var geDate = request.GeDate.Value.ToRemoteDateTime(cacheItem.TimezoneHours);
if (data.First().DateTime > geDate)
return null;
data = data.Where(d => d.DateTime >= geDate);
}
else
{
if (request.Order == 0)
return null;
}
if (request.LeDate.HasValue)
{
var leDate = request.LeDate.Value.ToRemoteDateTime(cacheItem.TimezoneHours);
data = data.Where(d => d.DateTime >= request.LeDate);
}
if (request.Divider > 1)
data = data.Where((d) => (((d.DateTime.DayOfYear * 24 + d.DateTime.Hour) * 60 + d.DateTime.Minute) * 60 + d.DateTime.Second) % request.Divider == 0);
switch (request.Order)
{
case 1: // Поздние вперед
data = data
.OrderByDescending(d => d.DateTime)
.Skip(request.Skip)
.Take(request.Take)
.OrderBy(d => d.DateTime);
break;
default: // Ранние вперед
data = data
.OrderBy(d => d.DateTime)
.Skip(request.Skip)
.Take(request.Take);
break;
}
return data;
}
}
}