using AsbCloudApp.Data; using AsbCloudApp.Services; using AsbCloudDb.Model; using AsbCloudInfrastructure.Services.Cache; using Mapster; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using System; using System.Threading; using Microsoft.EntityFrameworkCore; namespace AsbCloudInfrastructure.Services { public class TelemetryService : ITelemetryService { private readonly CacheTable cacheTelemetry; private readonly CacheTable cacheWells; private readonly CacheTable cacheClusters; private readonly CacheTable cacheDeposits; private readonly IAsbCloudDbContext db; private readonly ITelemetryTracker telemetryTracker; private readonly ITimeZoneService timeZoneService; public ITimeZoneService TimeZoneService => timeZoneService; public ITelemetryTracker TelemetryTracker => telemetryTracker; public TelemetryService( IAsbCloudDbContext db, ITelemetryTracker telemetryTracker, ITimeZoneService timeZoneService, CacheDb cacheDb) { cacheTelemetry = cacheDb.GetCachedTable((AsbCloudDbContext)db); cacheWells = cacheDb.GetCachedTable((AsbCloudDbContext)db); cacheClusters = cacheDb.GetCachedTable((AsbCloudDbContext)db); cacheDeposits = cacheDb.GetCachedTable((AsbCloudDbContext)db); this.db = db; this.telemetryTracker = telemetryTracker; this.timeZoneService = timeZoneService; } public IEnumerable GetTransmittingTelemetries() { var telemetryDtos = new List(); var activeTelemetriesUids = telemetryTracker.GetTransmittingTelemetriesUids(); if (!activeTelemetriesUids.Any()) return telemetryDtos; var telemetries = cacheTelemetry .Where(t => activeTelemetriesUids.Contains(t.RemoteUid)); telemetryDtos = telemetries.Adapt().ToList(); return telemetryDtos; } public void SaveRequestDate(string uid, DateTime remoteDate) => telemetryTracker.SaveRequestDate(uid, remoteDate); public DateTime GetLastTelemetryDate(string telemetryUid) => telemetryTracker.GetLastTelemetryDateByUid(telemetryUid); public DateTime GetLastTelemetryDate(int telemetryId) { var lastTelemetryDate = DateTime.MinValue; var telemetry = cacheTelemetry.FirstOrDefault(t => t.Id == telemetryId); if (telemetry is null) return lastTelemetryDate; var uid = telemetry.RemoteUid; lastTelemetryDate = GetLastTelemetryDate(uid); return lastTelemetryDate; } public int GetOrCreateTelemetryIdByUid(string uid) => GetOrCreateTelemetryByUid(uid).Id; public int? GetIdWellByTelemetryUid(string uid) => GetWellByTelemetryUid(uid)?.Id; public double GetTimezoneOffsetByTelemetryId(int idTelemetry) => cacheTelemetry.FirstOrDefault(t => t.Id == idTelemetry).Info?.TimeZoneOffsetTotalHours ?? 0d; public async Task UpdateInfoAsync(string uid, TelemetryInfoDto info, CancellationToken token) { var telemetry = GetOrCreateTelemetryByUid(uid); telemetry.Info = info.Adapt(); if (!string.IsNullOrEmpty(info.TimeZoneId) && telemetry.TelemetryTimeZone?.IsOverride != true) telemetry.TelemetryTimeZone = new TelemetryTimeZone() { Hours = info.TimeZoneOffsetTotalHours, TimeZoneId = info.TimeZoneId }; await cacheTelemetry.UpsertAsync(telemetry, token) .ConfigureAwait(false); } public async Task GetTelemetryTimeZoneOffsetAsync(int idTelemetry, CancellationToken token) { var telemetry = await cacheTelemetry.FirstOrDefaultAsync(t => t.Id == idTelemetry, token); if (!string.IsNullOrEmpty(telemetry.TelemetryTimeZone?.TimeZoneId)) return telemetry.TelemetryTimeZone.Hours; if (!string.IsNullOrEmpty(telemetry.Info?.TimeZoneId)) { telemetry.TelemetryTimeZone = new TelemetryTimeZone { Hours = telemetry.Info.TimeZoneOffsetTotalHours, IsOverride = false, TimeZoneId = telemetry.Info.TimeZoneId, }; } else { var well = await cacheWells.FirstOrDefaultAsync(t => t.IdTelemetry == telemetry.Id, token) .ConfigureAwait(false); if (well is null) return null; var coordinates = await GetWellCoordinatesAsync(well.Id, token); if (coordinates is null) return null; var requestedTimeZone = await timeZoneService.GetByCoordinatesAsync(coordinates.Value.latitude, coordinates.Value.longitude, token) .ConfigureAwait(false); if (requestedTimeZone is null) return null; telemetry.TelemetryTimeZone = requestedTimeZone.Adapt(); } await cacheTelemetry.UpsertAsync(telemetry, token).ConfigureAwait(false); return telemetry.TelemetryTimeZone.Hours; } public async Task FixDatesRangeByTimeZoneAsync(int idTelemetry, DatesRangeDto range, CancellationToken token) { var offset = await GetTelemetryTimeZoneOffsetAsync(idTelemetry, token); if (offset is null) return range; return new DatesRangeDto() { From = timeZoneService.DateToTimeZone(range.From, offset ?? default), To = timeZoneService.DateToTimeZone(range.To, offset ?? default), }; } public async Task UpdateTimeZoneAsync(string uid, TelemetryTimeZoneDto timeZoneInfo, CancellationToken token) { var telemetry = GetOrCreateTelemetryByUid(uid); var newTelemetryTimeZone = timeZoneInfo.Adapt(); if (newTelemetryTimeZone?.Equals(telemetry.TelemetryTimeZone) == true) return; await cacheTelemetry.UpsertAsync(telemetry, token) .ConfigureAwait(false); } public int? GetIdTelemetryByIdWell(int idWell) { var well = cacheWells.FirstOrDefault(w => w.Id == idWell); return well?.IdTelemetry; } private async Task<(double latitude, double longitude)?> GetWellCoordinatesAsync(int idWell, CancellationToken token) { var well = await cacheWells.FirstOrDefaultAsync(w => w.Id == idWell, token) .ConfigureAwait(false); if (well is null) return null; if (well.Latitude is not null && well.Longitude is not null) return (well.Latitude ?? default, well.Longitude??default); var cluster = await cacheClusters.FirstOrDefaultAsync(c => c.Id == well.IdCluster, token) .ConfigureAwait(false); if (cluster.Latitude is not null && cluster.Longitude is not null) return (cluster.Latitude ?? default, cluster.Longitude ?? default); var deposit = await cacheDeposits.FirstOrDefaultAsync(d => d.Id == cluster.IdDeposit, token) .ConfigureAwait(false); if (deposit.Latitude is not null && deposit.Longitude is not null) return (deposit.Latitude ?? default, deposit.Longitude ?? default); return null; } private Well GetWellByTelemetryUid(string uid) { var tele = cacheTelemetry.FirstOrDefault(t => t.RemoteUid == uid); if (tele is null) return null; return cacheWells.FirstOrDefault(w => w?.IdTelemetry == tele.Id); } private Telemetry GetOrCreateTelemetryByUid(string uid) => cacheTelemetry.GetOrCreate(t => t.RemoteUid == uid, () => new Telemetry { RemoteUid = uid }); public IEnumerable<(string Key, int[] Ids)> GetRedundantRemoteUids() { return db.Telemetries .ToList() .GroupBy(t => t.RemoteUid) .Where(g => g.Count() > 1) .Select(g => (g.Key, g.Select(t=>t.Id).ToArray())); } public int Merge(IEnumerable telemetryIds) { if (telemetryIds.Count() < 2) throw new ArgumentException($"telemetryIds {telemetryIds} < 2. nothing to merge.", nameof(telemetryIds)); // найти телеметрию с наиболее полными справочниками и принять её за основную // отделить основную от остальных // Оценка трудоебкости var telemetriesGrade = db.Telemetries .Include(t => t.Messages) .Include(t => t.DataSaub) .Include(t => t.DataSpin) .Include(t => t.Well) .Where(t => telemetryIds.Contains(t.Id)) .Select(t => new { t.Id, t.RemoteUid, t.Info, IdWell = t.Well != null ? t.Well.Id : int.MinValue, Records = t.Messages.Count + t.DataSaub.Count + t.DataSpin.Count, EventsAny = t.Events.Any(), UsersAny = t.Users.Any(), }) .OrderByDescending(t=>t.Records) .ToList(); var telemetryDestId = telemetriesGrade.FirstOrDefault().Id; if (telemetryDestId == default) return 0; var telemetriesSrcIds = telemetryIds.Where(t => t != telemetryDestId).ToList(); if(!telemetriesSrcIds.Any()) return 0; var telemetriesSrcIdsSql = $"({string.Join(',', telemetriesSrcIds)})"; var (RemoteUid, Info) = telemetriesGrade .Where(t => t.Info != null) .OrderByDescending(t => t.Id) .Select(t => (t.RemoteUid, t.Info)) .FirstOrDefault(); var wellId = telemetriesGrade .Where(t => t.IdWell > 0) .OrderByDescending(t => t.Id) .Select(t => t.IdWell) .FirstOrDefault(); // начало изменений Console.WriteLine($"Start merge telemetries ids: [{string.Join(',', telemetriesSrcIds)}] to {telemetryDestId}"); var sw = new System.Diagnostics.Stopwatch(); sw.Start(); var transaction = db.Database.BeginTransaction(); int rows = 0; try { var telemetryDst = db.Telemetries.FirstOrDefault(t => t.Id == telemetryDestId); telemetryDst.RemoteUid = RemoteUid; telemetryDst.Info = Info; if (wellId != default) { var well = db.Wells.FirstOrDefault(w => w.Id == wellId); well.IdTelemetry = telemetryDestId; } // events merge var telemetryDstEventsIds = db.TelemetryEvents.Where(t => t.IdTelemetry == telemetryDestId).Select(t => t.IdEvent).ToList(); var telemetrySrcEvents = db.TelemetryEvents .Where(t => telemetriesSrcIds.Contains(t.IdTelemetry) && !telemetryDstEventsIds.Contains(t.IdEvent)) .Select(t => new TelemetryEvent { IdTelemetry = telemetryDestId, IdEvent = t.IdEvent, IdCategory = t.IdCategory, MessageTemplate = t.MessageTemplate, }) .ToList(); var telemetryEventNewUniq = new Dictionary(); foreach (var telemetryEvent in telemetrySrcEvents) telemetryEventNewUniq[telemetryEvent.IdEvent] = telemetryEvent; if (telemetrySrcEvents.Any()) db.TelemetryEvents.AddRange(telemetryEventNewUniq.Values); // users merge var telemetryDstUsersIds = db.TelemetryUsers.Where(t => t.IdTelemetry == telemetryDestId).Select(t => t.IdUser).ToList(); var telemetrySrcUsers = db.TelemetryUsers .Where(t => telemetriesSrcIds.Contains(t.IdTelemetry) && !telemetryDstUsersIds.Contains(t.IdUser)) .Select(t => new TelemetryUser { IdTelemetry = telemetryDestId, IdUser = t.IdUser, Level = t.Level, Name = t.Name, Patronymic = t.Patronymic, Surname = t.Surname, }).ToList(); var telemetryUserNewUniq = new Dictionary(); foreach (var telemetryUser in telemetrySrcUsers) telemetryUserNewUniq[telemetryUser.IdUser] = telemetryUser; if (telemetrySrcUsers.Any()) db.TelemetryUsers.AddRange(telemetryUserNewUniq.Values); db.SaveChanges(); db.Database.SetCommandTimeout(3_000); // 5 мин db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_saub DISABLE TRIGGER ALL;"); rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_data_saub SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};"); db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_saub ENABLE TRIGGER ALL;"); db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_spin DISABLE TRIGGER ALL;"); rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_data_spin SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};"); db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_spin ENABLE TRIGGER ALL;"); db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_analysis DISABLE TRIGGER ALL;"); rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_analysis SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};"); db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_analysis ENABLE TRIGGER ALL;"); db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_message DISABLE TRIGGER ALL;"); rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_message SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};"); db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_message ENABLE TRIGGER ALL;"); rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry_event WHERE id_telemetry IN {telemetriesSrcIdsSql};"); rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry_user WHERE id_telemetry IN {telemetriesSrcIdsSql};"); rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry WHERE id IN {telemetriesSrcIdsSql};"); transaction.Commit(); sw.Stop(); Console.WriteLine($"Successfully commited in {1d*sw.ElapsedMilliseconds/1000d: #0.00} sec. Affected {rows} rows."); } catch(Exception ex) { Console.WriteLine($"Fail. Rollback. Reason is:{ex.Message}"); transaction.Rollback(); return 0; } return rows; } } }