DD.WellWorkover.Cloud/AsbCloudInfrastructure/Services/TelemetryService.cs
2021-12-07 18:27:52 +05:00

372 lines
16 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using AsbCloudApp.Data;
using AsbCloudApp.Services;
using AsbCloudDb.Model;
using AsbCloudInfrastructure.Services.Cache;
using Mapster;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System;
using System.Threading;
using Microsoft.EntityFrameworkCore;
namespace AsbCloudInfrastructure.Services
{
public class TelemetryService : ITelemetryService
{
private readonly CacheTable<Telemetry> cacheTelemetry;
private readonly CacheTable<Well> cacheWells;
private readonly CacheTable<Cluster> cacheClusters;
private readonly CacheTable<Deposit> cacheDeposits;
private readonly IAsbCloudDbContext db;
private readonly ITelemetryTracker telemetryTracker;
private readonly ITimeZoneService timeZoneService;
public ITimeZoneService TimeZoneService => timeZoneService;
public ITelemetryTracker TelemetryTracker => telemetryTracker;
public TelemetryService(
IAsbCloudDbContext db,
ITelemetryTracker telemetryTracker,
ITimeZoneService timeZoneService,
CacheDb cacheDb)
{
cacheTelemetry = cacheDb.GetCachedTable<Telemetry>((AsbCloudDbContext)db);
cacheWells = cacheDb.GetCachedTable<Well>((AsbCloudDbContext)db);
cacheClusters = cacheDb.GetCachedTable<Cluster>((AsbCloudDbContext)db);
cacheDeposits = cacheDb.GetCachedTable<Deposit>((AsbCloudDbContext)db);
this.db = db;
this.telemetryTracker = telemetryTracker;
this.timeZoneService = timeZoneService;
}
public IEnumerable<TelemetryDto> GetTransmittingTelemetries()
{
var telemetryDtos = new List<TelemetryDto>();
var activeTelemetriesUids = telemetryTracker.GetTransmittingTelemetriesUids();
if (!activeTelemetriesUids.Any())
return telemetryDtos;
var telemetries = cacheTelemetry
.Where(t => activeTelemetriesUids.Contains(t.RemoteUid));
telemetryDtos = telemetries.Adapt<TelemetryDto>().ToList();
return telemetryDtos;
}
public void SaveRequestDate(string uid, DateTime remoteDate) =>
telemetryTracker.SaveRequestDate(uid, remoteDate);
public DateTime GetLastTelemetryDate(string telemetryUid) =>
telemetryTracker.GetLastTelemetryDateByUid(telemetryUid);
public DateTime GetLastTelemetryDate(int telemetryId)
{
var lastTelemetryDate = DateTime.MinValue;
var telemetry = cacheTelemetry.FirstOrDefault(t => t.Id == telemetryId);
if (telemetry is null)
return lastTelemetryDate;
var uid = telemetry.RemoteUid;
lastTelemetryDate = GetLastTelemetryDate(uid);
return lastTelemetryDate;
}
public int GetOrCreateTelemetryIdByUid(string uid)
=> GetOrCreateTelemetryByUid(uid).Id;
public int? GetIdWellByTelemetryUid(string uid)
=> GetWellByTelemetryUid(uid)?.Id;
public double GetTimezoneOffsetByTelemetryId(int idTelemetry) =>
cacheTelemetry.FirstOrDefault(t => t.Id == idTelemetry).Info?.TimeZoneOffsetTotalHours ?? 0d;
public async Task UpdateInfoAsync(string uid, TelemetryInfoDto info,
CancellationToken token)
{
var telemetry = GetOrCreateTelemetryByUid(uid);
telemetry.Info = info.Adapt<TelemetryInfo>();
if (!string.IsNullOrEmpty(info.TimeZoneId) &&
telemetry.TelemetryTimeZone?.IsOverride != true)
telemetry.TelemetryTimeZone = new TelemetryTimeZone()
{
Hours = info.TimeZoneOffsetTotalHours,
TimeZoneId = info.TimeZoneId
};
await cacheTelemetry.UpsertAsync(telemetry, token)
.ConfigureAwait(false);
}
public async Task<double?> GetTelemetryTimeZoneOffsetAsync(int idTelemetry, CancellationToken token)
{
var telemetry =
await cacheTelemetry.FirstOrDefaultAsync(t => t.Id == idTelemetry, token);
if (!string.IsNullOrEmpty(telemetry.TelemetryTimeZone?.TimeZoneId))
return telemetry.TelemetryTimeZone.Hours;
if (!string.IsNullOrEmpty(telemetry.Info?.TimeZoneId))
{
telemetry.TelemetryTimeZone = new TelemetryTimeZone
{
Hours = telemetry.Info.TimeZoneOffsetTotalHours,
IsOverride = false,
TimeZoneId = telemetry.Info.TimeZoneId,
};
}
else
{
var well = await cacheWells.FirstOrDefaultAsync(t => t.IdTelemetry == telemetry.Id, token)
.ConfigureAwait(false);
if (well is null)
return null;
var coordinates = await GetWellCoordinatesAsync(well.Id, token);
if (coordinates is null)
return null;
var requestedTimeZone = await timeZoneService.GetByCoordinatesAsync(coordinates.Value.latitude, coordinates.Value.longitude, token)
.ConfigureAwait(false);
if (requestedTimeZone is null)
return null;
telemetry.TelemetryTimeZone = requestedTimeZone.Adapt<TelemetryTimeZone>();
}
await cacheTelemetry.UpsertAsync(telemetry, token).ConfigureAwait(false);
return telemetry.TelemetryTimeZone.Hours;
}
public async Task<DatesRangeDto> FixDatesRangeByTimeZoneAsync(int idTelemetry, DatesRangeDto range,
CancellationToken token)
{
var offset = await GetTelemetryTimeZoneOffsetAsync(idTelemetry, token);
if (offset is null)
return range;
return new DatesRangeDto()
{
From = timeZoneService.DateToTimeZone(range.From, offset ?? default),
To = timeZoneService.DateToTimeZone(range.To, offset ?? default),
};
}
public async Task UpdateTimeZoneAsync(string uid, TelemetryTimeZoneDto timeZoneInfo,
CancellationToken token)
{
var telemetry = GetOrCreateTelemetryByUid(uid);
var newTelemetryTimeZone = timeZoneInfo.Adapt<TelemetryTimeZone>();
if (newTelemetryTimeZone?.Equals(telemetry.TelemetryTimeZone) == true)
return;
await cacheTelemetry.UpsertAsync(telemetry, token)
.ConfigureAwait(false);
}
public int? GetIdTelemetryByIdWell(int idWell)
{
var well = cacheWells.FirstOrDefault(w => w.Id == idWell);
return well?.IdTelemetry;
}
private async Task<(double latitude, double longitude)?> GetWellCoordinatesAsync(int idWell,
CancellationToken token)
{
var well = await cacheWells.FirstOrDefaultAsync(w => w.Id == idWell, token)
.ConfigureAwait(false);
if (well is null)
return null;
if (well.Latitude is not null && well.Longitude is not null)
return (well.Latitude ?? default, well.Longitude??default);
var cluster = await cacheClusters.FirstOrDefaultAsync(c => c.Id == well.IdCluster, token)
.ConfigureAwait(false);
if (cluster.Latitude is not null && cluster.Longitude is not null)
return (cluster.Latitude ?? default, cluster.Longitude ?? default);
var deposit = await cacheDeposits.FirstOrDefaultAsync(d => d.Id == cluster.IdDeposit, token)
.ConfigureAwait(false);
if (deposit.Latitude is not null && deposit.Longitude is not null)
return (deposit.Latitude ?? default, deposit.Longitude ?? default);
return null;
}
private Well GetWellByTelemetryUid(string uid)
{
var tele = cacheTelemetry.FirstOrDefault(t => t.RemoteUid == uid);
if (tele is null)
return null;
return cacheWells.FirstOrDefault(w => w?.IdTelemetry == tele.Id);
}
private Telemetry GetOrCreateTelemetryByUid(string uid)
=> cacheTelemetry.GetOrCreate(t => t.RemoteUid == uid, () => new Telemetry { RemoteUid = uid });
public IEnumerable<(string Key, int[] Ids)> GetRedundantRemoteUids()
{
return db.Telemetries
.ToList()
.GroupBy(t => t.RemoteUid)
.Where(g => g.Count() > 1)
.Select(g => (g.Key, g.Select(t=>t.Id).ToArray()));
}
public int Merge(IEnumerable<int> telemetryIds)
{
if (telemetryIds.Count() < 2)
throw new ArgumentException($"telemetryIds {telemetryIds} < 2. nothing to merge.", nameof(telemetryIds));
// найти телеметрию с наиболее полными справочниками и принять её за основную
// отделить основную от остальных
// Оценка трудоебкости
var telemetriesGrade = db.Telemetries
.Include(t => t.Messages)
.Include(t => t.DataSaub)
.Include(t => t.DataSpin)
.Include(t => t.Well)
.Where(t => telemetryIds.Contains(t.Id))
.Select(t => new {
t.Id,
t.RemoteUid,
t.Info,
IdWell = t.Well != null ? t.Well.Id : int.MinValue,
Records = t.Messages.Count + t.DataSaub.Count + t.DataSpin.Count,
EventsAny = t.Events.Any(),
UsersAny = t.Users.Any(),
})
.OrderByDescending(t=>t.Records)
.ToList();
var telemetryDestId = telemetriesGrade.FirstOrDefault().Id;
if (telemetryDestId == default)
return 0;
var telemetriesSrcIds = telemetryIds.Where(t => t != telemetryDestId).ToList();
if(!telemetriesSrcIds.Any())
return 0;
var telemetriesSrcIdsSql = $"({string.Join(',', telemetriesSrcIds)})";
var (RemoteUid, Info) = telemetriesGrade
.Where(t => t.Info != null)
.OrderByDescending(t => t.Id)
.Select(t => (t.RemoteUid, t.Info))
.FirstOrDefault();
var wellId = telemetriesGrade
.Where(t => t.IdWell > 0)
.OrderByDescending(t => t.Id)
.Select(t => t.IdWell)
.FirstOrDefault();
// начало изменений
Console.WriteLine($"Start merge telemetries ids: [{string.Join(',', telemetriesSrcIds)}] to {telemetryDestId}");
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
var transaction = db.Database.BeginTransaction();
int rows = 0;
try
{
var telemetryDst = db.Telemetries.FirstOrDefault(t => t.Id == telemetryDestId);
telemetryDst.RemoteUid = RemoteUid;
telemetryDst.Info = Info;
if (wellId != default)
{
var well = db.Wells.FirstOrDefault(w => w.Id == wellId);
well.IdTelemetry = telemetryDestId;
}
// events merge
var telemetryDstEventsIds = db.TelemetryEvents.Where(t => t.IdTelemetry == telemetryDestId).Select(t => t.IdEvent).ToList();
var telemetrySrcEvents = db.TelemetryEvents
.Where(t => telemetriesSrcIds.Contains(t.IdTelemetry) && !telemetryDstEventsIds.Contains(t.IdEvent))
.Select(t => new TelemetryEvent
{
IdTelemetry = telemetryDestId,
IdEvent = t.IdEvent,
IdCategory = t.IdCategory,
MessageTemplate = t.MessageTemplate,
})
.ToList();
var telemetryEventNewUniq = new Dictionary<int, TelemetryEvent>();
foreach (var telemetryEvent in telemetrySrcEvents)
telemetryEventNewUniq[telemetryEvent.IdEvent] = telemetryEvent;
if (telemetrySrcEvents.Any())
db.TelemetryEvents.AddRange(telemetryEventNewUniq.Values);
// users merge
var telemetryDstUsersIds = db.TelemetryUsers.Where(t => t.IdTelemetry == telemetryDestId).Select(t => t.IdUser).ToList();
var telemetrySrcUsers = db.TelemetryUsers
.Where(t => telemetriesSrcIds.Contains(t.IdTelemetry) && !telemetryDstUsersIds.Contains(t.IdUser))
.Select(t => new TelemetryUser
{
IdTelemetry = telemetryDestId,
IdUser = t.IdUser,
Level = t.Level,
Name = t.Name,
Patronymic = t.Patronymic,
Surname = t.Surname,
}).ToList();
var telemetryUserNewUniq = new Dictionary<int, TelemetryUser>();
foreach (var telemetryUser in telemetrySrcUsers)
telemetryUserNewUniq[telemetryUser.IdUser] = telemetryUser;
if (telemetrySrcUsers.Any())
db.TelemetryUsers.AddRange(telemetryUserNewUniq.Values);
db.SaveChanges();
db.Database.SetCommandTimeout(3_000); // 5 мин
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_saub DISABLE TRIGGER ALL;");
rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_data_saub SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_saub ENABLE TRIGGER ALL;");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_spin DISABLE TRIGGER ALL;");
rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_data_spin SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_data_spin ENABLE TRIGGER ALL;");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_analysis DISABLE TRIGGER ALL;");
rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_analysis SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_analysis ENABLE TRIGGER ALL;");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_message DISABLE TRIGGER ALL;");
rows += db.Database.ExecuteSqlRaw($"UPDATE t_telemetry_message SET id_telemetry = {telemetryDestId} WHERE id_telemetry IN {telemetriesSrcIdsSql};");
db.Database.ExecuteSqlRaw($"ALTER TABLE t_telemetry_message ENABLE TRIGGER ALL;");
rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry_event WHERE id_telemetry IN {telemetriesSrcIdsSql};");
rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry_user WHERE id_telemetry IN {telemetriesSrcIdsSql};");
rows += db.Database.ExecuteSqlRaw($"DELETE FROM t_telemetry WHERE id IN {telemetriesSrcIdsSql};");
transaction.Commit();
sw.Stop();
Console.WriteLine($"Successfully commited in {1d*sw.ElapsedMilliseconds/1000d: #0.00} sec. Affected {rows} rows.");
}
catch(Exception ex)
{
Console.WriteLine($"Fail. Rollback. Reason is:{ex.Message}");
transaction.Rollback();
return 0;
}
return rows;
}
}
}