forked from ddrilling/AsbCloudServer
ca969c99ad
Replace source for DatesRange from db to telemetry tracker.
403 lines
16 KiB
C#
403 lines
16 KiB
C#
using AsbCloudApp.Data;
|
|
using AsbCloudApp.Services;
|
|
using AsbCloudDb.Model;
|
|
using AsbCloudInfrastructure.Services.Cache;
|
|
using Mapster;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading.Tasks;
|
|
using System;
|
|
using System.Threading;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using AsbCloudDb;
|
|
|
|
namespace AsbCloudInfrastructure.Services
|
|
{
|
|
public class TelemetryService : ITelemetryService
|
|
{
|
|
private readonly CacheTable<Telemetry> cacheTelemetry;
|
|
private readonly CacheTable<Well> cacheWells;//TODO: use wellService insad of this
|
|
private readonly IAsbCloudDbContext db;
|
|
private readonly ITelemetryTracker telemetryTracker;
|
|
private readonly ITimeZoneService timeZoneService;
|
|
|
|
public ITimeZoneService TimeZoneService => timeZoneService;
|
|
public ITelemetryTracker TelemetryTracker => telemetryTracker;
|
|
|
|
public TelemetryService(
|
|
IAsbCloudDbContext db,
|
|
ITelemetryTracker telemetryTracker,
|
|
ITimeZoneService timeZoneService,
|
|
CacheDb cacheDb)
|
|
{
|
|
cacheTelemetry = cacheDb.GetCachedTable<Telemetry>((AsbCloudDbContext)db);
|
|
cacheWells = cacheDb.GetCachedTable<Well>(
|
|
(AsbCloudDbContext)db,
|
|
$"{nameof(Well.Cluster)}.{nameof(Cluster.Deposit)}",
|
|
nameof(Well.Telemetry),
|
|
$"{nameof(Well.RelationCompaniesWells)}.{nameof(RelationCompanyWell.Company)}",
|
|
nameof(Well.WellType));
|
|
this.db = db;
|
|
this.telemetryTracker = telemetryTracker;
|
|
this.timeZoneService = timeZoneService;
|
|
}
|
|
|
|
public IEnumerable<TelemetryDto> GetTransmittingTelemetries()
|
|
{
|
|
var telemetryDtos = new List<TelemetryDto>();
|
|
var activeTelemetriesUids = telemetryTracker.GetTransmittingTelemetriesUids();
|
|
if (!activeTelemetriesUids.Any())
|
|
return telemetryDtos;
|
|
var telemetries = cacheTelemetry
|
|
.Where(t => activeTelemetriesUids.Contains(t.RemoteUid));
|
|
telemetryDtos = telemetries.Adapt<TelemetryDto>().ToList();
|
|
|
|
return telemetryDtos;
|
|
}
|
|
|
|
public void SaveRequestDate(string uid, DateTime remoteDate) =>
|
|
telemetryTracker.SaveRequestDate(uid, remoteDate);
|
|
|
|
public DateTime GetLastTelemetryDate(string telemetryUid) =>
|
|
telemetryTracker.GetLastTelemetryDateByUid(telemetryUid);
|
|
|
|
public DateTime GetLastTelemetryDate(int telemetryId)
|
|
{
|
|
var lastTelemetryDate = DateTime.MinValue;
|
|
var telemetry = cacheTelemetry.FirstOrDefault(t => t.Id == telemetryId);
|
|
|
|
if (telemetry is null)
|
|
return lastTelemetryDate;
|
|
|
|
var uid = telemetry.RemoteUid;
|
|
|
|
lastTelemetryDate = GetLastTelemetryDate(uid);
|
|
return lastTelemetryDate;
|
|
}
|
|
|
|
public virtual async Task<DatesRangeDto> GetDatesRangeAsync(
|
|
int idWell,
|
|
bool isUtc,
|
|
CancellationToken token = default)
|
|
{
|
|
var telemetryId = GetIdTelemetryByIdWell(idWell);
|
|
if (telemetryId is null)
|
|
return null;
|
|
|
|
var telemetry = await cacheTelemetry.FirstOrDefaultAsync(t => t.Id == telemetryId, token)
|
|
.ConfigureAwait(false);
|
|
|
|
var dto = TelemetryTracker.GetTelemetryDateRangeByUid(telemetry.RemoteUid);
|
|
|
|
if (isUtc)
|
|
return dto;
|
|
|
|
dto = await DatesRangeToTelemetryTimeZoneAsync((int)telemetryId, dto, token)
|
|
.ConfigureAwait(false);
|
|
|
|
return dto;
|
|
|
|
}
|
|
|
|
public int GetOrCreateTelemetryIdByUid(string uid)
|
|
=> GetOrCreateTelemetryByUid(uid).Id;
|
|
|
|
public int? GetIdWellByTelemetryUid(string uid)
|
|
=> GetWellByTelemetryUid(uid)?.Id;
|
|
|
|
public double GetTimezoneOffsetByTelemetryId(int idTelemetry) =>
|
|
cacheTelemetry.FirstOrDefault(t => t.Id == idTelemetry).Info?.TimeZoneOffsetTotalHours ?? 0d;
|
|
|
|
public async Task UpdateInfoAsync(string uid, TelemetryInfoDto info,
|
|
CancellationToken token)
|
|
{
|
|
var telemetry = GetOrCreateTelemetryByUid(uid);
|
|
telemetry.Info = info.Adapt<TelemetryInfo>();
|
|
|
|
if (!string.IsNullOrEmpty(info.TimeZoneId) &&
|
|
telemetry.TelemetryTimeZone?.IsOverride != true)
|
|
telemetry.TelemetryTimeZone = new TelemetryTimeZone()
|
|
{
|
|
Hours = info.TimeZoneOffsetTotalHours,
|
|
TimeZoneId = info.TimeZoneId
|
|
};
|
|
|
|
await cacheTelemetry.UpsertAsync(telemetry, token)
|
|
.ConfigureAwait(false);
|
|
}
|
|
|
|
public async Task<double?> GetTelemetryTimeZoneOffsetAsync(int idTelemetry, CancellationToken token)
|
|
{
|
|
var telemetry =
|
|
await cacheTelemetry.FirstOrDefaultAsync(t => t.Id == idTelemetry, token);
|
|
|
|
if (!string.IsNullOrEmpty(telemetry.TelemetryTimeZone?.TimeZoneId))
|
|
return telemetry.TelemetryTimeZone.Hours;
|
|
|
|
if (!string.IsNullOrEmpty(telemetry.Info?.TimeZoneId))
|
|
{
|
|
telemetry.TelemetryTimeZone = new TelemetryTimeZone
|
|
{
|
|
Hours = telemetry.Info.TimeZoneOffsetTotalHours,
|
|
IsOverride = false,
|
|
TimeZoneId = telemetry.Info.TimeZoneId,
|
|
};
|
|
}
|
|
else
|
|
{
|
|
var well = await cacheWells.FirstOrDefaultAsync(t => t.IdTelemetry == telemetry.Id, token)
|
|
.ConfigureAwait(false);
|
|
|
|
if (well is null)
|
|
return null;
|
|
|
|
var coordinates = await GetWellCoordinatesAsync(well.Id, token);
|
|
|
|
if (coordinates is null)
|
|
return null;
|
|
|
|
var requestedTimeZone = await timeZoneService.GetByCoordinatesAsync(coordinates.Value.latitude, coordinates.Value.longitude, token)
|
|
.ConfigureAwait(false);
|
|
|
|
if (requestedTimeZone is null)
|
|
return null;
|
|
|
|
telemetry.TelemetryTimeZone = requestedTimeZone.Adapt<TelemetryTimeZone>();
|
|
}
|
|
|
|
await cacheTelemetry.UpsertAsync(telemetry, token).ConfigureAwait(false);
|
|
|
|
return telemetry.TelemetryTimeZone.Hours;
|
|
}
|
|
|
|
public async Task<DatesRangeDto> DatesRangeToTelemetryTimeZoneAsync(int idTelemetry, DatesRangeDto range,
|
|
CancellationToken token)
|
|
{
|
|
var offset = await GetTelemetryTimeZoneOffsetAsync(idTelemetry, token);
|
|
|
|
if (offset is null)
|
|
return range;
|
|
|
|
return new DatesRangeDto()
|
|
{
|
|
From = timeZoneService.DateToTimeZone(range.From, offset ?? default),
|
|
To = timeZoneService.DateToTimeZone(range.To, offset ?? default),
|
|
};
|
|
}
|
|
|
|
public async Task UpdateTimeZoneAsync(string uid, TelemetryTimeZoneDto timeZoneInfo,
|
|
CancellationToken token)
|
|
{
|
|
var telemetry = GetOrCreateTelemetryByUid(uid);
|
|
var newTelemetryTimeZone = timeZoneInfo.Adapt<TelemetryTimeZone>();
|
|
if (newTelemetryTimeZone?.Equals(telemetry.TelemetryTimeZone) == true)
|
|
return;
|
|
await cacheTelemetry.UpsertAsync(telemetry, token)
|
|
.ConfigureAwait(false);
|
|
}
|
|
|
|
public int? GetIdTelemetryByIdWell(int idWell)
|
|
{
|
|
var well = cacheWells.FirstOrDefault(w => w.Id == idWell);
|
|
return well?.IdTelemetry;
|
|
}
|
|
|
|
private async Task<(double latitude, double longitude)?> GetWellCoordinatesAsync(int idWell,
|
|
CancellationToken token)
|
|
{
|
|
var well = await cacheWells.FirstOrDefaultAsync(w => w.Id == idWell, token)
|
|
.ConfigureAwait(false);
|
|
|
|
if (well is null)
|
|
return null;
|
|
|
|
var latitude = well.Latitude ??
|
|
well.Cluster?.Latitude ??
|
|
well.Cluster?.Deposit?.Latitude;
|
|
|
|
var longitude = well.Longitude ??
|
|
well.Cluster?.Longitude ??
|
|
well.Cluster?.Deposit?.Longitude;
|
|
|
|
if (latitude is not null && longitude is not null)
|
|
return ((double)latitude, (double)longitude);
|
|
|
|
return null;
|
|
}
|
|
|
|
private Well GetWellByTelemetryUid(string uid)
|
|
{
|
|
var tele = cacheTelemetry.FirstOrDefault(t => t.RemoteUid == uid);
|
|
if (tele is null)
|
|
return null;
|
|
|
|
return cacheWells.FirstOrDefault(w => w?.IdTelemetry == tele.Id);
|
|
}
|
|
|
|
private Telemetry GetOrCreateTelemetryByUid(string uid)
|
|
=> cacheTelemetry.GetOrCreate(t => t.RemoteUid == uid, () => new Telemetry { RemoteUid = uid });
|
|
|
|
public async Task<int> MergeAsync(int from, int to, CancellationToken token)
|
|
{
|
|
if (from == to)
|
|
return -2;
|
|
|
|
var stopwath = new System.Diagnostics.Stopwatch();
|
|
stopwath.Start();
|
|
|
|
var transaction = await db.Database.BeginTransactionAsync(token).ConfigureAwait(false);
|
|
try
|
|
{
|
|
var affected = 0;
|
|
|
|
var wellFrom = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == from, token)
|
|
.ConfigureAwait(false);
|
|
var wellTo = await db.Wells.FirstOrDefaultAsync(w => w.IdTelemetry == to, token)
|
|
.ConfigureAwait(false);
|
|
|
|
if (wellTo is not null && wellFrom is not null)
|
|
return -2;
|
|
|
|
if (wellTo is null && wellFrom is not null)
|
|
{
|
|
wellFrom.IdTelemetry = to;
|
|
affected += await db.SaveChangesAsync(token);
|
|
}
|
|
|
|
affected += await MergeEventsAndMessagesAndUsersAsync(from, to, token);
|
|
affected += await MergeDataAsync<TelemetryDataSaub>(from, to, token);
|
|
affected += await MergeDataAsync<TelemetryDataSpin>(from, to, token);
|
|
|
|
affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry_analysis WHERE id_telemetry = {from} OR id_telemetry = {to};", token)
|
|
.ConfigureAwait(false);
|
|
affected += await db.Database.ExecuteSqlRawAsync($"DELETE FROM t_telemetry WHERE id = {from};", token)
|
|
.ConfigureAwait(false);
|
|
await transaction.CommitAsync(token).ConfigureAwait(false);
|
|
|
|
stopwath.Stop();
|
|
Console.WriteLine($"Successfully commited in {1d * stopwath.ElapsedMilliseconds / 1000d: #0.00} sec. Affected {affected} rows.");
|
|
return affected;
|
|
}
|
|
catch(Exception ex)
|
|
{
|
|
System.Diagnostics.Trace.WriteLine($"Merge() Fail. Rollback. Reason is:{ex.Message}");
|
|
await transaction.RollbackAsync(token).ConfigureAwait(false);
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
private async Task<int> MergeEventsAndMessagesAndUsersAsync(int from, int to, CancellationToken token)
|
|
{
|
|
var messagesFrom = await db.TelemetryMessages
|
|
.Where(d => d.IdTelemetry == from)
|
|
.ToListAsync(token)
|
|
.ConfigureAwait(false);
|
|
|
|
var usersFromQuery = db.TelemetryUsers
|
|
.Where(d => d.IdTelemetry == from);
|
|
var usersFrom = await usersFromQuery
|
|
.ToListAsync(token)
|
|
.ConfigureAwait(false);
|
|
|
|
var usersTo = await db.TelemetryUsers
|
|
.Where(d => d.IdTelemetry == to)
|
|
.AsNoTracking()
|
|
.ToListAsync(token)
|
|
.ConfigureAwait(false);
|
|
|
|
var usersToNextId = usersTo.Max(u => u.IdUser) + 100;
|
|
|
|
messagesFrom.ForEach(m => m.IdTelemetry = to);
|
|
|
|
foreach (var userFrom in usersFrom)
|
|
{
|
|
var userTo = usersTo
|
|
.FirstOrDefault(u=>u.IdUser == userFrom.IdUser);
|
|
|
|
if (userTo is null ||
|
|
userTo.Name != userFrom.Name ||
|
|
userTo.Surname != userFrom.Surname ||
|
|
userTo.Patronymic != userFrom.Patronymic)
|
|
{
|
|
messagesFrom
|
|
.Where(m => m.IdTelemetryUser == userFrom.IdUser)
|
|
.ToList()
|
|
.ForEach(m => m.IdTelemetryUser = usersToNextId);
|
|
userFrom.IdUser = usersToNextId;
|
|
userFrom.IdTelemetry = to;
|
|
usersToNextId++;
|
|
}
|
|
}
|
|
|
|
var eventsFromQuery = db.TelemetryEvents
|
|
.Where(d => d.IdTelemetry == from);
|
|
var eventsFrom = await eventsFromQuery
|
|
.ToListAsync(token)
|
|
.ConfigureAwait(false);
|
|
|
|
var eventsTo = await db.TelemetryEvents
|
|
.Where(d => d.IdTelemetry == to)
|
|
.AsNoTracking()
|
|
.ToListAsync(token)
|
|
.ConfigureAwait(false);
|
|
|
|
var eventsToNextId = eventsTo.Max(e => e.IdEvent) + 1;
|
|
|
|
foreach (var eventFrom in eventsFrom)
|
|
{
|
|
var eventTo = eventsTo
|
|
.FirstOrDefault(e => e.IdEvent == eventFrom.IdEvent);
|
|
|
|
if (eventTo is null ||
|
|
eventTo.IdCategory != eventFrom.IdCategory ||
|
|
eventTo.MessageTemplate != eventFrom.MessageTemplate)
|
|
{
|
|
messagesFrom
|
|
.Where(m => m.IdEvent == eventFrom.IdEvent)
|
|
.ToList()
|
|
.ForEach(m => m.IdEvent = eventsToNextId);
|
|
eventFrom.IdEvent = eventsToNextId;
|
|
eventFrom.IdTelemetry = to;
|
|
eventsToNextId++;
|
|
}
|
|
}
|
|
|
|
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user DISABLE TRIGGER ALL;", token).ConfigureAwait(false);
|
|
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event DISABLE TRIGGER ALL;", token).ConfigureAwait(false);
|
|
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message DISABLE TRIGGER ALL;", token).ConfigureAwait(false);
|
|
var affected = await db.SaveChangesAsync(token).ConfigureAwait(false);
|
|
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_user ENABLE TRIGGER ALL;", token).ConfigureAwait(false);
|
|
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_event ENABLE TRIGGER ALL;", token).ConfigureAwait(false);
|
|
await db.Database.ExecuteSqlRawAsync($"ALTER TABLE t_telemetry_message ENABLE TRIGGER ALL;", token).ConfigureAwait(false);
|
|
db.TelemetryUsers.RemoveRange(usersFromQuery);
|
|
db.TelemetryEvents.RemoveRange(eventsFromQuery);
|
|
affected += await db.SaveChangesAsync(token).ConfigureAwait(false);
|
|
return affected;
|
|
}
|
|
|
|
private async Task<int> MergeDataAsync<TEntity>(int from, int to, CancellationToken token)
|
|
where TEntity: class, AsbCloudDb.Model.ITelemetryData
|
|
{
|
|
const string IdTelemetryColumnName = "\"id_telemetry\"";
|
|
var dbSet = db.Set<TEntity>();
|
|
var tableName = dbSet.GetTableName();
|
|
var columns = dbSet.GetColumnsNames().ToList();
|
|
var index = columns.FindIndex(c => c.ToLower() == IdTelemetryColumnName.ToLower());
|
|
if (index < 0)
|
|
return -5;
|
|
columns[index] = $"{to} as {IdTelemetryColumnName}";
|
|
var columnsString = string.Join(',', columns);
|
|
var sql = $"INSERT INTO {tableName} " +
|
|
$"select {columnsString} " +
|
|
$"from {tableName} " +
|
|
$"where {IdTelemetryColumnName} = {from};" +
|
|
$"delete from {tableName} where {IdTelemetryColumnName} = {from};";
|
|
|
|
var affected = await db.Database.ExecuteSqlRawAsync(sql, token)
|
|
.ConfigureAwait(false);
|
|
|
|
return affected;
|
|
}
|
|
}
|
|
}
|