using AsbCloudDb.Model; using System.Collections.Concurrent; using System.Collections.Generic; using System; using System.Linq; using Microsoft.EntityFrameworkCore; using Mapster; using System.Threading.Tasks; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using AsbCloudInfrastructure.Background; using System.Threading; using AsbCloudApp.Data; using AsbCloudApp.Requests; namespace AsbCloudInfrastructure.Services.SAUB { public class TelemetryDataCache where TDto : AsbCloudApp.Data.ITelemetryData { class TelemetryDataCacheItem { public TDto FirstByDate { get; init; } = default!; public CyclycArray LastData { get; init; } = null!; public double TimezoneHours { get; init; } = 5; } private IServiceProvider provider = null!; private const int activeWellCapacity = 12 * 60 * 60; private const int doneWellCapacity = 65 * 60; // key == idTelemetry private readonly ConcurrentDictionary caches; private bool isLoading = false; private TelemetryDataCache() { caches = new(); } private static TelemetryDataCache? instance; public static TelemetryDataCache GetInstance(IServiceProvider provider) where TEntity : class, AsbCloudDb.Model.ITelemetryData { if (instance is null) { instance = new TelemetryDataCache(); var worker = provider.GetRequiredService(); var workId = $"Telemetry cache loading from DB {typeof(TEntity).Name}"; var work = Work.CreateByDelegate(workId, async (workId, provider, onProgress, token) => { var db = provider.GetRequiredService(); await instance.InitializeCacheFromDBAsync(db, onProgress, token); }); work.Timeout = TimeSpan.FromMinutes(15); worker.WorkStore.RunOnceQueue.Enqueue(work); } instance.provider = provider; return instance; } /// /// Добавить новые элементы в кеш /// /// /// public void AddRange(int idTelemetry, IEnumerable range) { if (!range.Any()) return; var newItems = range .OrderBy(i => i.DateTime); foreach (var item in newItems) item.IdTelemetry = idTelemetry; TelemetryDataCacheItem cacheItem; if (isLoading) { if (caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? localCacheItem)) cacheItem = localCacheItem; else return; } else { cacheItem = caches.GetOrAdd(idTelemetry, _ => new TelemetryDataCacheItem() { FirstByDate = newItems.ElementAt(0), LastData = new CyclycArray(activeWellCapacity) }); } cacheItem.LastData.AddRange(newItems); } /// /// Получить данные из кеша.
/// Если dateBegin меньше минимального элемента в кеше, то вернется null. /// Даже если intervalSec частично перекрыт данными из кеша. ///
/// /// /// /// кол-во элементов до которых эти данные прореживаются /// public IEnumerable? GetOrDefault(int idTelemetry, DateTime dateBegin, double intervalSec = 600d, int approxPointsCount = 1024) { if(!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem)) return null; var cacheLastData = cacheItem.LastData; if (!cacheLastData.Any() || cacheLastData[0].DateTime > dateBegin) return null; var dateEnd = dateBegin.AddSeconds(intervalSec); var items = cacheLastData .Where(i => i.DateTime >= dateBegin && i.DateTime <= dateEnd); var ratio = items.Count() / approxPointsCount; if (ratio > 1) items = items .Where((_, index) => index % ratio == 0); return items; } public virtual TDto? GetLastOrDefault(int idTelemetry) { if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem)) return default; return cacheItem.LastData.LastOrDefault(); } public DatesRangeDto? GetOrDefaultDataDateRange(int idTelemetry) { if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem)) return null; var from = cacheItem.FirstByDate?.DateTime; if(!cacheItem.LastData.Any()) return null; var to = cacheItem.LastData[^1].DateTime; from = from ?? cacheItem.LastData[0].DateTime; return new DatesRangeDto { From = from.Value, To = to }; } public (TDto First, TDto Last)? GetOrDefaultFirstLast(int idTelemetry) { if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem)) return null; if (!cacheItem.LastData.Any()) return null; var last = cacheItem.LastData[^1]; var first = cacheItem.FirstByDate; return (first, last); } private async Task InitializeCacheFromDBAsync(IAsbCloudDbContext db, Action onProgress, CancellationToken token) where TEntity : class, AsbCloudDb.Model.ITelemetryData { if (isLoading) throw new Exception("Multiple cache loading detected."); isLoading = true; var defaultTimeout = db.Database.GetCommandTimeout(); db.Database.SetCommandTimeout(TimeSpan.FromMinutes(5)); Well[] wells = await db.Set() .Include(well => well.Telemetry) .Include(well => well.Cluster) .Where(well => well.IdTelemetry != null) .ToArrayAsync(token); var count = wells.Length; var i = 0d; foreach (Well well in wells) { var capacity = well.IdState == 1 ? activeWellCapacity : doneWellCapacity; var idTelemetry = well.IdTelemetry!.Value; var hoursOffset = well.Timezone.Hours; onProgress($"Loading for well: {well.Cluster?.Caption}/{well.Caption} (capacity:{capacity}) idTelemetry:{idTelemetry}", i++/count); var cacheItem = await GetOrDefaultCacheDataFromDbAsync(db, idTelemetry, capacity, hoursOffset, token); if(cacheItem is not null) caches.TryAdd(idTelemetry, cacheItem); } isLoading = false; db.Database.SetCommandTimeout(defaultTimeout); } private static async Task GetOrDefaultCacheDataFromDbAsync(IAsbCloudDbContext db, int idTelemetry, int capacity, double hoursOffset, CancellationToken token) where TEntity : class, AsbCloudDb.Model.ITelemetryData { var query = db.Set() .Where(i => i.IdTelemetry == idTelemetry); var firstDbEntity = await query .OrderBy(i => i.DateTime) .FirstOrDefaultAsync(token); if (firstDbEntity is null) return default; var first = firstDbEntity.Adapt(); first.DateTime = firstDbEntity.DateTime.ToRemoteDateTime(hoursOffset); var entities = await query .OrderByDescending(i => i.DateTime) .Take(capacity) .ToArrayAsync(token); var dtos = entities .AsEnumerable() .Reverse() .Select(entity => { var dto = entity.Adapt(); dto.DateTime = entity.DateTime.ToRemoteDateTime(hoursOffset); return dto; }); var cacheItem = new CyclycArray(capacity); cacheItem.AddRange(dtos); var item = new TelemetryDataCacheItem { FirstByDate = first, LastData = cacheItem, TimezoneHours = hoursOffset, }; return item; } public IEnumerable? GetOrDefault(int idTelemetry, TelemetryDataRequest request) { if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem)) return null; IEnumerable data = cacheItem.LastData; if (!data.Any()) return null; if (request.GeDate.HasValue) { var geDate = request.GeDate.Value.ToRemoteDateTime(cacheItem.TimezoneHours); if (data.First().DateTime > geDate) return null; data = data.Where(d => d.DateTime >= geDate); } else { if (request.Order == 0) return null; } if (request.LeDate.HasValue) { var leDate = request.LeDate.Value.ToRemoteDateTime(cacheItem.TimezoneHours); data = data.Where(d => d.DateTime >= request.LeDate); } if (request.Divider > 1) data = data.Where((d) => (((d.DateTime.DayOfYear * 24 + d.DateTime.Hour) * 60 + d.DateTime.Minute) * 60 + d.DateTime.Second) % request.Divider == 0); switch (request.Order) { case 1: // Поздние вперед data = data .OrderByDescending(d => d.DateTime) .Skip(request.Skip) .Take(request.Take) .OrderBy(d => d.DateTime); break; default: // Ранние вперед data = data .OrderBy(d => d.DateTime) .Skip(request.Skip) .Take(request.Take); break; } return data; } } }