using AsbCloudDb.Model; using System.Collections.Concurrent; using System.Collections.Generic; using System; using System.Linq; using Microsoft.EntityFrameworkCore; using Mapster; using System.Threading.Tasks; using Microsoft.Extensions.Configuration; #nullable enable namespace AsbCloudInfrastructure.Services.SAUB { public class TelemetryDataCache<TDto> where TDto : AsbCloudApp.Data.ITelemetryData { private const int activeWellCapacity = 12 * 60 * 60; private const int doneWellCapacity = 65 * 60; private readonly ConcurrentDictionary<int, CyclycArray<TDto>> caches; private bool isLoading = false; private TelemetryDataCache() { caches = new(); } private static TelemetryDataCache<TDto>? instance; //TODO: Move initialize fromDB to bacground service task public static TelemetryDataCache<TDto> GetInstance<TEntity>(IConfiguration configuration) where TEntity : class, ITelemetryData { if (instance is null) { instance = new TelemetryDataCache<TDto>(); _ = Task.Run(() => { using var db = MakeContext(configuration); instance.InitializeCacheFromDB<TEntity>(db); db.Dispose(); }); } return instance; } public static TelemetryDataCache<TDto> GetInstance<TEntity>(IAsbCloudDbContext db, out Task initializationTask) where TEntity : class, ITelemetryData { if (instance is null) { instance = new TelemetryDataCache<TDto>(); initializationTask = Task.Run(() => { instance.InitializeCacheFromDB<TEntity>(db); }); } else initializationTask = Task.CompletedTask; return instance; } /// <summary> /// Добавить новые элементы в кеш /// </summary> /// <param name="idTelemetry"></param> /// <param name="range"></param> public void AddRange(int idTelemetry, IEnumerable<TDto> range) { CyclycArray<TDto> cacheItem; if (isLoading) { if (caches.TryGetValue(idTelemetry, out CyclycArray<TDto>? localCacheItem)) cacheItem = localCacheItem; else return; } else { cacheItem = caches.GetOrAdd(idTelemetry, _ => new CyclycArray<TDto>(activeWellCapacity)); } var newItems = range .OrderBy(i => i.DateTime); foreach (var item in newItems) item.IdTelemetry = idTelemetry; cacheItem.AddRange(newItems); } /// <summary> /// Получить данные из кеша. <br/> /// Если dateBegin меньше минимального элемента в кеше, то вернется null. /// Даже если intervalSec частично перекрыт данными из кеша. /// </summary> /// <param name="idTelemetry"></param> /// <param name="dateBegin"></param> /// <param name="intervalSec"></param> /// <param name="approxPointsCount">кол-во элементов до которых эти данные прореживаются</param> /// <returns></returns> public IEnumerable<TDto>? GetOrDefault(int idTelemetry, DateTime dateBegin, double intervalSec = 600d, int approxPointsCount = 1024) { if(!caches.TryGetValue(idTelemetry, out CyclycArray<TDto>? cacheItem)) return null; if (cacheItem is null || !cacheItem.Any() || cacheItem[0].DateTime > dateBegin) return null; var dateEnd = dateBegin.AddSeconds(intervalSec); var items = cacheItem .Where(i => i.DateTime >= dateBegin && i.DateTime <= dateEnd); var ratio = items.Count() / approxPointsCount; if (ratio > 1) items = items .Where((_, index) => index % ratio == 0); return items; } private void InitializeCacheFromDB<TEntity>(IAsbCloudDbContext db) where TEntity : class, ITelemetryData { if (isLoading) throw new Exception("Multiple cache loading detected."); isLoading = true; Well[] wells = Array.Empty<Well>(); wells = db.Set<Well>() .Include(well => well.Telemetry) .Include(well => well.Cluster) .Where(well => well.IdTelemetry != null) .ToArray(); foreach (Well well in wells) { var capacity = well.IdState == 1 ? activeWellCapacity : doneWellCapacity; var idTelemetry = well.IdTelemetry!.Value; var hoursOffset = well.Timezone.Hours; IEnumerable<TDto> cacheItemData = GetCacheDataFromDb<TEntity>(db, idTelemetry, capacity, hoursOffset); var cacheItem = new CyclycArray<TDto>(capacity); cacheItem.AddRange(cacheItemData); caches.TryAdd(idTelemetry, cacheItem); System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> for well: {well.Cluster?.Caption}/{well.Caption} loaded"); } System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> load complete"); isLoading = false; } private static IAsbCloudDbContext MakeContext(IConfiguration configuration) { var connectionString = configuration.GetConnectionString("DefaultConnection"); var options = new DbContextOptionsBuilder<AsbCloudDbContext>() .UseNpgsql(connectionString) .Options; var db = new AsbCloudDbContext(options); return db; } private static IEnumerable<TDto> GetCacheDataFromDb<TEntity>(IAsbCloudDbContext db, int idTelemetry, int capacity, double hoursOffset) where TEntity : class, ITelemetryData { var entities = db.Set<TEntity>() .Where(i => i.IdTelemetry == idTelemetry) .OrderByDescending(i => i.DateTime) .Take(capacity) .ToArray() .AsEnumerable() .Reverse(); var dtos = entities.Select(entity => { var dto = entity.Adapt<TDto>(); dto.DateTime = entity.DateTime.ToRemoteDateTime(hoursOffset); return dto; }); return dtos; } } } #nullable disable