using AsbCloudApp.Data; using AsbCloudApp.Services; using AsbCloudDb.Model; using AsbCloudInfrastructure.Services.Cache; using Mapster; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using System; using System.Threading; using Microsoft.EntityFrameworkCore; using AsbCloudDb; namespace AsbCloudInfrastructure.Services { public class TelemetryService : ITelemetryService { private readonly CacheTable cacheTelemetry; private readonly CacheTable cacheWells;//TODO: use wellService insad of this private readonly IAsbCloudDbContext db; private readonly ITelemetryTracker telemetryTracker; private readonly ITimeZoneService timeZoneService; public ITimeZoneService TimeZoneService => timeZoneService; public ITelemetryTracker TelemetryTracker => telemetryTracker; public TelemetryService( IAsbCloudDbContext db, ITelemetryTracker telemetryTracker, ITimeZoneService timeZoneService, CacheDb cacheDb) { cacheTelemetry = cacheDb.GetCachedTable((AsbCloudDbContext)db); cacheWells = cacheDb.GetCachedTable( (AsbCloudDbContext)db, $"{nameof(Well.Cluster)}.{nameof(Cluster.Deposit)}", nameof(Well.Telemetry), $"{nameof(Well.RelationCompaniesWells)}.{nameof(RelationCompanyWell.Company)}", nameof(Well.WellType)); this.db = db; this.telemetryTracker = telemetryTracker; this.timeZoneService = timeZoneService; } public IEnumerable GetTransmittingTelemetries() { var telemetryDtos = new List(); var activeTelemetriesUids = telemetryTracker.GetTransmittingTelemetriesUids(); if (!activeTelemetriesUids.Any()) return telemetryDtos; var telemetries = cacheTelemetry .Where(t => activeTelemetriesUids.Contains(t.RemoteUid)); telemetryDtos = telemetries.Adapt().ToList(); return telemetryDtos; } public void SaveRequestDate(string uid, DateTime remoteDate) => telemetryTracker.SaveRequestDate(uid, remoteDate); public DateTime GetLastTelemetryDate(string telemetryUid) => telemetryTracker.GetLastTelemetryDateByUid(telemetryUid); public DateTime GetLastTelemetryDate(int telemetryId) { var lastTelemetryDate = DateTime.MinValue; var telemetry = cacheTelemetry.FirstOrDefault(t => t.Id == telemetryId); if (telemetry is null) return lastTelemetryDate; var uid = telemetry.RemoteUid; lastTelemetryDate = GetLastTelemetryDate(uid); return lastTelemetryDate; } public int GetOrCreateTelemetryIdByUid(string uid) => GetOrCreateTelemetryByUid(uid).Id; public int? GetIdWellByTelemetryUid(string uid) => GetWellByTelemetryUid(uid)?.Id; public double GetTimezoneOffsetByTelemetryId(int idTelemetry) => cacheTelemetry.FirstOrDefault(t => t.Id == idTelemetry).Info?.TimeZoneOffsetTotalHours ?? 0d; public async Task UpdateInfoAsync(string uid, TelemetryInfoDto info, CancellationToken token) { var telemetry = GetOrCreateTelemetryByUid(uid); telemetry.Info = info.Adapt(); if (!string.IsNullOrEmpty(info.TimeZoneId) && telemetry.TelemetryTimeZone?.IsOverride != true) telemetry.TelemetryTimeZone = new TelemetryTimeZone() { Hours = info.TimeZoneOffsetTotalHours, TimeZoneId = info.TimeZoneId }; await cacheTelemetry.UpsertAsync(telemetry, token) .ConfigureAwait(false); } public async Task GetTelemetryTimeZoneOffsetAsync(int idTelemetry, CancellationToken token) { var telemetry = await cacheTelemetry.FirstOrDefaultAsync(t => t.Id == idTelemetry, token); if (!string.IsNullOrEmpty(telemetry.TelemetryTimeZone?.TimeZoneId)) return telemetry.TelemetryTimeZone.Hours; if (!string.IsNullOrEmpty(telemetry.Info?.TimeZoneId)) { telemetry.TelemetryTimeZone = new TelemetryTimeZone { Hours = telemetry.Info.TimeZoneOffsetTotalHours, IsOverride = false, TimeZoneId = telemetry.Info.TimeZoneId, }; } else { var well = await cacheWells.FirstOrDefaultAsync(t => t.IdTelemetry == telemetry.Id, token) .ConfigureAwait(false); if (well is null) return null; var coordinates = await GetWellCoordinatesAsync(well.Id, token); if (coordinates is null) return null; var requestedTimeZone = await timeZoneService.GetByCoordinatesAsync(coordinates.Value.latitude, coordinates.Value.longitude, token) .ConfigureAwait(false); if (requestedTimeZone is null) return null; telemetry.TelemetryTimeZone = requestedTimeZone.Adapt(); } await cacheTelemetry.UpsertAsync(telemetry, token).ConfigureAwait(false); return telemetry.TelemetryTimeZone.Hours; } public async Task FixDatesRangeByTimeZoneAsync(int idTelemetry, DatesRangeDto range, CancellationToken token) { var offset = await GetTelemetryTimeZoneOffsetAsync(idTelemetry, token); if (offset is null) return range; return new DatesRangeDto() { From = timeZoneService.DateToTimeZone(range.From, offset ?? default), To = timeZoneService.DateToTimeZone(range.To, offset ?? default), }; } public async Task UpdateTimeZoneAsync(string uid, TelemetryTimeZoneDto timeZoneInfo, CancellationToken token) { var telemetry = GetOrCreateTelemetryByUid(uid); var newTelemetryTimeZone = timeZoneInfo.Adapt(); if (newTelemetryTimeZone?.Equals(telemetry.TelemetryTimeZone) == true) return; await cacheTelemetry.UpsertAsync(telemetry, token) .ConfigureAwait(false); } public int? GetIdTelemetryByIdWell(int idWell) { var well = cacheWells.FirstOrDefault(w => w.Id == idWell); return well?.IdTelemetry; } private async Task<(double latitude, double longitude)?> GetWellCoordinatesAsync(int idWell, CancellationToken token) { var well = await cacheWells.FirstOrDefaultAsync(w => w.Id == idWell, token) .ConfigureAwait(false); if (well is null) return null; var latitude = well.Latitude ?? well.Cluster?.Latitude ?? well.Cluster?.Deposit?.Latitude; var longitude = well.Longitude ?? well.Cluster?.Longitude ?? well.Cluster?.Deposit?.Longitude; if (latitude is not null && longitude is not null) return ((double)latitude, (double)longitude); return null; } private Well GetWellByTelemetryUid(string uid) { var tele = cacheTelemetry.FirstOrDefault(t => t.RemoteUid == uid); if (tele is null) return null; return cacheWells.FirstOrDefault(w => w?.IdTelemetry == tele.Id); } private Telemetry GetOrCreateTelemetryByUid(string uid) => cacheTelemetry.GetOrCreate(t => t.RemoteUid == uid, () => new Telemetry { RemoteUid = uid }); public async Task MergeAsync(int from, int to, CancellationToken token) { if (from == to) return -2; var stopwath = new System.Diagnostics.Stopwatch(); stopwath.Start(); var transaction = await db.Database.BeginTransactionAsync(token).ConfigureAwait(false); try { var affected = 0; var wellFrom = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == from, token) .ConfigureAwait(false); var wellTo = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == to, token) .ConfigureAwait(false); if (wellTo is not null && wellFrom is not null) return -2; if (wellTo is null && wellFrom is not null) { wellFrom.IdTelemetry = to; affected += await db.SaveChangesAsync(token); } affected += await MergeEventsAndMessagesAndUsersAsync(from, to, token); affected += await MergeDataAsync(from, to, token); affected += await MergeDataAsync(from, to, token); affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry_analysis WHERE id_telemetry = {from} OR id_telemetry = {to};", token) .ConfigureAwait(false); affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry WHERE id = {from};", token) .ConfigureAwait(false); await transaction.CommitAsync(token).ConfigureAwait(false); stopwath.Stop(); Console.WriteLine($"Successfully commited in {1d * stopwath.ElapsedMilliseconds / 1000d: #0.00} sec. Affected {affected} rows."); return affected; } catch(Exception ex) { System.Diagnostics.Trace.WriteLine($"Merge() Fail. Rollback. Reason is:{ex.Message}"); await transaction.RollbackAsync(token).ConfigureAwait(false); return -1; } } private async Task MergeEventsAndMessagesAndUsersAsync(int from, int to, CancellationToken token) { var messagesFrom = await db.TelemetryMessages .Where(d => d.IdTelemetry == from) .ToListAsync(token) .ConfigureAwait(false); var usersFromQuery = db.TelemetryUsers .Where(d => d.IdTelemetry == from); var usersFrom = await usersFromQuery .ToListAsync(token) .ConfigureAwait(false); var usersTo = await db.TelemetryUsers .Where(d => d.IdTelemetry == to) .AsNoTracking() .ToListAsync(token) .ConfigureAwait(false); var usersToNextId = usersTo.Max(u => u.IdUser) + 100; messagesFrom.ForEach(m => m.IdTelemetry = to); foreach (var userFrom in usersFrom) { var userTo = usersTo .FirstOrDefault(u=>u.IdUser == userFrom.IdUser); if (userTo is null || userTo.Name != userFrom.Name || userTo.Surname != userFrom.Surname || userTo.Patronymic != userFrom.Patronymic) { messagesFrom .Where(m => m.IdTelemetryUser == userFrom.IdUser) .ToList() .ForEach(m => m.IdTelemetryUser = usersToNextId); userFrom.IdUser = usersToNextId; userFrom.IdTelemetry = to; usersToNextId++; } } var eventsFromQuery = db.TelemetryEvents .Where(d => d.IdTelemetry == from); var eventsFrom = await eventsFromQuery .ToListAsync(token) .ConfigureAwait(false); var eventsTo = await db.TelemetryEvents .Where(d => d.IdTelemetry == to) .AsNoTracking() .ToListAsync(token) .ConfigureAwait(false); var eventsToNextId = eventsTo.Max(e => e.IdEvent) + 1; foreach (var eventFrom in eventsFrom) { var eventTo = eventsTo .FirstOrDefault(e => e.IdEvent == eventFrom.IdEvent); if (eventTo is null || eventTo.IdCategory != eventFrom.IdCategory || eventTo.MessageTemplate != eventFrom.MessageTemplate) { messagesFrom .Where(m => m.IdEvent == eventFrom.IdEvent) .ToList() .ForEach(m => m.IdEvent = eventsToNextId); eventFrom.IdEvent = eventsToNextId; eventFrom.IdTelemetry = to; eventsToNextId++; } } await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user DISABLE TRIGGER ALL;", token).ConfigureAwait(false); await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event DISABLE TRIGGER ALL;", token).ConfigureAwait(false); await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message DISABLE TRIGGER ALL;", token).ConfigureAwait(false); var affected = await db.SaveChangesAsync(token).ConfigureAwait(false); await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user ENABLE TRIGGER ALL;", token).ConfigureAwait(false); await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event ENABLE TRIGGER ALL;", token).ConfigureAwait(false); await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message ENABLE TRIGGER ALL;", token).ConfigureAwait(false); db.TelemetryUsers.RemoveRange(usersFromQuery); db.TelemetryEvents.RemoveRange(eventsFromQuery); affected += await db.SaveChangesAsync(token).ConfigureAwait(false); return affected; } private async Task MergeDataAsync(int from, int to, CancellationToken token) where TEntity: class, AsbCloudDb.Model.ITelemetryData { const string IdTelemetryColumnName = "\"id_telemetry\""; var dbSet = db.Set(); var tableName = dbSet.GetTableName(); var columns = dbSet.GetColumnsNames().ToList(); var index = columns.FindIndex(c => c.ToLower() == IdTelemetryColumnName.ToLower()); if (index < 0) return -5; columns[index] = $"{to} as {IdTelemetryColumnName}"; var columnsString = string.Join(',', columns); var sql = $"INSERT INTO {tableName} " + $"select {columnsString} " + $"from {tableName} " + $"where {IdTelemetryColumnName} = {from};" + $"delete from {tableName} where {IdTelemetryColumnName} = {from};"; var affected = await db.Database.ExecuteSqlRawAsync(sql, token) .ConfigureAwait(false); return affected; } } }