DD.WellWorkover.Cloud/AsbCloudInfrastructure/Services/SAUB/TelemetryDataCache.cs

185 lines
6.8 KiB
C#
Raw Normal View History

2022-11-03 16:57:41 +05:00
using AsbCloudDb.Model;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using Mapster;
2022-11-15 17:44:48 +05:00
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
2022-11-03 16:57:41 +05:00
2022-11-03 16:57:41 +05:00
namespace AsbCloudInfrastructure.Services.SAUB
{
2022-11-15 17:44:48 +05:00
public class TelemetryDataCache<TDto>
where TDto : AsbCloudApp.Data.ITelemetryData
2022-11-03 16:57:41 +05:00
{
2022-12-05 10:53:24 +05:00
private const int activeWellCapacity = 12 * 60 * 60;
private const int doneWellCapacity = 65 * 60;
2022-11-03 16:57:41 +05:00
private readonly ConcurrentDictionary<int, CyclycArray<TDto>> caches;
2022-11-15 17:44:48 +05:00
private bool isLoading = false;
private TelemetryDataCache()
2022-11-03 16:57:41 +05:00
{
2022-11-15 17:44:48 +05:00
caches = new();
2022-11-03 16:57:41 +05:00
}
2022-11-15 17:44:48 +05:00
private static TelemetryDataCache<TDto>? instance;
2022-11-15 17:44:48 +05:00
//TODO: Move initialize fromDB to bacground service task
public static TelemetryDataCache<TDto> GetInstance<TEntity>(IConfiguration configuration)
where TEntity : class, ITelemetryData
{
if (instance is null)
{
2022-11-15 17:44:48 +05:00
instance = new TelemetryDataCache<TDto>();
_ = Task.Run(() =>
{
using var db = MakeContext(configuration);
instance.InitializeCacheFromDB<TEntity>(db);
db.Dispose();
2022-11-15 17:44:48 +05:00
});
}
2022-11-15 17:44:48 +05:00
return instance;
}
2022-11-15 17:44:48 +05:00
public static TelemetryDataCache<TDto> GetInstance<TEntity>(IAsbCloudDbContext db, out Task initializationTask)
where TEntity : class, ITelemetryData
{
2022-11-15 17:44:48 +05:00
if (instance is null)
{
instance = new TelemetryDataCache<TDto>();
initializationTask = Task.Run(() =>
{
instance.InitializeCacheFromDB<TEntity>(db);
});
}
else
initializationTask = Task.CompletedTask;
return instance;
}
/// <summary>
2022-11-15 17:44:48 +05:00
/// Добавить новые элементы в кеш
/// </summary>
/// <param name="idTelemetry"></param>
/// <param name="range"></param>
2022-11-15 17:44:48 +05:00
public void AddRange(int idTelemetry, IEnumerable<TDto> range)
{
2022-11-15 17:44:48 +05:00
CyclycArray<TDto> cacheItem;
if (isLoading)
{
if (caches.TryGetValue(idTelemetry, out CyclycArray<TDto>? localCacheItem))
cacheItem = localCacheItem;
else
return;
}
else
{
cacheItem = caches.GetOrAdd(idTelemetry, _ => new CyclycArray<TDto>(activeWellCapacity));
}
var newItems = range
.OrderBy(i => i.DateTime);
foreach (var item in newItems)
item.IdTelemetry = idTelemetry;
cacheItem.AddRange(newItems);
}
/// <summary>
/// Получить данные из кеша. <br/>
/// Если dateBegin меньше минимального элемента в кеше, то вернется null.
/// Даже если intervalSec частично перекрыт данными из кеша.
/// </summary>
/// <param name="idTelemetry"></param>
/// <param name="dateBegin"></param>
/// <param name="intervalSec"></param>
/// <param name="approxPointsCount">кол-во элементов до которых эти данные прореживаются</param>
/// <returns></returns>
public IEnumerable<TDto>? GetOrDefault(int idTelemetry, DateTime dateBegin, double intervalSec = 600d, int approxPointsCount = 1024)
{
if(!caches.TryGetValue(idTelemetry, out CyclycArray<TDto>? cacheItem))
return null;
2022-11-15 17:44:48 +05:00
if (cacheItem is null || !cacheItem.Any() || cacheItem[0].DateTime > dateBegin)
return null;
var dateEnd = dateBegin.AddSeconds(intervalSec);
var items = cacheItem
.Where(i => i.DateTime >= dateBegin && i.DateTime <= dateEnd);
var ratio = items.Count() / approxPointsCount;
if (ratio > 1)
items = items
.Where((_, index) => index % ratio == 0);
return items;
}
2022-11-15 17:44:48 +05:00
private void InitializeCacheFromDB<TEntity>(IAsbCloudDbContext db)
where TEntity : class, ITelemetryData
{
if (isLoading)
throw new Exception("Multiple cache loading detected.");
isLoading = true;
Well[] wells = Array.Empty<Well>();
wells = db.Set<Well>()
.Include(well => well.Telemetry)
.Include(well => well.Cluster)
.Where(well => well.IdTelemetry != null)
.ToArray();
2022-11-16 17:07:47 +05:00
2022-11-15 17:44:48 +05:00
foreach (Well well in wells)
{
var capacity = well.IdState == 1
? activeWellCapacity
: doneWellCapacity;
var idTelemetry = well.IdTelemetry!.Value;
var hoursOffset = well.Timezone.Hours;
IEnumerable<TDto> cacheItemData = GetCacheDataFromDb<TEntity>(db, idTelemetry, capacity, hoursOffset);
var cacheItem = new CyclycArray<TDto>(capacity);
cacheItem.AddRange(cacheItemData);
caches.TryAdd(idTelemetry, cacheItem);
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> for well: {well.Cluster?.Caption}/{well.Caption} loaded");
}
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> load complete");
isLoading = false;
}
private static IAsbCloudDbContext MakeContext(IConfiguration configuration)
{
var connectionString = configuration.GetConnectionString("DefaultConnection");
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql(connectionString)
.Options;
var db = new AsbCloudDbContext(options);
return db;
}
private static IEnumerable<TDto> GetCacheDataFromDb<TEntity>(IAsbCloudDbContext db, int idTelemetry, int capacity, double hoursOffset)
where TEntity : class, ITelemetryData
{
var entities = db.Set<TEntity>()
.Where(i => i.IdTelemetry == idTelemetry)
.OrderByDescending(i => i.DateTime)
.Take(capacity)
.ToArray()
.AsEnumerable()
.Reverse();
var dtos = entities.Select(entity => {
var dto = entity.Adapt<TDto>();
dto.DateTime = entity.DateTime.ToRemoteDateTime(hoursOffset);
return dto;
});
return dtos;
}
2022-11-03 16:57:41 +05:00
}
}