persistence/DD.Persistence.Database/Repositories/TimestampedValuesRepository.cs

181 lines
6.2 KiB
C#

using DD.Persistence.Database.Entity;
using DD.Persistence.Database.Postgres.Helpers;
using DD.Persistence.Filter.Models.Abstractions;
using DD.Persistence.Models;
using DD.Persistence.Models.Common;
using DD.Persistence.Repositories;
using Microsoft.EntityFrameworkCore;
namespace DD.Persistence.Database.Postgres.Repositories;
public class TimestampedValuesRepository : ITimestampedValuesRepository
{
private readonly DbContext db;
private readonly ISchemePropertyRepository schemePropertyRepository;
public TimestampedValuesRepository(DbContext db, ISchemePropertyRepository schemePropertyRepository)
{
this.db = db;
this.schemePropertyRepository = schemePropertyRepository;
}
protected virtual IQueryable<TimestampedValues> GetQueryReadOnly() => db.Set<TimestampedValues>();
public async virtual Task<int> AddRange(Guid discriminatorId, IEnumerable<TimestampedValuesDto> dtos, CancellationToken token)
{
var timestampedValuesEntities = dtos.Select(dto => new TimestampedValues()
{
DiscriminatorId = discriminatorId,
Timestamp = dto.Timestamp.ToUniversalTime(),
Values = dto.Values.Values.ToArray()
});
await db.AddRangeAsync(timestampedValuesEntities, token);
var result = await db.SaveChangesAsync(token);
return result;
}
public async virtual Task<IDictionary<Guid, IEnumerable<(DateTimeOffset Timestamp, object[] Values)>>> Get(IEnumerable<Guid> discriminatorIds,
DateTimeOffset? geTimestamp,
TNode? filterTree,
IEnumerable<string>? columnNames,
int skip,
int take,
CancellationToken token)
{
var resultQuery = Array.Empty<TimestampedValues>().AsQueryable();
foreach (var discriminatorId in discriminatorIds)
{
var scheme = await schemePropertyRepository.Get(discriminatorId, token);
if (scheme == null)
throw new NotSupportedException($"Для переданного дискриминатора {discriminatorId} не была обнаружена схема данных");
var geTimestampUtc = geTimestamp!.Value.ToUniversalTime();
var query = GetQueryReadOnly()
.Where(e => e.DiscriminatorId == discriminatorId)
.Where(entity => entity.Timestamp >= geTimestampUtc);
if (filterTree != null)
query = query.ApplyFilter(scheme, filterTree);
resultQuery = resultQuery.Any() ? resultQuery.Union(query) : query;
}
var groupedQuery = resultQuery!
.GroupBy(e => e!.DiscriminatorId)
.Select(g => KeyValuePair.Create(
g.Key,
g.OrderBy(i => i!.Timestamp).Skip(skip).Take(take))
);
var entities = await groupedQuery.ToArrayAsync(token);
var result = entities.ToDictionary(k => k.Key, v => v.Value.Select(e => (
e!.Timestamp,
e.Values
)));
return result;
}
public async virtual Task<IEnumerable<(DateTimeOffset Timestamp, object[] Values)>> GetFirst(Guid discriminatorId, int takeCount, CancellationToken token)
{
var query = GetQueryReadOnly()
.OrderBy(e => e.Timestamp)
.Take(takeCount);
var entities = await query.ToArrayAsync(token);
var result = entities.Select(e => (
e.Timestamp,
e.Values
));
return result;
}
public async virtual Task<IEnumerable<(DateTimeOffset Timestamp, object[] Values)>> GetLast(Guid discriminatorId, int takeCount, CancellationToken token)
{
var query = GetQueryReadOnly()
.OrderByDescending(e => e.Timestamp)
.Take(takeCount);
var entities = await query.ToArrayAsync(token);
var result = entities.Select(e => (
e.Timestamp,
e.Values
));
return result;
}
// ToDo: прореживание должно осуществляться до материализации
public async virtual Task<IEnumerable<(DateTimeOffset Timestamp, object[] Values)>> GetResampledData(
Guid discriminatorId,
DateTimeOffset dateBegin,
double intervalSec = 600d,
int approxPointsCount = 1024,
CancellationToken token = default)
{
var result = await GetGtDate(discriminatorId, dateBegin, token);
var dateEnd = dateBegin.AddSeconds(intervalSec);
result = result
.Where(i => i.Item1 <= dateEnd);
var ratio = result.Count() / approxPointsCount;
if (ratio > 1)
result = result
.Where((_, index) => index % ratio == 0);
return result;
}
public async virtual Task<IEnumerable<(DateTimeOffset Timestamp, object[] Values)>> GetGtDate(Guid discriminatorId, DateTimeOffset gtTimestamp, CancellationToken token)
{
var gtTimestampUtc = gtTimestamp.ToUniversalTime();
var query = GetQueryReadOnly()
.Where(entity => entity.Timestamp > gtTimestampUtc);
var entities = await query.ToArrayAsync(token);
var result = entities.Select(e => (
e.Timestamp,
e.Values
));
return result;
}
public async virtual Task<DatesRangeDto?> GetDatesRange(Guid discriminatorId, CancellationToken token)
{
var query = GetQueryReadOnly()
.GroupBy(entity => entity.DiscriminatorId)
.Select(group => new
{
Min = group.Min(entity => entity.Timestamp),
Max = group.Max(entity => entity.Timestamp),
});
var item = await query.FirstOrDefaultAsync(token);
if (item is null)
{
return null;
}
var dto = new DatesRangeDto
{
From = item.Min,
To = item.Max,
};
return dto;
}
public async virtual Task<int> Count(Guid discriminatorId, CancellationToken token)
{
var query = GetQueryReadOnly()
.Where(e => e.DiscriminatorId == discriminatorId);
var result = await query.CountAsync(token);
return result;
}
}