add telemetry merge

This commit is contained in:
Фролов 2021-12-17 12:48:58 +05:00
parent e05bdd2993
commit e0834e4720
6 changed files with 218 additions and 151 deletions

View File

@ -1,4 +0,0 @@
namespace AsbCloudApp.Data
{
}

View File

@ -14,19 +14,27 @@ namespace AsbCloudApp.Services
int? GetIdWellByTelemetryUid(string uid);
int GetOrCreateTelemetryIdByUid(string uid);
double GetTimezoneOffsetByTelemetryId(int idTelemetry);
Task UpdateInfoAsync(string uid, TelemetryInfoDto info, CancellationToken token);
Task<double?> GetTelemetryTimeZoneOffsetAsync(int idTelemetry, CancellationToken token);
IEnumerable<(string Key, int[] Ids)> GetDuplicateRemoteUids();
IEnumerable<TelemetryDto> GetTransmittingTelemetries();
DateTime GetLastTelemetryDate(string telemetryUid);
DateTime GetLastTelemetryDate(int telemetryId);
int? GetIdTelemetryByIdWell(int idWell);
Task UpdateInfoAsync(string uid, TelemetryInfoDto info, CancellationToken token);
Task<DatesRangeDto> FixDatesRangeByTimeZoneAsync(int telemetryId, DatesRangeDto result,
CancellationToken token);
Task UpdateTimeZoneAsync(string uid, TelemetryTimeZoneDto telemetryTimeZoneInfo, CancellationToken token);
int? GetIdTelemetryByIdWell(int idWell);
int Merge(IEnumerable<int> telemetryIds);
IEnumerable<(string Key, int[] Ids)> GetRedundantRemoteUids();
IEnumerable<TelemetryDto> GetTransmittingTelemetries();
DateTime GetLastTelemetryDate(string telemetryUid);
DateTime GetLastTelemetryDate(int telemetryId);
/// <summary>
/// Слить данные телеметрии в одну
/// </summary>
/// <param name="from">старая (исходная)</param>
/// <param name="to">новая</param>
/// <returns></returns>
Task<int> MergeAsync(int from, int to, CancellationToken token);
void SaveRequestDate(string uid, DateTime remoteDate);
}
}

View File

@ -35,6 +35,20 @@ namespace AsbCloudDb
return database.ExecuteSqlRawAsync(query, token);
}
public static string GetTableName<T>(this DbSet<T> dbSet)
where T : class
{
var factory = GetQueryStringFactory(dbSet);
return factory.TableName;
}
public static IEnumerable<string> GetColumnsNames<T>(this DbSet<T> dbSet)
where T : class
{
var factory = GetQueryStringFactory(dbSet);
return factory.Columns;
}
}
interface IQueryStringFactory{}
@ -43,24 +57,26 @@ namespace AsbCloudDb
where T : class
{
private readonly string insertHeader;
private readonly string pk;
private readonly string conflictBody;
private readonly IEnumerable<IClrPropertyGetter> getters;
public string TableName { get; }
public IEnumerable<string> Columns { get; }
public QueryStringFactory(DbSet<T> dbset)
{
var properties = dbset.EntityType.GetProperties();
var pkColsNames = dbset.EntityType.FindPrimaryKey()?.Properties.Select(p => p.GetColumnBaseName());
pk = pkColsNames is null ? string.Empty : $"({string.Join(", ", pkColsNames)})";
var tableName = dbset.EntityType.GetTableName();
TableName = dbset.EntityType.GetTableName();
getters = properties.Select(p => p.GetGetter());
var colNames = properties.Select(p => $"\"{p.GetColumnBaseName()}\"");
var colunmsString = $"({string.Join(", ", colNames)})";
Columns = properties.Select(p => $"\"{p.GetColumnBaseName()}\"");
var colunmsString = $"({string.Join(", ", Columns)})";
insertHeader = $"INSERT INTO {tableName} {colunmsString} VALUES ";
var excludedUpdateSet = string.Join(", ", colNames.Select(n => $"{n} = excluded.{n}"));
insertHeader = $"INSERT INTO {TableName} {colunmsString} VALUES ";
var excludedUpdateSet = string.Join(", ", Columns.Select(n => $"{n} = excluded.{n}"));
conflictBody = $" ON CONFLICT {pk} DO UPDATE SET {excludedUpdateSet};";
}

View File

@ -9,6 +9,7 @@ using System.Threading.Tasks;
using System;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using AsbCloudDb;
namespace AsbCloudInfrastructure.Services
{
@ -215,7 +216,7 @@ namespace AsbCloudInfrastructure.Services
private Telemetry GetOrCreateTelemetryByUid(string uid)
=> cacheTelemetry.GetOrCreate(t => t.RemoteUid == uid, () => new Telemetry { RemoteUid = uid });
public IEnumerable<(string Key, int[] Ids)> GetRedundantRemoteUids()
public IEnumerable<(string Key, int[] Ids)> GetDuplicateRemoteUids()
{
return db.Telemetries
.ToList()
@ -224,148 +225,192 @@ namespace AsbCloudInfrastructure.Services
.Select(g => (g.Key, g.Select(t=>t.Id).ToArray()));
}
public int Merge(IEnumerable<int> telemetryIds)
public async Task<int> MergeAsync(int from, int to, CancellationToken token)
{
if (telemetryIds.Count() < 2)
throw new ArgumentException($"telemetryIds {telemetryIds} < 2. nothing to merge.", nameof(telemetryIds));
if (from == to)
return -2;
// найти телеметрию с наиболее полными справочниками и принять её за основную
// отделить основную от остальных
var stopwath = new System.Diagnostics.Stopwatch();
stopwath.Start();
// Оценка трудоебкости
var telemetriesGrade = db.Telemetries
.Include(t => t.Messages)
.Include(t => t.DataSaub)
.Include(t => t.DataSpin)
.Include(t => t.Well)
.Where(t => telemetryIds.Contains(t.Id))
.Select(t => new {
t.Id,
t.RemoteUid,
t.Info,
IdWell = t.Well != null ? t.Well.Id : int.MinValue,
Records = t.Messages.Count + t.DataSaub.Count + t.DataSpin.Count,
EventsAny = t.Events.Any(),
UsersAny = t.Users.Any(),
})
.OrderByDescending(t=>t.Records)
.ToList();
var telemetryDestId = telemetriesGrade.FirstOrDefault().Id;
if (telemetryDestId == default)
return 0;
var telemetriesSrcIds = telemetryIds.Where(t => t != telemetryDestId).ToList();
if(!telemetriesSrcIds.Any())
return 0;
var telemetriesSrcIdsSql = $"({string.Join(',', telemetriesSrcIds)})";
var (RemoteUid, Info) = telemetriesGrade
.Where(t => t.Info != null)
.OrderByDescending(t => t.Id)
.Select(t => (t.RemoteUid, t.Info))
.FirstOrDefault();
var wellId = telemetriesGrade
.Where(t => t.IdWell > 0)
.OrderByDescending(t => t.Id)
.Select(t => t.IdWell)
.FirstOrDefault();
// начало изменений
Console.WriteLine($"Start merge telemetries ids: [{string.Join(',', telemetriesSrcIds)}] to {telemetryDestId}");
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
var transaction = db.Database.BeginTransaction();
int rows = 0;
var transaction = await db.Database.BeginTransactionAsync(token).ConfigureAwait(false);
try
{
var telemetryDst = db.Telemetries.FirstOrDefault(t => t.Id == telemetryDestId);
telemetryDst.RemoteUid = RemoteUid;
telemetryDst.Info = Info;
var affected = 0;
var wellFrom = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == from, token)
.ConfigureAwait(false);
var wellTo = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == to, token)
.ConfigureAwait(false);
if (wellId != default)
if (wellTo is not null && wellFrom is not null)
return -2;
if (wellTo is null && wellFrom is not null)
{
var well = db.Wells.FirstOrDefault(w => w.Id == wellId);
well.IdTelemetry = telemetryDestId;
wellFrom.IdTelemetry = to;
affected += await db.SaveChangesAsync(token);
}
// events merge
var telemetryDstEventsIds = db.TelemetryEvents.Where(t => t.IdTelemetry == telemetryDestId).Select(t => t.IdEvent).ToList();
var telemetrySrcEvents = db.TelemetryEvents
.Where(t => telemetriesSrcIds.Contains(t.IdTelemetry) && !telemetryDstEventsIds.Contains(t.IdEvent))
.Select(t => new TelemetryEvent
{
IdTelemetry = telemetryDestId,
IdEvent = t.IdEvent,
IdCategory = t.IdCategory,
MessageTemplate = t.MessageTemplate,
})
.ToList();
var telemetryEventNewUniq = new Dictionary<int, TelemetryEvent>();
foreach (var telemetryEvent in telemetrySrcEvents)
telemetryEventNewUniq[telemetryEvent.IdEvent] = telemetryEvent;
if (telemetrySrcEvents.Any())
db.TelemetryEvents.AddRange(telemetryEventNewUniq.Values);
affected += await MergeEventsAndMessagesAndUsersAsync(from, to, token);
affected += await MergeDataAsync<TelemetryDataSaub>(from, to, token);
affected += await MergeDataAsync<TelemetryDataSpin>(from, to, token);
affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry_analysis WHERE id_telemetry = {from} OR id_telemetry = {to};", token)
.ConfigureAwait(false);
affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry WHERE id = {from};", token)
.ConfigureAwait(false);
await transaction.CommitAsync(token).ConfigureAwait(false);
// users merge
var telemetryDstUsersIds = db.TelemetryUsers.Where(t => t.IdTelemetry == telemetryDestId).Select(t => t.IdUser).ToList();
var telemetrySrcUsers = db.TelemetryUsers
.Where(t => telemetriesSrcIds.Contains(t.IdTelemetry) && !telemetryDstUsersIds.Contains(t.IdUser))
.Select(t => new TelemetryUser
{
IdTelemetry = telemetryDestId,
IdUser = t.IdUser,
Level = t.Level,
Name = t.Name,
Patronymic = t.Patronymic,
Surname = t.Surname,
}).ToList();
var telemetryUserNewUniq = new Dictionary<int, TelemetryUser>();
foreach (var telemetryUser in telemetrySrcUsers)
telemetryUserNewUniq[telemetryUser.IdUser] = telemetryUser;
if (telemetrySrcUsers.Any())
db.TelemetryUsers.AddRange(telemetryUserNewUniq.Values);
db.SaveChanges();
db.Database.SetCommandTimeout(3_000); // 5 мин
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_saub DISABLE TRIGGER ALL;");
rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_data_saub SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_saub ENABLE TRIGGER ALL;");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_spin DISABLE TRIGGER ALL;");
rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_data_spin SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_spin ENABLE TRIGGER ALL;");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_analysis DISABLE TRIGGER ALL;");
rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_analysis SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_analysis ENABLE TRIGGER ALL;");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_message DISABLE TRIGGER ALL;");
rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_message SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_message ENABLE TRIGGER ALL;");
rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry_event WHERE id_telemetry IN {telemetriesSrcIdsSql};");
rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry_user WHERE id_telemetry IN {telemetriesSrcIdsSql};");
rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry WHERE id IN {telemetriesSrcIdsSql};");
transaction.Commit();
sw.Stop();
Console.WriteLine($"Successfully commited in {1d*sw.ElapsedMilliseconds/1000d: #0.00} sec. Affected {rows} rows.");
stopwath.Stop();
Console.WriteLine($"Successfully commited in {1d * stopwath.ElapsedMilliseconds / 1000d: #0.00} sec. Affected {affected} rows.");
return affected;
}
catch(Exception ex)
{
Console.WriteLine($"Fail. Rollback. Reason is:{ex.Message}");
transaction.Rollback();
return 0;
System.Diagnostics.Trace.WriteLine($"Merge() Fail. Rollback. Reason is:{ex.Message}");
await transaction.RollbackAsync(token).ConfigureAwait(false);
return -1;
}
return rows;
}
private async Task<int> MergeEventsAndMessagesAndUsersAsync(int from, int to, CancellationToken token)
{
var messagesFrom = await db.TelemetryMessages
.Where(d => d.IdTelemetry == from)
.ToListAsync(token)
.ConfigureAwait(false);
var usersFromQuery = db.TelemetryUsers
.Where(d => d.IdTelemetry == from);
var usersFrom = await usersFromQuery
.ToListAsync(token)
.ConfigureAwait(false);
var usersTo = await db.TelemetryUsers
.Where(d => d.IdTelemetry == to)
.AsNoTracking()
.ToListAsync(token)
.ConfigureAwait(false);
var usersToNextId = usersTo.Max(u => u.IdUser) + 100;
messagesFrom.ForEach(m => m.IdTelemetry = to);
foreach (var userFrom in usersFrom)
{
var userTo = usersTo
.FirstOrDefault(u=>u.IdUser == userFrom.IdUser);
if (userTo is null ||
userTo.Name != userFrom.Name ||
userTo.Surname != userFrom.Surname ||
userTo.Patronymic != userFrom.Patronymic)
{
messagesFrom
.Where(m => m.IdTelemetryUser == userFrom.IdUser)
.ToList()
.ForEach(m => m.IdTelemetryUser = usersToNextId);
userFrom.IdUser = usersToNextId;
userFrom.IdTelemetry = to;
usersToNextId++;
}
}
var eventsFromQuery = db.TelemetryEvents
.Where(d => d.IdTelemetry == from);
var eventsFrom = await eventsFromQuery
.ToListAsync(token)
.ConfigureAwait(false);
var eventsTo = await db.TelemetryEvents
.Where(d => d.IdTelemetry == to)
.AsNoTracking()
.ToListAsync(token)
.ConfigureAwait(false);
var eventsToNextId = eventsTo.Max(e => e.IdEvent) + 1;
foreach (var eventFrom in eventsFrom)
{
var eventTo = eventsTo
.FirstOrDefault(e => e.IdEvent == eventFrom.IdEvent);
if (eventTo is null ||
eventTo.IdCategory != eventFrom.IdCategory ||
eventTo.MessageTemplate != eventFrom.MessageTemplate)
{
messagesFrom
.Where(m => m.IdEvent == eventFrom.IdEvent)
.ToList()
.ForEach(m => m.IdEvent = eventsToNextId);
eventFrom.IdEvent = eventsToNextId;
eventFrom.IdTelemetry = to;
eventsToNextId++;
}
}
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user DISABLE TRIGGER ALL;", token).ConfigureAwait(false);
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event DISABLE TRIGGER ALL;", token).ConfigureAwait(false);
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message DISABLE TRIGGER ALL;", token).ConfigureAwait(false);
var affected = await db.SaveChangesAsync(token).ConfigureAwait(false);
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user ENABLE TRIGGER ALL;", token).ConfigureAwait(false);
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event ENABLE TRIGGER ALL;", token).ConfigureAwait(false);
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message ENABLE TRIGGER ALL;", token).ConfigureAwait(false);
db.TelemetryUsers.RemoveRange(usersFromQuery);
db.TelemetryEvents.RemoveRange(eventsFromQuery);
affected += await db.SaveChangesAsync(token).ConfigureAwait(false);
return affected;
}
private async Task<int> MergeDataAsync<TEntity>(int from, int to, CancellationToken token)
where TEntity: class, AsbCloudDb.Model.ITelemetryData
{
const string IdTelemetryColumnName = "\"id_telemetry\"";
var dbSet = db.Set<TEntity>();
var tableName = dbSet.GetTableName();
var columns = dbSet.GetColumnsNames().ToList();
var index = columns.FindIndex(c => c.ToLower() == IdTelemetryColumnName.ToLower());
if (index < 0)
return -5;
columns[index] = $"{to} as {IdTelemetryColumnName}";
var columnsString = string.Join(',', columns);
var sql = $"INSERT INTO {tableName} " +
$"select {columnsString} " +
$"from {tableName} " +
$"where {IdTelemetryColumnName} = {from};" +
$"delete from {tableName} where {IdTelemetryColumnName} = {from};";
var affected = await db.Database.ExecuteSqlRawAsync(sql, token)
.ConfigureAwait(false);
return affected;
}
//todo: delete this
private async Task<int> MergeDataAsync_old<TEntity>(int from, int to, CancellationToken token)
where TEntity : class, AsbCloudDb.Model.ITelemetryData
{
var dbSet = db.Set<TEntity>();
var entitiesFromQuery = dbSet.Where(d => d.IdTelemetry == from);
var entitiesFrom = await entitiesFromQuery
.AsNoTracking()
.ToListAsync(token)
.ConfigureAwait(false);
dbSet.RemoveRange(entitiesFromQuery);
var affected = await db.SaveChangesAsync(token)
.ConfigureAwait(false);
entitiesFrom.ForEach(d => d.IdTelemetry = to);
dbSet.AddRange(entitiesFrom);
affected += await db.SaveChangesAsync(token)
.ConfigureAwait(false);
return affected;
}
}
}

View File

@ -4,6 +4,8 @@ using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudWebApi.Controllers
{
@ -27,20 +29,20 @@ namespace AsbCloudWebApi.Controllers
[Route("/reduntentUids")]
public IActionResult GetRedundentRemoteUids()
{
var result = telemetryService.GetRedundantRemoteUids().Select(i => new { i.Key, ids = i.Ids });
var result = telemetryService.GetDuplicateRemoteUids().Select(i => new { i.Key, ids = i.Ids });
return Ok(result);
}
/// <summary>
/// merge telemetries
/// </summary>
/// <param name="telemetriesIds">array of ids</param>
/// <returns></returns>
[HttpPost]
[Route("/merge")]
public IActionResult MergeTelemetries([FromBody] List<int> telemetriesIds)
[Route("/merge/{idFrom}/{idTo}")]
public async Task<IActionResult> MergeTelemetriesAsync(int idFrom, int idTo, CancellationToken token = default)
{
var count = telemetryService.Merge(telemetriesIds);
var count = await telemetryService.MergeAsync(idFrom, idTo, token)
.ConfigureAwait(false);
return Ok(count);
}
}

View File

@ -48,7 +48,7 @@ namespace ConsoleApp1
Deposit = "1",
Customer = "1",
HmiVersion = "1",
PlcVersion = "1",
//PlcVersion = "1",
TimeZoneId = "1",
DrillingStartDate = DateTime.Parse("2021-06-29T12:01:19.000000"),
TimeZoneOffsetTotalHours = 5.0