using DD.Persistence.Database.Entity; using DD.Persistence.Database.Postgres.Helpers; using DD.Persistence.Filter.Models.Abstractions; using DD.Persistence.Models; using DD.Persistence.Models.Common; using DD.Persistence.Repositories; using Microsoft.EntityFrameworkCore; namespace DD.Persistence.Database.Postgres.Repositories; public class TimestampedValuesRepository : ITimestampedValuesRepository { private readonly DbContext db; private readonly ISchemePropertyRepository schemePropertyRepository; public TimestampedValuesRepository(DbContext db, ISchemePropertyRepository schemePropertyRepository) { this.db = db; this.schemePropertyRepository = schemePropertyRepository; } protected IQueryable GetQueryReadOnly() => db.Set(); public async Task AddRange(Guid discriminatorId, IEnumerable dtos, CancellationToken token) { var timestampedValuesEntities = dtos.Select(dto => new TimestampedValues() { DiscriminatorId = discriminatorId, Timestamp = dto.Timestamp.ToUniversalTime(), Values = dto.Values.Values.ToArray() }); await db.AddRangeAsync(timestampedValuesEntities, token); var result = await db.SaveChangesAsync(token); return result; } public async Task>> Get(IEnumerable discriminatorIds, DateTimeOffset? geTimestamp, TNode? filterTree, IEnumerable? columnNames, int skip, int take, CancellationToken token) { var resultQuery = Array.Empty().AsQueryable(); foreach (var discriminatorId in discriminatorIds) { var scheme = await schemePropertyRepository.Get(discriminatorId, token); if (scheme == null) throw new NotSupportedException($"Для переданного дискриминатора {discriminatorId} не была обнаружена схема данных"); var geTimestampUtc = geTimestamp!.Value.ToUniversalTime(); var query = GetQueryReadOnly() .Where(e => e.DiscriminatorId == discriminatorId) .Where(entity => entity.Timestamp >= geTimestampUtc); if (filterTree != null) query = query.ApplyFilter(scheme, filterTree); resultQuery = resultQuery.Any() ? resultQuery.Union(query) : query; } var groupedQuery = resultQuery! .GroupBy(e => e.DiscriminatorId) .Select(g => KeyValuePair.Create( g.Key, g.OrderBy(i => i.Timestamp).Skip(skip).Take(take)) ); var entities = await groupedQuery.ToArrayAsync(token); var result = entities.ToDictionary(k => k.Key, v => v.Value.Select(e => ( e.Timestamp, e.Values ))); return result; } public async Task> GetFirst(Guid discriminatorId, int takeCount, CancellationToken token) { var query = GetQueryReadOnly() .OrderBy(e => e.Timestamp) .Take(takeCount); var entities = await query.ToArrayAsync(token); var result = entities.Select(e => ( e.Timestamp, e.Values )); return result; } public async Task> GetLast(Guid discriminatorId, int takeCount, CancellationToken token) { var query = GetQueryReadOnly() .OrderByDescending(e => e.Timestamp) .Take(takeCount); var entities = await query.ToArrayAsync(token); var result = entities.Select(e => ( e.Timestamp, e.Values )); return result; } // ToDo: прореживание должно осуществляться до материализации public async Task> GetResampledData( Guid discriminatorId, DateTimeOffset dateBegin, double intervalSec = 600d, int approxPointsCount = 1024, CancellationToken token = default) { var result = await GetGtDate(discriminatorId, dateBegin, token); var dateEnd = dateBegin.AddSeconds(intervalSec); result = result .Where(i => i.Item1 <= dateEnd); var ratio = result.Count() / approxPointsCount; if (ratio > 1) result = result .Where((_, index) => index % ratio == 0); return result; } public async Task> GetGtDate(Guid discriminatorId, DateTimeOffset gtTimestamp, CancellationToken token) { var gtTimestampUtc = gtTimestamp.ToUniversalTime(); var query = GetQueryReadOnly() .Where(entity => entity.Timestamp > gtTimestampUtc); var entities = await query.ToArrayAsync(token); var result = entities.Select(e => ( e.Timestamp, e.Values )); return result; } public async Task GetDatesRange(Guid discriminatorId, CancellationToken token) { var query = GetQueryReadOnly() .GroupBy(entity => entity.DiscriminatorId) .Select(group => new { Min = group.Min(entity => entity.Timestamp), Max = group.Max(entity => entity.Timestamp), }); var item = await query.FirstOrDefaultAsync(token); if (item is null) { return null; } var dto = new DatesRangeDto { From = item.Min, To = item.Max, }; return dto; } public async Task Count(Guid discriminatorId, CancellationToken token) { var query = GetQueryReadOnly() .Where(e => e.DiscriminatorId == discriminatorId); var result = await query.CountAsync(token); return result; } }