persistence/DD.Persistence.Repository/Repositories/TimestampedValuesRepository.cs
Roman Efremov fd276f5a43
Some checks failed
Unit tests / test (push) Failing after 55s
Доработки
2025-01-17 17:21:54 +05:00

242 lines
7.9 KiB
C#

using DD.Persistence.Database.Entity;
using DD.Persistence.Models;
using DD.Persistence.Models.Common;
using DD.Persistence.Repositories;
using DD.Persistence.Repository.Extensions;
using Microsoft.EntityFrameworkCore;
namespace DD.Persistence.Repository.Repositories;
public class TimestampedValuesRepository : ITimestampedValuesRepository
{
private readonly DbContext db;
private readonly IRelatedDataRepository<ValuesIdentityDto> relatedDataRepository;
public TimestampedValuesRepository(DbContext db, IRelatedDataRepository<ValuesIdentityDto> relatedDataRepository)
{
this.db = db;
this.relatedDataRepository = relatedDataRepository;
}
protected virtual IQueryable<TimestampedValues> GetQueryReadOnly() => this.db.Set<TimestampedValues>()
.Include(e => e.ValuesIdentity);
public async virtual Task<int> AddRange(Guid discriminatorId, IEnumerable<TimestampedValuesDto> dtos, CancellationToken token)
{
var timestampedValuesEntities = new List<TimestampedValues>();
foreach (var dto in dtos)
{
var keys = dto.Values.Keys.ToArray();
await CreateValuesIdentityIfNotExist(discriminatorId, keys, token);
var timestampedValuesEntity = new TimestampedValues()
{
DiscriminatorId = discriminatorId,
Timestamp = dto.Timestamp.ToUniversalTime(),
Values = dto.Values.Values.ToArray()
};
timestampedValuesEntities.Add(timestampedValuesEntity);
}
await db.Set<TimestampedValues>().AddRangeAsync(timestampedValuesEntities, token);
var result = await db.SaveChangesAsync(token);
return result;
}
public async virtual Task<IEnumerable<TimestampedValuesDto>> Get(Guid discriminatorId, DateTimeOffset? timestampBegin, IEnumerable<string>? columnNames, int skip, int take, CancellationToken token)
{
var query = GetQueryReadOnly()
.Where(entity => entity.DiscriminatorId == discriminatorId);
// Фильтрация по дате
if (timestampBegin.HasValue)
{
query = ApplyGeTimestamp(query, timestampBegin.Value);
}
query = query
.OrderBy(item => item.Timestamp)
.Skip(skip)
.Take(take);
var data = await Materialize(discriminatorId, query, token);
// Фильтрация по запрашиваемым полям
if (!columnNames.IsNullOrEmpty())
{
data = ReduceSetColumnsByNames(data, columnNames!);
}
return data;
}
public async virtual Task<IEnumerable<TimestampedValuesDto>> GetFirst(Guid discriminatorId, int takeCount, CancellationToken token)
{
var query = GetQueryReadOnly()
.OrderBy(e => e.Timestamp)
.Take(takeCount);
var dtos = await Materialize(discriminatorId, query, token);
return dtos;
}
public async virtual Task<IEnumerable<TimestampedValuesDto>> GetLast(Guid discriminatorId, int takeCount, CancellationToken token)
{
var query = GetQueryReadOnly()
.OrderByDescending(e => e.Timestamp)
.Take(takeCount);
var dtos = await Materialize(discriminatorId, query, token);
return dtos;
}
public async virtual Task<IEnumerable<TimestampedValuesDto>> GetResampledData(
Guid discriminatorId,
DateTimeOffset dateBegin,
double intervalSec = 600d,
int approxPointsCount = 1024,
CancellationToken token = default)
{
var dtos = await GetGtDate(discriminatorId, dateBegin, token);
var dateEnd = dateBegin.AddSeconds(intervalSec);
dtos = dtos
.Where(i => i.Timestamp <= dateEnd);
var ratio = dtos.Count() / approxPointsCount;
if (ratio > 1)
dtos = dtos
.Where((_, index) => index % ratio == 0);
return dtos;
}
public async virtual Task<IEnumerable<TimestampedValuesDto>> GetGtDate(Guid discriminatorId, DateTimeOffset timestampBegin, CancellationToken token)
{
var query = GetQueryReadOnly()
.Where(e => e.Timestamp > timestampBegin);
var dtos = await Materialize(discriminatorId, query, token);
return dtos;
}
public async virtual Task<DatesRangeDto?> GetDatesRange(Guid discriminatorId, CancellationToken token)
{
var query = GetQueryReadOnly()
.GroupBy(entity => entity.DiscriminatorId)
.Select(group => new
{
Min = group.Min(entity => entity.Timestamp),
Max = group.Max(entity => entity.Timestamp),
});
var item = await query.FirstOrDefaultAsync(token);
if (item is null)
{
return null;
}
var dto = new DatesRangeDto
{
From = item.Min,
To = item.Max,
};
return dto;
}
public virtual Task<int> Count(Guid discriminatorId, CancellationToken token)
{
var dbSet = db.Set<TimestampedValues>();
var query = dbSet.Where(entity => entity.DiscriminatorId == discriminatorId);
return query.CountAsync(token);
}
private async Task<IEnumerable<TimestampedValuesDto>> Materialize(Guid discriminatorId, IQueryable<TimestampedValues> query, CancellationToken token)
{
var valuesIdentities = await relatedDataRepository.Get(token);
var valuesIdentity = valuesIdentities?
.FirstOrDefault(e => e.DiscriminatorId == discriminatorId);
if (valuesIdentity == null)
return [];
var entities = await query.ToArrayAsync(token);
var dtos = entities.Select(entity =>
{
var dto = new TimestampedValuesDto()
{
Timestamp = entity.Timestamp.ToUniversalTime()
};
for (var i = 0; i < valuesIdentity.Identity.Count(); i++)
{
var key = valuesIdentity.Identity[i];
var value = entity.Values[i];
dto.Values.Add(key, value);
}
return dto;
});
return dtos;
}
private IQueryable<TimestampedValues> ApplyGeTimestamp(IQueryable<TimestampedValues> query, DateTimeOffset timestampBegin)
{
var geTimestampUtc = timestampBegin.ToUniversalTime();
var result = query
.Where(entity => entity.Timestamp >= geTimestampUtc);
return result;
}
private IEnumerable<TimestampedValuesDto> ReduceSetColumnsByNames(IEnumerable<TimestampedValuesDto> dtos, IEnumerable<string> columnNames)
{
var result = dtos.Select(dto =>
{
var reducedValues = dto.Values
.Where(v => columnNames.Contains(v.Key))
.ToDictionary();
dto.Values = reducedValues;
return dto;
});
return result;
}
private async Task CreateValuesIdentityIfNotExist(Guid discriminatorId, string[] keys, CancellationToken token)
{
var valuesIdentities = await relatedDataRepository.Get(token);
var valuesIdentity = valuesIdentities?
.FirstOrDefault(e => e.DiscriminatorId == discriminatorId);
if (valuesIdentity is null)
{
valuesIdentity = new ValuesIdentityDto()
{
DiscriminatorId = discriminatorId,
Identity = keys
};
await relatedDataRepository.Add(valuesIdentity, token);
return;
}
if (!valuesIdentity.Identity.SequenceEqual(keys))
{
var expectedIdentity = string.Join(", ", valuesIdentity.Identity);
var actualIdentity = string.Join(", ", keys);
throw new InvalidOperationException($"Для системы {discriminatorId.ToString()} " +
$"характерен набор данных: [{expectedIdentity}], однако был передан набор: [{actualIdentity}]");
}
}
}