forked from ddrilling/AsbCloudServer
91 lines
2.9 KiB
C#
91 lines
2.9 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using AsbCloudApp.Data;
|
|
using AsbCloudApp.Data.SAUB;
|
|
using AsbCloudApp.Repositories;
|
|
using AsbCloudApp.Requests;
|
|
using AsbCloudApp.Services;
|
|
using AsbCloudInfrastructure.Background;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
|
|
namespace AsbCloudInfrastructure.Services.DetectOperations;
|
|
|
|
public class WorkOperationDetection : Work
|
|
{
|
|
private static readonly IDictionary<int, DateTimeOffset> CacheOfStartDatesByTelemetryId = new Dictionary<int, DateTimeOffset>();
|
|
|
|
public WorkOperationDetection()
|
|
: base("Operation detection")
|
|
{
|
|
Timeout = TimeSpan.FromMinutes(20);
|
|
OnErrorAsync = (id, exception, _) =>
|
|
{
|
|
var text = $"work {id}, when {CurrentState?.State}, throw error:{exception.Message}";
|
|
Trace.TraceWarning(text);
|
|
return Task.CompletedTask;
|
|
};
|
|
}
|
|
|
|
protected override async Task Action(string id,
|
|
IServiceProvider services,
|
|
Action<string, double?> onProgressCallback,
|
|
CancellationToken token)
|
|
{
|
|
var telemetryRepository = services.GetRequiredService<ICrudRepository<TelemetryDto>>();
|
|
var detectedOperationRepository = services.GetRequiredService<IDetectedOperationRepository>();
|
|
var detectedOperationService = services.GetRequiredService<IDetectedOperationService>();
|
|
var telemetryDataCache = services.GetRequiredService<ITelemetryDataCache<TelemetryDataSaubDto>>();
|
|
|
|
var idsTelemetry = (await telemetryRepository.GetAllAsync(token))
|
|
.Select(t => t.Id)
|
|
.ToArray();
|
|
|
|
var lastDetectedOperations = await detectedOperationRepository.GetLastDetectedOperationsAsync(token);
|
|
|
|
for (int i = 0; i < idsTelemetry.Length; i++)
|
|
{
|
|
var idTelemetry = idsTelemetry[i];
|
|
|
|
var telemetryDateRange = telemetryDataCache.GetOrDefaultWellDataDateRange(idTelemetry);
|
|
|
|
if(telemetryDateRange == null)
|
|
continue;
|
|
|
|
var dateBegin = telemetryDateRange.From;
|
|
var dateEnd = telemetryDateRange.To;
|
|
|
|
if (lastDetectedOperations.TryGetValue(idTelemetry, out var lastDetectedOperation))
|
|
dateBegin = lastDetectedOperation.DateEnd;
|
|
|
|
if (CacheOfStartDatesByTelemetryId.TryGetValue(idTelemetry, out var dateBeginFromCahce))
|
|
dateBegin = dateBeginFromCahce;
|
|
|
|
onProgressCallback.Invoke($"Start detecting telemetry: {idTelemetry} from {dateBegin}", i / idsTelemetry.Length);
|
|
|
|
const int pointsCount = 4 * 86_400;
|
|
|
|
while (dateBegin < dateEnd)
|
|
{
|
|
var request = new TelemetryDataRequest
|
|
{
|
|
GeDate = dateBegin,
|
|
Take = pointsCount,
|
|
Order = 0
|
|
};
|
|
|
|
var detectedOperations =
|
|
await detectedOperationService.DetectOperationsAsync(idTelemetry, request, lastDetectedOperation, token);
|
|
|
|
dateBegin = detectedOperations.LastDate;
|
|
|
|
CacheOfStartDatesByTelemetryId[idTelemetry] = dateBegin;
|
|
|
|
await detectedOperationRepository.InsertRangeAsync(detectedOperations.Items, token);
|
|
}
|
|
}
|
|
}
|
|
} |