Отображение данных в реальном времени

1. Добавил инфраструктуру для доменных событий.
2. Сделал Hub отправки для информации о скважине.
This commit is contained in:
parent 80a07d0b4e
commit 47bd9cb56b
8 changed files with 104 additions and 5 deletions

View File

@ -0,0 +1,6 @@
namespace AsbCloudApp.IntegrationEvents.Events;
/// <summary>
/// Интерфейс маркер для доменных событий
/// </summary>
public interface IIntegrationEvent { }

View File

@ -0,0 +1,7 @@
namespace AsbCloudApp.IntegrationEvents.Events;
/// <summary>
/// Обновление информации о скважине
/// </summary>
/// <param name="IdWell"></param>
public record WellInfoUpdaterEvent(int IdWell) : IIntegrationEvent;

View File

@ -0,0 +1,20 @@
using System.Threading;
using System.Threading.Tasks;
using AsbCloudApp.IntegrationEvents.Events;
namespace AsbCloudApp.IntegrationEvents;
/// <summary>
/// Обработчик событий
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IIntegrationEventHandler<in T> where T: IIntegrationEvent
{
/// <summary>
/// Метод обработки события
/// </summary>
/// <param name="integrationEvent"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task HandleAsync(T integrationEvent, CancellationToken cancellationToken);
}

View File

@ -16,6 +16,8 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using AsbCloudApp.IntegrationEvents;
using AsbCloudApp.IntegrationEvents.Events;
namespace AsbCloudInfrastructure.Services namespace AsbCloudInfrastructure.Services
{ {
@ -36,7 +38,7 @@ namespace AsbCloudInfrastructure.Services
private readonly IWitsRecordRepository<Record1Dto> witsRecord1Repository; private readonly IWitsRecordRepository<Record1Dto> witsRecord1Repository;
private readonly IGtrRepository gtrRepository; private readonly IGtrRepository gtrRepository;
private static IEnumerable<WellMapInfoWithComanies> WellMapInfo = Enumerable.Empty<WellMapInfoWithComanies>(); private static IEnumerable<WellMapInfoWithComanies> WellMapInfo = Enumerable.Empty<WellMapInfoWithComanies>();
public WellInfoService( public WellInfoService(
TelemetryDataCache<TelemetryDataSaubDto> telemetryDataSaubCache, TelemetryDataCache<TelemetryDataSaubDto> telemetryDataSaubCache,
TelemetryDataCache<TelemetryDataSpinDto> telemetryDataSpinCache, TelemetryDataCache<TelemetryDataSpinDto> telemetryDataSpinCache,
@ -69,6 +71,7 @@ namespace AsbCloudInfrastructure.Services
var processMapRepository = serviceProvider.GetRequiredService<IProcessMapPlanRepository>(); var processMapRepository = serviceProvider.GetRequiredService<IProcessMapPlanRepository>();
var subsystemOperationTimeService = serviceProvider.GetRequiredService<ISubsystemOperationTimeService>(); var subsystemOperationTimeService = serviceProvider.GetRequiredService<ISubsystemOperationTimeService>();
var telemetryDataSaubCache = serviceProvider.GetRequiredService<TelemetryDataCache<TelemetryDataSaubDto>>(); var telemetryDataSaubCache = serviceProvider.GetRequiredService<TelemetryDataCache<TelemetryDataSaubDto>>();
var messageHub = serviceProvider.GetRequiredService<IIntegrationEventHandler<WellInfoUpdaterEvent>>();
var activeWells = await wellService.GetAsync(new() {IdState = 1}, token); var activeWells = await wellService.GetAsync(new() {IdState = 1}, token);
@ -167,6 +170,11 @@ namespace AsbCloudInfrastructure.Services
return wellMapInfo; return wellMapInfo;
}).ToArray(); }).ToArray();
foreach (var idWell in activeWellsIds)
{
await messageHub.HandleAsync(new WellInfoUpdaterEvent(idWell), token);
}
} }
private WellMapInfoWithTelemetryStat Convert(WellMapInfoWithComanies wellInfo) private WellMapInfoWithTelemetryStat Convert(WellMapInfoWithComanies wellInfo)

View File

@ -3,7 +3,6 @@ using AsbCloudApp.Repositories;
using AsbCloudDb.Model; using AsbCloudDb.Model;
using AsbCloudInfrastructure.Services; using AsbCloudInfrastructure.Services;
using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.IdentityModel.Tokens; using Microsoft.IdentityModel.Tokens;
using Microsoft.OpenApi.Models; using Microsoft.OpenApi.Models;
@ -12,8 +11,12 @@ using System.Collections.Generic;
using System.IO; using System.IO;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using AsbCloudApp.IntegrationEvents;
using AsbCloudApp.IntegrationEvents.Events;
using AsbCloudApp.Services.Notifications; using AsbCloudApp.Services.Notifications;
using AsbCloudWebApi.SignalR;
using AsbCloudWebApi.SignalR.Services; using AsbCloudWebApi.SignalR.Services;
using Microsoft.AspNetCore.Mvc;
using Microsoft.OpenApi.Any; using Microsoft.OpenApi.Any;
namespace AsbCloudWebApi namespace AsbCloudWebApi
@ -140,5 +143,8 @@ namespace AsbCloudWebApi
{ {
services.AddTransient<INotificationTransportService, SignalRNotificationTransportService>(); services.AddTransient<INotificationTransportService, SignalRNotificationTransportService>();
} }
public static void AddIntegrationEvents(this IServiceCollection services) => services
.AddTransient<IIntegrationEventHandler<WellInfoUpdaterEvent>, WellInfoUpdaterHub>();
} }
} }

View File

@ -0,0 +1,49 @@
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using AsbCloudApp.IntegrationEvents;
using AsbCloudApp.IntegrationEvents.Events;
using AsbCloudInfrastructure.Services;
using Microsoft.AspNetCore.SignalR;
namespace AsbCloudWebApi.SignalR;
public class WellInfoUpdaterHub : Hub, IIntegrationEventHandler<WellInfoUpdaterEvent>
{
private const string groupTemplate = "system_operation_updater_well_{0}";
private readonly IHubContext<WellInfoUpdaterHub> hubContext;
private readonly WellInfoService wellInfoService;
public WellInfoUpdaterHub(IHubContext<WellInfoUpdaterHub> hubContext,
WellInfoService wellInfoService)
{
this.hubContext = hubContext;
this.wellInfoService = wellInfoService;
}
public async Task OnConnectedAsync(int idWell)
{
await base.OnConnectedAsync();
await Groups.AddToGroupAsync(Context.ConnectionId, string.Format(groupTemplate, idWell));
await HandleAsync(new WellInfoUpdaterEvent(idWell), CancellationToken.None);
}
public async Task HandleAsync(WellInfoUpdaterEvent integrationEvent, CancellationToken cancellationToken)
{
const string method = "well_info_update";
var wellInfo = wellInfoService.FirstOrDefault(w => w.Id == integrationEvent.IdWell);
if (wellInfo != null)
{
var serializedObject = JsonSerializer.Serialize(wellInfo);
await hubContext.Clients.Group(string.Format(groupTemplate, integrationEvent.IdWell))
.SendCoreAsync(method, new object[] { serializedObject }, cancellationToken);
}
}
}

View File

@ -45,6 +45,8 @@ namespace AsbCloudWebApi
services.AddNotificationTransportServices(); services.AddNotificationTransportServices();
services.AddIntegrationEvents();
services.AddJWTAuthentication(); services.AddJWTAuthentication();
services.AddSignalR() services.AddSignalR()
@ -151,6 +153,7 @@ namespace AsbCloudWebApi
app.UseEndpoints(endpoints => app.UseEndpoints(endpoints =>
{ {
endpoints.MapControllers(); endpoints.MapControllers();
endpoints.MapHub<WellInfoUpdaterHub>("/hubs/limitingParameters");
endpoints.MapHub<NotificationHub>("/hubs/notifications"); endpoints.MapHub<NotificationHub>("/hubs/notifications");
endpoints.MapHub<TelemetryHub>("/hubs/telemetry"); endpoints.MapHub<TelemetryHub>("/hubs/telemetry");
endpoints.MapHub<ReportsHub>("/hubs/reports"); endpoints.MapHub<ReportsHub>("/hubs/reports");

View File

@ -10,7 +10,7 @@ internal class Program
{ {
var connectionBuilder = new HubConnectionBuilder(); var connectionBuilder = new HubConnectionBuilder();
var connection = connectionBuilder var connection = connectionBuilder
.WithUrl("http://localhost:5000/hubs/notifications", connectionOptions => { .WithUrl("http://localhost:5000/hubs/limitingParameters", connectionOptions => {
connectionOptions.AccessTokenProvider = AccessTokenProvider; connectionOptions.AccessTokenProvider = AccessTokenProvider;
}) })
.WithAutomaticReconnect() .WithAutomaticReconnect()
@ -26,9 +26,9 @@ internal class Program
connection.StartAsync().Wait(); connection.StartAsync().Wait();
//Console.WriteLine("OnConnected"); //Console.WriteLine("OnConnected");
//connection.SendCoreAsync("OnConnected", new object[] { }, CancellationToken.None).Wait(); connection.SendCoreAsync("OnConnectedAsync", new object[] { 1 }, CancellationToken.None).Wait();
var subsction = connection.On<object>("receiveNotifications", (str1) => { var subsction = connection.On<object>("well_info_update", (str1) => {
Console.WriteLine(str1); Console.WriteLine(str1);
} ); } );