using DD.Persistence.Database.Entity; using DD.Persistence.Models; using DD.Persistence.Models.Common; using DD.Persistence.Repositories; using DD.Persistence.Repository.Extensions; using Microsoft.EntityFrameworkCore; namespace DD.Persistence.Repository.Repositories; public class TimestampedValuesRepository : ITimestampedValuesRepository { private readonly DbContext db; private readonly IRelatedDataRepository relatedDataRepository; public TimestampedValuesRepository(DbContext db, IRelatedDataRepository relatedDataRepository) { this.db = db; this.relatedDataRepository = relatedDataRepository; } protected virtual IQueryable GetQueryReadOnly() => this.db.Set() .Include(e => e.ValuesIdentity); public async virtual Task AddRange(Guid discriminatorId, IEnumerable dtos, CancellationToken token) { var timestampedValuesEntities = new List(); foreach (var dto in dtos) { var keys = dto.Values.Keys.ToArray(); await CreateValuesIdentityIfNotExist(discriminatorId, keys, token); var timestampedValuesEntity = new TimestampedValues() { DiscriminatorId = discriminatorId, Timestamp = dto.Timestamp.ToUniversalTime(), Values = dto.Values.Values.ToArray() }; timestampedValuesEntities.Add(timestampedValuesEntity); } await db.Set().AddRangeAsync(timestampedValuesEntities, token); var result = await db.SaveChangesAsync(token); return result; } public async virtual Task> Get(Guid discriminatorId, DateTimeOffset? timestampBegin, IEnumerable? columnNames, int skip, int take, CancellationToken token) { var query = GetQueryReadOnly() .Where(entity => entity.DiscriminatorId == discriminatorId); // Фильтрация по дате if (timestampBegin.HasValue) { query = ApplyGeTimestamp(query, timestampBegin.Value); } query = query .OrderBy(item => item.Timestamp) .Skip(skip) .Take(take); var data = await Materialize(discriminatorId, query, token); // Фильтрация по запрашиваемым полям if (!columnNames.IsNullOrEmpty()) { data = ReduceSetColumnsByNames(data, columnNames!); } return data; } public async virtual Task> GetFirst(Guid discriminatorId, int takeCount, CancellationToken token) { var query = GetQueryReadOnly() .OrderBy(e => e.Timestamp) .Take(takeCount); var dtos = await Materialize(discriminatorId, query, token); return dtos; } public async virtual Task> GetLast(Guid discriminatorId, int takeCount, CancellationToken token) { var query = GetQueryReadOnly() .OrderByDescending(e => e.Timestamp) .Take(takeCount); var dtos = await Materialize(discriminatorId, query, token); return dtos; } public async virtual Task> GetResampledData( Guid discriminatorId, DateTimeOffset dateBegin, double intervalSec = 600d, int approxPointsCount = 1024, CancellationToken token = default) { var dtos = await GetGtDate(discriminatorId, dateBegin, token); var dateEnd = dateBegin.AddSeconds(intervalSec); dtos = dtos .Where(i => i.Timestamp <= dateEnd); var ratio = dtos.Count() / approxPointsCount; if (ratio > 1) dtos = dtos .Where((_, index) => index % ratio == 0); return dtos; } public async virtual Task> GetGtDate(Guid discriminatorId, DateTimeOffset timestampBegin, CancellationToken token) { var query = GetQueryReadOnly() .Where(e => e.Timestamp > timestampBegin); var dtos = await Materialize(discriminatorId, query, token); return dtos; } public async virtual Task GetDatesRange(Guid discriminatorId, CancellationToken token) { var query = GetQueryReadOnly() .GroupBy(entity => entity.DiscriminatorId) .Select(group => new { Min = group.Min(entity => entity.Timestamp), Max = group.Max(entity => entity.Timestamp), }); var item = await query.FirstOrDefaultAsync(token); if (item is null) { return null; } var dto = new DatesRangeDto { From = item.Min, To = item.Max, }; return dto; } public virtual Task Count(Guid discriminatorId, CancellationToken token) { var dbSet = db.Set(); var query = dbSet.Where(entity => entity.DiscriminatorId == discriminatorId); return query.CountAsync(token); } private async Task> Materialize(Guid discriminatorId, IQueryable query, CancellationToken token) { var valuesIdentities = await relatedDataRepository.Get(token); var valuesIdentity = valuesIdentities? .FirstOrDefault(e => e.DiscriminatorId == discriminatorId); if (valuesIdentity == null) return []; var entities = await query.ToArrayAsync(token); var dtos = entities.Select(entity => { var dto = new TimestampedValuesDto() { Timestamp = entity.Timestamp.ToUniversalTime() }; for (var i = 0; i < valuesIdentity.Identity.Count(); i++) { var key = valuesIdentity.Identity[i]; var value = entity.Values[i]; dto.Values.Add(key, value); } return dto; }); return dtos; } private IQueryable ApplyGeTimestamp(IQueryable query, DateTimeOffset timestampBegin) { var geTimestampUtc = timestampBegin.ToUniversalTime(); var result = query .Where(entity => entity.Timestamp >= geTimestampUtc); return result; } private IEnumerable ReduceSetColumnsByNames(IEnumerable dtos, IEnumerable columnNames) { var result = dtos.Select(dto => { var reducedValues = dto.Values .Where(v => columnNames.Contains(v.Key)) .ToDictionary(); dto.Values = reducedValues; return dto; }); return result; } private async Task CreateValuesIdentityIfNotExist(Guid discriminatorId, string[] keys, CancellationToken token) { var valuesIdentities = await relatedDataRepository.Get(token); var valuesIdentity = valuesIdentities? .FirstOrDefault(e => e.DiscriminatorId == discriminatorId); if (valuesIdentity is null) { valuesIdentity = new ValuesIdentityDto() { DiscriminatorId = discriminatorId, Identity = keys }; await relatedDataRepository.Add(valuesIdentity, token); return; } if (!valuesIdentity.Identity.SequenceEqual(keys)) { var expectedIdentity = string.Join(", ", valuesIdentity.Identity); var actualIdentity = string.Join(", ", keys); throw new InvalidOperationException($"Для системы {discriminatorId.ToString()} " + $"характерен набор данных: [{expectedIdentity}], однако был передан набор: [{actualIdentity}]"); } } }