Merge branch 'dev' into feature/20214792-contacts

This commit is contained in:
on.nemtina 2023-10-13 10:46:20 +05:00
commit 9405ce90a6
32 changed files with 1423 additions and 1111 deletions

View File

@ -0,0 +1,206 @@
using System;
using System.Diagnostics;
namespace AsbCloudApp.Data
{
/// <summary>
/// Информация о фоновой работе
/// </summary>
public class BackgroundWorkDto
{
/// <summary>
/// Идентификатор работы. Должен быть уникальным. Используется в логах и передается в колбэки.
/// </summary>
public string Id { get; init; } = null!;
/// <summary>
/// Класс описания состояния
/// </summary>
public class CurrentStateInfo
{
private string state = "start";
/// <summary>
/// Время последнего запуска
/// </summary>
public DateTime Start { get; } = DateTime.Now;
/// <summary>
/// Текущее время выполнения
/// </summary>
public TimeSpan ExecutionTime => DateTime.Now - Start;
/// <summary>
/// Текстовое описание того, что происходит в задаче.
/// </summary>
public string State
{
get => state;
internal set
{
state = value;
StateUpdate = DateTime.Now;
}
}
/// <summary>
/// Прогресс
/// </summary>
public double Progress { get; internal set; } = 0;
/// <summary>
/// Время последнего запуска
/// </summary>
public DateTime StateUpdate { get; private set; } = DateTime.Now;
}
/// <summary>
/// Инфо о последней ошибке
/// </summary>
public class LastErrorInfo : LastCompleteInfo
{
/// <summary>
///
/// </summary>
/// <param name="state"></param>
/// <param name="errorText"></param>
public LastErrorInfo(CurrentStateInfo state, string errorText)
: base(state)
{
ErrorText = errorText;
}
/// <summary>
/// Последняя ошибка
/// </summary>
public string ErrorText { get; init; } = null!;
}
/// <summary>
/// Инфо о последнем завершении
/// </summary>
public class LastCompleteInfo
{
/// <summary>
/// Дата запуска
/// </summary>
public DateTime Start { get; init; }
/// <summary>
/// Дата завершения
/// </summary>
public DateTime End { get; init; }
/// <summary>
/// Продолжительность последнего выполнения
/// </summary>
public TimeSpan ExecutionTime => End - Start;
/// <summary>
/// Состояние на момент завершения
/// </summary>
public string State { get; init; }
/// <summary>
/// ctor
/// </summary>
/// <param name="state"></param>
public LastCompleteInfo(CurrentStateInfo state)
{
Start = state.Start;
End = DateTime.Now;
State = state.State;
}
}
/// <summary>
/// Текущее состояние
/// </summary>
public CurrentStateInfo? CurrentState { get; private set; }
/// <summary>
/// Последняя ошибка
/// </summary>
public LastErrorInfo? LastError { get; private set; }
/// <summary>
/// Последняя завершенная
/// </summary>
public LastCompleteInfo? LastComplete { get; private set; }
/// <summary>
/// Кол-во запусков
/// </summary>
public int CountStart { get; private set; }
/// <summary>
/// Кол-во завершений
/// </summary>
public int CountComplete { get; private set; }
/// <summary>
/// Кол-во ошибок
/// </summary>
public int CountErrors { get; private set; }
/// <summary>
/// Максимально допустимое время выполнения работы
/// </summary>
public TimeSpan Timeout { get; set; } = TimeSpan.FromMinutes(1);
private string WorkNameForTrace => $"Backgroud work:\"{Id}\"";
/// <summary>
/// Обновления состояния при запуске работы
/// </summary>
protected void SetStatusStart()
{
CurrentState = new();
CountStart++;
Trace.TraceInformation($"{WorkNameForTrace} state: starting");
}
/// <summary>
/// Обновления состояния в процессе работы
/// </summary>
protected void UpdateStatus(string newState, double? progress)
{
if (CurrentState is null)
return;
CurrentState.State = newState;
if (progress.HasValue)
CurrentState.Progress = progress.Value;
Trace.TraceInformation($"{WorkNameForTrace} state: {newState}");
}
/// <summary>
/// Обновления состояния при успешном завершении работы
/// </summary>
protected void SetStatusComplete()
{
if (CurrentState is null)
return;
LastComplete = new(CurrentState);
CurrentState = null;
CountComplete++;
Trace.TraceInformation($"{WorkNameForTrace} state: completed");
}
/// <summary>
/// Обновления состояния при ошибке в работе
/// </summary>
protected void SetLastError(string errorMessage)
{
if (CurrentState is null)
return;
LastError = new LastErrorInfo(CurrentState, errorMessage);
CurrentState = null;
CountErrors++;
Trace.TraceError($"{WorkNameForTrace} throw exception[{CountErrors}]: {errorMessage}");
}
}
}

View File

@ -7,7 +7,10 @@ namespace AsbCloudApp.Data;
/// </summary>
public class WellboreDto
{
public WellWithTimezoneDto Well { get; set; }
/// <summary>
/// Скважина
/// </summary>
public WellWithTimezoneDto Well { get; set; } = null!;
/// <summary>
/// Идентификатор

View File

@ -90,7 +90,6 @@ public class NotificationService
/// Отправка уведомлений, которые не были отправлены
/// </summary>
/// <param name="idUser"></param>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task RenotifyAsync(int idUser, CancellationToken cancellationToken)

View File

@ -1,96 +1,49 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Background
namespace AsbCloudInfrastructure.Background;
/// <summary>
/// Сервис для фонового выполнения работы
/// </summary>
public class BackgroundWorker : BackgroundService
{
/// <summary>
/// Сервис для фонового выполнения работы
/// </summary>
public class BackgroundWorker : BackgroundService
{
private static readonly TimeSpan executePeriod = TimeSpan.FromSeconds(10);
private static readonly TimeSpan minDelay = TimeSpan.FromSeconds(2);
private static readonly TimeSpan exceptionHandleTimeout = TimeSpan.FromSeconds(2);
private readonly IServiceProvider serviceProvider;
private readonly WorkQueue workQueue = new WorkQueue();
public string? CurrentWorkId;
public WorkStore WorkStore { get; } = new WorkStore();
public Work? CurrentWork;
public BackgroundWorker(IServiceProvider serviceProvider)
{
this.serviceProvider = serviceProvider;
}
/// <summary>
/// Добавление задачи в очередь.
/// Не периодические задачи будут выполняться вперед.
/// </summary>
/// <param name="work"></param>
/// <exception cref="ArgumentException">Id mast be unique</exception>
public void Push(WorkBase work)
{
workQueue.Push(work);
}
/// <summary>
/// Проверяет наличие работы с указанным Id
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public bool Contains(string id)
{
return workQueue.Contains(id);
}
/// <summary>
/// Удаление работы по ID
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public bool Delete(string id)
{
return workQueue.Delete(id);
}
protected override async Task ExecuteAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
var dateStart = DateTime.Now;
var work = workQueue.Pop();
var work = WorkStore.GetNext();
if (work is null)
{
await Task.Delay(executePeriod, token);
continue;
}
CurrentWorkId = work.Id;
CurrentWork = work;
using var scope = serviceProvider.CreateScope();
try
{
Trace.TraceInformation($"Backgroud work:\"{work.Id}\" start.");
var task = work.ActionAsync(work.Id, scope.ServiceProvider, token);
await task.WaitAsync(work.Timeout, token);
var result = await work.Start(scope.ServiceProvider, token);
work.ExecutionTime = DateTime.Now - dateStart;
Trace.TraceInformation($"Backgroud work:\"{work.Id}\" done. ExecutionTime: {work.ExecutionTime:hh\\:mm\\:ss\\.fff}");
}
catch (Exception exception)
{
Trace.TraceError($"Backgroud work:\"{work.Id}\" throw exception: {exception.Message}");
if (work.OnErrorAsync is not null)
{
using var task = Task.Run(
async () => await work.OnErrorAsync(work.Id, exception, token),
token);
await task.WaitAsync(exceptionHandleTimeout, token);
}
}
CurrentWorkId = null;
if (!result)
WorkStore.Felled.Add(work);
CurrentWork = null;
await Task.Delay(minDelay, token);
}
}
}
}

View File

@ -0,0 +1,41 @@
using System.Linq;
namespace System.Collections.Generic
{
public class OrderedList<T>: IEnumerable<T>, ICollection<T>
where T : notnull
{
private readonly List<T> list = new List<T>();
private readonly Func<T, object> keySelector;
private readonly bool isDescending = false;
private IOrderedEnumerable<T> OrdredList => isDescending
? list.OrderByDescending(keySelector)
: list.OrderBy(keySelector);
public int Count => list.Count;
public bool IsReadOnly => false;
public OrderedList(Func<T, object> keySelector, bool isDescending = false)
{
this.keySelector = keySelector;
this.isDescending = isDescending;
}
public void Add(T item) => list.Add(item);
public void Clear()=> list.Clear();
public bool Contains(T item)=> list.Contains(item);
public void CopyTo(T[] array, int arrayIndex)=> list.CopyTo(array, arrayIndex);
public bool Remove(T item)=> list.Remove(item);
public IEnumerator<T> GetEnumerator() => OrdredList.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
}

View File

@ -0,0 +1,125 @@
using AsbCloudApp.Data;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Background;
/// <summary>
/// Класс разовой работы.
/// Разовая работа приоритетнее периодической.
/// </summary>
public abstract class Work : BackgroundWorkDto
{
private sealed class WorkBase : Work
{
private Func<string, IServiceProvider, Action<string, double?>, CancellationToken, Task> ActionAsync { get; }
public WorkBase(string id, Func<string, IServiceProvider, Action<string, double?>, CancellationToken, Task> actionAsync)
: base(id)
{
ActionAsync = actionAsync;
}
protected override Task Action(string id, IServiceProvider services, Action<string, double?> onProgressCallback, CancellationToken token)
=> ActionAsync(id, services, onProgressCallback, token);
}
/// <summary>
/// Делегат обработки ошибки.
/// Не должен выполняться долго.
/// </summary>
public Func<string, Exception, CancellationToken, Task>? OnErrorAsync { get; set; }
/// <summary>
/// макс продолжительность обработки исключения
/// </summary>
public TimeSpan OnErrorHandlerTimeout { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Базовая работа
/// </summary>
/// <param name="id"></param>
public Work(string id)
{
Id = id;
}
/// <summary>
/// Создать работу на основе делегата
/// </summary>
/// <param name="id"></param>
/// <param name="actionAsync"></param>
/// <returns></returns>
[Obsolete("Use implement Work class")]
public static Work CreateByDelegate(string id, Func<string, IServiceProvider, Action<string, double?>, CancellationToken, Task> actionAsync)
{
return new WorkBase(id, actionAsync);
}
/// <summary>
/// Запустить работу
/// </summary>
/// <param name="services"></param>
/// <param name="token"></param>
/// <returns>True - success, False = fail</returns>
public async Task<bool> Start(IServiceProvider services, CancellationToken token)
{
SetStatusStart();
try
{
var task = Action(Id, services, UpdateStatus, token);
await task.WaitAsync(Timeout, token);
SetStatusComplete();
return true;
}
catch (Exception exception)
{
var message = FormatExceptionMessage(exception);
SetLastError(message);
if (OnErrorAsync is not null)
{
var task = Task.Run(
async () => await OnErrorAsync(Id, exception, token),
token);
await task.WaitAsync(OnErrorHandlerTimeout, token);
}
}
return false;
}
private static string FormatExceptionMessage(Exception exception)
{
var firstException = FirstException(exception);
var message = new StringBuilder();
if (firstException != exception)
{
message.Append("top exception:");
message.AppendLine(exception.Message);
message.Append("inner exception:");
message.AppendLine(firstException.Message);
}
else
message.AppendLine(firstException.Message);
message.AppendLine(exception.StackTrace);
return message.ToString();
}
private static Exception FirstException(Exception exception)
{
if (exception.InnerException is not null)
return FirstException(exception.InnerException);
return exception;
}
/// <summary>
/// делегат фоновой работы
/// </summary>
/// <param name="id">Идентификатор работы</param>
/// <param name="services">Поставщик сервисов</param>
/// <param name="onProgressCallback">on progress callback. String - new state text. double? - optional progress 0-100%</param>
/// <param name="token"></param>
/// <returns></returns>
protected abstract Task Action(string id, IServiceProvider services, Action<string, double?> onProgressCallback, CancellationToken token);
}

View File

@ -1,69 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Background
{
/// <summary>
/// Класс разовой работы.
/// Разовая работа приоритетнее периодической.
/// </summary>
public class WorkBase
{
/// <summary>
/// Идентификатор работы. Должен быть уникальным. Используется в логах и передается в колбэки.
/// </summary>
public string Id { get; private set; }
/// <summary>
/// Делегат работы.
/// <para>
/// Параметры:
/// <list type="number">
/// <item>
/// <term>string</term>
/// <description>Id Идентификатор работы</description>
/// </item>
/// <item>
/// <term>IServiceProvider</term>
/// <description>Поставщик сервисов</description>
/// </item>
/// <item>
/// <term>CancellationToken</term>
/// <description>Токен отмены задачи</description>
/// </item>
/// </list>
/// </para>
/// </summary>
internal Func<string, IServiceProvider, CancellationToken, Task> ActionAsync { get; set; }
/// <summary>
/// Делегат обработки ошибки.
/// Не должен выполняться долго.
/// </summary>
public Func<string, Exception, CancellationToken, Task>? OnErrorAsync { get; set; }
/// <summary>
/// максимально допустимое время выполнения работы
/// </summary>
public TimeSpan Timeout { get; set; } = TimeSpan.FromMinutes(1);
/// <summary>
/// Фактическое время успешного выполнения работы
/// </summary>
public TimeSpan? ExecutionTime { get; internal set; }
/// <summary>
/// Время последнего запуска
/// </summary>
public DateTime LastStart { get; set; }
public WorkBase(string id, Func<string, IServiceProvider, CancellationToken, Task> actionAsync)
{
Id = id;
ActionAsync = actionAsync;
}
}
}

View File

@ -0,0 +1,145 @@
using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore;
using System;
using System.Data.Common;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using Microsoft.Extensions.DependencyInjection;
namespace AsbCloudInfrastructure.Background;
public class WorkLimitingParameterCalc : Work
{
public WorkLimitingParameterCalc()
: base("Limiting parameter calc")
{
Timeout = TimeSpan.FromMinutes(30);
}
protected override async Task Action(string id, IServiceProvider services, Action<string, double?> onProgressCallback, CancellationToken token)
{
using var db = services.GetRequiredService<IAsbCloudDbContext>();
db.Database.SetCommandTimeout(TimeSpan.FromMinutes(5));
var lastDetectedDates = await db.LimitingParameter
.GroupBy(o => o.IdTelemetry)
.Select(g => new
{
IdTelemetry = g.Key,
LastDate = g.Max(o => o.DateEnd)
})
.ToListAsync(token);
var telemetryIds = await db.Telemetries
.Where(t => t.Info != null && t.TimeZone != null)
.Select(t => t.Id)
.ToListAsync(token);
var telemetryLastDetectedDates = telemetryIds
.GroupJoin(lastDetectedDates,
t => t,
o => o.IdTelemetry,
(outer, inner) => new
{
IdTelemetry = outer,
inner.SingleOrDefault()?.LastDate,
});
var count = telemetryLastDetectedDates.Count();
var i = 0d;
foreach (var item in telemetryLastDetectedDates)
{
onProgressCallback($"Start hanling telemetry: {item.IdTelemetry} from {item.LastDate}", i++ / count);
var newLimitingParameters = await GetLimitingParameterAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, db, token);
if (newLimitingParameters?.Any() == true)
{
db.LimitingParameter.AddRange(newLimitingParameters);
await db.SaveChangesAsync(token);
}
}
}
private static async Task<IEnumerable<LimitingParameter>> GetLimitingParameterAsync(int idTelemetry, DateTimeOffset begin, IAsbCloudDbContext db, CancellationToken token)
{
var query =
$"select " +
$"limiting_parameters.date, limiting_parameters.id_feed_regulator, limiting_parameters.well_depth " +
$"from ( " +
$"select " +
$"date, id_feed_regulator, well_depth, " +
$"lag(id_feed_regulator, 1) over (order by date) as id_feed_regulator_lag, " +
$"lead(id_feed_regulator, 1) over (order by date) as id_feed_regulator_lead " +
$"from t_telemetry_data_saub " +
$"where id_feed_regulator is not null " +
$"and id_telemetry = {idTelemetry} " +
$"and date >= '{begin:u}' " +
$"order by date) as limiting_parameters " +
$"where id_feed_regulator_lag is null " +
$"or (id_feed_regulator != id_feed_regulator_lag and id_feed_regulator_lead != id_feed_regulator_lag) " +
$"order by date;";
var limitingParameters = new List<LimitingParameter>(32);
using (var result = await ExecuteReaderAsync(db, query, token))
{
LimitingParameter? limitingLast = null;
while (result.Read())
{
var date = result.GetFieldValue<DateTimeOffset>(0);
var idLimiting = result.GetFieldValue<short>(1);
var wellDepth = result.GetFieldValue<float>(2);
if (limitingLast is null)
{
limitingLast = new LimitingParameter
{
DateStart = date,
DepthStart = wellDepth,
IdFeedRegulator = idLimiting
};
}
if (limitingLast.IdFeedRegulator != idLimiting || limitingLast.DepthStart < wellDepth)
{
limitingParameters.Add(new LimitingParameter
{
IdTelemetry = idTelemetry,
IdFeedRegulator = limitingLast.IdFeedRegulator,
DateStart = limitingLast.DateStart,
DateEnd = date,
DepthStart = limitingLast.DepthStart,
DepthEnd = wellDepth
});
limitingLast = new LimitingParameter
{
DateStart = date,
DepthStart = wellDepth,
IdFeedRegulator = idLimiting
};
}
}
}
return limitingParameters;
}
private static async Task<DbDataReader> ExecuteReaderAsync(IAsbCloudDbContext db, string query, CancellationToken token)
{
var connection = db.Database.GetDbConnection();
if (
connection?.State is null ||
connection.State == ConnectionState.Broken ||
connection.State == ConnectionState.Closed)
{
await db.Database.OpenConnectionAsync(token);
connection = db.Database.GetDbConnection();
}
using var command = connection.CreateCommand();
command.CommandText = query;
var result = await command.ExecuteReaderAsync(token);
return result;
}
}

View File

@ -1,15 +1,14 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Background
namespace AsbCloudInfrastructure.Background;
/// <summary>
/// Класс периодической работы.
/// </summary>
public class WorkPeriodic
{
public Work Work { get; }
/// <summary>
/// Класс периодической работы.
/// </summary>
public class WorkPeriodic : WorkBase
{
/// <summary>
/// Период выполнения задачи
/// </summary>
@ -18,7 +17,16 @@ namespace AsbCloudInfrastructure.Background
/// <summary>
/// Время следующего запуска
/// </summary>
public DateTime NextStart => LastStart + Period;
public DateTime NextStart
{
get
{
var lastStart = Work.LastComplete?.Start ?? DateTime.MinValue;
if (Work.LastError?.Start > lastStart)
lastStart = Work.LastError.Start;
return lastStart + Period;
}
}
/// <summary>
/// Класс периодической работы
@ -26,11 +34,9 @@ namespace AsbCloudInfrastructure.Background
/// <param name="id">Идентификатор работы. Должен быть уникальным. Используется в логах и передается в колбэки</param>
/// <param name="actionAsync">Делегат работы</param>
/// <param name="period">Период выполнения задачи</param>
public WorkPeriodic(string id, Func<string, IServiceProvider, CancellationToken, Task> actionAsync, TimeSpan period)
: base(id, actionAsync)
public WorkPeriodic(Work work, TimeSpan period)
{
Work = work;
Period = period;
}
}
}

View File

@ -1,107 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace AsbCloudInfrastructure.Background
{
/// <summary>
/// <para>
/// Очередь работ
/// </para>
/// Не периодические задачи будут возвращаться первыми, как самые приоритетные.
/// </summary>
class WorkQueue
{
private Queue<WorkBase> Primary = new(8);
private readonly List<WorkPeriodic> Periodic = new(8);
/// <summary>
/// Добавление работы.
/// </summary>
/// <param name="work"></param>
/// <exception cref="ArgumentException">Id mast be unique</exception>
public void Push(WorkBase work)
{
if (Periodic.Any(w => w.Id == work.Id))
throw new ArgumentException("work.Id is not unique", nameof(work));
if (Primary.Any(w => w.Id == work.Id))
throw new ArgumentException("work.Id is not unique", nameof(work));
if (work is WorkPeriodic workPeriodic)
{
Periodic.Add(workPeriodic);
return;
}
Primary.Enqueue(work);
}
/// <summary>
/// Удаление работы по ID
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public bool Delete(string id)
{
var workPeriodic = Periodic.FirstOrDefault(w => w.Id == id);
if (workPeriodic is not null)
{
Periodic.Remove(workPeriodic);
return true;
}
var work = Primary.FirstOrDefault(w => w.Id == id);
if (work is not null)
{
Primary = new Queue<WorkBase>(Primary.Where(w => w.Id != id));
return true;
}
return false;
}
public bool Contains(string id)
{
var result = Periodic.Any(w => w.Id == id) || Primary.Any(w => w.Id == id);
return result;
}
/// <summary>
/// <para>
/// Возвращает приоритетную задачу.
/// </para>
/// <para>
/// Если приоритетные закончились, то ищет ближайшую периодическую.
/// Если до старта ближайшей периодической работы меньше 20 сек,
/// то этой задаче устанавливается время последнего запуска в now и она возвращается.
/// Если больше 20 сек, то возвращается null.
/// </para>
/// </summary>
/// <param name="maxTimeToNextWork"></param>
/// <returns></returns>
public WorkBase? Pop()
{
if (Primary.Any())
return Primary.Dequeue();
var work = GetNextPeriodic();
if (work is null || work.NextStart > DateTime.Now)
return null;
work.LastStart = DateTime.Now;
return work;
}
private WorkPeriodic? GetNextPeriodic()
{
var work = Periodic
.OrderBy(w => w.NextStart)
.ThenByDescending(w => w.Period)
.FirstOrDefault();
return work;
}
}
}

View File

@ -0,0 +1,105 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace AsbCloudInfrastructure.Background;
/// <summary>
/// <para>
/// Очередь работ
/// </para>
/// Не периодические задачи будут возвращаться первыми, как самые приоритетные.
/// </summary>
public class WorkStore
{
private readonly List<WorkPeriodic> periodics = new(8);
/// <summary>
/// Список периодических задач
/// </summary>
public IEnumerable<WorkPeriodic> Periodics => periodics;
/// <summary>
/// Работы выполняемые один раз
/// </summary>
public Queue<Work> RunOnceQueue { get; private set; } = new(8);
/// <summary>
/// Завершившиеся с ошибкой
/// </summary>
public CyclycArray<Work> Felled { get; } = new(16);
/// <summary>
/// Добавить фоновую работу выполняющуюся с заданным периодом
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="period"></param>
public void AddPeriodic<T>(TimeSpan period)
where T : Work, new()
{
var work = new T();
var periodic = new WorkPeriodic(work, period);
periodics.Add(periodic);
}
/// <summary>
/// Добавить фоновую работу выполняющуюся с заданным периодом
/// </summary>
/// <param name="work"></param>
/// <param name="period"></param>
public void AddPeriodic(Work work, TimeSpan period)
{
var periodic = new WorkPeriodic(work, period);
periodics.Add(periodic);
}
/// <summary>
/// Удаление работы по ID из одноразовой очереди
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public bool TryRemoveFromRunOnceQueue(string id)
{
var work = RunOnceQueue.FirstOrDefault(w => w.Id == id);
if (work is not null)
{
RunOnceQueue = new Queue<Work>(RunOnceQueue.Where(w => w.Id != id));
return true;
}
return false;
}
/// <summary>
/// <para>
/// Возвращает приоритетную задачу.
/// </para>
/// <para>
/// Если приоритетные закончились, то ищет ближайшую периодическую.
/// Если до старта ближайшей периодической работы меньше 20 сек,
/// то этой задаче устанавливается время последнего запуска в now и она возвращается.
/// Если больше 20 сек, то возвращается null.
/// </para>
/// </summary>
/// <param name="maxTimeToNextWork"></param>
/// <returns></returns>
public Work? GetNext()
{
if (RunOnceQueue.Any())
return RunOnceQueue.Dequeue();
var work = GetNextPeriodic();
if (work is null || work.NextStart > DateTime.Now)
return null;
return work.Work;
}
private WorkPeriodic? GetNextPeriodic()
{
var work = Periodics
.OrderBy(w => w.NextStart)
.FirstOrDefault();
return work;
}
}

View File

@ -1,5 +1,12 @@
# Проблемы фонового сервиса
- нет понимания по загрузке очереди работ
- Нужно состояние по загрузки сервиса и очереди работ.
- Все ли задачи укладываются в таймаут,
- Сколько свободного времени остается,
- Что делает текущая задача,
- нет управления сервисом. Для исключения его влияния на другие процессы сервера.
- отключать/включать целиком
- отключать/включать отдельную периодическую задачу
# Сделать
- Разработать dto статуса задачи
- Отказаться от периодической задачи, при добавлении в хранилище задач период будет параметром метода добавления.

View File

@ -226,7 +226,7 @@ public class AutoGeneratedDailyReportService : IAutoGeneratedDailyReportService
.OrderBy(w => w.DateStart);
}
private Task<IEnumerable<SubsystemStatDto>?> GetSubsystemStatsAsync(int idWell, DateTime startDate,
private Task<IEnumerable<SubsystemStatDto>> GetSubsystemStatsAsync(int idWell, DateTime startDate,
DateTime finishDate, CancellationToken cancellationToken)
{
var request = new SubsystemOperationTimeRequest

View File

@ -1,163 +0,0 @@
using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using AsbCloudInfrastructure.Services.DetectOperations.Detectors;
using AsbCloudInfrastructure.Background;
using Microsoft.Extensions.DependencyInjection;
namespace AsbCloudInfrastructure.Services.DetectOperations
{
public static class OperationDetectionWorkFactory
{
private const string workId = "Operation detection";
private static readonly TimeSpan workPeriod = TimeSpan.FromMinutes(30);
private static string progress = "no progress";
private static readonly DetectorAbstract[] detectors = new DetectorAbstract[]
{
new DetectorRotor(),
new DetectorSlide(),
//new DetectorDevelopment(),
//new DetectorTemplating(),
new DetectorSlipsTime(),
//new DetectorStaticSurveying(),
//new DetectorFlashingBeforeConnection(),
//new DetectorFlashing(),
//new DetectorTemplatingWhileDrilling(),
};
public static WorkPeriodic MakeWork()
{
var workPeriodic = new WorkPeriodic(workId, WorkAction, workPeriod);
workPeriodic.Timeout = TimeSpan.FromSeconds(15 * 60);
workPeriodic.OnErrorAsync = (id, exception, token) =>
{
var text = $"work {id}, when {progress}, throw error:{exception.Message}";
Trace.TraceWarning(text);
return Task.CompletedTask;
};
return workPeriodic;
}
// TODO: Разделить этот акшн на более мелкие части И использовать telemetryServiceData<..> вместо прямого обращения к БД.
private static async Task WorkAction(string _, IServiceProvider serviceProvider, CancellationToken token)
{
using var db = serviceProvider.GetRequiredService<IAsbCloudDbContext>();
var lastDetectedDates = await db.DetectedOperations
.GroupBy(o => o.IdTelemetry)
.Select(g => new
{
IdTelemetry = g.Key,
LastDate = g.Max(o => o.DateEnd)
})
.ToListAsync(token);
var telemetryIds = await db.Telemetries
.Where(t => t.Info != null && t.TimeZone != null)
.Select(t => t.Id)
.ToListAsync(token);
var joinedlastDetectedDates = telemetryIds
.GroupJoin(lastDetectedDates,
t => t,
o => o.IdTelemetry,
(outer, inner) => new
{
IdTelemetry = outer,
inner.SingleOrDefault()?.LastDate,
});
var affected = 0;
foreach (var item in joinedlastDetectedDates)
{
var stopwatch = Stopwatch.StartNew();
var newOperations = await DetectOperationsAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, db, token);
stopwatch.Stop();
if (newOperations.Any())
{
db.DetectedOperations.AddRange(newOperations);
affected += await db.SaveChangesAsync(token);
}
}
}
private static async Task<IEnumerable<DetectedOperation>> DetectOperationsAsync(int idTelemetry, DateTimeOffset begin, IAsbCloudDbContext db, CancellationToken token)
{
var query = db.TelemetryDataSaub
.AsNoTracking()
.Where(d => d.IdTelemetry == idTelemetry)
.Where(d => d.BlockPosition >= 0)
.Select(d => new DetectableTelemetry
{
DateTime = d.DateTime,
IdUser = d.IdUser,
WellDepth = d.WellDepth ?? float.NaN,
Pressure = d.Pressure ?? float.NaN,
HookWeight = d.HookWeight ?? float.NaN,
BlockPosition = d.BlockPosition ?? float.NaN,
BitDepth = d.BitDepth ?? float.NaN,
RotorSpeed = d.RotorSpeed ?? float.NaN,
})
.OrderBy(d => d.DateTime);
var take = 4 * 86_400; // 4 дня
var startDate = begin;
var detectedOperations = new List<DetectedOperation>(8);
DetectedOperation? lastDetectedOperation = null;
const int minOperationLength = 5;
const int maxDetectorsInterpolationFrameLength = 30;
const int gap = maxDetectorsInterpolationFrameLength + minOperationLength;
while (true)
{
var data = await query
.Where(d => d.DateTime > startDate)
.Take(take)
.ToArrayAsync(token);
if (data.Length < gap)
break;
var isDetected = false;
var positionBegin = 0;
var positionEnd = data.Length - gap;
var step = 10;
while (positionEnd > positionBegin)
{
step ++;
for (int i = 0; i < detectors.Length; i++)
{
progress = $"telemetry:{idTelemetry}, date:{startDate}, pos:{positionBegin}, detector:{detectors[i]}";
if (detectors[i].TryDetect(idTelemetry, data, positionBegin, positionEnd, lastDetectedOperation, out OperationDetectorResult? result))
{
detectedOperations.Add(result!.Operation);
lastDetectedOperation = result.Operation;
isDetected = true;
step = 1;
positionBegin = result.TelemetryEnd;
break;
}
}
if (step > 20)
step = 10;
positionBegin += step;
}
if (isDetected)
startDate = lastDetectedOperation!.DateEnd;
else
startDate = data[positionEnd].DateTime;
}
return detectedOperations;
}
}
}

View File

@ -0,0 +1,157 @@
using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using AsbCloudInfrastructure.Services.DetectOperations.Detectors;
using AsbCloudInfrastructure.Background;
using Microsoft.Extensions.DependencyInjection;
namespace AsbCloudInfrastructure.Services.DetectOperations;
public class WorkOperationDetection: Work
{
private static readonly DetectorAbstract[] detectors = new DetectorAbstract[]
{
new DetectorRotor(),
new DetectorSlide(),
//new DetectorDevelopment(),
//new DetectorTemplating(),
new DetectorSlipsTime(),
//new DetectorStaticSurveying(),
//new DetectorFlashingBeforeConnection(),
//new DetectorFlashing(),
//new DetectorTemplatingWhileDrilling(),
};
public WorkOperationDetection()
:base("Operation detection")
{
Timeout = TimeSpan.FromMinutes(20);
OnErrorAsync = (id, exception, token) =>
{
var text = $"work {id}, when {CurrentState?.State}, throw error:{exception.Message}";
Trace.TraceWarning(text);
return Task.CompletedTask;
};
}
protected override async Task Action(string id, IServiceProvider services, Action<string, double?> onProgressCallback, CancellationToken token)
{
using var db = services.GetRequiredService<IAsbCloudDbContext>();
var lastDetectedDates = await db.DetectedOperations
.GroupBy(o => o.IdTelemetry)
.Select(g => new
{
IdTelemetry = g.Key,
LastDate = g.Max(o => o.DateEnd)
})
.ToListAsync(token);
var telemetryIds = await db.Telemetries
.Where(t => t.Info != null && t.TimeZone != null)
.Select(t => t.Id)
.ToListAsync(token);
var joinedlastDetectedDates = telemetryIds
.GroupJoin(lastDetectedDates,
t => t,
o => o.IdTelemetry,
(outer, inner) => new
{
IdTelemetry = outer,
inner.SingleOrDefault()?.LastDate,
});
var affected = 0;
var count = joinedlastDetectedDates.Count();
var i = 0d;
foreach (var item in joinedlastDetectedDates)
{
var stopwatch = Stopwatch.StartNew();
var startDate = item.LastDate ?? DateTimeOffset.MinValue;
onProgressCallback($"start detecting telemetry: {item.IdTelemetry} from {startDate}", i++ / count);
var newOperations = await DetectOperationsAsync(item.IdTelemetry, startDate, db, token);
stopwatch.Stop();
if (newOperations.Any())
{
db.DetectedOperations.AddRange(newOperations);
affected += await db.SaveChangesAsync(token);
}
}
}
private static async Task<IEnumerable<DetectedOperation>> DetectOperationsAsync(int idTelemetry, DateTimeOffset begin, IAsbCloudDbContext db, CancellationToken token)
{
var query = db.TelemetryDataSaub
.AsNoTracking()
.Where(d => d.IdTelemetry == idTelemetry)
.Where(d => d.BlockPosition >= 0)
.Select(d => new DetectableTelemetry
{
DateTime = d.DateTime,
IdUser = d.IdUser,
WellDepth = d.WellDepth ?? float.NaN,
Pressure = d.Pressure ?? float.NaN,
HookWeight = d.HookWeight ?? float.NaN,
BlockPosition = d.BlockPosition ?? float.NaN,
BitDepth = d.BitDepth ?? float.NaN,
RotorSpeed = d.RotorSpeed ?? float.NaN,
})
.OrderBy(d => d.DateTime);
var take = 4 * 86_400; // 4 дня
var startDate = begin;
var detectedOperations = new List<DetectedOperation>(8);
DetectedOperation? lastDetectedOperation = null;
const int minOperationLength = 5;
const int maxDetectorsInterpolationFrameLength = 30;
const int gap = maxDetectorsInterpolationFrameLength + minOperationLength;
while (true)
{
var data = await query
.Where(d => d.DateTime > startDate)
.Take(take)
.ToArrayAsync(token);
if (data.Length < gap)
break;
var isDetected = false;
var positionBegin = 0;
var positionEnd = data.Length - gap;
var step = 10;
while (positionEnd > positionBegin)
{
step ++;
for (int i = 0; i < detectors.Length; i++)
{
if (detectors[i].TryDetect(idTelemetry, data, positionBegin, positionEnd, lastDetectedOperation, out OperationDetectorResult? result))
{
detectedOperations.Add(result!.Operation);
lastDetectedOperation = result.Operation;
isDetected = true;
step = 1;
positionBegin = result.TelemetryEnd;
break;
}
}
if (step > 20)
step = 10;
positionBegin += step;
}
if (isDetected)
startDate = lastDetectedOperation!.DateEnd;
else
startDate = data[positionEnd].DateTime;
}
return detectedOperations;
}
}

View File

@ -513,17 +513,18 @@ namespace AsbCloudInfrastructure.Services.DrillingProgram
if (state.IdState == idStateCreating)
{
var workId = MakeWorkId(idWell);
if (!backgroundWorker.Contains(workId))
if (!backgroundWorker.WorkStore.RunOnceQueue.Any(w => w.Id == workId))
{
var well = (await wellService.GetOrDefaultAsync(idWell, token))!;
var resultFileName = $"Программа бурения {well.Cluster} {well.Caption}.pdf";
var convertedFilesDir = Path.Combine(Path.GetTempPath(), "drillingProgram", $"{well.Cluster}_{well.Caption}");
var tempResultFilePath = Path.Combine(convertedFilesDir, resultFileName);
var workAction = async (string workId, IServiceProvider serviceProvider, CancellationToken token) =>
var workAction = async (string workId, IServiceProvider serviceProvider, Action<string, double?> onProgress, CancellationToken token) =>
{
var context = serviceProvider.GetRequiredService<IAsbCloudDbContext>();
var fileService = serviceProvider.GetRequiredService<FileService>();
var files = state.Parts.Select(p => fileService.GetUrl(p.File!));
onProgress($"Start converting {files.Count()} files to PDF.", null);
await ConvertToPdf.GetConverteAndMergedFileAsync(files, tempResultFilePath, convertedFilesDir, token);
await fileService.MoveAsync(idWell, null, idFileCategoryDrillingProgram, resultFileName, tempResultFilePath, token);
};
@ -539,13 +540,9 @@ namespace AsbCloudInfrastructure.Services.DrillingProgram
return Task.CompletedTask;
};
var work = new WorkBase(workId, workAction)
{
ExecutionTime = TimeSpan.FromMinutes(1),
OnErrorAsync = onErrorAction
};
backgroundWorker.Push(work);
var work = Work.CreateByDelegate(workId, workAction);
work.OnErrorAsync = onErrorAction;
backgroundWorker.WorkStore.RunOnceQueue.Enqueue(work);
}
}
}
@ -559,7 +556,7 @@ namespace AsbCloudInfrastructure.Services.DrillingProgram
private async Task<int> RemoveDrillingProgramAsync(int idWell, CancellationToken token)
{
var workId = MakeWorkId(idWell);
backgroundWorker.Delete(workId);
backgroundWorker.WorkStore.TryRemoveFromRunOnceQueue(workId);
var filesIds = await context.Files
.Where(f => f.IdWell == idWell &&

View File

@ -10,6 +10,7 @@ using AsbCloudApp.Exceptions;
using AsbCloudApp.Repositories;
using AsbCloudApp.Services.Notifications;
using AsbCloudInfrastructure.Background;
using DocumentFormat.OpenXml.Presentation;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
@ -51,12 +52,12 @@ namespace AsbCloudInfrastructure.Services.Email
}
var workId = MakeWorkId(notification.IdUser, notification.Title, notification.Message);
if (!backgroundWorker.Contains(workId))
if (!backgroundWorker.WorkStore.RunOnceQueue.Any(w=>w.Id==workId))
{
var workAction = MakeEmailSendWorkAction(notification);
var work = new WorkBase(workId, workAction);
backgroundWorker.Push(work);
var work = Work.CreateByDelegate(workId, workAction);
backgroundWorker.WorkStore.RunOnceQueue.Enqueue(work);
}
return Task.CompletedTask;
@ -70,9 +71,9 @@ namespace AsbCloudInfrastructure.Services.Email
return Task.WhenAll(tasks);
}
private Func<string, IServiceProvider, CancellationToken, Task> MakeEmailSendWorkAction(NotificationDto notification)
private Func<string, IServiceProvider, Action<string, double?>, CancellationToken, Task> MakeEmailSendWorkAction(NotificationDto notification)
{
return async (_, serviceProvider, token) =>
return async (_, serviceProvider, onProgress, token) =>
{
var notificationRepository = serviceProvider.GetRequiredService<INotificationRepository>();
var userRepository = serviceProvider.GetRequiredService<IUserRepository>();

View File

@ -1,151 +0,0 @@
using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore;
using System;
using System.Data.Common;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using AsbCloudInfrastructure.Background;
using Microsoft.Extensions.DependencyInjection;
namespace AsbCloudInfrastructure.Services
{
internal static class LimitingParameterCalcWorkFactory
{
private const string workId = "Limiting parameter calc";
private static readonly TimeSpan workPeriod = TimeSpan.FromMinutes(30);
public static WorkPeriodic MakeWork()
{
var workPeriodic = new WorkPeriodic(workId, WorkAction, workPeriod)
{
Timeout = TimeSpan.FromMinutes(30)
};
return workPeriodic;
}
// TODO: Разделить этот акшн на более мелкие части И использовать telemetryServiceData<..> вместо прямого обращения к БД.
private static async Task WorkAction(string _, IServiceProvider serviceProvider, CancellationToken token)
{
using var db = serviceProvider.GetRequiredService<IAsbCloudDbContext>();
var lastDetectedDates = await db.LimitingParameter
.GroupBy(o => o.IdTelemetry)
.Select(g => new
{
IdTelemetry = g.Key,
LastDate = g.Max(o => o.DateEnd)
})
.ToListAsync(token);
var telemetryIds = await db.Telemetries
.Where(t => t.Info != null && t.TimeZone != null)
.Select(t => t.Id)
.ToListAsync(token);
var telemetryLastDetectedDates = telemetryIds
.GroupJoin(lastDetectedDates,
t => t,
o => o.IdTelemetry,
(outer, inner) => new
{
IdTelemetry = outer,
inner.SingleOrDefault()?.LastDate,
});
foreach (var item in telemetryLastDetectedDates)
{
var newLimitingParameters = await GetLimitingParameterAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, db, token);
if (newLimitingParameters?.Any() == true)
{
db.LimitingParameter.AddRange(newLimitingParameters);
await db.SaveChangesAsync(token);
}
}
}
private static async Task<IEnumerable<LimitingParameter>> GetLimitingParameterAsync(int idTelemetry, DateTimeOffset begin, IAsbCloudDbContext db, CancellationToken token)
{
var query =
$"select " +
$"limiting_parameters.date, limiting_parameters.id_feed_regulator, limiting_parameters.well_depth " +
$"from ( " +
$"select " +
$"date, id_feed_regulator, well_depth, " +
$"lag(id_feed_regulator, 1) over (order by date) as id_feed_regulator_lag, " +
$"lead(id_feed_regulator, 1) over (order by date) as id_feed_regulator_lead " +
$"from t_telemetry_data_saub " +
$"where id_feed_regulator is not null " +
$"and id_telemetry = {idTelemetry}" +
$"and date >= '{begin:u}'" +
$"order by date) as limiting_parameters " +
$"where id_feed_regulator_lag is null " +
$"or (id_feed_regulator != id_feed_regulator_lag and id_feed_regulator_lead != id_feed_regulator_lag) " +
$"order by date;";
var limitingParameters = new List<LimitingParameter>(32);
using (var result = await ExecuteReaderAsync(db, query, token))
{
LimitingParameter? limitingLast = null;
while (result.Read())
{
var date = result.GetFieldValue<DateTimeOffset>(0);
var idLimiting = result.GetFieldValue<short>(1);
var wellDepth = result.GetFieldValue<float>(2);
if (limitingLast is null)
{
limitingLast = new LimitingParameter
{
DateStart = date,
DepthStart = wellDepth,
IdFeedRegulator = idLimiting
};
}
if (limitingLast.IdFeedRegulator != idLimiting || limitingLast.DepthStart < wellDepth)
{
limitingParameters.Add(new LimitingParameter {
IdTelemetry = idTelemetry,
IdFeedRegulator = limitingLast.IdFeedRegulator,
DateStart = limitingLast.DateStart,
DateEnd = date,
DepthStart = limitingLast.DepthStart,
DepthEnd = wellDepth
});
limitingLast = new LimitingParameter
{
DateStart = date,
DepthStart = wellDepth,
IdFeedRegulator = idLimiting
};
}
}
}
return limitingParameters;
}
private static async Task<DbDataReader> ExecuteReaderAsync(IAsbCloudDbContext db, string query, CancellationToken token)
{
var connection = db.Database.GetDbConnection();
if (
connection?.State is null ||
connection.State == ConnectionState.Broken ||
connection.State == ConnectionState.Closed)
{
await db.Database.OpenConnectionAsync(token);
connection = db.Database.GetDbConnection();
}
using var command = connection.CreateCommand();
command.CommandText = query;
var result = await command.ExecuteReaderAsync(token);
return result;
}
}
}

View File

@ -51,7 +51,7 @@ namespace AsbCloudInfrastructure.Services
var workId = $"create report by wellid:{idWell} for userid:{idUser} requested at {DateTime.Now}";
var workAction = async (string id, IServiceProvider serviceProvider, CancellationToken token) =>
var workAction = async (string id, IServiceProvider serviceProvider, Action<string, double?> onProgress, CancellationToken token) =>
{
using var context = serviceProvider.GetRequiredService<IAsbCloudDbContext>();
var fileService = serviceProvider.GetRequiredService<FileService>();
@ -64,7 +64,9 @@ namespace AsbCloudInfrastructure.Services
generator.OnProgress += (s, e) =>
{
progressHandler.Invoke(e.Adapt<ReportProgressDto>(), id);
var arg = e.Adapt<ReportProgressDto>();
onProgress(arg.Operation?? string.Empty, arg.Progress);
progressHandler.Invoke(arg, id);
};
generator.Make(reportFileName);
@ -92,8 +94,8 @@ namespace AsbCloudInfrastructure.Services
context.SaveChanges();
};
var work = new WorkBase(workId, workAction);
backgroundWorkerService.Push(work);
var work = Work.CreateByDelegate(workId, workAction);
backgroundWorkerService.WorkStore.RunOnceQueue.Enqueue(work);
progressHandler.Invoke(new ReportProgressDto
{

View File

@ -48,12 +48,12 @@ namespace AsbCloudInfrastructure.Services.SAUB
instance = new TelemetryDataCache<TDto>();
var worker = provider.GetRequiredService<BackgroundWorker>();
var workId = $"Telemetry cache loading from DB {typeof(TEntity).Name}";
var work = new WorkBase(workId, async (workId, provider, token) => {
var work = Work.CreateByDelegate(workId, async (workId, provider, onProgress, token) => {
var db = provider.GetRequiredService<IAsbCloudDbContext>();
await instance.InitializeCacheFromDBAsync<TEntity>(db, token);
await instance.InitializeCacheFromDBAsync<TEntity>(db, onProgress, token);
});
worker.Push(work);
worker.WorkStore.RunOnceQueue.Enqueue(work);
}
instance.provider = provider;
return instance;
@ -150,7 +150,7 @@ namespace AsbCloudInfrastructure.Services.SAUB
return new DatesRangeDto { From = from.Value, To = to };
}
private async Task InitializeCacheFromDBAsync<TEntity>(IAsbCloudDbContext db, CancellationToken token)
private async Task InitializeCacheFromDBAsync<TEntity>(IAsbCloudDbContext db, Action<string, double?> onProgress, CancellationToken token)
where TEntity : class, AsbCloudDb.Model.ITelemetryData
{
if (isLoading)
@ -159,7 +159,6 @@ namespace AsbCloudInfrastructure.Services.SAUB
isLoading = true;
var defaultTimeout = db.Database.GetCommandTimeout();
System.Diagnostics.Trace.TraceInformation($"cache loading starting. Setting CommandTimeout 90s ({defaultTimeout})");
db.Database.SetCommandTimeout(TimeSpan.FromSeconds(90));
Well[] wells = await db.Set<Well>()
@ -168,6 +167,8 @@ namespace AsbCloudInfrastructure.Services.SAUB
.Where(well => well.IdTelemetry != null)
.ToArrayAsync(token);
var count = wells.Length;
var i = 0d;
foreach (Well well in wells)
{
var capacity = well.IdState == 1
@ -177,20 +178,12 @@ namespace AsbCloudInfrastructure.Services.SAUB
var idTelemetry = well.IdTelemetry!.Value;
var hoursOffset = well.Timezone.Hours;
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}>: Loading for well: {well.Cluster?.Caption}/{well.Caption} (capacity:{capacity}) idTelemetry:{idTelemetry}");
onProgress($"Loading for well: {well.Cluster?.Caption}/{well.Caption} (capacity:{capacity}) idTelemetry:{idTelemetry}", i++/count);
var cacheItem = await GetOrDefaultCacheDataFromDbAsync<TEntity>(db, idTelemetry, capacity, hoursOffset, token);
if(cacheItem is not null)
{
caches.TryAdd(idTelemetry, cacheItem);
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> for well: {well.Cluster?.Caption}/{well.Caption} loaded");
}
else
{
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> for well: {well.Cluster?.Caption}/{well.Caption} has no data");
}
}
System.Diagnostics.Trace.TraceInformation($"cache<{typeof(TDto).Name}> load complete");
isLoading = false;
db.Database.SetCommandTimeout(defaultTimeout);
}

View File

@ -1,300 +0,0 @@
using AsbCloudDb.Model;
using AsbCloudDb.Model.Subsystems;
using AsbCloudInfrastructure.Background;
using AsbCloudInfrastructure.Services.Subsystems.Utils;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services.Subsystems
{
internal static class SubsystemOperationTimeCalcWorkFactory
{
private const string workId = "Subsystem operation time calc";
private static readonly TimeSpan workPeriod = TimeSpan.FromMinutes(30);
private const int idSubsytemTorqueMaster = 65537;
private const int idSubsytemSpinMaster = 65536;
private const int idSubsystemAPDRotor = 11;
private const int idSubsystemAPDSlide = 12;
private const int idSubsytemMse = 2;
public static WorkPeriodic MakeWork()
{
var workPeriodic = new WorkPeriodic(workId, WorkAction, workPeriod)
{
Timeout = TimeSpan.FromMinutes(30)
};
return workPeriodic;
}
// TODO: Разделить этот акшн на более мелкие части И использовать telemetryServiceData<..> вместо прямого обращения к БД.
private static async Task WorkAction(string _, IServiceProvider serviceProvider, CancellationToken token)
{
using var db = serviceProvider.GetRequiredService<IAsbCloudDbContext>();
var lastDetectedDates = await db.SubsystemOperationTimes
.GroupBy(o => o.IdTelemetry)
.Select(g => new
{
IdTelemetry = g.Key,
LastDate = g.Max(o => o.DateEnd)
})
.ToListAsync(token);
var telemetryIds = await db.Telemetries
.Where(t => t.Info != null && t.TimeZone != null)
.Select(t => t.Id)
.ToListAsync(token);
var telemetryLastDetectedDates = telemetryIds
.GroupJoin(lastDetectedDates,
t => t,
o => o.IdTelemetry,
(outer, inner) => new
{
IdTelemetry = outer,
inner.SingleOrDefault()?.LastDate,
});
foreach (var item in telemetryLastDetectedDates)
{
var newOperationsSaub = await OperationTimeSaubAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, db, token);
if (newOperationsSaub?.Any() == true)
{
db.SubsystemOperationTimes.AddRange(newOperationsSaub);
await db.SaveChangesAsync(token);
}
var newOperationsSpin = await OperationTimeSpinAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, db, token);
if (newOperationsSpin?.Any() == true)
{
db.SubsystemOperationTimes.AddRange(newOperationsSpin);
await db.SaveChangesAsync(token);
}
}
}
private static async Task<DbDataReader> ExecuteReaderAsync(IAsbCloudDbContext db, string query, CancellationToken token)
{
var connection = db.Database.GetDbConnection();
if (
connection?.State is null ||
connection.State == ConnectionState.Broken ||
connection.State == ConnectionState.Closed)
{
await db.Database.OpenConnectionAsync(token);
connection = db.Database.GetDbConnection();
}
using var command = connection.CreateCommand();
command.CommandText = query;
var result = await command.ExecuteReaderAsync(token);
return result;
}
private static async Task<IEnumerable<SubsystemOperationTime>> OperationTimeSaubAsync(int idTelemetry, DateTimeOffset begin, IAsbCloudDbContext db, CancellationToken token)
{
static bool isSubsytemAkbRotor(short? mode) => mode == 1;
static bool isSubsytemAkbSlide(short? mode) => mode == 3;
static bool IsSubsystemMse(short? state) => (state & 1) > 0;
var query =
$"select tt.date, tt.mode, tt.well_depth, tt.mse_state " +
$"from ( " +
$" select " +
$" date, " +
$" mode, " +
$" mse_state, " +
$" well_depth, " +
$" lag(mode,1) over (order by date) as mode_lag, " +
$" lead(mode,1) over (order by date) as mode_lead " +
$" from t_telemetry_data_saub " +
$" where id_telemetry = {idTelemetry} and well_depth is not null and well_depth > 0" +
$" order by date ) as tt " +
$"where (tt.mode_lag is null or (tt.mode != tt.mode_lag and tt.mode_lead != tt.mode_lag)) and tt.date >= '{begin:u}' " +
$"order by tt.date;";
using var result = await ExecuteReaderAsync(db, query, token);
var subsystemsOperationTimes = new List<SubsystemOperationTime>();
var detectorRotor = new SubsystemDetector(idTelemetry, idSubsystemAPDRotor, isSubsytemAkbRotor, IsValid);
var detectorSlide = new SubsystemDetector(idTelemetry, idSubsystemAPDSlide, isSubsytemAkbSlide, IsValid);
var detectorMse = new SubsystemDetector(idTelemetry, idSubsytemMse, IsSubsystemMse, IsValid);
while (result.Read())
{
var mode = result.GetFieldValue<short?>(1);
var state = result.GetFieldValue<short?>(3);
var isAkbRotorEnable = isSubsytemAkbRotor(mode);
var isAkbSlideEnable = isSubsytemAkbSlide(mode);
var isMseEnable = IsSubsystemMse(state);
var date = result.GetFieldValue<DateTimeOffset>(0);
var depth = result.GetFieldValue<float>(2);
if (detectorRotor.TryDetect(mode, date, depth, out var detectedRotor))
subsystemsOperationTimes.Add(detectedRotor!);
if (detectorSlide.TryDetect(mode, date, depth, out var detectedSlide))
subsystemsOperationTimes.Add(detectedSlide!);
if (detectorMse.TryDetect(mode, date, depth, out var detectedMse))
subsystemsOperationTimes.Add(detectedMse!);
}
return subsystemsOperationTimes;
}
private static async Task<IEnumerable<SubsystemOperationTime>> OperationTimeSpinAsync(int idTelemetry, DateTimeOffset begin, IAsbCloudDbContext db, CancellationToken token)
{
static int? GetSubsytemId(short? mode, int? state)
{
// При изменении следующего кода сообщи в Vladimir.Sobolev@nedra.digital
if (state == 7 && (mode & 2) > 0)
return idSubsytemTorqueMaster;// демпфер
if (state != 0 && state != 5 && state != 6 && state != 7)
return idSubsytemSpinMaster;// осцилляция
return null;
}
var querySpin =
$"select " +
$" tspin.date, " +
$" tspin.mode, " +
$" tspin.state " +
$"from ( " +
$" select " +
$" date, " +
$" mode, " +
$" lag(mode, 1) over (order by date) as mode_lag, " +
$" lead(mode, 1) over (order by date) as mode_lead, " +
$" state, " +
$" lag(state, 1) over (order by date) as state_lag " +
$" from t_telemetry_data_spin " +
$" where id_telemetry = {idTelemetry} and date >= '{begin:u}'" +
$" order by date ) as tspin " +
$"where mode_lag is null or state_lag is null or (mode != mode_lag and mode_lead != mode_lag) or state != state_lag " +
$"order by date;";
var rows = new List<(int? IdSubsystem, DateTimeOffset Date)>(32);
using var resultSpin = await ExecuteReaderAsync(db, querySpin, token);
int? idSubsystemLast = null;
while (resultSpin.Read())
{
var mode = resultSpin.GetFieldValue<short?>(1);
var state = resultSpin.GetFieldValue<short?>(2);
var idSubsystem = GetSubsytemId(mode, state);
if (idSubsystemLast != idSubsystem)
{
idSubsystemLast = idSubsystem;
var date = resultSpin.GetFieldValue<DateTimeOffset>(0);
rows.Add((idSubsystem, date));
}
}
await resultSpin.DisposeAsync();
if (rows.Count < 2)
return Enumerable.Empty<SubsystemOperationTime>();
var minSpinDate = rows.Min(i => i.Date);
var maxSpinDate = rows.Max(i => i.Date);
var depthInterpolation = await GetInterpolation(db, idTelemetry, minSpinDate, maxSpinDate, token);
if (depthInterpolation is null)
return Enumerable.Empty<SubsystemOperationTime>();
var subsystemsOperationTimes = new List<SubsystemOperationTime>(32);
for (int i = 1; i < rows.Count; i++)
{
var r0 = rows[i - 1];
var r1 = rows[i];
if (r0.IdSubsystem is not null && r0.IdSubsystem != r1.IdSubsystem)
{
var subsystemOperationTime = new SubsystemOperationTime()
{
IdTelemetry = idTelemetry,
IdSubsystem = r0.IdSubsystem.Value,
DateStart = r0.Date,
DateEnd = r1.Date,
DepthStart = depthInterpolation.GetDepth(r0.Date),
DepthEnd = depthInterpolation.GetDepth(r1.Date),
};
if (IsValid(subsystemOperationTime))
subsystemsOperationTimes.Add(subsystemOperationTime);
}
}
return subsystemsOperationTimes;
}
private static bool IsValid(SubsystemOperationTime item)
{
var validateCode = GetValidateErrorCode(item);
if (validateCode != 0)
{
var str = System.Text.Json.JsonSerializer.Serialize(item);
Trace.TraceWarning($"Wrong({validateCode}) SubsystemOperationTime: {str}");
}
return validateCode == 0;
}
private static int GetValidateErrorCode(SubsystemOperationTime item)
{
if (item.DateStart > item.DateEnd)
return -1;
if ((item.DateEnd - item.DateStart).TotalHours > 48)
return -2;
if (item.DepthEnd < item.DepthStart)
return -3;
if (item.DepthEnd - item.DepthStart > 2000d)
return -4;
if (item.DepthEnd < 0d)
return -5;
if (item.DepthStart < 0d)
return -6;
if (item.DepthEnd > 24_0000d)
return -7;
if (item.DepthStart > 24_0000d)
return -8;
return 0;
}
private static async Task<DepthInterpolation?> GetInterpolation(IAsbCloudDbContext db, int idTelemetry, DateTimeOffset dateBegin, DateTimeOffset dateEnd, CancellationToken token)
{
var dataDepthFromSaub = await db.TelemetryDataSaub
.Where(d => d.IdTelemetry == idTelemetry)
.Where(d => d.DateTime >= dateBegin)
.Where(d => d.DateTime <= dateEnd)
.Where(d => d.WellDepth != null)
.Where(d => d.WellDepth > 0)
.GroupBy(d => Math.Ceiling(d.WellDepth ?? 0 * 10))
.Select(g => new {
DateMin = g.Min(d => d.DateTime),
DepthMin = g.Min(d => d.WellDepth) ?? 0,
})
.OrderBy(i => i.DateMin)
.ToArrayAsync(token);
if (!dataDepthFromSaub.Any())
return null;
var depthInterpolation = new DepthInterpolation(dataDepthFromSaub.Select(i => (i.DateMin, i.DepthMin)));
return depthInterpolation;
}
}
}

View File

@ -0,0 +1,295 @@
using AsbCloudDb.Model;
using AsbCloudDb.Model.Subsystems;
using AsbCloudInfrastructure.Background;
using AsbCloudInfrastructure.Services.Subsystems.Utils;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsbCloudInfrastructure.Services.Subsystems;
public class WorkSubsystemOperationTimeCalc : Work
{
private const int idSubsytemTorqueMaster = 65537;
private const int idSubsytemSpinMaster = 65536;
private const int idSubsystemAPDRotor = 11;
private const int idSubsystemAPDSlide = 12;
private const int idSubsytemMse = 2;
public WorkSubsystemOperationTimeCalc()
: base("Subsystem operation time calc")
{
Timeout = TimeSpan.FromMinutes(20);
}
protected override async Task Action(string id, IServiceProvider services, Action<string, double?> onProgressCallback, CancellationToken token)
{
using var db = services.GetRequiredService<IAsbCloudDbContext>();
db.Database.SetCommandTimeout(TimeSpan.FromMinutes(5));
var lastDetectedDates = await db.SubsystemOperationTimes
.GroupBy(o => o.IdTelemetry)
.Select(g => new
{
IdTelemetry = g.Key,
LastDate = g.Max(o => o.DateEnd)
})
.ToListAsync(token);
var telemetryIds = await db.Telemetries
.Where(t => t.Info != null && t.TimeZone != null)
.Select(t => t.Id)
.ToListAsync(token);
var telemetryLastDetectedDates = telemetryIds
.GroupJoin(lastDetectedDates,
t => t,
o => o.IdTelemetry,
(outer, inner) => new
{
IdTelemetry = outer,
inner.SingleOrDefault()?.LastDate,
});
var count = telemetryLastDetectedDates.Count();
var i = 0d;
foreach (var item in telemetryLastDetectedDates)
{
onProgressCallback($"Start handling telemetry: {item.IdTelemetry} from {item.LastDate}", i++ / count);
var newOperationsSaub = await OperationTimeSaubAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, db, token);
if (newOperationsSaub?.Any() == true)
{
db.SubsystemOperationTimes.AddRange(newOperationsSaub);
await db.SaveChangesAsync(token);
}
var newOperationsSpin = await OperationTimeSpinAsync(item.IdTelemetry, item.LastDate ?? DateTimeOffset.MinValue, db, token);
if (newOperationsSpin?.Any() == true)
{
db.SubsystemOperationTimes.AddRange(newOperationsSpin);
await db.SaveChangesAsync(token);
}
}
}
private static async Task<DbDataReader> ExecuteReaderAsync(IAsbCloudDbContext db, string query, CancellationToken token)
{
var connection = db.Database.GetDbConnection();
if (
connection?.State is null ||
connection.State == ConnectionState.Broken ||
connection.State == ConnectionState.Closed)
{
await db.Database.OpenConnectionAsync(token);
connection = db.Database.GetDbConnection();
}
using var command = connection.CreateCommand();
command.CommandText = query;
var result = await command.ExecuteReaderAsync(token);
return result;
}
private static async Task<IEnumerable<SubsystemOperationTime>> OperationTimeSaubAsync(int idTelemetry, DateTimeOffset begin, IAsbCloudDbContext db, CancellationToken token)
{
static bool isSubsytemAkbRotor(short? mode) => mode == 1;
static bool isSubsytemAkbSlide(short? mode) => mode == 3;
static bool IsSubsystemMse(short? state) => (state & 1) > 0;
var query =
$"select tt.date, tt.mode, tt.well_depth, tt.mse_state " +
$"from ( " +
$" select " +
$" date, " +
$" mode, " +
$" mse_state, " +
$" well_depth, " +
$" lag(mode,1) over (order by date) as mode_lag, " +
$" lead(mode,1) over (order by date) as mode_lead " +
$" from t_telemetry_data_saub " +
$" where id_telemetry = {idTelemetry} and well_depth is not null and well_depth > 0 " +
$" order by date ) as tt " +
$"where (tt.mode_lag is null or (tt.mode != tt.mode_lag and tt.mode_lead != tt.mode_lag)) and tt.date >= '{begin:u}' " +
$"order by tt.date;";
using var result = await ExecuteReaderAsync(db, query, token);
var subsystemsOperationTimes = new List<SubsystemOperationTime>();
var detectorRotor = new SubsystemDetector(idTelemetry, idSubsystemAPDRotor, isSubsytemAkbRotor, IsValid);
var detectorSlide = new SubsystemDetector(idTelemetry, idSubsystemAPDSlide, isSubsytemAkbSlide, IsValid);
var detectorMse = new SubsystemDetector(idTelemetry, idSubsytemMse, IsSubsystemMse, IsValid);
while (result.Read())
{
var mode = result.GetFieldValue<short?>(1);
var state = result.GetFieldValue<short?>(3);
var isAkbRotorEnable = isSubsytemAkbRotor(mode);
var isAkbSlideEnable = isSubsytemAkbSlide(mode);
var isMseEnable = IsSubsystemMse(state);
var date = result.GetFieldValue<DateTimeOffset>(0);
var depth = result.GetFieldValue<float>(2);
if (detectorRotor.TryDetect(mode, date, depth, out var detectedRotor))
subsystemsOperationTimes.Add(detectedRotor!);
if (detectorSlide.TryDetect(mode, date, depth, out var detectedSlide))
subsystemsOperationTimes.Add(detectedSlide!);
if (detectorMse.TryDetect(mode, date, depth, out var detectedMse))
subsystemsOperationTimes.Add(detectedMse!);
}
return subsystemsOperationTimes;
}
private static async Task<IEnumerable<SubsystemOperationTime>> OperationTimeSpinAsync(int idTelemetry, DateTimeOffset begin, IAsbCloudDbContext db, CancellationToken token)
{
static int? GetSubsytemId(short? mode, int? state)
{
// При изменении следующего кода сообщи в Vladimir.Sobolev@nedra.digital
if (state == 7 && (mode & 2) > 0)
return idSubsytemTorqueMaster;// демпфер
if (state != 0 && state != 5 && state != 6 && state != 7)
return idSubsytemSpinMaster;// осцилляция
return null;
}
var querySpin =
$"select " +
$" tspin.date, " +
$" tspin.mode, " +
$" tspin.state " +
$"from ( " +
$" select " +
$" date, " +
$" mode, " +
$" lag(mode, 1) over (order by date) as mode_lag, " +
$" lead(mode, 1) over (order by date) as mode_lead, " +
$" state, " +
$" lag(state, 1) over (order by date) as state_lag " +
$" from t_telemetry_data_spin " +
$" where id_telemetry = {idTelemetry} and date >= '{begin:u}'" +
$" order by date ) as tspin " +
$"where mode_lag is null or state_lag is null or (mode != mode_lag and mode_lead != mode_lag) or state != state_lag " +
$"order by date;";
var rows = new List<(int? IdSubsystem, DateTimeOffset Date)>(32);
using var resultSpin = await ExecuteReaderAsync(db, querySpin, token);
int? idSubsystemLast = null;
while (resultSpin.Read())
{
var mode = resultSpin.GetFieldValue<short?>(1);
var state = resultSpin.GetFieldValue<short?>(2);
var idSubsystem = GetSubsytemId(mode, state);
if (idSubsystemLast != idSubsystem)
{
idSubsystemLast = idSubsystem;
var date = resultSpin.GetFieldValue<DateTimeOffset>(0);
rows.Add((idSubsystem, date));
}
}
await resultSpin.DisposeAsync();
if (rows.Count < 2)
return Enumerable.Empty<SubsystemOperationTime>();
var minSpinDate = rows.Min(i => i.Date);
var maxSpinDate = rows.Max(i => i.Date);
var depthInterpolation = await GetInterpolation(db, idTelemetry, minSpinDate, maxSpinDate, token);
if (depthInterpolation is null)
return Enumerable.Empty<SubsystemOperationTime>();
var subsystemsOperationTimes = new List<SubsystemOperationTime>(32);
for (int i = 1; i < rows.Count; i++)
{
var r0 = rows[i - 1];
var r1 = rows[i];
if (r0.IdSubsystem is not null && r0.IdSubsystem != r1.IdSubsystem)
{
var subsystemOperationTime = new SubsystemOperationTime()
{
IdTelemetry = idTelemetry,
IdSubsystem = r0.IdSubsystem.Value,
DateStart = r0.Date,
DateEnd = r1.Date,
DepthStart = depthInterpolation.GetDepth(r0.Date),
DepthEnd = depthInterpolation.GetDepth(r1.Date),
};
if (IsValid(subsystemOperationTime))
subsystemsOperationTimes.Add(subsystemOperationTime);
}
}
return subsystemsOperationTimes;
}
private static bool IsValid(SubsystemOperationTime item)
{
var validateCode = GetValidateErrorCode(item);
if (validateCode != 0)
{
var str = System.Text.Json.JsonSerializer.Serialize(item);
Trace.TraceWarning($"Wrong({validateCode}) SubsystemOperationTime: {str}");
}
return validateCode == 0;
}
private static int GetValidateErrorCode(SubsystemOperationTime item)
{
if (item.DateStart > item.DateEnd)
return -1;
if ((item.DateEnd - item.DateStart).TotalHours > 48)
return -2;
if (item.DepthEnd < item.DepthStart)
return -3;
if (item.DepthEnd - item.DepthStart > 2000d)
return -4;
if (item.DepthEnd < 0d)
return -5;
if (item.DepthStart < 0d)
return -6;
if (item.DepthEnd > 24_0000d)
return -7;
if (item.DepthStart > 24_0000d)
return -8;
return 0;
}
private static async Task<DepthInterpolation?> GetInterpolation(IAsbCloudDbContext db, int idTelemetry, DateTimeOffset dateBegin, DateTimeOffset dateEnd, CancellationToken token)
{
var dataDepthFromSaub = await db.TelemetryDataSaub
.Where(d => d.IdTelemetry == idTelemetry)
.Where(d => d.DateTime >= dateBegin)
.Where(d => d.DateTime <= dateEnd)
.Where(d => d.WellDepth != null)
.Where(d => d.WellDepth > 0)
.GroupBy(d => Math.Ceiling(d.WellDepth ?? 0 * 10))
.Select(g => new
{
DateMin = g.Min(d => d.DateTime),
DepthMin = g.Min(d => d.WellDepth) ?? 0,
})
.OrderBy(i => i.DateMin)
.ToArrayAsync(token);
if (!dataDepthFromSaub.Any())
return null;
var depthInterpolation = new DepthInterpolation(dataDepthFromSaub.Select(i => (i.DateMin, i.DepthMin)));
return depthInterpolation;
}
}

View File

@ -18,62 +18,31 @@ using System.Threading.Tasks;
using AsbCloudApp.IntegrationEvents;
using AsbCloudApp.IntegrationEvents.Interfaces;
namespace AsbCloudInfrastructure.Services
namespace AsbCloudInfrastructure.Services;
public class WellInfoService
{
public class WellInfoService
public class WorkWellInfoUpdate : Work
{
class WellMapInfoWithComanies : WellMapInfoDto
public WorkWellInfoUpdate()
: base("Well statistics update")
{
public int? IdTelemetry { get; set; }
public IEnumerable<int> IdsCompanies { get; set; } = null!;
Timeout = TimeSpan.FromMinutes(30);
}
private const string workId = "Well statistics update";
private static readonly TimeSpan workPeriod = TimeSpan.FromMinutes(30);
private readonly TelemetryDataCache<TelemetryDataSaubDto> telemetryDataSaubCache;
private readonly TelemetryDataCache<TelemetryDataSpinDto> telemetryDataSpinCache;
private readonly IWitsRecordRepository<Record7Dto> witsRecord7Repository;
private readonly IWitsRecordRepository<Record1Dto> witsRecord1Repository;
private readonly IGtrRepository gtrRepository;
private static IEnumerable<WellMapInfoWithComanies> WellMapInfo = Enumerable.Empty<WellMapInfoWithComanies>();
public WellInfoService(
TelemetryDataCache<TelemetryDataSaubDto> telemetryDataSaubCache,
TelemetryDataCache<TelemetryDataSpinDto> telemetryDataSpinCache,
IWitsRecordRepository<Record7Dto> witsRecord7Repository,
IWitsRecordRepository<Record1Dto> witsRecord1Repository,
IGtrRepository gtrRepository)
protected override async Task Action(string id, IServiceProvider services, Action<string, double?> onProgressCallback, CancellationToken token)
{
this.telemetryDataSaubCache = telemetryDataSaubCache;
this.telemetryDataSpinCache = telemetryDataSpinCache;
this.witsRecord7Repository = witsRecord7Repository;
this.witsRecord1Repository = witsRecord1Repository;
this.gtrRepository = gtrRepository;
}
public static WorkPeriodic MakeWork()
{
var workPeriodic = new WorkPeriodic(workId, WorkAction, workPeriod)
{
Timeout = TimeSpan.FromMinutes(30)
};
return workPeriodic;
}
private static async Task WorkAction(string workName, IServiceProvider serviceProvider, CancellationToken token)
{
var wellService = serviceProvider.GetRequiredService<IWellService>();
var operationsStatService = serviceProvider.GetRequiredService<IOperationsStatService>();
var processMapRepository = serviceProvider.GetRequiredService<IProcessMapPlanRepository>();
var subsystemOperationTimeService = serviceProvider.GetRequiredService<ISubsystemOperationTimeService>();
var telemetryDataSaubCache = serviceProvider.GetRequiredService<TelemetryDataCache<TelemetryDataSaubDto>>();
var messageHub = serviceProvider.GetRequiredService<IIntegrationEventHandler<UpdateWellInfoEvent>>();
var wellService = services.GetRequiredService<IWellService>();
var operationsStatService = services.GetRequiredService<IOperationsStatService>();
var processMapRepository = services.GetRequiredService<IProcessMapPlanRepository>();
var subsystemOperationTimeService = services.GetRequiredService<ISubsystemOperationTimeService>();
var telemetryDataSaubCache = services.GetRequiredService<TelemetryDataCache<TelemetryDataSaubDto>>();
var messageHub = services.GetRequiredService<IIntegrationEventHandler<UpdateWellInfoEvent>>();
var wells = await wellService.GetAllAsync(token);
var activeWells = wells.Where(well => well.IdState == 1);
var wellsIds = wells.Select(w => w.Id);
var wellsIds = activeWells.Select(w => w.Id);
var processMapRequests = wellsIds.Select(id => new ProcessMapRequest { IdWell = id });
var processMaps = await processMapRepository.GetProcessMapAsync(processMapRequests, token);
@ -90,11 +59,14 @@ namespace AsbCloudInfrastructure.Services
var subsystemStat = await subsystemOperationTimeService
.GetStatByActiveWells(wellsIds, token);
subsystemStat = subsystemStat.ToArray();
WellMapInfo = wells.Select(well => {
var count = activeWells.Count();
var i = 0d;
WellMapInfo = activeWells.Select(well => {
var wellMapInfo = well.Adapt<WellMapInfoWithComanies>();
wellMapInfo.IdState = well.IdState;
onProgressCallback($"Start updating info by well({well.Id}): {well.Caption}", i++ / count);
double? currentDepth = null;
TelemetryDataSaubDto? lastSaubTelemetry = null;
@ -103,7 +75,7 @@ namespace AsbCloudInfrastructure.Services
{
wellMapInfo.IdTelemetry = well.IdTelemetry.Value;
lastSaubTelemetry = telemetryDataSaubCache.GetLastOrDefault(well.IdTelemetry.Value);
if(lastSaubTelemetry is not null)
if (lastSaubTelemetry is not null)
{
currentDepth = lastSaubTelemetry.WellDepth;
}
@ -124,7 +96,7 @@ namespace AsbCloudInfrastructure.Services
{
wellProcessMap = wellProcessMaps.FirstOrDefault(p => p.IdWellSectionType == idSection);
}
else if(currentDepth.HasValue)
else if (currentDepth.HasValue)
{
wellProcessMap = wellProcessMaps.FirstOrDefault(p => p.DepthStart <= currentDepth.Value && p.DepthEnd >= currentDepth.Value);
}
@ -200,6 +172,35 @@ namespace AsbCloudInfrastructure.Services
await Task.WhenAll(updateWellInfoEventTasks);
}
}
class WellMapInfoWithComanies : WellMapInfoDto
{
public int? IdTelemetry { get; set; }
public IEnumerable<int> IdsCompanies { get; set; } = null!;
}
private readonly TelemetryDataCache<TelemetryDataSaubDto> telemetryDataSaubCache;
private readonly TelemetryDataCache<TelemetryDataSpinDto> telemetryDataSpinCache;
private readonly IWitsRecordRepository<Record7Dto> witsRecord7Repository;
private readonly IWitsRecordRepository<Record1Dto> witsRecord1Repository;
private readonly IGtrRepository gtrRepository;
private static IEnumerable<WellMapInfoWithComanies> WellMapInfo = Enumerable.Empty<WellMapInfoWithComanies>();
public WellInfoService(
TelemetryDataCache<TelemetryDataSaubDto> telemetryDataSaubCache,
TelemetryDataCache<TelemetryDataSpinDto> telemetryDataSpinCache,
IWitsRecordRepository<Record7Dto> witsRecord7Repository,
IWitsRecordRepository<Record1Dto> witsRecord1Repository,
IGtrRepository gtrRepository)
{
this.telemetryDataSaubCache = telemetryDataSaubCache;
this.telemetryDataSpinCache = telemetryDataSpinCache;
this.witsRecord7Repository = witsRecord7Repository;
this.witsRecord1Repository = witsRecord1Repository;
this.gtrRepository = gtrRepository;
}
private WellMapInfoWithTelemetryStat Convert(WellMapInfoWithComanies wellInfo)
{
@ -241,5 +242,4 @@ namespace AsbCloudInfrastructure.Services
return null;
}
}
}

View File

@ -1,7 +1,6 @@
using AsbCloudApp.Services;
using AsbCloudDb.Model;
using AsbCloudInfrastructure.Services.DetectOperations;
using AsbCloudInfrastructure.Services.Subsystems;
using AsbCloudInfrastructure.Services;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
@ -12,10 +11,10 @@ using System.Threading;
using AsbCloudInfrastructure.Background;
using AsbCloudApp.Data.SAUB;
using AsbCloudInfrastructure.Services.SAUB;
using AsbCloudInfrastructure.Services.Subsystems;
namespace AsbCloudInfrastructure
{
public class Startup
{
public static void BeforeRunHandler(IHost host)
@ -24,7 +23,7 @@ namespace AsbCloudInfrastructure
var provider = scope.ServiceProvider;
var context = provider.GetRequiredService<IAsbCloudDbContext>();
context.Database.SetCommandTimeout(TimeSpan.FromSeconds(2 * 60));
context.Database.SetCommandTimeout(TimeSpan.FromMinutes(5));
context.Database.Migrate();
// TODO: Сделать инициализацию кеша телеметрии более явной.
@ -32,11 +31,11 @@ namespace AsbCloudInfrastructure
_ = provider.GetRequiredService<TelemetryDataCache<TelemetryDataSpinDto>>();
var backgroundWorker = provider.GetRequiredService<BackgroundWorker>();
backgroundWorker.Push(WellInfoService.MakeWork());
backgroundWorker.Push(OperationDetectionWorkFactory.MakeWork());
backgroundWorker.Push(SubsystemOperationTimeCalcWorkFactory.MakeWork());
backgroundWorker.Push(LimitingParameterCalcWorkFactory.MakeWork());
backgroundWorker.Push(MakeMemoryMonitoringWork());
backgroundWorker.WorkStore.AddPeriodic<WellInfoService.WorkWellInfoUpdate>(TimeSpan.FromMinutes(30));
backgroundWorker.WorkStore.AddPeriodic<WorkOperationDetection>(TimeSpan.FromMinutes(15));
backgroundWorker.WorkStore.AddPeriodic<WorkSubsystemOperationTimeCalc>(TimeSpan.FromMinutes(30));
backgroundWorker.WorkStore.AddPeriodic<WorkLimitingParameterCalc>(TimeSpan.FromMinutes(30));
backgroundWorker.WorkStore.AddPeriodic(MakeMemoryMonitoringWork(), TimeSpan.FromMinutes(1));
var notificationBackgroundWorker = provider.GetRequiredService<NotificationBackgroundWorker>();
@ -48,17 +47,15 @@ namespace AsbCloudInfrastructure
});
}
static WorkPeriodic MakeMemoryMonitoringWork()
static Work MakeMemoryMonitoringWork()
{
var workId = "Memory monitoring";
var workAction = (string _, IServiceProvider _, CancellationToken _) => {
var workAction = (string _, IServiceProvider _, Action<string, double?> _, CancellationToken _) => {
var bytes = GC.GetTotalMemory(false);
var bytesString = FromatBytes(bytes);
System.Diagnostics.Trace.TraceInformation($"Total memory allocated is {bytesString} bytes. DbContext count is:{AsbCloudDbContext.ReferenceCount}");
return Task.CompletedTask;
};
var workPeriod = TimeSpan.FromMinutes(1);
var work = new WorkPeriodic(workId, workAction, workPeriod);
var work = Work.CreateByDelegate("Memory monitoring", workAction);
return work;
}

View File

@ -1,4 +1,6 @@
using AsbCloudApp.Data.SAUB;
using AsbCloudApp.Data;
using AsbCloudApp.Data.SAUB;
using AsbCloudApp.Requests;
using AsbCloudApp.Services;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
@ -35,12 +37,22 @@ namespace AsbCloudWebApi.Tests.Middlware
public class TelemetryDataSaubService : ITelemetryDataSaubService
{
public async Task<IEnumerable<TelemetryDataSaubDto>?> GetAsync(int idWell, DateTime dateBegin = default, double intervalSec = 600, int approxPointsCount = 1024, CancellationToken token = default)
public async Task<IEnumerable<TelemetryDataSaubDto>> GetAsync(int idWell, DateTime dateBegin = default, double intervalSec = 600, int approxPointsCount = 1024, CancellationToken token = default)
{
await Task.Delay(1000, token);
return Enumerable.Empty<TelemetryDataSaubDto>();
}
public Task<IEnumerable<TelemetryDataSaubDto>> GetAsync(int idWell, TelemetryDataRequest request, CancellationToken token)
{
throw new NotImplementedException();
}
public Task<DatesRangeDto?> GetRangeAsync(int idWell, DateTimeOffset start, DateTimeOffset end, CancellationToken token)
{
throw new NotImplementedException();
}
public Task<IEnumerable<TelemetryDataSaubStatDto>> GetTelemetryDataStatAsync(int idTelemetry, CancellationToken token) => throw new NotImplementedException();
public Task<Stream> GetZippedCsv(int idWell, DateTime beginDate, DateTime endDate, CancellationToken token)

View File

@ -88,7 +88,7 @@ namespace AsbCloudWebApi.Tests.ServicesTests
await BackgroundWorker.StartAsync(CancellationToken.None);
await Task.Delay(10);
Assert.True(work.ExecutionTime > TimeSpan.Zero);
Assert.True(work.LastExecutionTime > TimeSpan.Zero);
}
[Fact]

View File

@ -366,7 +366,7 @@ namespace AsbCloudWebApi.Tests.ServicesTests
var state = await service.GetStateAsync(idWell, publisher1.Id, CancellationToken.None);
Assert.Equal(2, state.IdState);
backgroundWorkerMock.Verify(s => s.Push(It.IsAny<WorkBase>()));
backgroundWorkerMock.Verify(s => s.Push(It.IsAny<Work>()));
}
[Fact]

View File

@ -0,0 +1,50 @@
using AsbCloudApp.Data;
using AsbCloudInfrastructure.Background;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System.Linq;
namespace AsbCloudWebApi.Controllers
{
[Route("api/[controller]")]
[Authorize]
[ApiController]
public class BackgroundWorkController : ControllerBase
{
private readonly BackgroundWorker backgroundWorker;
public BackgroundWorkController(BackgroundWorker backgroundWorker)
{
this.backgroundWorker = backgroundWorker;
}
[HttpGet]
public IActionResult GetAll()
{
var result = new {
CurrentWork = (BackgroundWorkDto?)backgroundWorker.CurrentWork,
RunOnceQueue = backgroundWorker.WorkStore.RunOnceQueue.Select(work => (BackgroundWorkDto)work),
Periodics = backgroundWorker.WorkStore.Periodics.Select(work => (BackgroundWorkDto)work.Work),
Felled = backgroundWorker.WorkStore.Felled.Select(work => (BackgroundWorkDto)work),
};
return Ok(result);
}
[HttpGet("Current")]
public IActionResult GetCurrent()
{
var work = backgroundWorker.CurrentWork;
if (work == null)
return NoContent();
return Ok(work);
}
[HttpGet("Failed")]
public IActionResult GetFelled()
{
var result = backgroundWorker.WorkStore.Felled.Select(work => (BackgroundWorkDto)work);
return Ok(result);
}
}
}

View File

@ -0,0 +1,72 @@
using Microsoft.AspNetCore.Mvc;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Linq;
namespace AsbCloudWebApi.Controllers
{
/// <summary>
/// Имитирует разные типы ответа сервера
/// </summary>
[Route("api/[controller]")]
[ApiController]
public class MockController : ControllerBase
{
/// <summary>
/// имитирует http-400
/// </summary>
[HttpGet("400")]
[ProducesResponseType(typeof(ValidationProblemDetails), (int)System.Net.HttpStatusCode.BadRequest)]
public IActionResult Get400([FromQuery, Required]IDictionary<string, string> args)
{
var errors = new Dictionary<string, string[]>();
foreach (var arg in args)
{
var countOfErrors = ((arg.Key + arg.Value).Length % 3) + 1;
var errorsText = Enumerable.Range(0, countOfErrors)
.Select(i => $"{arg.Value} не соответствует критериям проверки № {i}");
errors.Add(arg.Key, errorsText.ToArray());
}
if (errors.Any())
{
var problem = new ValidationProblemDetails(errors);
return BadRequest(problem);
}
else
{
var problem = new ValidationProblemDetails { Detail = "at least one argument must be provided" };
return BadRequest(problem);
}
}
/// <summary>
/// имитирует http-403
/// </summary>
[HttpGet("403")]
public IActionResult Get403()
{
return Forbid();
}
/// <summary>
/// имитирует http-401
/// </summary>
[HttpGet("401")]
public IActionResult Get401()
{
return Unauthorized();
}
/// <summary>
/// имитирует http-500
/// </summary>
[HttpGet("500")]
public IActionResult Get500()
{
throw new System.Exception("Это тестовое исключение");
}
}
}

View File

@ -1,11 +1,9 @@
using AsbCloudApp.Data;
using AsbCloudApp.Data.Subsystems;
using AsbCloudApp.Exceptions;
using AsbCloudApp.Requests;
using AsbCloudApp.Services;
using AsbCloudApp.Services.Subsystems;
using AsbCloudDb.Model;
using AsbCloudInfrastructure;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System;
@ -56,7 +54,6 @@ namespace AsbCloudWebApi.Controllers.Subsystems
{
if (!await UserHasAccesToWellAsync(request.IdWell, token))
return Forbid();
await CustomValidate(request, token);
var subsystemResult = await subsystemOperationTimeService.GetStatAsync(request, token);
return Ok(subsystemResult);
}
@ -133,7 +130,6 @@ namespace AsbCloudWebApi.Controllers.Subsystems
{
if (!await UserHasAccesToWellAsync(request.IdWell, token))
return Forbid();
await CustomValidate(request, token);
var result = await subsystemOperationTimeService.GetOperationTimeAsync(request, token);
return Ok(result);
@ -155,7 +151,6 @@ namespace AsbCloudWebApi.Controllers.Subsystems
{
if (!await UserHasAccesToWellAsync(request.IdWell, token))
return Forbid();
await CustomValidate(request, token);
var result = await subsystemOperationTimeService.DeleteAsync(request, token);
return Ok(result);
}
@ -180,24 +175,5 @@ namespace AsbCloudWebApi.Controllers.Subsystems
return true;
return false;
}
/// <summary>
/// Валидирует запрос и бросает исключение ArgumentInvalidException
/// </summary>
/// <param name="request"></param>
/// <param name="token"></param>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
private async Task CustomValidate(SubsystemOperationTimeRequest request, CancellationToken token)
{
var well = await wellService.GetOrDefaultAsync(request.IdWell, token);
if (well is not null && request.LtDate.HasValue)
{
var ltDate = request.LtDate.Value;
var utcDateRequest = ltDate.ToUtcDateTimeOffset(well.Timezone.Hours);
if (utcDateRequest.AddHours(2) > DateTime.UtcNow)
throw new ArgumentInvalidException(nameof(request.LtDate), "Запрашиваемый диапазон должен заканчиваться за 2 часа до текущего времени");
}
}
}
}

View File

@ -29,19 +29,19 @@ public class SignalRNotificationTransportService : INotificationTransportService
{
var workId = HashCode.Combine(notifications.Select(n => n.Id)).ToString("x");
if (backgroundWorker.Contains(workId))
if (backgroundWorker.WorkStore.RunOnceQueue.Any(w => w.Id == workId))
return Task.CompletedTask;
var workAction = MakeSignalRSendWorkAction(notifications);
var work = new WorkBase(workId, workAction);
backgroundWorker.Push(work);
var work = Work.CreateByDelegate(workId, workAction);
backgroundWorker.WorkStore.RunOnceQueue.Enqueue(work);
return Task.CompletedTask;
}
private Func<string, IServiceProvider, CancellationToken, Task> MakeSignalRSendWorkAction(IEnumerable<NotificationDto> notifications)
private Func<string, IServiceProvider, Action<string, double?>, CancellationToken, Task> MakeSignalRSendWorkAction(IEnumerable<NotificationDto> notifications)
{
return async (_, serviceProvider, cancellationToken) =>
return async (_, serviceProvider, onProgress, cancellationToken) =>
{
var notificationPublisher = serviceProvider.GetRequiredService<NotificationPublisher>();

View File

@ -1,40 +0,0 @@
using AsbCloudApp.Data;
using System;
using System.Collections.Generic;
namespace ConsoleApp1
{
public static class DebugWellOperationsStatService
{
public static void Main(/*string[] args*/)
{
//var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
// .UseNpgsql("Host=localhost;Database=postgres;Username=postgres;Password=q;Persist Security Info=True")
// .Options;
//using var db = new AsbCloudDbContext(options);
//var cacheDb = new CacheDb();
//var telemetryService = new TelemetryService(db, new TelemetryTracker(cacheDb), cacheDb);
//var wellService = new WellService(db, telemetryService, cacheDb);
//var wellOptsStat = new OperationsStatService(db, cacheDb, wellService);
//var tvd = wellOptsStat.GetTvdAsync(1, default).Result;
//Print(tvd);
}
private static void Print(IEnumerable<PlanFactPredictBase<WellOperationDto>> tvd)
{
Console.WriteLine("|\tplan\t|\tfact\t|\tprog\t|");
Console.WriteLine("|:-------------:|:-------------:|:-------------:|");
foreach (var item in tvd)
Print(item);
}
private static void Print(PlanFactPredictBase<WellOperationDto> item)
{
static string GetText(WellOperationDto item)
=> (item is null)
? " --------- "
: $"{item.IdCategory} d:{item.DepthStart} ";
Console.WriteLine($"|\t{GetText(item.Plan)}\t|\t{GetText(item.Fact)}\t|\t{GetText(item.Predict)}\t|");
}
}
}