using AsbCloudApp.Data; using AsbCloudApp.Data.SAUB; using AsbCloudApp.Repositories; using AsbCloudApp.Services; using AsbCloudDb; using AsbCloudDb.Model; using Mapster; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Caching.Memory; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text.Csv; using System.Threading; using System.Threading.Tasks; namespace AsbCloudInfrastructure.Services.SAUB { public class TelemetryService : ITelemetryService { private readonly IAsbCloudDbContext db; private readonly IMemoryCache memoryCache; //TODO: методы использующие ITelemetryDataCache, скорее всего, тут не нужны private readonly ITelemetryDataCache dataSaubCache; private readonly ITimezoneService timezoneService; public ITimezoneService TimeZoneService => timezoneService; public TelemetryService( IAsbCloudDbContext db, IMemoryCache memoryCache, ITelemetryDataCache dataSaubCache, ITimezoneService timezoneService) { this.db = db; this.memoryCache = memoryCache; this.dataSaubCache = dataSaubCache; this.timezoneService = timezoneService; } private IEnumerable GetTelemetryCache() => memoryCache.GetOrCreateBasic( db.Set() .Include(t => t.Well)); private void DropTelemetryCache() { memoryCache.DropBasic(); } public DatesRangeDto GetDatesRange(int idTelemetry) { var cacheDataRange = dataSaubCache.GetOrDefaultWellDataDateRange(idTelemetry) ?? new (); return cacheDataRange; } public int? GetIdWellByTelemetryUid(string uid) => GetWellByTelemetryUid(uid)?.Id; public async Task UpdateInfoAsync(string uid, TelemetryInfoDto info, CancellationToken token) { var telemetry = GetOrCreateTelemetry(uid); telemetry.Info = info.Adapt(); if (!string.IsNullOrEmpty(info.TimeZoneId) && telemetry.TimeZone?.IsOverride != true) telemetry.TimeZone = new SimpleTimezone() { Hours = info.TimeZoneOffsetTotalHours, TimezoneId = info.TimeZoneId }; db.Telemetries.Upsert(telemetry); await db.SaveChangesAsync(token); DropTelemetryCache(); } [Obsolete("This method will be private. Use TelemetryDto.TimeZone prop.")] public SimpleTimezoneDto GetTimezone(int idTelemetry) { var telemetry = GetTelemetryCache().FirstOrDefault(t => t.Id == idTelemetry); if (telemetry is null) throw new Exception($"Telemetry id: {idTelemetry} does not exist."); if (telemetry.Well?.Timezone is not null) return telemetry.Well.Timezone.Adapt(); if (telemetry.TimeZone is not null) return telemetry.TimeZone.Adapt(); throw new Exception($"Telemetry id: {idTelemetry} can't find timezone."); } public TelemetryBaseDto? GetOrDefaultTelemetryByIdWell(int idWell) { var entity = GetTelemetryCache() .FirstOrDefault(t => t.Well?.Id == idWell); if (entity?.Well?.Timezone is not null && entity.TimeZone.Hours != entity.Well.Timezone.Hours) { entity.TimeZone = entity.Well.Timezone; //TODO: выдаем предупреждение! } return entity?.Adapt(); } public TelemetryDto GetOrCreateTelemetryByUid(string uid) { var entity = GetOrCreateTelemetry(uid); if(entity.Well?.Timezone is not null && entity.TimeZone.Hours != entity.Well.Timezone.Hours) { entity.TimeZone = entity.Well.Timezone; //TODO: выдаем предупреждение! } var dto = entity.Adapt(); return dto; } private Well? GetWellByTelemetryUid(string uid) { var telemetry = GetOrDefaultTelemetryByUid(uid); return telemetry?.Well; } private Telemetry? GetOrDefaultTelemetryByUid(string uid) { var telemetry = GetTelemetryCache().FirstOrDefault(t => t.RemoteUid == uid); return telemetry; } private Telemetry GetOrCreateTelemetry(string uid) { var telemetry = GetOrDefaultTelemetryByUid(uid); if (telemetry is null) { var newTelemetry = new Telemetry { RemoteUid = uid, TimeZone = new SimpleTimezone { Hours = 5, IsOverride = false, TimezoneId = "default", } }; var entry = db.Telemetries.Add(newTelemetry); db.SaveChanges(); DropTelemetryCache(); return entry.Entity; } return telemetry; } public async Task GetTelemetriesInfoByLastData(DateTimeOffset from, CancellationToken token) { var fromUtc = from.UtcDateTime; var query = db.TelemetryDataSaub .Where(i => i.DateTime >= fromUtc) .GroupBy(i => i.IdTelemetry) .Select(i => new { i.First().Telemetry, LastDate = i.Max(i => i.DateTime) }); var data = await query.ToArrayAsync(token); var infos = data.Select(i => new TelemetryWithSoftwareVersionsDto( i.Telemetry.Id, i.Telemetry.RemoteUid, i.LastDate, i.Telemetry.Info.Deposit, i.Telemetry.Info.Cluster, i.Telemetry.Info.Well, i.Telemetry.Info.HmiVersion, i.Telemetry.Info.SaubPlcVersion, i.Telemetry.Info.SpinPlcVersion) ); var stream = new MemoryStream(); if (!data.Any()) return stream; var serializer = new CsvSerializer(); serializer.Serialize(infos, stream); stream.Seek(0, SeekOrigin.Begin); return stream; } public async Task MergeAsync(int from, int to, CancellationToken token) { if (from == to) return -2; var stopwath = new System.Diagnostics.Stopwatch(); stopwath.Start(); var transaction = await db.Database.BeginTransactionAsync(token).ConfigureAwait(false); try { var affected = 0; var wellFrom = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == from, token) .ConfigureAwait(false); var wellTo = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == to, token) .ConfigureAwait(false); if (wellTo is not null && wellFrom is not null) return -2; if (wellTo is null && wellFrom is not null) { wellFrom.IdTelemetry = to; affected += await db.SaveChangesAsync(token); } affected += await MergeEventsAndMessagesAndUsersAsync(from, to, token); affected += await MergeDataAsync(from, to, token); affected += await MergeDataAsync(from, to, token); affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry_analysis WHERE id_telemetry = {from} OR id_telemetry = {to};", token) .ConfigureAwait(false); affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry WHERE id = {from};", token) .ConfigureAwait(false); await transaction.CommitAsync(token).ConfigureAwait(false); stopwath.Stop(); Console.WriteLine($"Successfully committed in {1d * stopwath.ElapsedMilliseconds / 1000d: #0.00} sec. Affected {affected} rows."); DropTelemetryCache(); return affected; } catch (Exception ex) { System.Diagnostics.Trace.WriteLine($"Merge() Fail. Rollback. Reason is:{ex.Message}"); await transaction.RollbackAsync(CancellationToken.None); return -1; } } private async Task MergeEventsAndMessagesAndUsersAsync(int from, int to, CancellationToken token) { var messagesFrom = await db.TelemetryMessages .Where(d => d.IdTelemetry == from) .ToListAsync(token) .ConfigureAwait(false); var usersFromQuery = db.TelemetryUsers .Where(d => d.IdTelemetry == from); var usersFrom = await usersFromQuery .ToListAsync(token) .ConfigureAwait(false); var usersTo = await db.TelemetryUsers .Where(d => d.IdTelemetry == to) .AsNoTracking() .ToListAsync(token) .ConfigureAwait(false); var usersToNextId = usersTo.Max(u => u.IdUser) + 100; messagesFrom.ForEach(m => m.IdTelemetry = to); foreach (var userFrom in usersFrom) { var userTo = usersTo .FirstOrDefault(u => u.IdUser == userFrom.IdUser); if (userTo is null || userTo.Name != userFrom.Name || userTo.Surname != userFrom.Surname || userTo.Patronymic != userFrom.Patronymic) { messagesFrom .Where(m => m.IdTelemetryUser == userFrom.IdUser) .ToList() .ForEach(m => m.IdTelemetryUser = usersToNextId); userFrom.IdUser = usersToNextId; userFrom.IdTelemetry = to; usersToNextId++; } } var eventsFromQuery = db.TelemetryEvents .Where(d => d.IdTelemetry == from); var eventsFrom = await eventsFromQuery .ToListAsync(token) .ConfigureAwait(false); var eventsTo = await db.TelemetryEvents .Where(d => d.IdTelemetry == to) .AsNoTracking() .ToListAsync(token) .ConfigureAwait(false); var eventsToNextId = eventsTo.Max(e => e.IdEvent) + 1; foreach (var eventFrom in eventsFrom) { var eventTo = eventsTo .FirstOrDefault(e => e.IdEvent == eventFrom.IdEvent); if (eventTo is null || eventTo.IdCategory != eventFrom.IdCategory || eventTo.MessageTemplate != eventFrom.MessageTemplate) { messagesFrom .Where(m => m.IdEvent == eventFrom.IdEvent) .ToList() .ForEach(m => m.IdEvent = eventsToNextId); eventFrom.IdEvent = eventsToNextId; eventFrom.IdTelemetry = to; eventsToNextId++; } } await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user DISABLE TRIGGER ALL;", token).ConfigureAwait(false); await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event DISABLE TRIGGER ALL;", token).ConfigureAwait(false); await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message DISABLE TRIGGER ALL;", token).ConfigureAwait(false); var affected = await db.SaveChangesAsync(token).ConfigureAwait(false); await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user ENABLE TRIGGER ALL;", token).ConfigureAwait(false); await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event ENABLE TRIGGER ALL;", token).ConfigureAwait(false); await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message ENABLE TRIGGER ALL;", token).ConfigureAwait(false); db.TelemetryUsers.RemoveRange(usersFromQuery); db.TelemetryEvents.RemoveRange(eventsFromQuery); affected += await db.SaveChangesAsync(token).ConfigureAwait(false); return affected; } private async Task MergeDataAsync(int from, int to, CancellationToken token) where TEntity : class, AsbCloudDb.Model.ITelemetryData { const string IdTelemetryColumnName = "\"id_telemetry\""; var dbSet = db.Set(); var tableName = dbSet.GetTableName(); var columns = dbSet.GetColumnsNames().ToList(); var index = columns.FindIndex(c => c.ToLower() == IdTelemetryColumnName.ToLower()); if (index < 0) return -5; columns[index] = $"{to} as {IdTelemetryColumnName}"; var columnsString = string.Join(',', columns); var sql = $"INSERT INTO {tableName} " + $"select {columnsString} " + $"from {tableName} " + $"where {IdTelemetryColumnName} = {from};" + $"delete from {tableName} where {IdTelemetryColumnName} = {from};"; var affected = await db.Database.ExecuteSqlRawAsync(sql, token) .ConfigureAwait(false); return affected; } } }