using DD.Persistence.Extensions; using DD.Persistence.Models; using DD.Persistence.Models.Common; using DD.Persistence.Repositories; using DD.Persistence.Services.Interfaces; namespace DD.Persistence.Services; /// <inheritdoc/> public class TimestampedValuesService : ITimestampedValuesService { private readonly ITimestampedValuesRepository timestampedValuesRepository; private readonly IDataSchemeRepository dataSchemeRepository; /// <inheritdoc/> public TimestampedValuesService(ITimestampedValuesRepository timestampedValuesRepository, IDataSchemeRepository relatedDataRepository) { this.timestampedValuesRepository = timestampedValuesRepository; this.dataSchemeRepository = relatedDataRepository; } /// <inheritdoc/> public async Task<int> AddRange(Guid discriminatorId, IEnumerable<TimestampedValuesDto> dtos, CancellationToken token) { // ToDo: реализовать без foreach foreach (var dto in dtos) { var keys = dto.Values.Keys.ToArray(); await CreateSystemSpecificationIfNotExist(discriminatorId, keys, token); } var result = await timestampedValuesRepository.AddRange(discriminatorId, dtos, token); return result; } /// <inheritdoc/> public async Task<IEnumerable<TimestampedValuesDto>> Get(IEnumerable<Guid> discriminatorIds, DateTimeOffset? geTimestamp, IEnumerable<string>? columnNames, int skip, int take, CancellationToken token) { var result = await timestampedValuesRepository.Get(discriminatorIds, geTimestamp, columnNames, skip, take, token); var dtos = await Materialize(result, token); if (!columnNames.IsNullOrEmpty()) { dtos = ReduceSetColumnsByNames(dtos, columnNames!).ToList(); } return dtos; } /// <inheritdoc/> public async Task<IEnumerable<TimestampedValuesDto>> GetFirst(Guid discriminatorId, int takeCount, CancellationToken token) { var result = await timestampedValuesRepository.GetFirst(discriminatorId, takeCount, token); var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) } .ToDictionary(); var dtos = await Materialize(resultToMaterialize, token); return dtos; } /// <inheritdoc/> public async Task<IEnumerable<TimestampedValuesDto>> GetLast(Guid discriminatorId, int takeCount, CancellationToken token) { var result = await timestampedValuesRepository.GetLast(discriminatorId, takeCount, token); var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) } .ToDictionary(); var dtos = await Materialize(resultToMaterialize, token); return dtos; } /// <inheritdoc/> public async Task<IEnumerable<TimestampedValuesDto>> GetResampledData( Guid discriminatorId, DateTimeOffset beginTimestamp, double intervalSec = 600d, int approxPointsCount = 1024, CancellationToken token = default) { var result = await timestampedValuesRepository.GetResampledData(discriminatorId, beginTimestamp, intervalSec, approxPointsCount, token); var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) } .ToDictionary(); var dtos = await Materialize(resultToMaterialize, token); return dtos; } /// <inheritdoc/> public async Task<IEnumerable<TimestampedValuesDto>> GetGtDate(Guid discriminatorId, DateTimeOffset beginTimestamp, CancellationToken token) { var result = await timestampedValuesRepository.GetGtDate(discriminatorId, beginTimestamp, token); var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) } .ToDictionary(); var dtos = await Materialize(resultToMaterialize, token); return dtos; } /// <summary> /// Преобразовать результат запроса в набор dto /// </summary> /// <param name="queryResult"></param> /// <param name="token"></param> /// <returns></returns> private async Task<IEnumerable<TimestampedValuesDto>> Materialize(IDictionary<Guid, IEnumerable<(DateTimeOffset Timestamp, object[] Values)>> queryResult, CancellationToken token) { IEnumerable<TimestampedValuesDto> result = []; foreach (var keyValuePair in queryResult) { var dataScheme = await dataSchemeRepository.Get(keyValuePair.Key, token); if (dataScheme is null) { continue; } foreach (var tuple in keyValuePair.Value) { var identity = dataScheme!.PropNames; var indexedIdentity = identity .Select((value, index) => new { index, value }); var dto = new TimestampedValuesDto() { Timestamp = tuple.Timestamp.ToUniversalTime(), Values = indexedIdentity .ToDictionary(x => x.value, x => tuple.Values[x.index]) }; result = result.Append(dto); } } return result; } /// <summary> /// Создать спецификацию, при отсутствии таковой /// </summary> /// <param name="discriminatorId">Дискриминатор системы</param> /// <param name="fieldNames">Набор наименований полей</param> /// <param name="token"></param> /// <returns></returns> /// <exception cref="InvalidOperationException">Некорректный набор наименований полей</exception> private async Task CreateSystemSpecificationIfNotExist(Guid discriminatorId, string[] fieldNames, CancellationToken token) { var systemSpecification = await dataSchemeRepository.Get(discriminatorId, token); if (systemSpecification is null) { systemSpecification = new DataSchemeDto() { DiscriminatorId = discriminatorId, PropNames = fieldNames }; await dataSchemeRepository.Add(systemSpecification, token); return; } if (!systemSpecification.PropNames.SequenceEqual(fieldNames)) { var expectedFieldNames = string.Join(", ", systemSpecification.PropNames); var actualFieldNames = string.Join(", ", fieldNames); throw new InvalidOperationException($"Для системы {discriminatorId.ToString()} " + $"характерен набор данных: [{expectedFieldNames}], однако был передан набор: [{actualFieldNames}]"); } } /// <summary> /// Отсеить лишние поля в соответствии с заданным фильтром /// </summary> /// <param name="dtos"></param> /// <param name="fieldNames">Поля, которые необходимо оставить</param> /// <returns></returns> private IEnumerable<TimestampedValuesDto> ReduceSetColumnsByNames(IEnumerable<TimestampedValuesDto> dtos, IEnumerable<string> fieldNames) { var result = dtos.Select(dto => { var reducedValues = dto.Values .Where(v => fieldNames.Contains(v.Key)) .ToDictionary(); dto.Values = reducedValues; return dto; }); return result; } }