using AsbCloudApp.Data; using AsbCloudApp.Data.GTR; using AsbCloudApp.Exceptions; using AsbCloudApp.Repositories; using AsbCloudApp.Requests; using AsbCloudApp.Services; using AsbCloudDb; using AsbCloudDb.Model; using AsbCloudDb.Model.GTR; using Mapster; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Caching.Memory; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace AsbCloudInfrastructure.Repository; public class GtrWitsRepository : IGtrRepository { private static IDictionary<(int IdRecord, int IdItem), string> WitsParameters = new Dictionary<(int, int), string> { { (1, 8), nameof(GtrWitsDto.DEPTBITM) }, { (1, 10), nameof(GtrWitsDto.DEPTMEAS) }, { (1, 14), nameof(GtrWitsDto.HKLA) }, { (1, 12), nameof(GtrWitsDto.BLKPOS) }, { (1, 16), nameof(GtrWitsDto.WOBA) }, { (1, 18), nameof(GtrWitsDto.TORQA) }, { (1, 21), nameof(GtrWitsDto.SPPA) }, { (2, 15), nameof(GtrWitsDto.RPMA) }, { (1, 13), nameof(GtrWitsDto.ROPA) }, { (3, 16), nameof(GtrWitsDto.RSUX) }, { (3, 17), nameof(GtrWitsDto.RSDX) }, { (1, 30), nameof(GtrWitsDto.MFIA) }, { (1, 29), nameof(GtrWitsDto.MFOA)}, { (1, 34), nameof(GtrWitsDto.MTIA) }, { (1, 33), nameof(GtrWitsDto.MTOA) }, { (1, 23), nameof(GtrWitsDto.SPM1) }, { (1, 24), nameof(GtrWitsDto.SPM2) }, { (1, 25), nameof(GtrWitsDto.SPM3) }, { (1, 26), nameof(GtrWitsDto.TVOLACT) }, { (11, 29), nameof(GtrWitsDto.TTVOL1) }, { (11, 30), nameof(GtrWitsDto.TTVOL2) }, { (11, 15), nameof(GtrWitsDto.TVOL01) }, { (11, 16), nameof(GtrWitsDto.TVOL02) }, { (11, 17), nameof(GtrWitsDto.TVOL03) }, { (11, 18), nameof(GtrWitsDto.TVOL04) }, { (11, 19), nameof(GtrWitsDto.TVOL05) }, { (11, 20), nameof(GtrWitsDto.TVOL06) }, { (11, 21), nameof(GtrWitsDto.TVOL07) }, { (11, 22), nameof(GtrWitsDto.TVOL08) }, { (11, 23), nameof(GtrWitsDto.TVOL09) }, { (11, 24), nameof(GtrWitsDto.TVOL10) }, { (11, 25), nameof(GtrWitsDto.TVOL11) }, { (11, 26), nameof(GtrWitsDto.TVOL12) }, { (11, 27), nameof(GtrWitsDto.TVOL13) }, { (11, 28), nameof(GtrWitsDto.TVOL14) }, { (1, 31), nameof(GtrWitsDto.MDOA) }, { (1, 32), nameof(GtrWitsDto.MDIA) }, { (12, 12), nameof(GtrWitsDto.METHANE) }, { (12, 13), nameof(GtrWitsDto.ETHANE) }, { (12, 14), nameof(GtrWitsDto.PROPANE) }, { (12, 15), nameof(GtrWitsDto.IBUTANE) }, { (12, 16), nameof(GtrWitsDto.NBUTANE) }, { (1, 40), nameof(GtrWitsDto.GASA) }, }; private readonly IAsbCloudDbContext db; private readonly ITelemetryService telemetryService; private static ConcurrentDictionary> cache = new(); public GtrWitsRepository( IAsbCloudDbContext db, ITelemetryService telemetryService) { this.db = db; this.telemetryService = telemetryService; } public async Task> GetAsync(int idWell, GtrRequest request, CancellationToken token) => await GetAsync(idWell, request, token); public async Task GetRangeAsync(int idWell, DateTimeOffset? geDate, DateTimeOffset? leDate, CancellationToken token) { var telemetry = telemetryService.GetOrDefaultTelemetryByIdWell(idWell); if (telemetry is null) return null; var rangeQuery = db .Set() .Where(e => e.IdTelemetry == telemetry.Id); if (geDate is not null) rangeQuery = rangeQuery.Where(e => e.DateTime >= geDate); if (leDate is not null) rangeQuery = rangeQuery.Where(e => e.DateTime <= leDate); var groupedQuery = rangeQuery.GroupBy(e => e.IdTelemetry) .Select(group => new { Min = group.Min(e => e.DateTime), Max = group.Max(e => e.DateTime) }); db.Database.SetCommandTimeout(5*60); var range = await groupedQuery.FirstOrDefaultAsync(token); if (range is null) return null; var result = new DatesRangeDto { From = range.Min.ToOffset(telemetry.TimeZone!.Offset), To = range.Max.ToOffset(telemetry.TimeZone!.Offset), }; return result; } private async Task> GetAsync(int idWell, GtrRequest request, CancellationToken token) where TEntity : WitsItemBase where TType : notnull { var telemetry = telemetryService.GetOrDefaultTelemetryByIdWell(idWell); if (telemetry is null) return Enumerable.Empty(); if (telemetry.TimeZone is null) throw new ArgumentInvalidException(nameof(idWell), $"Telemetry id: {telemetry.Id} can't find timezone"); var query = BuildQuery(telemetry.Id, request); var idsRecord = WitsParameters.Select(p => p.Key.IdRecord); var entities = await query .Where(e => idsRecord.Contains(e.IdRecord)) .OrderBy(e => e.DateTime) .AsNoTracking() .ToArrayAsync(token); if (!entities.Any()) return Enumerable.Empty(); var interval = TimeSpan.FromSeconds(10); var timezoneOffset = TimeSpan.FromHours(telemetry.TimeZone.Hours); var dtos = entities .GroupBy(e => e.DateTime.Ticks / interval.Ticks) .Select(groupByInterval => { var items = groupByInterval.Select(e => e); var values = items.GroupBy(e => (e.IdRecord, e.IdItem)) .Where(group => WitsParameters.ContainsKey(group.Key)) .ToDictionary(group => WitsParameters[group.Key], g => (object)g.Last().Value); var dto = values.Adapt(); dto.DateTime = items.Last().DateTime.ToOffset(timezoneOffset); return dto; }); return dtos; } private IQueryable BuildQuery(int idTelemetry, GtrRequest request) where TEntity : WitsItemBase where TType : notnull { var query = db.Set() .Where(e => e.IdTelemetry == idTelemetry); if (request.Begin.HasValue) { var dateBegin = request.Begin.Value.ToUniversalTime(); var dateEnd = dateBegin.AddSeconds(request.IntervalSec); query = query .Where(e => e.DateTime >= dateBegin) .Where(e => e.DateTime <= dateEnd); } else { var lastDate = query .OrderBy(e => e.DateTime) .LastOrDefault() ?.DateTime ?? DateTimeOffset.UtcNow; var dateBegin = lastDate.AddSeconds(-request.IntervalSec); var dateEnd = lastDate; query = query .Where(e => e.DateTime >= dateBegin) .Where(e => e.DateTime <= dateEnd); } return query; } [Obsolete] public async Task> GetAsync(int idWell, DateTime? dateBegin, double intervalSec = 600, int approxPointsCount = 1024, CancellationToken token = default) { var telemetry = telemetryService.GetOrDefaultTelemetryByIdWell(idWell); if (telemetry is null) return Enumerable.Empty(); var timezone = telemetryService.GetTimezone(telemetry.Id); DateTimeOffset? dateBeginUtc = dateBegin?.ToUtcDateTimeOffset(timezone.Hours); var dateEnd = dateBeginUtc?.AddSeconds(intervalSec); var witsRequest = new WitsRequest() { IdTelemetry = telemetry.Id, DateBeginUtc = dateBeginUtc, DateEnd = dateEnd, ApproxPointsCount = approxPointsCount, TimezoneHours = timezone.Hours }; var recordAllInt = await GetItemsOrDefaultAsync(witsRequest, token); var recordAllFloat = await GetItemsOrDefaultAsync(witsRequest, token); var recordAllString = await GetItemsOrDefaultAsync(witsRequest, token); var dtos = (recordAllFloat.Union(recordAllInt)).Union(recordAllString) .GroupBy(g => new { g.IdRecord, g.Date }) .Select(g => new WitsRecordDto { Id = g.Key.IdRecord, Date = g.Key.Date, Items = g.Select(r => new { Key = r.IdItem, r.Value }).ToDictionary(x => x.Key, x => x.Value) }); return dtos; } public IEnumerable GetLastDataByRecordId(int idWell, int idRecord) { var result = GetLastData(idWell) .Where(item => item.IdRecord == idRecord); return result; } public IEnumerable GetLastData(int idWell) { var telemetry = telemetryService.GetOrDefaultTelemetryByIdWell(idWell); if (telemetry is null) return Enumerable.Empty(); var lastData = cache.GetValueOrDefault(telemetry.Id); return lastData?.Values ?? Enumerable.Empty(); } private async Task> GetItemsOrDefaultAsync( WitsRequest request, CancellationToken token) where TEntity : WitsItemBase where TValue : notnull { var query = BuildQuery(request); var fullDataCount = await query.CountAsync(token); if (fullDataCount == 0) return Enumerable.Empty(); if (request.ApproxPointsCount is not null && fullDataCount > 1.75 * request.ApproxPointsCount) { var m = (int)Math.Round(1d * fullDataCount / request.ApproxPointsCount!.Value); if (m > 1) query = query.Where((d) => (((d.DateTime.DayOfYear * 24 + d.DateTime.Hour) * 60 + d.DateTime.Minute) * 60 + d.DateTime.Second) % m == 0); } var entities = await query .OrderBy(d => d.DateTime) .AsNoTracking() .ToListAsync(token) .ConfigureAwait(false); var items = entities.Select(e => new WitsItemRecordDto { IdRecord = e.IdRecord, Date = e.DateTime.ToRemoteDateTime(request.TimezoneHours), IdItem = e.IdItem, Value = new JsonValue(e.Value) }); return items; } private IQueryable BuildQuery(WitsRequest request) where TEntity : WitsItemBase where TValue : notnull { var query = db.Set().Where(i => i.IdTelemetry == request.IdTelemetry); if (request.IdRecord is not null) query = query .Where(d => d.IdRecord == request.IdRecord); if (request.DateBeginUtc is not null) query = query .Where(d => d.DateTime >= request.DateBeginUtc); if (request.DateEnd is not null) query = query .Where(d => d.DateTime <= request.DateEnd); return query; } public async Task SaveDataAsync(int idTelemetry, IEnumerable dtos, CancellationToken token) { var timezoneHours = telemetryService.GetTimezone(idTelemetry).Hours; var cacheTelemetryItems = cache.GetValueOrDefault(idTelemetry); var strings = new List(4); var floats = new List(4); var ints = new List(4); foreach (var record in dtos) { var dateTime = record.Date.ToUtcDateTimeOffset(timezoneHours); foreach (var item in record.Items) { if (cacheTelemetryItems?.TryGetValue((record.Id, item.Key), out var cacheItem) == true) if (Math.Abs((dateTime - cacheItem.Date).TotalSeconds) < 1) continue; if (item.Value.Value is string valueString) { var entity = MakeEntity(record.Id, item.Key, idTelemetry, dateTime, valueString); strings.Add(entity); } if (item.Value.Value is float valueFloat) { var entity = MakeEntity(record.Id, item.Key, idTelemetry, dateTime, valueFloat); floats.Add(entity); } if (item.Value.Value is int valueInt) { var entity = MakeEntity(record.Id, item.Key, idTelemetry, dateTime, valueInt); ints.Add(entity); } } } try { if (strings.Any()) await db.Database.ExecInsertOrIgnoreAsync(db.Set(), strings, token); if (floats.Any()) await db.Database.ExecInsertOrIgnoreAsync(db.Set(), floats, token); if (ints.Any()) await db.Database.ExecInsertOrIgnoreAsync(db.Set(), ints, token); } catch (Exception ex) { Trace.TraceError("Exception while saving GTR Wits data", ex); } cache.AddOrUpdate(idTelemetry, (_) => MakeNewCache(dtos), (_, oldItemsDictionary) => { foreach (var record in dtos) foreach (var item in record.Items) { oldItemsDictionary.AddOrUpdate( (record.Id, item.Key), (_) => new WitsItemRecordDto { IdRecord = record.Id, IdItem = item.Key, Date = record.Date, Value = item.Value }, (_, _) => new WitsItemRecordDto { IdRecord = record.Id, IdItem = item.Key, Date = record.Date, Value = item.Value }); } return oldItemsDictionary; }); } private static ConcurrentDictionary<(int, int), WitsItemRecordDto> MakeNewCache(IEnumerable dtos) { var items = dtos.SelectMany(record => record.Items.Select( item => new WitsItemRecordDto { IdItem = item.Key, IdRecord = record.Id, Date = record.Date, Value = item.Value, })); var groups = items .GroupBy(item => (item.IdRecord, item.IdItem)); var pairs = groups.Select(group => new KeyValuePair<(int, int), WitsItemRecordDto>( group.Key, group.OrderByDescending(item => item.Date).First())); return new ConcurrentDictionary<(int, int), WitsItemRecordDto>(pairs); } private static TEntity MakeEntity(int idRecord, int idItem, int idTelemetry, DateTimeOffset dateTime, TValue value) where TEntity : WitsItemBase, new() where TValue : notnull => new TEntity() { IdRecord = idRecord, IdItem = idItem, IdTelemetry = idTelemetry, DateTime = dateTime, Value = value, }; private static TEntity MakeEntity(WitsItemRecordDto dto, int idTelemetry, DateTimeOffset dateTime) where TEntity : WitsItemBase, new() where TValue : notnull => new TEntity() { IdRecord = dto.IdRecord, IdItem = dto.IdItem, IdTelemetry = idTelemetry, DateTime = dateTime, Value = (TValue)dto.Value.Value, }; private IQueryable BuildQuery(TelemetryPartDeleteRequest request) where TEntity : WitsItemBase where TValue : notnull { var query = db.Set().Where(i => i.IdTelemetry == request.IdTelemetry); if (request.LeDate is not null) { var leDate = request.LeDate.Value.ToUniversalTime(); query = query.Where(o => o.DateTime <= leDate); } if (request.GeDate is not null) { var geDate = request.GeDate.Value.ToUniversalTime(); query = query.Where(o => o.DateTime >= geDate); } return query; } public async Task DeleteAsync(TelemetryPartDeleteRequest request, CancellationToken token) { var result = 0; result += await DeleteAsync(request, token); result += await DeleteAsync(request, token); result += await DeleteAsync(request, token); return result; } private async Task DeleteAsync(TelemetryPartDeleteRequest request, CancellationToken token) where TEntity : WitsItemBase where TType : notnull { var query = BuildQuery(request); db.Set().RemoveRange(query); return await db.SaveChangesAsync(token); } private class WitsRequest { public int IdTelemetry { get; set; } public DateTimeOffset? DateBeginUtc { get; set; } public DateTimeOffset? DateEnd { get; set; } public int? ApproxPointsCount { get; set; } public double TimezoneHours { get; set; } public int? IdRecord { get; set; } } }