using DD.Persistence.Extensions; using DD.Persistence.Models; using DD.Persistence.Models.Common; using DD.Persistence.Repositories; using DD.Persistence.Services.Interfaces; namespace DD.Persistence.Services; /// public class TimestampedValuesService : ITimestampedValuesService { private readonly ITimestampedValuesRepository timestampedValuesRepository; private readonly IDataSchemeRepository dataSchemeRepository; /// public TimestampedValuesService(ITimestampedValuesRepository timestampedValuesRepository, IDataSchemeRepository relatedDataRepository) { this.timestampedValuesRepository = timestampedValuesRepository; this.dataSchemeRepository = relatedDataRepository; } /// public async Task AddRange(Guid discriminatorId, IEnumerable dtos, CancellationToken token) { // ToDo: реализовать без foreach foreach (var dto in dtos) { var keys = dto.Values.Keys.ToArray(); await CreateSystemSpecificationIfNotExist(discriminatorId, keys, token); } var result = await timestampedValuesRepository.AddRange(discriminatorId, dtos, token); return result; } /// public async Task> Get(IEnumerable discriminatorIds, DateTimeOffset? geTimestamp, IEnumerable? columnNames, int skip, int take, CancellationToken token) { var result = await timestampedValuesRepository.Get(discriminatorIds, geTimestamp, columnNames, skip, take, token); var dtos = await Materialize(result, token); if (!columnNames.IsNullOrEmpty()) { dtos = ReduceSetColumnsByNames(dtos, columnNames!).ToList(); } return dtos; } /// public async Task> GetFirst(Guid discriminatorId, int takeCount, CancellationToken token) { var result = await timestampedValuesRepository.GetFirst(discriminatorId, takeCount, token); var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) } .ToDictionary(); var dtos = await Materialize(resultToMaterialize, token); return dtos; } /// public async Task> GetLast(Guid discriminatorId, int takeCount, CancellationToken token) { var result = await timestampedValuesRepository.GetLast(discriminatorId, takeCount, token); var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) } .ToDictionary(); var dtos = await Materialize(resultToMaterialize, token); return dtos; } /// public async Task> GetResampledData( Guid discriminatorId, DateTimeOffset beginTimestamp, double intervalSec = 600d, int approxPointsCount = 1024, CancellationToken token = default) { var result = await timestampedValuesRepository.GetResampledData(discriminatorId, beginTimestamp, intervalSec, approxPointsCount, token); var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) } .ToDictionary(); var dtos = await Materialize(resultToMaterialize, token); return dtos; } /// public async Task> GetGtDate(Guid discriminatorId, DateTimeOffset beginTimestamp, CancellationToken token) { var result = await timestampedValuesRepository.GetGtDate(discriminatorId, beginTimestamp, token); var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) } .ToDictionary(); var dtos = await Materialize(resultToMaterialize, token); return dtos; } /// public async Task Count(Guid discriminatorId, CancellationToken token) { var result = await timestampedValuesRepository.Count(discriminatorId, token); return result; } /// public async virtual Task GetDatesRange(Guid discriminatorId, CancellationToken token) { var result = await timestampedValuesRepository.GetDatesRange(discriminatorId, token); return result; } /// /// Преобразовать результат запроса в набор dto /// /// /// /// private async Task> Materialize(IDictionary> queryResult, CancellationToken token) { IEnumerable result = []; foreach (var keyValuePair in queryResult) { var dataScheme = await dataSchemeRepository.Get(keyValuePair.Key, token); if (dataScheme is null) { continue; } foreach (var tuple in keyValuePair.Value) { var identity = dataScheme!.PropNames; var indexedIdentity = identity .Select((value, index) => new { index, value }); var dto = new TimestampedValuesDto() { Timestamp = tuple.Timestamp.ToUniversalTime(), Values = indexedIdentity .ToDictionary(x => x.value, x => tuple.Values[x.index]) }; result = result.Append(dto); } } return result; } /// /// Создать спецификацию, при отсутствии таковой /// /// Дискриминатор системы /// Набор наименований полей /// /// /// Некорректный набор наименований полей private async Task CreateSystemSpecificationIfNotExist(Guid discriminatorId, string[] fieldNames, CancellationToken token) { var systemSpecification = await dataSchemeRepository.Get(discriminatorId, token); if (systemSpecification is null) { systemSpecification = new DataSchemeDto() { DiscriminatorId = discriminatorId, PropNames = fieldNames }; await dataSchemeRepository.Add(systemSpecification, token); return; } if (!systemSpecification.PropNames.SequenceEqual(fieldNames)) { var expectedFieldNames = string.Join(", ", systemSpecification.PropNames); var actualFieldNames = string.Join(", ", fieldNames); throw new InvalidOperationException($"Для системы {discriminatorId.ToString()} " + $"характерен набор данных: [{expectedFieldNames}], однако был передан набор: [{actualFieldNames}]"); } } /// /// Отсеить лишние поля в соответствии с заданным фильтром /// /// /// Поля, которые необходимо оставить /// private IEnumerable ReduceSetColumnsByNames(IEnumerable dtos, IEnumerable fieldNames) { var result = dtos.Select(dto => { var reducedValues = dto.Values .Where(v => fieldNames.Contains(v.Key)) .ToDictionary(); dto.Values = reducedValues; return dto; }); return result; } }