forked from ddrilling/AsbCloudServer
TelemetryDataCache loads cache by BackgroundWorker
This commit is contained in:
parent
9106658ebf
commit
36556bd6e3
@ -7,17 +7,28 @@ using Microsoft.EntityFrameworkCore;
|
||||
using Mapster;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using AsbCloudInfrastructure.Background;
|
||||
using System.Threading;
|
||||
using AsbCloudApp.Data;
|
||||
|
||||
namespace AsbCloudInfrastructure.Services.SAUB
|
||||
{
|
||||
public class TelemetryDataCache<TDto>
|
||||
where TDto : AsbCloudApp.Data.ITelemetryData
|
||||
{
|
||||
class TelemetryDataCacheItem
|
||||
{
|
||||
public TDto? FirstByDate { get; init; }
|
||||
public CyclycArray<TDto> LastData { get; init; } = null!;
|
||||
}
|
||||
|
||||
private IServiceProvider provider = null!;
|
||||
private const int activeWellCapacity = 12 * 60 * 60;
|
||||
private const int doneWellCapacity = 65 * 60;
|
||||
|
||||
private readonly ConcurrentDictionary<int, CyclycArray<TDto>> caches;
|
||||
// key == idTelemetry
|
||||
private readonly ConcurrentDictionary<int, TelemetryDataCacheItem> caches;
|
||||
private bool isLoading = false;
|
||||
|
||||
private TelemetryDataCache()
|
||||
@ -27,35 +38,22 @@ namespace AsbCloudInfrastructure.Services.SAUB
|
||||
|
||||
private static TelemetryDataCache<TDto>? instance;
|
||||
|
||||
//TODO: Move initialize fromDB to bacground service task
|
||||
public static TelemetryDataCache<TDto> GetInstance<TEntity>(IConfiguration configuration)
|
||||
where TEntity : class, ITelemetryData
|
||||
public static TelemetryDataCache<TDto> GetInstance<TEntity>(IServiceProvider provider)
|
||||
where TEntity : class, AsbCloudDb.Model.ITelemetryData
|
||||
{
|
||||
if (instance is null)
|
||||
{
|
||||
instance = new TelemetryDataCache<TDto>();
|
||||
_ = Task.Run(() =>
|
||||
{
|
||||
using var db = MakeContext(configuration);
|
||||
instance.InitializeCacheFromDB<TEntity>(db);
|
||||
db.Dispose();
|
||||
var worker = provider.GetRequiredService<BackgroundWorker>();
|
||||
var workId = $"Telemetry cache loading from DB {typeof(TEntity).Name}";
|
||||
var work = new WorkBase(workId, async (workId, provider, token) => {
|
||||
var db = provider.GetRequiredService<IAsbCloudDbContext>();
|
||||
await instance.InitializeCacheFromDBAsync<TEntity>(db, token);
|
||||
});
|
||||
|
||||
worker.Push(work);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
public static TelemetryDataCache<TDto> GetInstance<TEntity>(IAsbCloudDbContext db, out Task initializationTask)
|
||||
where TEntity : class, ITelemetryData
|
||||
{
|
||||
if (instance is null)
|
||||
{
|
||||
instance = new TelemetryDataCache<TDto>();
|
||||
initializationTask = Task.Run(() =>
|
||||
{
|
||||
instance.InitializeCacheFromDB<TEntity>(db);
|
||||
});
|
||||
}
|
||||
else
|
||||
initializationTask = Task.CompletedTask;
|
||||
instance.provider = provider;
|
||||
return instance;
|
||||
}
|
||||
|
||||
@ -66,24 +64,33 @@ namespace AsbCloudInfrastructure.Services.SAUB
|
||||
/// <param name="range"></param>
|
||||
public void AddRange(int idTelemetry, IEnumerable<TDto> range)
|
||||
{
|
||||
CyclycArray<TDto> cacheItem;
|
||||
if (!range.Any())
|
||||
return;
|
||||
|
||||
var newItems = range
|
||||
.OrderBy(i => i.DateTime);
|
||||
|
||||
foreach (var item in newItems)
|
||||
item.IdTelemetry = idTelemetry;
|
||||
|
||||
TelemetryDataCacheItem cacheItem;
|
||||
if (isLoading)
|
||||
{
|
||||
if (caches.TryGetValue(idTelemetry, out CyclycArray<TDto>? localCacheItem))
|
||||
if (caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? localCacheItem))
|
||||
cacheItem = localCacheItem;
|
||||
else
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
cacheItem = caches.GetOrAdd(idTelemetry, _ => new CyclycArray<TDto>(activeWellCapacity));
|
||||
cacheItem = caches.GetOrAdd(idTelemetry, _ => new TelemetryDataCacheItem()
|
||||
{
|
||||
FirstByDate = newItems.ElementAt(0),
|
||||
LastData = new CyclycArray<TDto>(activeWellCapacity)
|
||||
});
|
||||
}
|
||||
|
||||
var newItems = range
|
||||
.OrderBy(i => i.DateTime);
|
||||
foreach (var item in newItems)
|
||||
item.IdTelemetry = idTelemetry;
|
||||
cacheItem.AddRange(newItems);
|
||||
cacheItem.LastData.AddRange(newItems);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -98,14 +105,16 @@ namespace AsbCloudInfrastructure.Services.SAUB
|
||||
/// <returns></returns>
|
||||
public IEnumerable<TDto>? GetOrDefault(int idTelemetry, DateTime dateBegin, double intervalSec = 600d, int approxPointsCount = 1024)
|
||||
{
|
||||
if(!caches.TryGetValue(idTelemetry, out CyclycArray<TDto>? cacheItem))
|
||||
if(!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem))
|
||||
return null;
|
||||
|
||||
if (cacheItem is null || !cacheItem.Any() || cacheItem[0].DateTime > dateBegin)
|
||||
var cacheLastData = cacheItem.LastData;
|
||||
|
||||
if (!cacheLastData.Any() || cacheLastData[0].DateTime > dateBegin)
|
||||
return null;
|
||||
|
||||
var dateEnd = dateBegin.AddSeconds(intervalSec);
|
||||
var items = cacheItem
|
||||
var items = cacheLastData
|
||||
.Where(i => i.DateTime >= dateBegin && i.DateTime <= dateEnd);
|
||||
|
||||
var ratio = items.Count() / approxPointsCount;
|
||||
@ -116,19 +125,35 @@ namespace AsbCloudInfrastructure.Services.SAUB
|
||||
return items;
|
||||
}
|
||||
|
||||
private void InitializeCacheFromDB<TEntity>(IAsbCloudDbContext db)
|
||||
where TEntity : class, ITelemetryData
|
||||
public DatesRangeDto? GetOrDefaultDataDateRange(int idTelemetry)
|
||||
{
|
||||
if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem))
|
||||
return null;
|
||||
|
||||
var from = cacheItem.FirstByDate?.DateTime;
|
||||
if(!cacheItem.LastData.Any())
|
||||
return null;
|
||||
|
||||
var to = cacheItem.LastData[^1].DateTime;
|
||||
from = from ?? cacheItem.LastData[0].DateTime;
|
||||
|
||||
return new DatesRangeDto { From = from.Value, To = to };
|
||||
}
|
||||
|
||||
private async Task InitializeCacheFromDBAsync<TEntity>(IAsbCloudDbContext db, CancellationToken token)
|
||||
where TEntity : class, AsbCloudDb.Model.ITelemetryData
|
||||
{
|
||||
if (isLoading)
|
||||
throw new Exception("Multiple cache loading detected.");
|
||||
|
||||
isLoading = true;
|
||||
Well[] wells = Array.Empty<Well>();
|
||||
wells = db.Set<Well>()
|
||||
|
||||
wells = await db.Set<Well>()
|
||||
.Include(well => well.Telemetry)
|
||||
.Include(well => well.Cluster)
|
||||
.Where(well => well.IdTelemetry != null)
|
||||
.ToArray();
|
||||
.ToArrayAsync(token);
|
||||
|
||||
foreach (Well well in wells)
|
||||
{
|
||||
@ -139,46 +164,61 @@ namespace AsbCloudInfrastructure.Services.SAUB
|
||||
var idTelemetry = well.IdTelemetry!.Value;
|
||||
var hoursOffset = well.Timezone.Hours;
|
||||
|
||||
IEnumerable<TDto> cacheItemData = GetCacheDataFromDb<TEntity>(db, idTelemetry, capacity, hoursOffset);
|
||||
var cacheItem = new CyclycArray<TDto>(capacity);
|
||||
cacheItem.AddRange(cacheItemData);
|
||||
var cacheItem = await GetOrDefaultCacheDataFromDbAsync<TEntity>(db, idTelemetry, capacity, hoursOffset, token);
|
||||
if(cacheItem is not null)
|
||||
{
|
||||
caches.TryAdd(idTelemetry, cacheItem);
|
||||
|
||||
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> for well: {well.Cluster?.Caption}/{well.Caption} loaded");
|
||||
}
|
||||
else
|
||||
{
|
||||
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> for well: {well.Cluster?.Caption}/{well.Caption} has no data");
|
||||
}
|
||||
}
|
||||
|
||||
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> load complete");
|
||||
isLoading = false;
|
||||
}
|
||||
|
||||
private static IAsbCloudDbContext MakeContext(IConfiguration configuration)
|
||||
private static async Task<TelemetryDataCacheItem?> GetOrDefaultCacheDataFromDbAsync<TEntity>(IAsbCloudDbContext db, int idTelemetry, int capacity, double hoursOffset, CancellationToken token)
|
||||
where TEntity : class, AsbCloudDb.Model.ITelemetryData
|
||||
{
|
||||
var connectionString = configuration.GetConnectionString("DefaultConnection");
|
||||
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
|
||||
.UseNpgsql(connectionString)
|
||||
.Options;
|
||||
var db = new AsbCloudDbContext(options);
|
||||
return db;
|
||||
}
|
||||
var query = db.Set<TEntity>()
|
||||
.Where(i => i.IdTelemetry == idTelemetry);
|
||||
|
||||
private static IEnumerable<TDto> GetCacheDataFromDb<TEntity>(IAsbCloudDbContext db, int idTelemetry, int capacity, double hoursOffset)
|
||||
where TEntity : class, ITelemetryData
|
||||
{
|
||||
var entities = db.Set<TEntity>()
|
||||
.Where(i => i.IdTelemetry == idTelemetry)
|
||||
var firstDbEntity = await query
|
||||
.OrderBy(i => i.DateTime)
|
||||
.FirstOrDefaultAsync(token);
|
||||
|
||||
if (firstDbEntity is null)
|
||||
return default;
|
||||
|
||||
var first = firstDbEntity.Adapt<TDto>();
|
||||
first.DateTime = firstDbEntity.DateTime.ToRemoteDateTime(hoursOffset);
|
||||
|
||||
var entities = await query
|
||||
.OrderByDescending(i => i.DateTime)
|
||||
.Take(capacity)
|
||||
.ToArray()
|
||||
.AsEnumerable()
|
||||
.Reverse();
|
||||
.ToArrayAsync(token);
|
||||
|
||||
var dtos = entities.Select(entity => {
|
||||
var dtos = entities
|
||||
.AsEnumerable()
|
||||
.Reverse()
|
||||
.Select(entity => {
|
||||
var dto = entity.Adapt<TDto>();
|
||||
dto.DateTime = entity.DateTime.ToRemoteDateTime(hoursOffset);
|
||||
return dto;
|
||||
});
|
||||
|
||||
return dtos;
|
||||
var cacheItem = new CyclycArray<TDto>(capacity);
|
||||
cacheItem.AddRange(dtos);
|
||||
|
||||
var item = new TelemetryDataCacheItem
|
||||
{
|
||||
FirstByDate = first,
|
||||
LastData = cacheItem,
|
||||
};
|
||||
return item;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user