From e0834e4720671adba2cf028da16d6de9b4722667 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A4=D1=80=D0=BE=D0=BB=D0=BE=D0=B2?= Date: Fri, 17 Dec 2021 12:48:58 +0500 Subject: [PATCH] add telemetry merge --- AsbCloudApp/Data/TimeZoneInfo.cs | 4 - AsbCloudApp/Services/ITelemetryService.cs | 22 +- AsbCloudDb/EFExtentions.cs | 28 +- .../Services/TelemetryService.cs | 301 ++++++++++-------- .../Controllers/AdminTelemetryController.cs | 12 +- ConsoleApp1/DbDemoDataService.cs | 2 +- 6 files changed, 218 insertions(+), 151 deletions(-) delete mode 100644 AsbCloudApp/Data/TimeZoneInfo.cs diff --git a/AsbCloudApp/Data/TimeZoneInfo.cs b/AsbCloudApp/Data/TimeZoneInfo.cs deleted file mode 100644 index d8763ecb..00000000 --- a/AsbCloudApp/Data/TimeZoneInfo.cs +++ /dev/null @@ -1,4 +0,0 @@ -namespace AsbCloudApp.Data -{ - -} \ No newline at end of file diff --git a/AsbCloudApp/Services/ITelemetryService.cs b/AsbCloudApp/Services/ITelemetryService.cs index 3402e1ca..842d78b3 100644 --- a/AsbCloudApp/Services/ITelemetryService.cs +++ b/AsbCloudApp/Services/ITelemetryService.cs @@ -14,19 +14,27 @@ namespace AsbCloudApp.Services int? GetIdWellByTelemetryUid(string uid); int GetOrCreateTelemetryIdByUid(string uid); double GetTimezoneOffsetByTelemetryId(int idTelemetry); - Task UpdateInfoAsync(string uid, TelemetryInfoDto info, CancellationToken token); Task GetTelemetryTimeZoneOffsetAsync(int idTelemetry, CancellationToken token); + IEnumerable<(string Key, int[] Ids)> GetDuplicateRemoteUids(); + IEnumerable GetTransmittingTelemetries(); + DateTime GetLastTelemetryDate(string telemetryUid); + DateTime GetLastTelemetryDate(int telemetryId); + int? GetIdTelemetryByIdWell(int idWell); + Task UpdateInfoAsync(string uid, TelemetryInfoDto info, CancellationToken token); Task FixDatesRangeByTimeZoneAsync(int telemetryId, DatesRangeDto result, CancellationToken token); Task UpdateTimeZoneAsync(string uid, TelemetryTimeZoneDto telemetryTimeZoneInfo, CancellationToken token); - int? GetIdTelemetryByIdWell(int idWell); - int Merge(IEnumerable telemetryIds); - IEnumerable<(string Key, int[] Ids)> GetRedundantRemoteUids(); - IEnumerable GetTransmittingTelemetries(); - DateTime GetLastTelemetryDate(string telemetryUid); - DateTime GetLastTelemetryDate(int telemetryId); + + /// + /// Слить данные телеметрии в одну + /// + /// старая (исходная) + /// новая + /// + Task MergeAsync(int from, int to, CancellationToken token); + void SaveRequestDate(string uid, DateTime remoteDate); } } \ No newline at end of file diff --git a/AsbCloudDb/EFExtentions.cs b/AsbCloudDb/EFExtentions.cs index 3162eb0d..9a0b88c3 100644 --- a/AsbCloudDb/EFExtentions.cs +++ b/AsbCloudDb/EFExtentions.cs @@ -35,6 +35,20 @@ namespace AsbCloudDb return database.ExecuteSqlRawAsync(query, token); } + + public static string GetTableName(this DbSet dbSet) + where T : class + { + var factory = GetQueryStringFactory(dbSet); + return factory.TableName; + } + + public static IEnumerable GetColumnsNames(this DbSet dbSet) + where T : class + { + var factory = GetQueryStringFactory(dbSet); + return factory.Columns; + } } interface IQueryStringFactory{} @@ -43,24 +57,26 @@ namespace AsbCloudDb where T : class { private readonly string insertHeader; - private readonly string pk; private readonly string conflictBody; private readonly IEnumerable getters; + public string TableName { get; } + public IEnumerable Columns { get; } + public QueryStringFactory(DbSet dbset) { var properties = dbset.EntityType.GetProperties(); var pkColsNames = dbset.EntityType.FindPrimaryKey()?.Properties.Select(p => p.GetColumnBaseName()); pk = pkColsNames is null ? string.Empty : $"({string.Join(", ", pkColsNames)})"; - var tableName = dbset.EntityType.GetTableName(); + TableName = dbset.EntityType.GetTableName(); getters = properties.Select(p => p.GetGetter()); - var colNames = properties.Select(p => $"\"{p.GetColumnBaseName()}\""); - var colunmsString = $"({string.Join(", ", colNames)})"; + Columns = properties.Select(p => $"\"{p.GetColumnBaseName()}\""); + var colunmsString = $"({string.Join(", ", Columns)})"; - insertHeader = $"INSERT INTO {tableName} {colunmsString} VALUES "; - var excludedUpdateSet = string.Join(", ", colNames.Select(n => $"{n} = excluded.{n}")); + insertHeader = $"INSERT INTO {TableName} {colunmsString} VALUES "; + var excludedUpdateSet = string.Join(", ", Columns.Select(n => $"{n} = excluded.{n}")); conflictBody = $" ON CONFLICT {pk} DO UPDATE SET {excludedUpdateSet};"; } diff --git a/AsbCloudInfrastructure/Services/TelemetryService.cs b/AsbCloudInfrastructure/Services/TelemetryService.cs index 15a646ca..a5b79598 100644 --- a/AsbCloudInfrastructure/Services/TelemetryService.cs +++ b/AsbCloudInfrastructure/Services/TelemetryService.cs @@ -9,6 +9,7 @@ using System.Threading.Tasks; using System; using System.Threading; using Microsoft.EntityFrameworkCore; +using AsbCloudDb; namespace AsbCloudInfrastructure.Services { @@ -215,7 +216,7 @@ namespace AsbCloudInfrastructure.Services private Telemetry GetOrCreateTelemetryByUid(string uid) => cacheTelemetry.GetOrCreate(t => t.RemoteUid == uid, () => new Telemetry { RemoteUid = uid }); - public IEnumerable<(string Key, int[] Ids)> GetRedundantRemoteUids() + public IEnumerable<(string Key, int[] Ids)> GetDuplicateRemoteUids() { return db.Telemetries .ToList() @@ -224,148 +225,192 @@ namespace AsbCloudInfrastructure.Services .Select(g => (g.Key, g.Select(t=>t.Id).ToArray())); } - public int Merge(IEnumerable telemetryIds) + + public async Task MergeAsync(int from, int to, CancellationToken token) { - if (telemetryIds.Count() < 2) - throw new ArgumentException($"telemetryIds {telemetryIds} < 2. nothing to merge.", nameof(telemetryIds)); + if (from == to) + return -2; - // найти телеметрию с наиболее полными справочниками и принять её за основную - // отделить основную от остальных + var stopwath = new System.Diagnostics.Stopwatch(); + stopwath.Start(); - // Оценка трудоебкости - var telemetriesGrade = db.Telemetries - .Include(t => t.Messages) - .Include(t => t.DataSaub) - .Include(t => t.DataSpin) - .Include(t => t.Well) - .Where(t => telemetryIds.Contains(t.Id)) - .Select(t => new { - t.Id, - t.RemoteUid, - t.Info, - IdWell = t.Well != null ? t.Well.Id : int.MinValue, - Records = t.Messages.Count + t.DataSaub.Count + t.DataSpin.Count, - EventsAny = t.Events.Any(), - UsersAny = t.Users.Any(), - }) - .OrderByDescending(t=>t.Records) - .ToList(); - - var telemetryDestId = telemetriesGrade.FirstOrDefault().Id; - if (telemetryDestId == default) - return 0; - - var telemetriesSrcIds = telemetryIds.Where(t => t != telemetryDestId).ToList(); - if(!telemetriesSrcIds.Any()) - return 0; - - var telemetriesSrcIdsSql = $"({string.Join(',', telemetriesSrcIds)})"; - - var (RemoteUid, Info) = telemetriesGrade - .Where(t => t.Info != null) - .OrderByDescending(t => t.Id) - .Select(t => (t.RemoteUid, t.Info)) - .FirstOrDefault(); - - var wellId = telemetriesGrade - .Where(t => t.IdWell > 0) - .OrderByDescending(t => t.Id) - .Select(t => t.IdWell) - .FirstOrDefault(); - - // начало изменений - Console.WriteLine($"Start merge telemetries ids: [{string.Join(',', telemetriesSrcIds)}] to {telemetryDestId}"); - var sw = new System.Diagnostics.Stopwatch(); - sw.Start(); - var transaction = db.Database.BeginTransaction(); - int rows = 0; + var transaction = await db.Database.BeginTransactionAsync(token).ConfigureAwait(false); try { - var telemetryDst = db.Telemetries.FirstOrDefault(t => t.Id == telemetryDestId); - telemetryDst.RemoteUid = RemoteUid; - telemetryDst.Info = Info; + var affected = 0; + + var wellFrom = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == from, token) + .ConfigureAwait(false); + var wellTo = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == to, token) + .ConfigureAwait(false); - if (wellId != default) + if (wellTo is not null && wellFrom is not null) + return -2; + + if (wellTo is null && wellFrom is not null) { - var well = db.Wells.FirstOrDefault(w => w.Id == wellId); - well.IdTelemetry = telemetryDestId; + wellFrom.IdTelemetry = to; + affected += await db.SaveChangesAsync(token); } - // events merge - var telemetryDstEventsIds = db.TelemetryEvents.Where(t => t.IdTelemetry == telemetryDestId).Select(t => t.IdEvent).ToList(); - var telemetrySrcEvents = db.TelemetryEvents - .Where(t => telemetriesSrcIds.Contains(t.IdTelemetry) && !telemetryDstEventsIds.Contains(t.IdEvent)) - .Select(t => new TelemetryEvent - { - IdTelemetry = telemetryDestId, - IdEvent = t.IdEvent, - IdCategory = t.IdCategory, - MessageTemplate = t.MessageTemplate, - }) - .ToList(); - var telemetryEventNewUniq = new Dictionary(); - foreach (var telemetryEvent in telemetrySrcEvents) - telemetryEventNewUniq[telemetryEvent.IdEvent] = telemetryEvent; - if (telemetrySrcEvents.Any()) - db.TelemetryEvents.AddRange(telemetryEventNewUniq.Values); + affected += await MergeEventsAndMessagesAndUsersAsync(from, to, token); + affected += await MergeDataAsync(from, to, token); + affected += await MergeDataAsync(from, to, token); + + affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry_analysis WHERE id_telemetry = {from} OR id_telemetry = {to};", token) + .ConfigureAwait(false); + affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry WHERE id = {from};", token) + .ConfigureAwait(false); + await transaction.CommitAsync(token).ConfigureAwait(false); - // users merge - var telemetryDstUsersIds = db.TelemetryUsers.Where(t => t.IdTelemetry == telemetryDestId).Select(t => t.IdUser).ToList(); - var telemetrySrcUsers = db.TelemetryUsers - .Where(t => telemetriesSrcIds.Contains(t.IdTelemetry) && !telemetryDstUsersIds.Contains(t.IdUser)) - .Select(t => new TelemetryUser - { - IdTelemetry = telemetryDestId, - IdUser = t.IdUser, - Level = t.Level, - Name = t.Name, - Patronymic = t.Patronymic, - Surname = t.Surname, - }).ToList(); - var telemetryUserNewUniq = new Dictionary(); - foreach (var telemetryUser in telemetrySrcUsers) - telemetryUserNewUniq[telemetryUser.IdUser] = telemetryUser; - if (telemetrySrcUsers.Any()) - db.TelemetryUsers.AddRange(telemetryUserNewUniq.Values); - - db.SaveChanges(); - - db.Database.SetCommandTimeout(3_000); // 5 мин - - db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_saub DISABLE TRIGGER ALL;"); - rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_data_saub SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};"); - db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_saub ENABLE TRIGGER ALL;"); - - db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_spin DISABLE TRIGGER ALL;"); - rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_data_spin SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};"); - db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_spin ENABLE TRIGGER ALL;"); - - db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_analysis DISABLE TRIGGER ALL;"); - rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_analysis SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};"); - db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_analysis ENABLE TRIGGER ALL;"); - - db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_message DISABLE TRIGGER ALL;"); - rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_message SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};"); - db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_message ENABLE TRIGGER ALL;"); - - rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry_event WHERE id_telemetry IN {telemetriesSrcIdsSql};"); - rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry_user WHERE id_telemetry IN {telemetriesSrcIdsSql};"); - rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry WHERE id IN {telemetriesSrcIdsSql};"); - - transaction.Commit(); - sw.Stop(); - Console.WriteLine($"Successfully commited in {1d*sw.ElapsedMilliseconds/1000d: #0.00} sec. Affected {rows} rows."); + stopwath.Stop(); + Console.WriteLine($"Successfully commited in {1d * stopwath.ElapsedMilliseconds / 1000d: #0.00} sec. Affected {affected} rows."); + return affected; } catch(Exception ex) { - Console.WriteLine($"Fail. Rollback. Reason is:{ex.Message}"); - transaction.Rollback(); - return 0; + System.Diagnostics.Trace.WriteLine($"Merge() Fail. Rollback. Reason is:{ex.Message}"); + await transaction.RollbackAsync(token).ConfigureAwait(false); + return -1; } - - return rows; } - + private async Task MergeEventsAndMessagesAndUsersAsync(int from, int to, CancellationToken token) + { + var messagesFrom = await db.TelemetryMessages + .Where(d => d.IdTelemetry == from) + .ToListAsync(token) + .ConfigureAwait(false); + + var usersFromQuery = db.TelemetryUsers + .Where(d => d.IdTelemetry == from); + var usersFrom = await usersFromQuery + .ToListAsync(token) + .ConfigureAwait(false); + + var usersTo = await db.TelemetryUsers + .Where(d => d.IdTelemetry == to) + .AsNoTracking() + .ToListAsync(token) + .ConfigureAwait(false); + + var usersToNextId = usersTo.Max(u => u.IdUser) + 100; + + messagesFrom.ForEach(m => m.IdTelemetry = to); + + foreach (var userFrom in usersFrom) + { + var userTo = usersTo + .FirstOrDefault(u=>u.IdUser == userFrom.IdUser); + + if (userTo is null || + userTo.Name != userFrom.Name || + userTo.Surname != userFrom.Surname || + userTo.Patronymic != userFrom.Patronymic) + { + messagesFrom + .Where(m => m.IdTelemetryUser == userFrom.IdUser) + .ToList() + .ForEach(m => m.IdTelemetryUser = usersToNextId); + userFrom.IdUser = usersToNextId; + userFrom.IdTelemetry = to; + usersToNextId++; + } + } + + var eventsFromQuery = db.TelemetryEvents + .Where(d => d.IdTelemetry == from); + var eventsFrom = await eventsFromQuery + .ToListAsync(token) + .ConfigureAwait(false); + + var eventsTo = await db.TelemetryEvents + .Where(d => d.IdTelemetry == to) + .AsNoTracking() + .ToListAsync(token) + .ConfigureAwait(false); + + var eventsToNextId = eventsTo.Max(e => e.IdEvent) + 1; + + foreach (var eventFrom in eventsFrom) + { + var eventTo = eventsTo + .FirstOrDefault(e => e.IdEvent == eventFrom.IdEvent); + + if (eventTo is null || + eventTo.IdCategory != eventFrom.IdCategory || + eventTo.MessageTemplate != eventFrom.MessageTemplate) + { + messagesFrom + .Where(m => m.IdEvent == eventFrom.IdEvent) + .ToList() + .ForEach(m => m.IdEvent = eventsToNextId); + eventFrom.IdEvent = eventsToNextId; + eventFrom.IdTelemetry = to; + eventsToNextId++; + } + } + + await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user DISABLE TRIGGER ALL;", token).ConfigureAwait(false); + await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event DISABLE TRIGGER ALL;", token).ConfigureAwait(false); + await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message DISABLE TRIGGER ALL;", token).ConfigureAwait(false); + var affected = await db.SaveChangesAsync(token).ConfigureAwait(false); + await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user ENABLE TRIGGER ALL;", token).ConfigureAwait(false); + await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event ENABLE TRIGGER ALL;", token).ConfigureAwait(false); + await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message ENABLE TRIGGER ALL;", token).ConfigureAwait(false); + db.TelemetryUsers.RemoveRange(usersFromQuery); + db.TelemetryEvents.RemoveRange(eventsFromQuery); + affected += await db.SaveChangesAsync(token).ConfigureAwait(false); + return affected; + } + + private async Task MergeDataAsync(int from, int to, CancellationToken token) + where TEntity: class, AsbCloudDb.Model.ITelemetryData + { + const string IdTelemetryColumnName = "\"id_telemetry\""; + var dbSet = db.Set(); + var tableName = dbSet.GetTableName(); + var columns = dbSet.GetColumnsNames().ToList(); + var index = columns.FindIndex(c => c.ToLower() == IdTelemetryColumnName.ToLower()); + if (index < 0) + return -5; + columns[index] = $"{to} as {IdTelemetryColumnName}"; + var columnsString = string.Join(',', columns); + var sql = $"INSERT INTO {tableName} " + + $"select {columnsString} " + + $"from {tableName} " + + $"where {IdTelemetryColumnName} = {from};" + + $"delete from {tableName} where {IdTelemetryColumnName} = {from};"; + + var affected = await db.Database.ExecuteSqlRawAsync(sql, token) + .ConfigureAwait(false); + + return affected; + } + + //todo: delete this + private async Task MergeDataAsync_old(int from, int to, CancellationToken token) + where TEntity : class, AsbCloudDb.Model.ITelemetryData + { + var dbSet = db.Set(); + var entitiesFromQuery = dbSet.Where(d => d.IdTelemetry == from); + var entitiesFrom = await entitiesFromQuery + .AsNoTracking() + .ToListAsync(token) + .ConfigureAwait(false); + + dbSet.RemoveRange(entitiesFromQuery); + + var affected = await db.SaveChangesAsync(token) + .ConfigureAwait(false); + + entitiesFrom.ForEach(d => d.IdTelemetry = to); + dbSet.AddRange(entitiesFrom); + + affected += await db.SaveChangesAsync(token) + .ConfigureAwait(false); + + return affected; + } } } diff --git a/AsbCloudWebApi/Controllers/AdminTelemetryController.cs b/AsbCloudWebApi/Controllers/AdminTelemetryController.cs index fc2386b4..aaa8ce7b 100644 --- a/AsbCloudWebApi/Controllers/AdminTelemetryController.cs +++ b/AsbCloudWebApi/Controllers/AdminTelemetryController.cs @@ -4,6 +4,8 @@ using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using System.Collections.Generic; using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace AsbCloudWebApi.Controllers { @@ -27,20 +29,20 @@ namespace AsbCloudWebApi.Controllers [Route("/reduntentUids")] public IActionResult GetRedundentRemoteUids() { - var result = telemetryService.GetRedundantRemoteUids().Select(i => new { i.Key, ids = i.Ids }); + var result = telemetryService.GetDuplicateRemoteUids().Select(i => new { i.Key, ids = i.Ids }); return Ok(result); } /// /// merge telemetries /// - /// array of ids /// [HttpPost] - [Route("/merge")] - public IActionResult MergeTelemetries([FromBody] List telemetriesIds) + [Route("/merge/{idFrom}/{idTo}")] + public async Task MergeTelemetriesAsync(int idFrom, int idTo, CancellationToken token = default) { - var count = telemetryService.Merge(telemetriesIds); + var count = await telemetryService.MergeAsync(idFrom, idTo, token) + .ConfigureAwait(false); return Ok(count); } } diff --git a/ConsoleApp1/DbDemoDataService.cs b/ConsoleApp1/DbDemoDataService.cs index 46fb1a5d..fa15e292 100644 --- a/ConsoleApp1/DbDemoDataService.cs +++ b/ConsoleApp1/DbDemoDataService.cs @@ -48,7 +48,7 @@ namespace ConsoleApp1 Deposit = "1", Customer = "1", HmiVersion = "1", - PlcVersion = "1", + //PlcVersion = "1", TimeZoneId = "1", DrillingStartDate = DateTime.Parse("2021-06-29T12:01:19.000000"), TimeZoneOffsetTotalHours = 5.0