Refactor TelemetryAnalysis.

This commit is contained in:
Фролов 2021-10-01 15:44:56 +05:00
parent 28b60250be
commit ae9b23cc00
11 changed files with 191 additions and 184 deletions

View File

@ -23,7 +23,7 @@ namespace AsbCloudApp.Services
Task<IEnumerable<TelemetryOperationInfoDto>> GetOperationsToIntervalAsync(int idWell,
int intervalHoursTimestamp, int workBeginTimestamp,
CancellationToken token = default);
void CalculateAnalytics();
Task AnalyzeAndSaveTelemetriesAsync(CancellationToken token = default);
Task<DatesRangeDto> GetOperationsDateRangeAsync(int idWell,
CancellationToken token = default);
}

View File

@ -19,8 +19,6 @@ namespace AsbCloudDb.Model
[Column("id_operation")]
public int IdOperation { get; set; }
[JsonIgnore]
[ForeignKey(nameof(IdTelemetry))]
[InverseProperty(nameof(Model.Telemetry.Analysis))]
@ -31,7 +29,6 @@ namespace AsbCloudDb.Model
[InverseProperty(nameof(Model.WellOperationCategory.Analysis))]
public virtual WellOperationCategory Operation { get; set; }
[Column("unix_date", TypeName = "bigint"), Comment("Unix timestamp для Linq запросов с вычислением дат")]
public long UnixDate { get; set; }

View File

@ -2,6 +2,7 @@
using AsbCloudApp.Services;
using AsbCloudDb.Model;
using AsbCloudInfrastructure.Services;
using AsbCloudInfrastructure.Services.Analysis;
using AsbCloudInfrastructure.Services.Cache;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;

View File

@ -0,0 +1,16 @@
using System;
namespace AsbCloudInfrastructure.Services.Analysis
{
class DataSaubAnalyse
{
public int IdTelemetry { get; internal set; }
public DateTime Date { get; internal set; }
public double WellDepth { get; internal set; }
public double BitDepth { get; internal set; }
public double BlockPosition { get; internal set; }
public double RotorSpeed { get; internal set; }
public double Pressure { get; internal set; }
public double HookWeight { get; internal set; }
}
}

View File

@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Linq;
namespace AsbCloudInfrastructure.Services
namespace AsbCloudInfrastructure.Services.Analysis
{
public class InterpolationLine
{
@ -14,11 +14,15 @@ namespace AsbCloudInfrastructure.Services
public InterpolationLine(IEnumerable<(double Y, double X)> rawData)
{
xSum = rawData.Sum(d => d.X);
ySum = rawData.Sum(d => d.Y);
xySum = rawData.Sum(d => d.X * d.Y);
x2Sum = rawData.Sum(d => d.X * d.X);
count = rawData.Count();
var iterator = rawData.GetEnumerator();
while (iterator.MoveNext())
{
xSum += iterator.Current.X;
ySum += iterator.Current.Y;
xySum += iterator.Current.X * iterator.Current.Y;
x2Sum += iterator.Current.X * iterator.Current.X;
count++;
}
}
public double A =>
@ -29,7 +33,6 @@ namespace AsbCloudInfrastructure.Services
(xSum * xySum - x2Sum * ySum) /
(xSum * xSum - count * x2Sum);
public bool IsYNotChanges(double upperBound = 0d, double lowerBound = 0d) =>
A < upperBound && A > lowerBound;

View File

@ -4,15 +4,15 @@ using System.Threading.Tasks;
using System.Diagnostics;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using AsbCloudApp.Services;
using AsbCloudInfrastructure.Services.Cache;
using AsbCloudDb.Model;
namespace AsbCloudInfrastructure.Services
namespace AsbCloudInfrastructure.Services.Analysis
{
public class TelemetryAnalyticsBackgroundService : BackgroundService
{
private readonly CacheDb cacheDb;
private readonly TimeSpan period = TimeSpan.FromHours(1);
public TelemetryAnalyticsBackgroundService(CacheDb cacheDb)
{
@ -22,32 +22,32 @@ namespace AsbCloudInfrastructure.Services
protected override async Task ExecuteAsync(CancellationToken token = default)
{
var timeToStartAnalysis = DateTime.Now;
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql("Host=localhost;Database=postgres;Username=postgres;Password=q;Persist Security Info=True")
.Options;
while (!token.IsCancellationRequested)
{
if(DateTime.Now > timeToStartAnalysis)
{
timeToStartAnalysis = DateTime.Now + period;
try
{
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql("Host=localhost;Database=postgres;Username=postgres;Password=q;Persist Security Info=True")
.Options;
using var context = new AsbCloudDbContext(options);
var telemetryService = new TelemetryService(context, cacheDb);
var analyticsService = new TelemetryAnalyticsService(context,
telemetryService, cacheDb);
timeToStartAnalysis = DateTime.Now.AddHours(1);
await Task.Run(() => analyticsService.CalculateAnalytics(), token)
.ConfigureAwait(false);
await analyticsService.AnalyzeAndSaveTelemetriesAsync(token).ConfigureAwait(false);
context.ChangeTracker.Clear();
context.Dispose();
}
catch (Exception ex)
{
Trace.TraceError(ex.Message);
Console.WriteLine(ex.Message);
}
GC.Collect();
}
var ms = (int)(timeToStartAnalysis - DateTime.Now).TotalMilliseconds;

View File

@ -9,7 +9,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services
namespace AsbCloudInfrastructure.Services.Analysis
{
public class TelemetryAnalyticsService : ITelemetryAnalyticsService
{
@ -19,7 +19,7 @@ namespace AsbCloudInfrastructure.Services
private readonly TelemetryOperationDetectorService operationDetectorService;
private readonly IEnumerable<WellOperationCategory> operations;
private const int intervalHours = 12;
private const int countOfRecordsForInterpolation = 12 * 60 * 60;
public TelemetryAnalyticsService(IAsbCloudDbContext db, ITelemetryService telemetryService,
CacheDb cacheDb)
@ -230,70 +230,68 @@ namespace AsbCloudInfrastructure.Services
return groupedOperationsList;
}
public void CalculateAnalytics()
public async Task AnalyzeAndSaveTelemetriesAsync(CancellationToken token = default)
{
var allTelemetryIds = (from telemetry in db.TelemetryDataSaub
select telemetry.IdTelemetry)
.Distinct()
.ToList();
var allTelemetryIds = await db.TelemetryDataSaub
.Select(t => t.IdTelemetry)
.Distinct()
.ToListAsync(token)
.ConfigureAwait(false);
foreach(var idTelemetry in allTelemetryIds)
{
var analyzeStartDate = GetAnalyzeStartDate(idTelemetry);
var analyzeStartDate = await GetLastAnalysisDateAsync(idTelemetry, token).ConfigureAwait(false);
await AnalyseAndSaveTelemetryAsync(idTelemetry, analyzeStartDate, token).ConfigureAwait(false);
GC.Collect();
}
}
if (analyzeStartDate == default)
continue;
private async Task AnalyseAndSaveTelemetryAsync(int idTelemetry, DateTime analyzeStartDate, CancellationToken token = default)
{
const int step = 10;
const int take = step * 2;
TelemetryAnalysis currentAnalysis = null;
TelemetryAnalysis currentAnalysis = null;
while (true)
{
var dataSaubPart = await GetDataSaubPartOrDefaultAsync(idTelemetry, analyzeStartDate, token).ConfigureAwait(false);
if (dataSaubPart is null)
break;
while (TryGetDataSaubPart(idTelemetry, analyzeStartDate,
out var dataSaubPart))
var count = dataSaubPart.Count();
var skip = 0;
if (step > count)
break;
analyzeStartDate = dataSaubPart.Last().Date;
for (; (skip + step) < count; skip += step)
{
analyzeStartDate = dataSaubPart.Last().Date;
var dataSaubPartOfPart = dataSaubPart.Skip(skip).Take(take);
var telemetryAnalysis = GetDrillingAnalysis(dataSaubPartOfPart);
var count = dataSaubPart.Count();
var skip = 0;
var step = 10;
var take = step * 2;
for(;skip < count; skip += step)
if (currentAnalysis is not null)
{
var analyzingSaubs = dataSaubPart.Skip(skip).Take(take);
if (analyzingSaubs.Count() <= 1)
{
continue;
}
var dataSaub = analyzingSaubs.First();
var telemetryAnalysis = new TelemetryAnalysis();
telemetryAnalysis = GetDrillingAnalysis(analyzingSaubs);
if (currentAnalysis is null)
{
currentAnalysis = telemetryAnalysis;
currentAnalysis.OperationStartDepth = dataSaub.WellDepth;
}
if (currentAnalysis.IdOperation == telemetryAnalysis.IdOperation)
{
currentAnalysis.DurationSec += telemetryAnalysis.DurationSec;
currentAnalysis.OperationEndDepth = dataSaub.WellDepth;
}
else
{
currentAnalysis.OperationEndDepth = dataSaubPartOfPart.LastOrDefault()?.WellDepth;
db.TelemetryAnalysis.Add(currentAnalysis);
currentAnalysis = telemetryAnalysis;
currentAnalysis.OperationStartDepth = dataSaub.WellDepth;
currentAnalysis = null;
}
}
db.SaveChanges();
GC.Collect();
if (currentAnalysis is null)
{
currentAnalysis = telemetryAnalysis;
currentAnalysis.OperationStartDepth = dataSaubPartOfPart.FirstOrDefault()?.WellDepth;
}
}
await db.SaveChangesAsync(token).ConfigureAwait(false);
GC.Collect();
}
}
@ -327,52 +325,48 @@ namespace AsbCloudInfrastructure.Services
};
}
private DateTime GetAnalyzeStartDate(int idTelemetry)
private async Task<DateTime> GetLastAnalysisDateAsync(int idTelemetry, CancellationToken token = default)
{
var lastAnalysisInDb = (from analysis in db.TelemetryAnalysis
var lastAnalysisInDb = await (from analysis in db.TelemetryAnalysis
where analysis.IdTelemetry == idTelemetry
orderby analysis.UnixDate
select analysis)
.DefaultIfEmpty()
.Last();
.LastOrDefaultAsync(token)
.ConfigureAwait(false);
var lastAnalysisUnixDate = lastAnalysisInDb?.UnixDate ?? default;
DateTime lastAnalysisDate = default;
var analyzeStartDate = lastAnalysisUnixDate == default
? DateTimeOffset.MinValue
: DateTimeOffset.FromUnixTimeSeconds(lastAnalysisUnixDate);
var firstDataSaub = GetFirstSaub(analyzeStartDate.DateTime, idTelemetry);
var firstSaubUtcTime = firstDataSaub?.Date.ToUniversalTime() ?? default;
return firstSaubUtcTime;
if(lastAnalysisInDb is not null)
lastAnalysisDate = DateTime.UnixEpoch.AddSeconds(lastAnalysisInDb.DurationSec + lastAnalysisInDb.UnixDate);
return lastAnalysisDate;
}
private bool TryGetDataSaubPart(int idTelemetry, DateTime analyzeStartDate,
out IEnumerable<TelemetryDataSaub> dataSaubPart)
{
var query = (from ds in db.TelemetryDataSaub
where ds.IdTelemetry == idTelemetry
&& ds.Date > analyzeStartDate
orderby ds.Date
select ds)
.Take(12 * 60* 60)
.AsNoTracking();
dataSaubPart = query.ToList();
return dataSaubPart.Any();
}
private TelemetryDataSaub GetFirstSaub(DateTime analyzeStartDate, int idTelemetry)
{
return (from ds in db.TelemetryDataSaub
where ds.IdTelemetry == idTelemetry
&& ds.Date > analyzeStartDate.AddHours(intervalHours)
orderby ds.Date
select ds).FirstOrDefault();
}
private Task<List<DataSaubAnalyse>> GetDataSaubPartOrDefaultAsync(int idTelemetry, DateTime analyzeStartDate, CancellationToken token) =>
db.TelemetryDataSaub
.Where(d =>
d.IdTelemetry == idTelemetry &&
d.Date > analyzeStartDate &&
d.BitDepth != null &&
d.BlockPosition != null &&
d.HookWeight != null &&
d.Pressure != null &&
d.RotorSpeed != null &&
d.WellDepth != null
)
.OrderBy(d => d.Date)
.Take(countOfRecordsForInterpolation)
.Select(d => new DataSaubAnalyse {
IdTelemetry = d.IdTelemetry,
Date = d.Date,
BitDepth = d.BitDepth ?? 0,
BlockPosition = d.BlockPosition ?? 0,
HookWeight = d.HookWeight ?? 0,
Pressure = d.Pressure ?? 0,
RotorSpeed = d.RotorSpeed ?? 0,
WellDepth = d.WellDepth ?? 0,
})
.ToListAsync(token);
private static IEnumerable<(long IntervalStart, string OperationName, int OperationDuration)> DivideOperationsByIntervalLength(
IEnumerable<(long IntervalStart, string OperationName, int OperationDuration)> operations, int intervalSeconds)
@ -471,28 +465,23 @@ namespace AsbCloudInfrastructure.Services
return groupedOperationsList;
}
private TelemetryAnalysis GetDrillingAnalysis(IEnumerable<TelemetryDataSaub> dataSaubBases)
private TelemetryAnalysis GetDrillingAnalysis(IEnumerable<DataSaubAnalyse> dataSaubPartOfPart)
{
var lastSaubDate = dataSaubBases.Last().Date;
var dataSaubFirst = dataSaubPartOfPart.First();
var dataSaubLast = dataSaubPartOfPart.Last();
var saubWellDepths = dataSaubBases.Where(sw => sw.WellDepth is not null)
.Select(s => (Y: (double)s.WellDepth,
X: (s.Date - dataSaubBases.First().Date).TotalSeconds));
var saubBitDepths = dataSaubBases.Where(sw => sw.BitDepth is not null)
.Select(s => (Y: (double)s.BitDepth,
X: (s.Date - dataSaubBases.First().Date).TotalSeconds));
var saubBlockPositions = dataSaubBases.Where(sw => sw.BlockPosition is not null)
.Select(s => (Y: (double)s.BlockPosition,
X: (s.Date - dataSaubBases.First().Date).TotalSeconds));
var saubRotorSpeeds = dataSaubBases.Where(sw => sw.RotorSpeed is not null)
.Select(s => (Y: (double)s.RotorSpeed,
X: (s.Date - dataSaubBases.First().Date).TotalSeconds));
var saubPressures = dataSaubBases.Where(sw => sw.Pressure is not null)
.Select(s => (Y: (double)s.Pressure,
X: (s.Date - dataSaubBases.First().Date).TotalSeconds));
var saubHookWeights = dataSaubBases.Where(sw => sw.HookWeight is not null)
.Select(s => (Y: (double)s.HookWeight,
X: (s.Date - dataSaubBases.First().Date).TotalSeconds));
var saubWellDepths = dataSaubPartOfPart.Select(s => (Y: (double)s.WellDepth,
X: (s.Date - dataSaubFirst.Date).TotalSeconds));
var saubBitDepths = dataSaubPartOfPart.Select(s => (Y: (double)s.BitDepth,
X: (s.Date - dataSaubFirst.Date).TotalSeconds));
var saubBlockPositions = dataSaubPartOfPart.Select(s => (Y: (double)s.BlockPosition,
X: (s.Date - dataSaubFirst.Date).TotalSeconds));
var saubRotorSpeeds = dataSaubPartOfPart.Select(s => (Y: (double)s.RotorSpeed,
X: (s.Date - dataSaubFirst.Date).TotalSeconds));
var saubPressures = dataSaubPartOfPart.Select(s => (Y: (double)s.Pressure,
X: (s.Date - dataSaubFirst.Date).TotalSeconds));
var saubHookWeights = dataSaubPartOfPart.Select(s => (Y: (double)s.HookWeight,
X: (s.Date - dataSaubFirst.Date).TotalSeconds));
var wellDepthLine = new InterpolationLine(saubWellDepths);
var bitPositionLine = new InterpolationLine(saubBitDepths);
@ -501,16 +490,11 @@ namespace AsbCloudInfrastructure.Services
var pressureLine = new InterpolationLine(saubPressures);
var hookWeightLine = new InterpolationLine(saubHookWeights);
var IsBlockRising = blockPositionLine.IsYDecreases(-0.0001);
var IsBlockGoesDown = blockPositionLine.IsYIncreases(0.0001);
var IsBlockStandsStill = blockPositionLine.IsYNotChanges(0.0001, -0.0001);
var drillingAnalysis = new TelemetryAnalysis
{
IdTelemetry = dataSaubBases.First().IdTelemetry,
UnixDate = (long)(lastSaubDate - new DateTime(1970, 1, 1, 0, 0, 0)).TotalSeconds,
DurationSec = (int)(dataSaubBases.Last().Date -
dataSaubBases.ElementAt(dataSaubBases.Count() - 2).Date).TotalSeconds,
IdTelemetry = dataSaubFirst.IdTelemetry,
UnixDate = (long)(dataSaubFirst.Date - DateTime.UnixEpoch).TotalSeconds,
DurationSec = (int)(dataSaubLast.Date - dataSaubFirst.Date).TotalSeconds,
OperationStartDepth = null,
OperationEndDepth = null,
IsWellDepthDecreasing = wellDepthLine.IsYDecreases(-0.0001),
@ -526,7 +510,7 @@ namespace AsbCloudInfrastructure.Services
IsPressureGt20 = pressureLine.IsAverageYMoreThanBound(20),
IsHookWeightNotChanges = hookWeightLine.IsYNotChanges(0.0001, -0.0001),
IsHookWeightLt3 = hookWeightLine.IsAverageYLessThanBound(3),
IdOperation = 1
IdOperation = default,
};
drillingAnalysis.IdOperation =

View File

@ -1,9 +1,9 @@
using AsbCloudDb.Model;
using System;
namespace AsbCloudInfrastructure.Services
namespace AsbCloudInfrastructure.Services.Analysis
{
public class TelemetryOperationDetector
class TelemetryOperationDetector
{
public int Order { get; set; }
public WellOperationCategory Operation { get; set; }

View File

@ -1,11 +1,10 @@
using AsbCloudApp.Data;
using AsbCloudDb.Model;
using AsbCloudDb.Model;
using System.Collections.Generic;
using System.Linq;
namespace AsbCloudInfrastructure.Services
namespace AsbCloudInfrastructure.Services.Analysis
{
public class TelemetryOperationDetectorService
class TelemetryOperationDetectorService
{
private readonly IEnumerable<TelemetryOperationDetector> detectors;

View File

@ -1,48 +1,48 @@
using AsbCloudApp.Data;
using AsbCloudApp.Services;
using AsbCloudDb.Model;
using AsbCloudInfrastructure.Services.Cache;
using Mapster;
using Microsoft.EntityFrameworkCore;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
//using AsbCloudApp.Data;
//using AsbCloudApp.Services;
//using AsbCloudDb.Model;
//using AsbCloudInfrastructure.Services.Cache;
//using Mapster;
//using Microsoft.EntityFrameworkCore;
//using System;
//using System.Linq;
//using System.Threading;
//using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services
{
public class _CachedCrudService<Tdto, TModel> : ICachedCrudService<Tdto>
where TModel : class, AsbCloudDb.Model.IId
where Tdto : AsbCloudApp.Data.IId
{
private readonly CacheTable<TModel> cache;
//namespace AsbCloudInfrastructure.Services
//{
// public class _CachedCrudService<Tdto, TModel> : ICachedCrudService<Tdto>
// where TModel : class, AsbCloudDb.Model.IId
// where Tdto : AsbCloudApp.Data.IId
// {
// private readonly CacheTable<TModel> cache;
public _CachedCrudService(IAsbCloudDbContext db, Cache.CacheDb cacheDb)
{
cache = cacheDb.GetCachedTable<TModel>((DbContext)db);
}
// public _CachedCrudService(IAsbCloudDbContext db, Cache.CacheDb cacheDb)
// {
// cache = cacheDb.GetCachedTable<TModel>((DbContext)db);
// }
public virtual async Task<PaginationContainer<Tdto>> GetAsync(int skip = 0, int take = 32, CancellationToken token = default)
{
var count = cache.Count();
var result = new PaginationContainer<Tdto> { Skip = skip, Take = take, Count = count };
if (count <= skip)
return result;
// public virtual async Task<PaginationContainer<Tdto>> GetAsync(int skip = 0, int take = 32, CancellationToken token = default)
// {
// var count = cache.Count();
// var result = new PaginationContainer<Tdto> { Skip = skip, Take = take, Count = count };
// if (count <= skip)
// return result;
var items = await cache.WhereAsync(token).ConfigureAwait(false);
// var items = await cache.WhereAsync(token).ConfigureAwait(false);
result.Items.AddRange(items.OrderBy(i => i.Id).Skip(skip).Take(take).Select(i => Convert(i)));
// result.Items.AddRange(items.OrderBy(i => i.Id).Skip(skip).Take(take).Select(i => Convert(i)));
return result;
}
// return result;
// }
public virtual Task<Tdto> GetAsync(int id, CancellationToken token = default)
{
throw new NotImplementedException();
}
// public virtual Task<Tdto> GetAsync(int id, CancellationToken token = default)
// {
// throw new NotImplementedException();
// }
public virtual Tdto Convert(TModel src) => src.Adapt<Tdto>();
// public virtual Tdto Convert(TModel src) => src.Adapt<Tdto>();
public virtual TModel Convert(Tdto src) => src.Adapt<TModel>();
}
}
// public virtual TModel Convert(Tdto src) => src.Adapt<TModel>();
// }
//}

View File

@ -25,15 +25,22 @@ namespace ConsoleApp1
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql("Host=localhost;Database=postgres;Username=postgres;Password=q;Persist Security Info=True")
.Options;
var db = new AsbCloudDbContext(options);
using var db = new AsbCloudDbContext(options);
var cacheDb = new CacheDb();
var telemetryService = new TelemetryService(db, cacheDb);
var analyticsService = new TelemetryAnalyticsService(db,
telemetryService, cacheDb);
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
analyticsService.AnalyzeAndSaveTelemetriesAsync().Wait();
sw.Stop();
Console.WriteLine(sw.ElapsedMilliseconds);
return;
var ts = new TelemetryService(db, new CacheDb());
var groups = ts.GetRedundentRemoteUids();
foreach(var g in groups)
ts.Merge(g.Ids);
//var sql = "UPDATE t_telemetry SET info = '{{\"a\":6}}' WHERE id = 2;\n" +
// "UPDATE t_telemetry SET info = '{{\"a\":1}}' WHERE id = 1;\n";
//db.Database.ExecuteSqlRaw(sql, 1);
Console.WriteLine("Done. Press any key to quit.");
Console.ReadKey();