using DD.Persistence.Extensions;
using DD.Persistence.Models;
using DD.Persistence.Repositories;
using DD.Persistence.Services.Interfaces;
namespace DD.Persistence.Services;
///
public class TimestampedValuesService : ITimestampedValuesService
{
private readonly ITimestampedValuesRepository timestampedValuesRepository;
private readonly IDataSchemeRepository dataSchemeRepository;
///
public TimestampedValuesService(ITimestampedValuesRepository timestampedValuesRepository, IDataSchemeRepository relatedDataRepository)
{
this.timestampedValuesRepository = timestampedValuesRepository;
this.dataSchemeRepository = relatedDataRepository;
}
///
public async Task AddRange(Guid discriminatorId, IEnumerable dtos, CancellationToken token)
{
// ToDo: реализовать без foreach
foreach (var dto in dtos)
{
await CreateDataSchemeIfNotExist(discriminatorId, dto, token);
}
var result = await timestampedValuesRepository.AddRange(discriminatorId, dtos, token);
return result;
}
///
public async Task> Get(IEnumerable discriminatorIds, DateTimeOffset? geTimestamp, IEnumerable? columnNames, int skip, int take, CancellationToken token)
{
var result = await timestampedValuesRepository.Get(discriminatorIds, geTimestamp, columnNames, skip, take, token);
var dtos = await Materialize(result, token);
if (!columnNames.IsNullOrEmpty())
{
dtos = ReduceSetColumnsByNames(dtos, columnNames!).ToList();
}
return dtos;
}
///
public async Task> GetFirst(Guid discriminatorId, int takeCount, CancellationToken token)
{
var result = await timestampedValuesRepository.GetFirst(discriminatorId, takeCount, token);
var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) }
.ToDictionary();
var dtos = await Materialize(resultToMaterialize, token);
return dtos;
}
///
public async Task> GetLast(Guid discriminatorId, int takeCount, CancellationToken token)
{
var result = await timestampedValuesRepository.GetLast(discriminatorId, takeCount, token);
var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) }
.ToDictionary();
var dtos = await Materialize(resultToMaterialize, token);
return dtos;
}
///
public async Task> GetResampledData(
Guid discriminatorId,
DateTimeOffset beginTimestamp,
double intervalSec = 600d,
int approxPointsCount = 1024,
CancellationToken token = default)
{
var result = await timestampedValuesRepository.GetResampledData(discriminatorId, beginTimestamp, intervalSec, approxPointsCount, token);
var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) }
.ToDictionary();
var dtos = await Materialize(resultToMaterialize, token);
return dtos;
}
///
public async Task> GetGtDate(Guid discriminatorId, DateTimeOffset beginTimestamp, CancellationToken token)
{
var result = await timestampedValuesRepository.GetGtDate(discriminatorId, beginTimestamp, token);
var resultToMaterialize = new[] { KeyValuePair.Create(discriminatorId, result) }
.ToDictionary();
var dtos = await Materialize(resultToMaterialize, token);
return dtos;
}
///
/// Преобразовать результат запроса в набор dto
///
///
///
///
private async Task> Materialize(IDictionary> queryResult, CancellationToken token)
{
IEnumerable result = [];
foreach (var keyValuePair in queryResult)
{
var dataScheme = await dataSchemeRepository.Get(keyValuePair.Key, token);
if (dataScheme is null)
{
continue;
}
foreach (var tuple in keyValuePair.Value)
{
var identity = dataScheme!.PropNames;
var indexedIdentity = identity
.Select((value, index) => new { index, value });
var dto = new TimestampedValuesDto()
{
Timestamp = tuple.Timestamp.ToUniversalTime(),
Values = indexedIdentity
.ToDictionary(x => x.value, x => tuple.Values[x.index])
};
result = result.Append(dto);
}
}
return result;
}
///
/// Создать схему данных, при отсутствии таковой
///
/// Дискриминатор схемы
/// Набор данных, по образу которого будет создана соответствующая схема
///
///
/// Некорректный набор наименований полей
private async Task CreateDataSchemeIfNotExist(Guid discriminatorId, TimestampedValuesDto dto, CancellationToken token)
{
var propNames = dto.Values.Keys.ToArray();
var propTypes = GetPropTypes(dto);
var dataScheme = await dataSchemeRepository.Get(discriminatorId, token);
if (dataScheme is null)
{
dataScheme = new DataSchemeDto()
{
DiscriminatorId = discriminatorId,
PropNames = propNames,
PropTypes = propTypes
};
await dataSchemeRepository.Add(dataScheme, token);
return;
}
if (!dataScheme.PropNames.SequenceEqual(propNames))
{
var expectedFieldNames = string.Join(", ", dataScheme.PropNames);
var actualFieldNames = string.Join(", ", propNames);
throw new InvalidOperationException($"Для системы {discriminatorId.ToString()} " +
$"характерен набор данных: [{expectedFieldNames}], однако был передан набор: [{actualFieldNames}]");
}
}
///
/// Получить типы для набора данных в соответствии с индексацией
///
///
///
///
private PropTypeEnum[] GetPropTypes(TimestampedValuesDto dto)
{
var types = dto.Values.Select(e =>
{
var valueString = e.Value.ToString();
if (valueString is null)
throw new ArgumentNullException("Переданный набор данных содержит null, в следствии чего не удаётся определить типы полей");
if (DateTimeOffset.TryParse(valueString, out _))
return PropTypeEnum.DateTime;
var doubleString = valueString.Replace('.', ',');
if (double.TryParse(doubleString, out _))
return PropTypeEnum.Double;
return PropTypeEnum.String;
});
return types.ToArray();
}
///
/// Отсеить лишние поля в соответствии с заданным фильтром
///
///
/// Поля, которые необходимо оставить
///
private IEnumerable ReduceSetColumnsByNames(IEnumerable dtos, IEnumerable fieldNames)
{
var result = dtos.Select(dto =>
{
var reducedValues = dto.Values
.Where(v => fieldNames.Contains(v.Key))
.ToDictionary();
dto.Values = reducedValues;
return dto;
});
return result;
}
}