forked from ddrilling/AsbCloudServer
Автоопределение операций
This commit is contained in:
parent
e29b2be423
commit
d7e9c4f893
@ -1,11 +1,9 @@
|
||||
using System;
|
||||
using AsbCloudApp.Data.DetectedOperation;
|
||||
using AsbCloudApp.Requests;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using System.Threading;
|
||||
using AsbCloudApp.Data;
|
||||
using AsbCloudApp.Data.WellOperation;
|
||||
|
||||
namespace AsbCloudApp.Repositories;
|
||||
|
||||
@ -55,11 +53,11 @@ public interface IDetectedOperationRepository
|
||||
Task<PaginationContainer<DetectedOperationDto>> GetPageAsync(DetectedOperationByTelemetryRequest request, CancellationToken token);
|
||||
|
||||
/// <summary>
|
||||
/// Получение дат последних определённых операций
|
||||
/// Получение последних автоопределённых операций
|
||||
/// </summary>
|
||||
/// <param name="token"></param>
|
||||
/// <returns></returns>
|
||||
Task<IDictionary<int, DateTimeOffset>> GetLastDetectedDatesAsync(CancellationToken token);
|
||||
Task<IDictionary<int, DetectedOperationDto>> GetLastDetectedOperationsAsync(CancellationToken token);
|
||||
|
||||
/// <summary>
|
||||
/// Удалить операции
|
||||
|
@ -79,9 +79,13 @@ namespace AsbCloudApp.Services
|
||||
/// Определение операций
|
||||
/// </summary>
|
||||
/// <param name="idTelemetry"></param>
|
||||
/// <param name="beginDate"></param>
|
||||
/// <param name="request"></param>
|
||||
/// <param name="lastDetectedOperation"></param>
|
||||
/// <param name="token"></param>
|
||||
/// <returns></returns>
|
||||
Task<IEnumerable<DetectedOperationDto>> DetectOperationsAsync(int idTelemetry, DateTimeOffset? beginDate, CancellationToken token);
|
||||
Task<(DateTimeOffset LastDate, IEnumerable<DetectedOperationDto> Items)> DetectOperationsAsync(int idTelemetry,
|
||||
TelemetryDataRequest request,
|
||||
DetectedOperationDto? lastDetectedOperation,
|
||||
CancellationToken token);
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using AsbCloudApp.Data;
|
||||
|
||||
namespace AsbCloudApp.Services
|
||||
{
|
||||
@ -44,5 +45,12 @@ namespace AsbCloudApp.Services
|
||||
/// <param name="token"></param>
|
||||
/// <returns></returns>
|
||||
Task<Stream> GetZippedCsv(int idWell, DateTime beginDate, DateTime endDate, CancellationToken token);
|
||||
|
||||
/// <summary>
|
||||
/// Получение диапозона дат телеметрий
|
||||
/// </summary>
|
||||
/// <param name="token"></param>
|
||||
/// <returns></returns>
|
||||
Task<IDictionary<int, DatesRangeDto>> GetDateRangesAsync(CancellationToken token);
|
||||
}
|
||||
}
|
@ -26,6 +26,28 @@ public class DetectedOperationRepository : CrudRepositoryBase<DetectedOperationD
|
||||
this.telemetryService = telemetryService;
|
||||
}
|
||||
|
||||
public async Task<IDictionary<int, DetectedOperationDto>> GetLastDetectedOperationsAsync(CancellationToken token)
|
||||
{
|
||||
var entities = await dbContext.Set<DetectedOperation>()
|
||||
.GroupBy(o => o.IdTelemetry)
|
||||
.Select(g => new
|
||||
{
|
||||
IdTelemetry = g.Key,
|
||||
LastDetectedOperation = g.OrderBy(o => o.DateEnd).Last()
|
||||
})
|
||||
.ToDictionaryAsync(x => x.IdTelemetry, x => x.LastDetectedOperation, token);
|
||||
|
||||
var dtos = entities.ToDictionary(x => x.Key, x =>
|
||||
{
|
||||
if (x.Value == null)
|
||||
throw new ArgumentNullException(nameof(x.Value), "The value of the last detected operation cannot be null");
|
||||
|
||||
return Convert(x.Value);
|
||||
});
|
||||
|
||||
return dtos;
|
||||
}
|
||||
|
||||
public async Task<int> DeleteAsync(DetectedOperationByTelemetryRequest request, CancellationToken token)
|
||||
{
|
||||
var query = BuildQuery(request);
|
||||
@ -58,16 +80,6 @@ public class DetectedOperationRepository : CrudRepositoryBase<DetectedOperationD
|
||||
return paginationContainer;
|
||||
}
|
||||
|
||||
public async Task<IDictionary<int, DateTimeOffset>> GetLastDetectedDatesAsync(CancellationToken token) =>
|
||||
await dbContext.Set<DetectedOperation>()
|
||||
.GroupBy(o => o.IdTelemetry)
|
||||
.Select(g => new
|
||||
{
|
||||
IdTelemetry = g.Key,
|
||||
LastDate = g.Max(o => o.DateEnd)
|
||||
})
|
||||
.ToDictionaryAsync(x => x.IdTelemetry, x => x.LastDate, token);
|
||||
|
||||
public async Task<IEnumerable<DetectedOperationDto>> Get(DetectedOperationByTelemetryRequest request, CancellationToken token)
|
||||
{
|
||||
var query = BuildQuery(request)
|
||||
|
@ -177,72 +177,70 @@ public class DetectedOperationService : IDetectedOperationService
|
||||
return dtos;
|
||||
}
|
||||
|
||||
public async Task<IEnumerable<DetectedOperationDto>> DetectOperationsAsync(int idTelemetry, DateTimeOffset? beginDate, CancellationToken token)
|
||||
public async Task<(DateTimeOffset LastDate, IEnumerable<DetectedOperationDto> Items)> DetectOperationsAsync(int idTelemetry,
|
||||
TelemetryDataRequest request,
|
||||
DetectedOperationDto? lastDetectedOperation,
|
||||
CancellationToken token)
|
||||
{
|
||||
const int take = 4 * 86_400;
|
||||
|
||||
var detectedOperations = new List<DetectedOperationDto>();
|
||||
DetectedOperationDto? lastDetectedOperation = null;
|
||||
const int minOperationLength = 5;
|
||||
const int maxDetectorsInterpolationFrameLength = 30;
|
||||
const int gap = maxDetectorsInterpolationFrameLength + minOperationLength;
|
||||
|
||||
var telemetries = await telemetryDataSaubService.GetByTelemetryAsync(idTelemetry, request, token);
|
||||
|
||||
var count = telemetries.Count();
|
||||
|
||||
if (count == 0)
|
||||
throw new InvalidOperationException("InvalidOperation_EmptyTelemetries");
|
||||
|
||||
var timezone = telemetryService.GetTimezone(idTelemetry);
|
||||
|
||||
while (true)
|
||||
if (telemetries.Count() <= gap)
|
||||
{
|
||||
var request = new TelemetryDataRequest
|
||||
{
|
||||
GeDate = beginDate,
|
||||
Take = take,
|
||||
Order = 0
|
||||
};
|
||||
|
||||
var dtos = await telemetryDataSaubService.GetByTelemetryAsync(idTelemetry, request, token);
|
||||
var detectableTelemetries = dtos
|
||||
.Where(t => t.BlockPosition >= 0)
|
||||
.Select(t => new DetectableTelemetry
|
||||
{
|
||||
DateTime = new DateTimeOffset(t.DateTime, timezone.Offset),
|
||||
IdUser = t.IdUser,
|
||||
Mode = t.Mode,
|
||||
WellDepth = t.WellDepth,
|
||||
Pressure = t.Pressure,
|
||||
HookWeight = t.HookWeight,
|
||||
BlockPosition = t.BlockPosition,
|
||||
BitDepth = t.BitDepth,
|
||||
RotorSpeed = t.RotorSpeed,
|
||||
AxialLoad = t.AxialLoad,
|
||||
}).ToArray();
|
||||
|
||||
if (detectableTelemetries.Length <= gap)
|
||||
break;
|
||||
|
||||
var isDetected = false;
|
||||
var positionBegin = 0;
|
||||
var positionEnd = detectableTelemetries.Length - gap;
|
||||
while (positionEnd > positionBegin)
|
||||
{
|
||||
foreach (var detector in detectors)
|
||||
{
|
||||
if (!detector.TryDetect(idTelemetry, detectableTelemetries, positionBegin, positionEnd, lastDetectedOperation, out var result))
|
||||
continue;
|
||||
|
||||
detectedOperations.Add(result!.Operation);
|
||||
lastDetectedOperation = result.Operation;
|
||||
isDetected = true;
|
||||
positionBegin = result.TelemetryEnd;
|
||||
break;
|
||||
}
|
||||
|
||||
positionBegin += 1;
|
||||
}
|
||||
|
||||
beginDate = isDetected
|
||||
? lastDetectedOperation!.DateEnd
|
||||
: detectableTelemetries[positionEnd].DateTime;
|
||||
var lastTelemetry = telemetries.Last();
|
||||
var lastDateTelemetry = new DateTimeOffset(lastTelemetry.DateTime, timezone.Offset);
|
||||
return (lastDateTelemetry, Enumerable.Empty<DetectedOperationDto>());
|
||||
}
|
||||
|
||||
return detectedOperations;
|
||||
var detectedOperations = new List<DetectedOperationDto>();
|
||||
|
||||
var detectableTelemetries = telemetries.Select(t => new DetectableTelemetry
|
||||
{
|
||||
DateTime = new DateTimeOffset(t.DateTime, timezone.Offset),
|
||||
IdUser = t.IdUser,
|
||||
Mode = t.Mode,
|
||||
WellDepth = t.WellDepth,
|
||||
Pressure = t.Pressure,
|
||||
HookWeight = t.HookWeight,
|
||||
BlockPosition = t.BlockPosition,
|
||||
BitDepth = t.BitDepth,
|
||||
RotorSpeed = t.RotorSpeed,
|
||||
AxialLoad = t.AxialLoad,
|
||||
}).ToArray();
|
||||
|
||||
var positionBegin = 0;
|
||||
var positionEnd = detectableTelemetries.Length - gap;
|
||||
|
||||
while (positionEnd > positionBegin)
|
||||
{
|
||||
foreach (var detector in detectors)
|
||||
{
|
||||
if (!detector.TryDetect(idTelemetry, detectableTelemetries, positionBegin, positionEnd, lastDetectedOperation,
|
||||
out var result))
|
||||
continue;
|
||||
|
||||
detectedOperations.Add(result!.Operation);
|
||||
lastDetectedOperation = result.Operation;
|
||||
positionBegin = result.TelemetryEnd;
|
||||
break;
|
||||
}
|
||||
|
||||
positionBegin += 1;
|
||||
}
|
||||
|
||||
var lastDate = lastDetectedOperation?.DateEnd ?? detectableTelemetries[positionEnd].DateTime;
|
||||
|
||||
return (lastDate, detectedOperations);
|
||||
}
|
||||
|
||||
public async Task<int> DeleteAsync(DetectedOperationByWellRequest request, CancellationToken token)
|
||||
|
@ -48,6 +48,8 @@ namespace AsbCloudInfrastructure.Services.DetectOperations.Detectors
|
||||
|
||||
while (positionEnd < end)
|
||||
{
|
||||
//TODO: поиск провалов телеметрии
|
||||
|
||||
positionEnd += 1;
|
||||
if (positionEnd > end)
|
||||
break;
|
||||
|
@ -1,54 +1,93 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using AsbCloudApp.Data;
|
||||
using AsbCloudApp.Repositories;
|
||||
using AsbCloudApp.Requests;
|
||||
using AsbCloudApp.Services;
|
||||
using AsbCloudInfrastructure.Background;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace AsbCloudInfrastructure.Services.DetectOperations;
|
||||
|
||||
public class WorkOperationDetection: Work
|
||||
public class WorkOperationDetection : Work
|
||||
{
|
||||
private static readonly IDictionary<int, DateTime> CacheOfStartDatesByTelemetryId = new Dictionary<int, DateTime>();
|
||||
|
||||
public WorkOperationDetection()
|
||||
:base("Operation detection")
|
||||
{
|
||||
Timeout = TimeSpan.FromMinutes(20);
|
||||
OnErrorAsync = (id, exception, token) =>
|
||||
{
|
||||
var text = $"work {id}, when {CurrentState?.State}, throw error:{exception.Message}";
|
||||
Trace.TraceWarning(text);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
}
|
||||
public WorkOperationDetection()
|
||||
: base("Operation detection")
|
||||
{
|
||||
Timeout = TimeSpan.FromMinutes(20);
|
||||
OnErrorAsync = (id, exception, _) =>
|
||||
{
|
||||
var text = $"work {id}, when {CurrentState?.State}, throw error:{exception.Message}";
|
||||
Trace.TraceWarning(text);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
}
|
||||
|
||||
protected override async Task Action(string id, IServiceProvider services, Action<string, double?> onProgressCallback, CancellationToken token)
|
||||
{
|
||||
var telemetryRepository = services.GetRequiredService<ICrudRepository<TelemetryDto>>();
|
||||
var detectedOperationRepository = services.GetRequiredService<IDetectedOperationRepository>();
|
||||
var detectedOperationService = services.GetRequiredService<IDetectedOperationService>();
|
||||
protected override async Task Action(string id,
|
||||
IServiceProvider services,
|
||||
Action<string, double?> onProgressCallback,
|
||||
CancellationToken token)
|
||||
{
|
||||
var telemetryRepository = services.GetRequiredService<ICrudRepository<TelemetryDto>>();
|
||||
var detectedOperationRepository = services.GetRequiredService<IDetectedOperationRepository>();
|
||||
var detectedOperationService = services.GetRequiredService<IDetectedOperationService>();
|
||||
var telemetryDataSaubService = services.GetRequiredService<ITelemetryDataSaubService>();
|
||||
|
||||
var telemetryIds = (await telemetryRepository.GetAllAsync(token))
|
||||
.Select(t => t.Id)
|
||||
.ToArray();
|
||||
var idsTelemetry = (await telemetryRepository.GetAllAsync(token))
|
||||
.Select(t => t.Id)
|
||||
.ToArray();
|
||||
|
||||
var lastDetectedDates = await detectedOperationRepository.GetLastDetectedDatesAsync(token);
|
||||
var telemetriesDateRanges = await telemetryDataSaubService.GetDateRangesAsync(token);
|
||||
|
||||
for (var i = 0; i < telemetryIds.Length; i++)
|
||||
{
|
||||
var telemetryId = telemetryIds[i];
|
||||
var lastDetectedOperations = await detectedOperationRepository.GetLastDetectedOperationsAsync(token);
|
||||
|
||||
var beginDate = lastDetectedDates.TryGetValue(telemetryId, out var date) ? date : (DateTimeOffset?)null;
|
||||
for (int i = 0; i < idsTelemetry.Length; i++)
|
||||
{
|
||||
var idTelemetry = idsTelemetry[i];
|
||||
|
||||
onProgressCallback($"Start detecting telemetry: {telemetryId} from {beginDate}", i / telemetryIds.Length);
|
||||
var detectedOperations = await detectedOperationService.DetectOperationsAsync(telemetryId, beginDate, token);
|
||||
if (!telemetriesDateRanges.TryGetValue(idTelemetry, out var telemetryDateRange))
|
||||
continue;
|
||||
|
||||
if (detectedOperations.Any())
|
||||
await detectedOperationRepository.InsertRangeAsync(detectedOperations, token);
|
||||
}
|
||||
}
|
||||
var dateBegin = telemetryDateRange.From.DateTime;
|
||||
var dateEnd = telemetryDateRange.To.DateTime;
|
||||
|
||||
if (lastDetectedOperations.TryGetValue(idTelemetry, out var lastDetectedOperation))
|
||||
dateBegin = lastDetectedOperation.DateEnd.UtcDateTime;
|
||||
|
||||
if (CacheOfStartDatesByTelemetryId.TryGetValue(idTelemetry, out var dateBeginFromCahce))
|
||||
dateBegin = dateBeginFromCahce;
|
||||
|
||||
onProgressCallback.Invoke($"Start detecting telemetry: {idTelemetry} from {dateBegin}", i / idsTelemetry.Length);
|
||||
|
||||
const int pointsCount = 4 * 86_400;
|
||||
|
||||
while (dateBegin < dateEnd)
|
||||
{
|
||||
var request = new TelemetryDataRequest
|
||||
{
|
||||
GeDate = dateBegin,
|
||||
Take = pointsCount,
|
||||
Order = 0,
|
||||
GeBlockPosition = 0
|
||||
};
|
||||
|
||||
var detectedOperations =
|
||||
await detectedOperationService.DetectOperationsAsync(idTelemetry, request, lastDetectedOperation, token);
|
||||
|
||||
await detectedOperationRepository.InsertRangeAsync(detectedOperations.Items, token);
|
||||
|
||||
dateBegin = detectedOperations.LastDate.UtcDateTime;
|
||||
|
||||
CacheOfStartDatesByTelemetryId[idTelemetry] = dateBegin;
|
||||
|
||||
await detectedOperationRepository.InsertRangeAsync(detectedOperations.Items, token);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -177,7 +177,7 @@ namespace AsbCloudInfrastructure.Services.SAUB
|
||||
return dtos;
|
||||
}
|
||||
|
||||
private IQueryable<TEntity> BuildQuery(int idTelemetry, TelemetryDataRequest request)
|
||||
protected virtual IQueryable<TEntity> BuildQuery(int idTelemetry, TelemetryDataRequest request)
|
||||
{
|
||||
var dbSet = db.Set<TEntity>();
|
||||
|
||||
|
@ -13,6 +13,8 @@ using System.Linq;
|
||||
using System.Text.Csv;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using AsbCloudApp.Data;
|
||||
using AsbCloudApp.Requests;
|
||||
|
||||
namespace AsbCloudInfrastructure.Services.SAUB;
|
||||
|
||||
@ -174,4 +176,33 @@ public class TelemetryDataSaubService : TelemetryDataBaseService<TelemetryDataSa
|
||||
outStream.Seek(0, SeekOrigin.Begin);
|
||||
return outStream;
|
||||
}
|
||||
|
||||
public async Task<IDictionary<int, DatesRangeDto>> GetDateRangesAsync(CancellationToken token)
|
||||
{
|
||||
return await db.Set<TelemetryDataSaub>().GroupBy(x => x.IdTelemetry)
|
||||
.Select(g => new
|
||||
{
|
||||
IdTelemetry = g.Key,
|
||||
From = g.Min(x => x.DateTime),
|
||||
To = g.Max(x => x.DateTime)
|
||||
})
|
||||
.ToDictionaryAsync(x => x.IdTelemetry, x => new DatesRangeDto
|
||||
{
|
||||
From = x.From,
|
||||
To = x.To
|
||||
}, token);
|
||||
}
|
||||
|
||||
protected override IQueryable<TelemetryDataSaub> BuildQuery(int idTelemetry, TelemetryDataRequest request)
|
||||
{
|
||||
var query = base.BuildQuery(idTelemetry, request);
|
||||
|
||||
if (request.GeBlockPosition.HasValue)
|
||||
query = query.Where(e => e.BlockPosition >= request.GeBlockPosition);
|
||||
|
||||
if (request.LeBlockPosition.HasValue)
|
||||
query = query.Where(e => e.BlockPosition <= request.LeBlockPosition);
|
||||
|
||||
return query;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user