persistence/DD.Persistence.Repository/Repositories/TimestampedValuesRepository.cs

188 lines
6.2 KiB
C#

using DD.Persistence.Database.Entity;
using DD.Persistence.Models;
using DD.Persistence.Models.Common;
using DD.Persistence.ModelsAbstractions;
using DD.Persistence.Repositories;
using Mapster;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
namespace DD.Persistence.Repository.Repositories;
public class TimestampedValuesRepository<TDto> : ITimestampedValuesRepository<TDto>
where TDto : class, ITimestampAbstractDto, new()
{
private readonly DbContext db;
public TimestampedValuesRepository(DbContext db)
{
this.db = db;
}
protected virtual IQueryable<TimestampedValues> GetQueryReadOnly() => this.db.Set<TimestampedValues>();
public virtual async Task<DatesRangeDto?> GetDatesRange(Guid discriminatorId, CancellationToken token)
{
var query = GetQueryReadOnly()
.GroupBy(entity => entity.DiscriminatorId)
.Select(group => new
{
Min = group.Min(entity => entity.Timestamp),
Max = group.Max(entity => entity.Timestamp),
});
var item = await query.FirstOrDefaultAsync(token);
if (item is null)
return null;
return new DatesRangeDto
{
From = item.Min,
To = item.Max,
};
}
public virtual async Task<IEnumerable<TDto>> GetGtDate(Guid discriminatorId, DateTimeOffset date, CancellationToken token)
{
var query = GetQueryReadOnly().Where(e => e.Timestamp > date);
var entities = await query.ToArrayAsync(token);
var dtos = entities.Select(e => e.Adapt<TDto>());
return dtos;
}
public virtual async Task<int> AddRange(Guid discriminatorId, IEnumerable<TDto> dtos, CancellationToken token)
{
var entities = dtos
.Select(d => d.Adapt<TimestampedValues>())
.ToList();
entities.ForEach(d => d.DiscriminatorId = discriminatorId);
await db.Set<TimestampedValues>().AddRangeAsync(entities, token);
var result = await db.SaveChangesAsync(token);
return result;
}
protected async Task<IEnumerable<TDto>> GetLastAsync(int takeCount, CancellationToken token)
{
var query = GetQueryReadOnly()
.OrderByDescending(e => e.Timestamp)
.Take(takeCount);
var entities = await query.ToArrayAsync(token);
var dtos = entities.Select(e => e.Adapt<TDto>());
return dtos;
}
protected async Task<TDto?> GetFirstAsync(CancellationToken token)
{
var query = GetQueryReadOnly()
.OrderBy(e => e.Timestamp);
var entity = await query.FirstOrDefaultAsync(token);
if (entity == null)
return null;
var dto = entity.Adapt<TDto>();
return dto;
}
public async virtual Task<IEnumerable<TDto>> GetResampledData(
Guid discriminatorId,
DateTimeOffset dateBegin,
double intervalSec = 600d,
int approxPointsCount = 1024,
CancellationToken token = default)
{
var dtos = await GetGtDate(discriminatorId, dateBegin, token);
var dateEnd = dateBegin.AddSeconds(intervalSec);
dtos = dtos
.Where(i => i.Timestamp <= dateEnd);
var ratio = dtos.Count() / approxPointsCount;
if (ratio > 1)
dtos = dtos
.Where((_, index) => index % ratio == 0);
return dtos;
}
public async Task<IEnumerable<TimestampedValuesDto>> Get(Guid idDiscriminator, DateTimeOffset? geTimestamp, IEnumerable<string>? columnNames, int skip, int take, CancellationToken token)
{
var dbSet = db.Set<TimestampedValues>();
var query = dbSet.Where(entity => entity.DiscriminatorId == idDiscriminator);
if (geTimestamp.HasValue)
query = ApplyGeTimestamp(query, geTimestamp.Value);
query = query
.OrderBy(item => item.Timestamp)
.Skip(skip)
.Take(take);
var data = await Materialize(query, token);
if (columnNames is not null && columnNames.Any())
data = ReduceSetColumnsByNames(data, columnNames);
return data;
}
public async Task<IEnumerable<TimestampedValuesDto>> GetLast(Guid idDiscriminator, IEnumerable<string>? columnNames, int take, CancellationToken token)
{
var dbSet = db.Set<TimestampedValues>();
var query = dbSet.Where(entity => entity.DiscriminatorId == idDiscriminator);
query = query.OrderByDescending(entity => entity.Timestamp)
.Take(take)
.OrderBy(entity => entity.Timestamp);
var data = await Materialize(query, token);
if (columnNames is not null && columnNames.Any())
data = ReduceSetColumnsByNames(data, columnNames);
return data;
}
public Task<int> Count(Guid idDiscriminator, CancellationToken token)
{
var dbSet = db.Set<TimestampedValues>();
var query = dbSet.Where(entity => entity.DiscriminatorId == idDiscriminator);
return query.CountAsync(token);
}
private static async Task<IEnumerable<TimestampedValuesDto>> Materialize(IQueryable<TimestampedValues> query, CancellationToken token)
{
var dtoQuery = query.Select(entity => new TimestampedValuesDto() { Timestamp = entity.Timestamp, Values = entity.Values });
var dtos = await dtoQuery.ToArrayAsync(token);
return dtos;
}
private static IQueryable<TimestampedValues> ApplyGeTimestamp(IQueryable<TimestampedValues> query, DateTimeOffset geTimestamp)
{
var geTimestampUtc = geTimestamp.ToUniversalTime();
return query.Where(entity => entity.Timestamp >= geTimestampUtc);
}
private static IEnumerable<TimestampedValuesDto> ReduceSetColumnsByNames(IEnumerable<TimestampedValuesDto> query, IEnumerable<string> columnNames)
{
var newQuery = query
.Select(entity => new TimestampedValuesDto()
{
Timestamp = entity.Timestamp,
Values = entity.Values?
.Where(prop => columnNames.Contains(
JsonSerializer.Deserialize<Dictionary<string, object>>(prop.ToString()!)?
.FirstOrDefault().Key
)).ToArray()
});
return newQuery;
}
}