using AsbCloudApp.Data.GTR; using AsbCloudApp.Repositories; using AsbCloudApp.Services; using AsbCloudDb; using AsbCloudDb.Model; using AsbCloudDb.Model.GTR; using Microsoft.EntityFrameworkCore; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace AsbCloudInfrastructure.Repository { public class GtrWitsRepository : IGtrRepository { private readonly IAsbCloudDbContext db; private readonly ITelemetryService telemetryService; private static ConcurrentDictionary> cache = new(); public GtrWitsRepository( IAsbCloudDbContext db, ITelemetryService telemetryService) { this.db = db; this.telemetryService = telemetryService; } public async Task> GetAsync(int idWell, DateTime? dateBegin, double intervalSec = 600, int approxPointsCount = 1024, CancellationToken token = default) { var telemetry = telemetryService.GetOrDefaultTelemetryByIdWell(idWell); if (telemetry is null) return Enumerable.Empty(); var timezone = telemetryService.GetTimezone(telemetry.Id); DateTimeOffset? dateBeginUtc = dateBegin?.ToUtcDateTimeOffset(timezone.Hours); var dateEnd = dateBeginUtc?.AddSeconds(intervalSec); var witsRequest = new WitsRequest() { IdTelemetry = telemetry.Id, DateBeginUtc = dateBeginUtc, DateEnd = dateEnd, ApproxPointsCount = approxPointsCount, TimezoneHours = timezone.Hours }; var recordAllInt = await GetItemsOrDefaultAsync(witsRequest, token); var recordAllFloat = await GetItemsOrDefaultAsync(witsRequest, token); var recordAllString = await GetItemsOrDefaultAsync(witsRequest, token); var dtos = (recordAllFloat.Union(recordAllInt)).Union(recordAllString) .GroupBy(g => new { g.IdRecord, g.Date }) .Select(g => new WitsRecordDto { Id = g.Key.IdRecord, Date = g.Key.Date, Items = g.Select(r => new { Key = r.IdItem, r.Value }).ToDictionary(x => x.Key, x => x.Value) }); return dtos; } public IEnumerable GetLastDataByRecordId(int idWell, int idRecord) { var result = GetLastData(idWell) .Where(item => item.IdRecord == idRecord); return result; } public IEnumerable GetLastData(int idWell) { var telemetry = telemetryService.GetOrDefaultTelemetryByIdWell(idWell); if (telemetry is null) return Enumerable.Empty(); var lastData = cache.GetValueOrDefault(telemetry.Id); return lastData?.Values ?? Enumerable.Empty(); } private async Task> GetItemsOrDefaultAsync( WitsRequest request, CancellationToken token) where TEntity : WitsItemBase where TValue : notnull { var query = BuildQuery(request); var fullDataCount = await query.CountAsync(token); if (fullDataCount == 0) return Enumerable.Empty(); if (request.ApproxPointsCount is not null && fullDataCount > 1.75 * request.ApproxPointsCount) { var m = (int)Math.Round(1d * fullDataCount / request.ApproxPointsCount!.Value); if (m > 1) query = query.Where((d) => (((d.DateTime.DayOfYear * 24 + d.DateTime.Hour) * 60 + d.DateTime.Minute) * 60 + d.DateTime.Second) % m == 0); } var entities = await query .OrderBy(d => d.DateTime) .AsNoTracking() .ToListAsync(token) .ConfigureAwait(false); var items = entities.Select(e => new WitsItemRecordDto { IdRecord = e.IdRecord, Date = e.DateTime.ToRemoteDateTime(request.TimezoneHours), IdItem = e.IdItem, Value = new JsonValue(e.Value) }); return items; } private IQueryable BuildQuery(WitsRequest request) where TEntity : WitsItemBase where TValue : notnull { var query = db.Set().Where(i => i.IdTelemetry == request.IdTelemetry); if (request.IdRecord is not null) query = query .Where(d => d.IdRecord == request.IdRecord); if (request.DateBeginUtc is not null) query = query .Where(d => d.DateTime >= request.DateBeginUtc); if (request.DateEnd is not null) query = query .Where(d => d.DateTime <= request.DateEnd); return query; } public async Task SaveDataAsync(int idTelemetry, IEnumerable dtos, CancellationToken token) { var timezoneHours = telemetryService.GetTimezone(idTelemetry).Hours; var cacheTelemetryItems = cache.GetValueOrDefault(idTelemetry); var strings = new List(4); var floats = new List(4); var ints = new List(4); foreach (var record in dtos) { var dateTime = record.Date.ToUtcDateTimeOffset(timezoneHours); foreach (var item in record.Items) { if (cacheTelemetryItems?.TryGetValue((record.Id, item.Key), out var cacheItem) == true) if (Math.Abs((dateTime - cacheItem.Date).TotalSeconds) < 1) continue; if (item.Value.Value is string valueString) { var entity = MakeEntity(record.Id, item.Key, idTelemetry, dateTime, valueString); strings.Add(entity); } if (item.Value.Value is float valueFloat) { var entity = MakeEntity(record.Id, item.Key, idTelemetry, dateTime, valueFloat); floats.Add(entity); } if (item.Value.Value is int valueInt) { var entity = MakeEntity(record.Id, item.Key, idTelemetry, dateTime, valueInt); ints.Add(entity); } } } try { if (strings.Any()) await db.Database.ExecInsertOrIgnoreAsync(db.Set(), strings, token); if (floats.Any()) await db.Database.ExecInsertOrIgnoreAsync(db.Set(), floats, token); if (ints.Any()) await db.Database.ExecInsertOrIgnoreAsync(db.Set(), ints, token); } catch(Exception ex) { Trace.TraceError("Exception while saving GTR Wits data", ex); } cache.AddOrUpdate(idTelemetry, (_) => MakeNewCache(dtos), (_, oldItemsDictionary) => { foreach (var record in dtos) foreach (var item in record.Items) { oldItemsDictionary.AddOrUpdate( (record.Id, item.Key), (_) => new WitsItemRecordDto { IdRecord = record.Id, IdItem = item.Key, Date = record.Date, Value = item.Value }, (_, _) => new WitsItemRecordDto { IdRecord = record.Id, IdItem = item.Key, Date = record.Date, Value = item.Value }); } return oldItemsDictionary; }); } private static ConcurrentDictionary<(int, int), WitsItemRecordDto> MakeNewCache(IEnumerable dtos) { var items = dtos.SelectMany(record => record.Items.Select( item => new WitsItemRecordDto { IdItem = item.Key, IdRecord = record.Id, Date = record.Date, Value = item.Value, })); var groups = items .GroupBy(item => (item.IdRecord, item.IdItem)); var pairs = groups.Select(group => new KeyValuePair<(int, int), WitsItemRecordDto>( group.Key, group.OrderByDescending(item => item.Date).First())); return new ConcurrentDictionary<(int, int), WitsItemRecordDto>(pairs); } private static TEntity MakeEntity(int idRecord, int idItem, int idTelemetry, DateTimeOffset dateTime, TValue value) where TEntity : WitsItemBase, new() where TValue : notnull => new TEntity() { IdRecord = idRecord, IdItem = idItem, IdTelemetry = idTelemetry, DateTime = dateTime, Value = value, }; private static TEntity MakeEntity(WitsItemRecordDto dto, int idTelemetry, DateTimeOffset dateTime) where TEntity : WitsItemBase, new() where TValue : notnull => new TEntity() { IdRecord = dto.IdRecord, IdItem = dto.IdItem, IdTelemetry = idTelemetry, DateTime = dateTime, Value = (TValue)dto.Value.Value, }; private class WitsRequest { public int IdTelemetry { get; set; } public DateTimeOffset? DateBeginUtc { get; set; } public DateTimeOffset? DateEnd { get; set; } public int? ApproxPointsCount { get; set; } public double TimezoneHours { get; set; } public int? IdRecord { get; set; } } } }