forked from ddrilling/AsbCloudServer
184 lines
6.8 KiB
C#
184 lines
6.8 KiB
C#
using AsbCloudDb.Model;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System;
|
|
using System.Linq;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Mapster;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Configuration;
|
|
|
|
#nullable enable
|
|
namespace AsbCloudInfrastructure.Services.SAUB
|
|
{
|
|
public class TelemetryDataCache<TDto>
|
|
where TDto : AsbCloudApp.Data.ITelemetryData
|
|
{
|
|
private const int activeWellCapacity = 24 * 60 * 60;
|
|
private const int doneWellCapacity = 65 * 60;
|
|
|
|
private readonly ConcurrentDictionary<int, CyclycArray<TDto>> caches;
|
|
private bool isLoading = false;
|
|
|
|
private TelemetryDataCache()
|
|
{
|
|
caches = new();
|
|
}
|
|
|
|
private static TelemetryDataCache<TDto>? instance;
|
|
|
|
//TODO: Move initialize fromDB to bacground service task
|
|
public static TelemetryDataCache<TDto> GetInstance<TEntity>(IConfiguration configuration)
|
|
where TEntity : class, ITelemetryData
|
|
{
|
|
if (instance is null)
|
|
{
|
|
instance = new TelemetryDataCache<TDto>();
|
|
_ = Task.Run(() =>
|
|
{
|
|
using var db = MakeContext(configuration);
|
|
instance.InitializeCacheFromDB<TEntity>(db);
|
|
});
|
|
}
|
|
return instance;
|
|
}
|
|
public static TelemetryDataCache<TDto> GetInstance<TEntity>(IAsbCloudDbContext db, out Task initializationTask)
|
|
where TEntity : class, ITelemetryData
|
|
{
|
|
if (instance is null)
|
|
{
|
|
instance = new TelemetryDataCache<TDto>();
|
|
initializationTask = Task.Run(() =>
|
|
{
|
|
instance.InitializeCacheFromDB<TEntity>(db);
|
|
});
|
|
}
|
|
else
|
|
initializationTask = Task.CompletedTask;
|
|
return instance;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Добавить новые элементы в кеш
|
|
/// </summary>
|
|
/// <param name="idTelemetry"></param>
|
|
/// <param name="range"></param>
|
|
public void AddRange(int idTelemetry, IEnumerable<TDto> range)
|
|
{
|
|
CyclycArray<TDto> cacheItem;
|
|
if (isLoading)
|
|
{
|
|
if (caches.TryGetValue(idTelemetry, out CyclycArray<TDto>? localCacheItem))
|
|
cacheItem = localCacheItem;
|
|
else
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
cacheItem = caches.GetOrAdd(idTelemetry, _ => new CyclycArray<TDto>(activeWellCapacity));
|
|
}
|
|
|
|
var newItems = range
|
|
.OrderBy(i => i.DateTime);
|
|
foreach (var item in newItems)
|
|
item.IdTelemetry = idTelemetry;
|
|
cacheItem.AddRange(newItems);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Получить данные из кеша. <br/>
|
|
/// Если dateBegin меньше минимального элемента в кеше, то вернется null.
|
|
/// Даже если intervalSec частично перекрыт данными из кеша.
|
|
/// </summary>
|
|
/// <param name="idTelemetry"></param>
|
|
/// <param name="dateBegin"></param>
|
|
/// <param name="intervalSec"></param>
|
|
/// <param name="approxPointsCount">кол-во элементов до которых эти данные прореживаются</param>
|
|
/// <returns></returns>
|
|
public IEnumerable<TDto>? GetOrDefault(int idTelemetry, DateTime dateBegin, double intervalSec = 600d, int approxPointsCount = 1024)
|
|
{
|
|
if(!caches.TryGetValue(idTelemetry, out CyclycArray<TDto>? cacheItem))
|
|
return null;
|
|
|
|
if (cacheItem is null || !cacheItem.Any() || cacheItem[0].DateTime > dateBegin)
|
|
return null;
|
|
|
|
var dateEnd = dateBegin.AddSeconds(intervalSec);
|
|
var items = cacheItem
|
|
.Where(i => i.DateTime >= dateBegin && i.DateTime <= dateEnd);
|
|
|
|
var ratio = items.Count() / approxPointsCount;
|
|
if (ratio > 1)
|
|
items = items
|
|
.Where((_, index) => index % ratio == 0);
|
|
|
|
return items;
|
|
}
|
|
|
|
private void InitializeCacheFromDB<TEntity>(IAsbCloudDbContext db)
|
|
where TEntity : class, ITelemetryData
|
|
{
|
|
if (isLoading)
|
|
throw new Exception("Multiple cache loading detected.");
|
|
|
|
isLoading = true;
|
|
Well[] wells = Array.Empty<Well>();
|
|
wells = db.Set<Well>()
|
|
.Include(well => well.Telemetry)
|
|
.Include(well => well.Cluster)
|
|
.Where(well => well.IdTelemetry != null)
|
|
.ToArray();
|
|
|
|
foreach (Well well in wells)
|
|
{
|
|
var capacity = well.IdState == 1
|
|
? activeWellCapacity
|
|
: doneWellCapacity;
|
|
|
|
var idTelemetry = well.IdTelemetry!.Value;
|
|
var hoursOffset = well.Timezone.Hours;
|
|
|
|
IEnumerable<TDto> cacheItemData = GetCacheDataFromDb<TEntity>(db, idTelemetry, capacity, hoursOffset);
|
|
var cacheItem = new CyclycArray<TDto>(capacity);
|
|
cacheItem.AddRange(cacheItemData);
|
|
caches.TryAdd(idTelemetry, cacheItem);
|
|
|
|
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> for well: {well.Cluster?.Caption}/{well.Caption} loaded");
|
|
}
|
|
|
|
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> load complete");
|
|
isLoading = false;
|
|
}
|
|
|
|
private static IAsbCloudDbContext MakeContext(IConfiguration configuration)
|
|
{
|
|
var connectionString = configuration.GetConnectionString("DefaultConnection");
|
|
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
|
|
.UseNpgsql(connectionString)
|
|
.Options;
|
|
var db = new AsbCloudDbContext(options);
|
|
return db;
|
|
}
|
|
|
|
private static IEnumerable<TDto> GetCacheDataFromDb<TEntity>(IAsbCloudDbContext db, int idTelemetry, int capacity, double hoursOffset)
|
|
where TEntity : class, ITelemetryData
|
|
{
|
|
var entities = db.Set<TEntity>()
|
|
.Where(i => i.IdTelemetry == idTelemetry)
|
|
.OrderByDescending(i => i.DateTime)
|
|
.Take(capacity)
|
|
.ToArray()
|
|
.AsEnumerable()
|
|
.Reverse();
|
|
|
|
var dtos = entities.Select(entity => {
|
|
var dto = entity.Adapt<TDto>();
|
|
dto.DateTime = entity.DateTime.ToRemoteDateTime(hoursOffset);
|
|
return dto;
|
|
});
|
|
|
|
return dtos;
|
|
}
|
|
}
|
|
}
|
|
#nullable disable |