DD.WellWorkover.Cloud/AsbCloudInfrastructure/Services/SAUB/TelemetryService.cs

420 lines
15 KiB
C#

using AsbCloudApp.Data;
using AsbCloudApp.Data.SAUB;
using AsbCloudApp.Repositories;
using AsbCloudApp.Services;
using AsbCloudDb;
using AsbCloudDb.Model;
using Mapster;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Caching.Memory;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.Csv;
using System.Threading;
using System.Threading.Tasks;
using AsbCloudApp.Requests;
namespace AsbCloudInfrastructure.Services.SAUB;
public class TelemetryService : ITelemetryService
{
private readonly IAsbCloudDbContext db;
private readonly IMemoryCache memoryCache;
//TODO: методы использующие ITelemetryDataCache, скорее всего, тут не нужны
private readonly ITelemetryDataCache<TelemetryDataSaubDto> dataSaubCache;
private readonly ITimezoneService timezoneService;
public ITimezoneService TimeZoneService => timezoneService;
public TelemetryService(
IAsbCloudDbContext db,
IMemoryCache memoryCache,
ITelemetryDataCache<TelemetryDataSaubDto> dataSaubCache,
ITimezoneService timezoneService)
{
this.db = db;
this.memoryCache = memoryCache;
this.dataSaubCache = dataSaubCache;
this.timezoneService = timezoneService;
}
private IEnumerable<Telemetry> GetTelemetryCache()
=> memoryCache.GetOrCreateBasic(
db.Set<Telemetry>()
.Include(t => t.Well));
private void DropTelemetryCache()
{
memoryCache.DropBasic<Telemetry>();
}
public DatesRangeDto GetDatesRange(int idTelemetry)
{
var cacheDataRange = dataSaubCache.GetOrDefaultWellDataDateRange(idTelemetry)
?? new ();
return cacheDataRange;
}
public int? GetIdWellByTelemetryUid(string uid)
=> GetWellByTelemetryUid(uid)?.Id;
public async Task UpdateInfoAsync(string uid, TelemetryInfoDto info,
CancellationToken token)
{
var telemetry = GetOrCreateTelemetry(uid);
telemetry.Info = info.Adapt<TelemetryInfo>();
if (!string.IsNullOrEmpty(info.TimeZoneId) &&
telemetry.TimeZone?.IsOverride != true)
telemetry.TimeZone = new SimpleTimezone()
{
Hours = info.TimeZoneOffsetTotalHours,
TimezoneId = info.TimeZoneId
};
db.Telemetries.Upsert(telemetry);
await db.SaveChangesAsync(token);
DropTelemetryCache();
}
public async Task<IEnumerable<VersionDto>> GetVersionsAsync(VersionRequest request, CancellationToken token)
{
var query = db.Set<Well>()
.Include(x => x.RelationCompaniesWells)
.ThenInclude(x => x.Company)
.Include(x => x.Cluster)
.ThenInclude(c => c.Deposit)
.Where(x => x.RelationCompaniesWells.Any(y => y.IdCompany == request.IdCompany));
if (request.IdWellState.HasValue)
query = query.Where(x => x.IdState == request.IdWellState);
if (request.IdsWell?.Any() == true)
query = query.Where(x => request.IdsWell.Contains(x.Id));
var wells = await query.Where(x => x.IdTelemetry.HasValue)
.ToDictionaryAsync(x => x.Id, x => x, token);
var idTelemetries = wells.Select(x => x.Value.IdTelemetry!.Value);
var telemetries = GetTelemetryCache()
.Where(x => idTelemetries.Contains(x.Id))
.OrderBy(x => x.Info.DrillingStartDate);
var dtos = telemetries.Select(x =>
{
var well = wells[x.Well!.Id];
var dto = new VersionDto
{
IdWell = well.Id,
Well = well.Caption,
Cluster = well.Cluster.Caption,
Deposit = well.Cluster.Deposit.Caption,
Customer = well.RelationCompaniesWells.Select(r => r.Company).FirstOrDefault(c => c.IdCompanyType == 1)?.Caption,
HmiVersion = x.Info.HmiVersion,
SaubPlcVersion = x.Info.SaubPlcVersion,
SpinPlcVersion = x.Info.SpinPlcVersion,
};
return dto;
});
return dtos;
}
[Obsolete("This method will be private. Use TelemetryDto.TimeZone prop.")]
public SimpleTimezoneDto GetTimezone(int idTelemetry)
{
var telemetry = GetTelemetryCache().FirstOrDefault(t => t.Id == idTelemetry);
if (telemetry is null)
throw new Exception($"Telemetry id: {idTelemetry} does not exist.");
if (telemetry.Well?.Timezone is not null)
return telemetry.Well.Timezone.Adapt<SimpleTimezoneDto>();
if (telemetry.TimeZone is not null)
return telemetry.TimeZone.Adapt<SimpleTimezoneDto>();
throw new Exception($"Telemetry id: {idTelemetry} can't find timezone.");
}
public TelemetryBaseDto? GetOrDefaultTelemetryByIdWell(int idWell)
{
var dto = GetOrDefaultTelemetriesByIdsWells([idWell])
.FirstOrDefault();
return dto;
}
public IEnumerable<TelemetryDto> GetOrDefaultTelemetriesByIdsWells(IEnumerable<int> idsWells)
{
var entities = GetTelemetryCache()
.Where(t => t.Well != null)
.Where(t => idsWells.Contains(t.Well!.Id))
.Select(t => {
t.TimeZone = t.TimeZone.Hours != t.Well!.Timezone.Hours ? t.Well.Timezone : t.TimeZone;
return t;
});
var dtos = entities.Select(t => {
var dto = t.Adapt<TelemetryDto>();
dto.IdWell = t.Well?.Id;
return dto;
});
return dtos;
}
public TelemetryDto GetOrCreateTelemetryByUid(string uid)
{
var entity = GetOrCreateTelemetry(uid);
if(entity.Well?.Timezone is not null && entity.TimeZone.Hours != entity.Well.Timezone.Hours)
{
entity.TimeZone = entity.Well.Timezone;
//TODO: выдаем предупреждение!
}
var dto = entity.Adapt<TelemetryDto>();
return dto;
}
private Well? GetWellByTelemetryUid(string uid)
{
var telemetry = GetOrDefaultTelemetryByUid(uid);
return telemetry?.Well;
}
private Telemetry? GetOrDefaultTelemetryByUid(string uid)
{
var telemetry = GetTelemetryCache().FirstOrDefault(t => t.RemoteUid == uid);
return telemetry;
}
private Telemetry GetOrCreateTelemetry(string uid)
{
var telemetry = GetOrDefaultTelemetryByUid(uid);
if (telemetry is null)
{
var newTelemetry = new Telemetry
{
RemoteUid = uid,
TimeZone = new SimpleTimezone
{
Hours = 5,
IsOverride = false,
TimezoneId = "default",
}
};
var entry = db.Telemetries.Add(newTelemetry);
db.SaveChanges();
DropTelemetryCache();
return entry.Entity;
}
return telemetry;
}
public async Task<Stream> GetTelemetriesInfoByLastData(DateTimeOffset from, CancellationToken token)
{
var fromUtc = from.UtcDateTime;
var query = db.TelemetryDataSaub
.Where(i => i.DateTime >= fromUtc)
.GroupBy(i => i.IdTelemetry)
.Select(i => new
{
i.First().Telemetry,
LastDate = i.Max(i => i.DateTime)
});
var data = await query.ToArrayAsync(token);
var infos = data.Select(i => new TelemetryWithSoftwareVersionsDto(
i.Telemetry.Id,
i.Telemetry.RemoteUid,
i.LastDate,
i.Telemetry.Info.Deposit,
i.Telemetry.Info.Cluster,
i.Telemetry.Info.Well,
i.Telemetry.Info.HmiVersion,
i.Telemetry.Info.SaubPlcVersion,
i.Telemetry.Info.SpinPlcVersion)
);
var stream = new MemoryStream();
if (!data.Any())
return stream;
var serializer = new CsvSerializer<TelemetryWithSoftwareVersionsDto>();
serializer.Serialize(infos, stream);
stream.Seek(0, SeekOrigin.Begin);
return stream;
}
public async Task<int> MergeAsync(int from, int to, CancellationToken token)
{
if (from == to)
return -2;
var stopwath = new System.Diagnostics.Stopwatch();
stopwath.Start();
var transaction = await db.Database.BeginTransactionAsync(token).ConfigureAwait(false);
try
{
var affected = 0;
var wellFrom = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == from, token)
.ConfigureAwait(false);
var wellTo = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == to, token)
.ConfigureAwait(false);
if (wellTo is not null && wellFrom is not null)
return -2;
if (wellTo is null && wellFrom is not null)
{
wellFrom.IdTelemetry = to;
affected += await db.SaveChangesAsync(token);
}
affected += await MergeEventsAndMessagesAndUsersAsync(from, to, token);
affected += await MergeDataAsync<TelemetryDataSaub>(from, to, token);
affected += await MergeDataAsync<TelemetryDataSpin>(from, to, token);
affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry_analysis WHERE id_telemetry = {from} OR id_telemetry = {to};", token)
.ConfigureAwait(false);
affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry WHERE id = {from};", token)
.ConfigureAwait(false);
await transaction.CommitAsync(token).ConfigureAwait(false);
stopwath.Stop();
Console.WriteLine($"Successfully committed in {1d * stopwath.ElapsedMilliseconds / 1000d: #0.00} sec. Affected {affected} rows.");
DropTelemetryCache();
return affected;
}
catch (Exception ex)
{
System.Diagnostics.Trace.WriteLine($"Merge() Fail. Rollback. Reason is:{ex.Message}");
await transaction.RollbackAsync(CancellationToken.None);
return -1;
}
}
private async Task<int> MergeEventsAndMessagesAndUsersAsync(int from, int to, CancellationToken token)
{
var messagesFrom = await db.TelemetryMessages
.Where(d => d.IdTelemetry == from)
.ToListAsync(token)
.ConfigureAwait(false);
var usersFromQuery = db.TelemetryUsers
.Where(d => d.IdTelemetry == from);
var usersFrom = await usersFromQuery
.ToListAsync(token)
.ConfigureAwait(false);
var usersTo = await db.TelemetryUsers
.Where(d => d.IdTelemetry == to)
.AsNoTracking()
.ToListAsync(token)
.ConfigureAwait(false);
var usersToNextId = usersTo.Max(u => u.IdUser) + 100;
messagesFrom.ForEach(m => m.IdTelemetry = to);
foreach (var userFrom in usersFrom)
{
var userTo = usersTo
.FirstOrDefault(u => u.IdUser == userFrom.IdUser);
if (userTo is null ||
userTo.Name != userFrom.Name ||
userTo.Surname != userFrom.Surname ||
userTo.Patronymic != userFrom.Patronymic)
{
messagesFrom
.Where(m => m.IdTelemetryUser == userFrom.IdUser)
.ToList()
.ForEach(m => m.IdTelemetryUser = usersToNextId);
userFrom.IdUser = usersToNextId;
userFrom.IdTelemetry = to;
usersToNextId++;
}
}
var eventsFromQuery = db.TelemetryEvents
.Where(d => d.IdTelemetry == from);
var eventsFrom = await eventsFromQuery
.ToListAsync(token)
.ConfigureAwait(false);
var eventsTo = await db.TelemetryEvents
.Where(d => d.IdTelemetry == to)
.AsNoTracking()
.ToListAsync(token)
.ConfigureAwait(false);
var eventsToNextId = eventsTo.Max(e => e.IdEvent) + 1;
foreach (var eventFrom in eventsFrom)
{
var eventTo = eventsTo
.FirstOrDefault(e => e.IdEvent == eventFrom.IdEvent);
if (eventTo is null ||
eventTo.IdCategory != eventFrom.IdCategory ||
eventTo.MessageTemplate != eventFrom.MessageTemplate)
{
messagesFrom
.Where(m => m.IdEvent == eventFrom.IdEvent)
.ToList()
.ForEach(m => m.IdEvent = eventsToNextId);
eventFrom.IdEvent = eventsToNextId;
eventFrom.IdTelemetry = to;
eventsToNextId++;
}
}
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user DISABLE TRIGGER ALL;", token).ConfigureAwait(false);
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event DISABLE TRIGGER ALL;", token).ConfigureAwait(false);
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message DISABLE TRIGGER ALL;", token).ConfigureAwait(false);
var affected = await db.SaveChangesAsync(token).ConfigureAwait(false);
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user ENABLE TRIGGER ALL;", token).ConfigureAwait(false);
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event ENABLE TRIGGER ALL;", token).ConfigureAwait(false);
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message ENABLE TRIGGER ALL;", token).ConfigureAwait(false);
db.TelemetryUsers.RemoveRange(usersFromQuery);
db.TelemetryEvents.RemoveRange(eventsFromQuery);
affected += await db.SaveChangesAsync(token).ConfigureAwait(false);
return affected;
}
private async Task<int> MergeDataAsync<TEntity>(int from, int to, CancellationToken token)
where TEntity : class, AsbCloudDb.Model.ITelemetryData
{
const string IdTelemetryColumnName = "\"id_telemetry\"";
var dbSet = db.Set<TEntity>();
var tableName = dbSet.GetTableName();
var columns = dbSet.GetColumnsNames().ToList();
var index = columns.FindIndex(c => c.ToLower() == IdTelemetryColumnName.ToLower());
if (index < 0)
return -5;
columns[index] = $"{to} as {IdTelemetryColumnName}";
var columnsString = string.Join(',', columns);
var sql = $"INSERT INTO {tableName} " +
$"select {columnsString} " +
$"from {tableName} " +
$"where {IdTelemetryColumnName} = {from};" +
$"delete from {tableName} where {IdTelemetryColumnName} = {from};";
var affected = await db.Database.ExecuteSqlRawAsync(sql, token)
.ConfigureAwait(false);
return affected;
}
}