using AsbCloudDb.Model; using System.Collections.Concurrent; using System.Collections.Generic; using System; using System.Linq; using Microsoft.EntityFrameworkCore; using Mapster; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using AsbCloudInfrastructure.Background; using System.Threading; using AsbCloudApp.Data; using AsbCloudApp.Requests; using AsbCloudApp.Repositories; namespace AsbCloudInfrastructure.Services.SAUB { public class TelemetryDataCache : ITelemetryDataCache where TDto : AsbCloudApp.Data.ITelemetryData { class TelemetryDataCacheItem { public TDto FirstByDate { get; init; } = default!; public CyclicArray LastData { get; init; } = null!; public double TimezoneHours { get; init; } = 5; } private const int activeWellCapacity = 12 * 60 * 60; private const int doneWellCapacity = 65 * 60; // key == idTelemetry private readonly ConcurrentDictionary caches; private bool isLoading = false; private TelemetryDataCache() { caches = new(); } private static TelemetryDataCache? instance; public static TelemetryDataCache GetInstance(IServiceProvider provider) where TEntity : class, AsbCloudDb.Model.ITelemetryData { if (instance is null) { instance = new TelemetryDataCache(); var worker = provider.GetRequiredService(); var workId = $"Telemetry cache loading from DB {typeof(TEntity).Name}"; var work = Work.CreateByDelegate(workId, async (workId, provider, onProgress, token) => { var db = provider.GetRequiredService(); await instance.InitializeCacheFromDBAsync(db, onProgress, token); }); work.Timeout = TimeSpan.FromMinutes(15); worker.Enqueue(work); } return instance; } /// /// Добавить новые элементы в кеш /// /// /// public void AddRange(int idTelemetry, IEnumerable range) { if (!range.Any()) return; range = range.OrderBy(x => x.DateTime); foreach (var item in range) item.IdTelemetry = idTelemetry; TelemetryDataCacheItem cacheItem; if (isLoading) { if (caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? localCacheItem)) cacheItem = localCacheItem; else return; } else { cacheItem = caches.GetOrAdd(idTelemetry, _ => new TelemetryDataCacheItem() { FirstByDate = range.ElementAt(0), LastData = new CyclicArray(activeWellCapacity) }); } cacheItem.LastData.AddRange(range); } /// /// Получить данные из кеша.
/// Если dateBegin меньше минимального элемента в кеше, то вернется null. /// Даже если intervalSec частично перекрыт данными из кеша. ///
/// /// /// /// кол-во элементов до которых эти данные прореживаются /// public IEnumerable? GetOrDefault(int idTelemetry, DateTime dateBegin, double intervalSec = 600d, int approxPointsCount = 1024) { if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem)) return null; var cacheLastData = cacheItem.LastData; if (!cacheLastData.Any() || cacheLastData[0].DateTime > dateBegin) return null; var dateEnd = dateBegin.AddSeconds(intervalSec); var items = cacheLastData .Where(i => i.DateTime >= dateBegin && i.DateTime <= dateEnd); var ratio = items.Count() / approxPointsCount; if (ratio > 1) items = items .Where((_, index) => index % ratio == 0); return items; } public IEnumerable GetStat() { var result = caches.Select(cacheItem => new TelemetryDataStatDto { IdTelemetry = cacheItem.Key, DateFirst = cacheItem.Value.FirstByDate.DateTime, DateLast = cacheItem.Value.LastData[^1].DateTime, TimezoneOffsetHours = cacheItem.Value.TimezoneHours, }); return result; } public virtual TDto? GetLastOrDefault(int idTelemetry) { if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem)) return default; return cacheItem.LastData.LastOrDefault(); } public DatesRangeDto? GetOrDefaultDataDateRange(int idTelemetry) { if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem)) return null; if (!cacheItem.LastData.Any()) return null; var from = DateTime.SpecifyKind(cacheItem.FirstByDate.DateTime, DateTimeKind.Unspecified); var to = DateTime.SpecifyKind(cacheItem.LastData[^1].DateTime, DateTimeKind.Unspecified); return new DatesRangeDto { From = new DateTimeOffset(from, TimeSpan.FromHours(cacheItem.TimezoneHours)), To = new DateTimeOffset(to, TimeSpan.FromHours(cacheItem.TimezoneHours)) }; } public DatesRangeDto? GetOrDefaultCachedDateRange(int idTelemetry) { if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem)) return null; if (cacheItem.LastData.Count < 2) return null; var to = cacheItem.LastData[^1].DateTime; var from = cacheItem.LastData[0].DateTime; return new DatesRangeDto { From = from, To = to }; } public (TDto First, TDto Last)? GetOrDefaultFirstLast(int idTelemetry) { if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem)) return null; if (!cacheItem.LastData.Any()) return null; var last = cacheItem.LastData[^1]; var first = cacheItem.FirstByDate; return (first, last); } private async Task InitializeCacheFromDBAsync(IAsbCloudDbContext db, Action onProgress, CancellationToken token) where TEntity : class, AsbCloudDb.Model.ITelemetryData { var defaultTimeout = db.Database.GetCommandTimeout(); db.Database.SetCommandTimeout(TimeSpan.FromMinutes(5)); if (isLoading) throw new Exception("Multiple cache loading detected."); try { isLoading = true; Well[] wells = await db.Set() .Include(well => well.Telemetry) .Include(well => well.Cluster) .Where(well => well.IdTelemetry != null) .ToArrayAsync(token); var count = wells.Length; var i = 0d; foreach (Well well in wells) { var capacity = well.IdState == 1 ? activeWellCapacity : doneWellCapacity; var idTelemetry = well.IdTelemetry!.Value; var hoursOffset = well.Timezone.Hours; onProgress($"Loading for well: {well.Cluster?.Caption}/{well.Caption} (capacity:{capacity}) idTelemetry:{idTelemetry}", i++ / count); var cacheItem = await GetOrDefaultCacheDataFromDbAsync(db, idTelemetry, capacity, hoursOffset, token); if (cacheItem is not null) caches.TryAdd(idTelemetry, cacheItem); } } finally { isLoading = false; db.Database.SetCommandTimeout(defaultTimeout); } } private static async Task GetOrDefaultCacheDataFromDbAsync(IAsbCloudDbContext db, int idTelemetry, int capacity, double hoursOffset, CancellationToken token) where TEntity : class, AsbCloudDb.Model.ITelemetryData { var query = db.Set() .Where(i => i.IdTelemetry == idTelemetry); var firstDbEntity = await query .OrderBy(i => i.DateTime) .FirstOrDefaultAsync(token); if (firstDbEntity is null) return default; var first = firstDbEntity.Adapt(); first.DateTime = firstDbEntity.DateTime.ToRemoteDateTime(hoursOffset); var entities = await query .OrderByDescending(i => i.DateTime) .Take(capacity) .ToArrayAsync(token); var dtos = entities .AsEnumerable() .Reverse() .Select(entity => { var dto = entity.Adapt(); dto.DateTime = entity.DateTime.ToRemoteDateTime(hoursOffset); return dto; }); var cacheItem = new CyclicArray(capacity); cacheItem.AddRange(dtos); var item = new TelemetryDataCacheItem { FirstByDate = first, LastData = cacheItem, TimezoneHours = hoursOffset, }; return item; } public IEnumerable? GetOrDefault(int idTelemetry, TelemetryDataRequest request) { if (!caches.TryGetValue(idTelemetry, out TelemetryDataCacheItem? cacheItem)) return null; IEnumerable data = cacheItem.LastData; if (!data.Any()) return null; if (request.GeDate.HasValue) { var geDate = request.GeDate.Value.ToRemoteDateTime(cacheItem.TimezoneHours); if (data.First().DateTime > geDate) return null; data = data.Where(d => d.DateTime >= geDate); } else { if (request.Order == 0) return null; } if (request.LeDate.HasValue) { var leDate = request.LeDate.Value.ToRemoteDateTime(cacheItem.TimezoneHours); data = data.Where(d => d.DateTime <= request.LeDate); } if (request.Divider > 1) data = data.Where((d) => (((d.DateTime.DayOfYear * 24 + d.DateTime.Hour) * 60 + d.DateTime.Minute) * 60 + d.DateTime.Second) % request.Divider == 0); switch (request.Order) { case 1: // Поздние вперед data = data .OrderByDescending(d => d.DateTime) .Skip(request.Skip) .Take(request.Take) .OrderBy(d => d.DateTime); break; default: // Ранние вперед data = data .OrderBy(d => d.DateTime) .Skip(request.Skip) .Take(request.Take); break; } return data; } public IEnumerable GetIds(TelemetryDataRequest request) { var data = caches.Where(i => i.Value.LastData.Count > 0); if (request.GeDate.HasValue) { data = data .Where(item => { var lastItem = item.Value.LastData.Last(); var geDate = request.GeDate.Value.ToOffset(TimeSpan.FromHours(item.Value.TimezoneHours)); return lastItem.DateTime >= geDate; }); } if (request.LeDate.HasValue) { data = data .Where(item => { var firstItem = item.Value.LastData.First(); var leDate = request.LeDate.Value.ToOffset(TimeSpan.FromHours(item.Value.TimezoneHours)); return firstItem.DateTime <= leDate; }); } var telemetryIds = data.Select(item => item.Key); return telemetryIds; } } }