fix dictionaries sync

This commit is contained in:
Фролов 2021-07-16 09:15:10 +05:00
parent 5259a1c730
commit 0c645ed960
6 changed files with 155 additions and 357 deletions

View File

@ -15,7 +15,7 @@ namespace AsbCloudInfrastructure
public static IServiceCollection AddInfrastructure(this IServiceCollection services, IConfiguration configuration)
{
services.AddDbContext<AsbCloudDbContext>(options =>
options.UseNpgsql(configuration.GetConnectionString("DefaultConnection")));
options.UseNpgsql(configuration.GetConnectionString("DefaultConnection")), ServiceLifetime.Scoped);
services.AddScoped<IAsbCloudDbContext>(provider => provider.GetService<AsbCloudDbContext>());

View File

@ -275,7 +275,7 @@ namespace AsbCloudInfrastructure.Services
return drillingAnalysis;
}
private static double GetAForLinearFormula(IEnumerable<(double?, double)> rawData)
private static double GetAForLinearFormula(IEnumerable<(double? x, double y)> rawData)
{
var (xSum, ySum, xySum, x2Sum) = GetFormulaVariables(rawData);

View File

@ -12,7 +12,6 @@ namespace AsbCloudInfrastructure.Services.Cache
private readonly DbContext context;
private (DateTime refreshDate, IEnumerable<object> entities) data;
private readonly List<TEntity> cached;
private readonly ReaderWriterLockSlim cacheLocker = new ReaderWriterLockSlim();
internal CacheTable(DbContext context, (DateTime refreshDate, IEnumerable<object> entities) data)
{
@ -24,9 +23,6 @@ namespace AsbCloudInfrastructure.Services.Cache
public TEntity this[int index] { get => cached.ElementAt(index); }
public int Refresh()
{
cacheLocker.EnterWriteLock();
try
{
cached.Clear();
var dbEntities = context.Set<TEntity>().ToList();
@ -34,16 +30,8 @@ namespace AsbCloudInfrastructure.Services.Cache
data.refreshDate = DateTime.Now;
return cached.Count;
}
finally
{
cacheLocker.ExitWriteLock();
}
}
public async Task<int> RefreshAsync(CancellationToken token = default)
{
cacheLocker.EnterWriteLock();
try
{
cached.Clear();
var dbEntities = await context.Set<TEntity>().ToListAsync(token).ConfigureAwait(false);
@ -51,11 +39,6 @@ namespace AsbCloudInfrastructure.Services.Cache
data.refreshDate = DateTime.Now;
return cached.Count;
}
finally
{
cacheLocker.ExitWriteLock();
}
}
private bool CheckRefresh(RefreshMode refreshMode)
{
@ -106,30 +89,24 @@ namespace AsbCloudInfrastructure.Services.Cache
public TEntity FirstOrDefault(RefreshMode refreshMode = RefreshMode.IfResultEmpty)
{
bool isUpdated = CheckRefresh(refreshMode);
var result = cached.FirstOrDefault();
if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated)
{
Refresh();
return cached.FirstOrDefault();
}
return result;
}
public async Task<TEntity> FirstOrDefaultAsync(RefreshMode refreshMode = RefreshMode.IfResultEmpty, CancellationToken token = default)
{
bool isUpdated = await CheckRefreshAsync(refreshMode, token);
var result = cached.FirstOrDefault();
if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated)
{
await RefreshAsync(token);
return cached.FirstOrDefault();
}
return result;
}
@ -139,30 +116,24 @@ namespace AsbCloudInfrastructure.Services.Cache
public TEntity FirstOrDefault(Func<TEntity, bool> predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty)
{
bool isUpdated = CheckRefresh(refreshMode);
var result = cached.FirstOrDefault(predicate);
if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated)
{
Refresh();
return cached.FirstOrDefault(predicate);
}
return result;
}
public async Task<TEntity> FirstOrDefaultAsync(Func<TEntity, bool> predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty, CancellationToken token = default)
{
bool isUpdated = await CheckRefreshAsync(refreshMode, token);
var result = cached.FirstOrDefault(predicate);
if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated)
{
await RefreshAsync(token);
return cached.FirstOrDefault(predicate);
}
return result;
}
@ -172,38 +143,28 @@ namespace AsbCloudInfrastructure.Services.Cache
public IEnumerable<TEntity> Select(Func<TEntity, bool> predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty)
{
bool isUpdated = CheckRefresh(refreshMode);
var result = cached.Where(predicate);
if (!result.Any() && refreshMode == RefreshMode.IfResultEmpty && !isUpdated)
{
Refresh();
return cached.Where(predicate);
}
return result;
}
public async Task<IEnumerable<TEntity>> SelectAsync(Func<TEntity, bool> predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty, CancellationToken token = default)
{
bool isUpdated = await CheckRefreshAsync(refreshMode, token);
var result = cached.Where(predicate);
if (!result.Any() && refreshMode == RefreshMode.IfResultEmpty && !isUpdated)
{
await RefreshAsync(token);
return cached.Where(predicate);
}
return result;
}
public IEnumerable<TEntity> Update(Func<TEntity, bool> predicate, Action<TEntity> mutation)
{
cacheLocker.EnterWriteLock();
try
public IEnumerable<TEntity> Mutate(Func<TEntity, bool> predicate, Action<TEntity> mutation)
{
var dbSet = context.Set<TEntity>();
var dbEntities = dbSet.Where(predicate);
@ -217,17 +178,8 @@ namespace AsbCloudInfrastructure.Services.Cache
cached.AddRange(dbEntities);
return dbEntities;
}
finally
{
cacheLocker.ExitWriteLock();
}
}
public async Task<IEnumerable<TEntity>> UpdateAsync(Func<TEntity, bool> predicate, Action<TEntity> mutation, CancellationToken token = default)
{
cacheLocker.EnterWriteLock();
try
public async Task<IEnumerable<TEntity>> MutateAsync(Func<TEntity, bool> predicate, Action<TEntity> mutation, CancellationToken token = default)
{
var dbSet = context.Set<TEntity>();
var dbEntities = dbSet.Where(predicate);
@ -241,155 +193,48 @@ namespace AsbCloudInfrastructure.Services.Cache
cached.AddRange(dbEntities);
return dbEntities;
}
finally
{
cacheLocker.ExitWriteLock();
}
}
public TEntity Upsert(TEntity entity)
{
cacheLocker.EnterWriteLock();
try
{
var dbSet = context.Set<TEntity>();
Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry<TEntity> entityEntry;
if (cached.Contains(entity))
{
entityEntry = dbSet.Update(entity);
cached.Remove(entity);
}
else
{
entityEntry = dbSet.Add(entity);
}
var updated = dbSet.Update(entity);
context.SaveChanges();
cached.Add(entityEntry.Entity);
return entityEntry.Entity;
Refresh();
return updated.Entity;
}
finally
{
cacheLocker.ExitWriteLock();
}
}
public async Task<TEntity> UpsertAsync(TEntity entity, CancellationToken token = default)
{
cacheLocker.EnterWriteLock();
try
{
var dbSet = context.Set<TEntity>();
Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry<TEntity> entityEntry;
if (cached.Contains(entity))
{
entityEntry = dbSet.Update(entity);
cached.Remove(entity);
}
else
{
entityEntry = dbSet.Add(entity);
}
var updated = dbSet.Update(entity);
await context.SaveChangesAsync(token).ConfigureAwait(false);
cached.Add(entityEntry.Entity);
return entityEntry.Entity;
}
finally
{
cacheLocker.ExitWriteLock();
}
await RefreshAsync(token).ConfigureAwait(false);
return updated.Entity;
}
public IEnumerable<TEntity> Upsert(IEnumerable<TEntity> entities)
{
cacheLocker.EnterWriteLock();
try
{
var dbSet = context.Set<TEntity>();
var upsertedEntries = new List<Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry<TEntity>>(entities.Count());
var upsertedEntries = new List<TEntity>(entities.Count());
foreach (var entity in entities)
{
Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry<TEntity> entityEntry;
if (cached.Contains(entity))
{
entityEntry = dbSet.Update(entity);
cached.Remove(entity);
}
else
{
entityEntry = dbSet.Add(entity);
}
upsertedEntries.Add(entityEntry);
}
upsertedEntries.Add(dbSet.Update(entity).Entity);
context.SaveChanges();
var upserted = upsertedEntries.Select(e => e.Entity);
cached.AddRange(upserted);
return upserted;
}
finally
{
cacheLocker.ExitWriteLock();
}
Refresh();
return upsertedEntries;
}
public async Task<IEnumerable<TEntity>> UpsertAsync(IEnumerable<TEntity> entities, CancellationToken token = default)
{
cacheLocker.EnterWriteLock();
try
{
var dbSet = context.Set<TEntity>();
var upsertedEntries = new List<Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry<TEntity>>(entities.Count());
var upsertedEntries = new List<TEntity>(entities.Count());
foreach (var entity in entities)
{
Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry<TEntity> entityEntry;
if (cached.Contains(entity))
{
entityEntry = dbSet.Update(entity);
cached.Remove(entity);
}
else
{
entityEntry = dbSet.Add(entity);
}
upsertedEntries.Add(entityEntry);
}
upsertedEntries.Add(dbSet.Update(entity).Entity);
await context.SaveChangesAsync(token).ConfigureAwait(false);
var upserted = upsertedEntries.Select(e => e.Entity);
cached.AddRange(upserted);
return upserted;
}
finally
{
cacheLocker.ExitWriteLock();
}
await RefreshAsync(token).ConfigureAwait(false);
return upsertedEntries;
}
public void Remove(Func<TEntity, bool> predicate)
{
cacheLocker.EnterWriteLock();
try
{
var dbSet = context.Set<TEntity>();
cached.RemoveAll(e => predicate(e));
@ -397,17 +242,8 @@ namespace AsbCloudInfrastructure.Services.Cache
context.SaveChanges();
return;
}
finally
{
cacheLocker.ExitWriteLock();
}
}
public async Task RemoveAsync(Func<TEntity, bool> predicate, CancellationToken token = default)
{
cacheLocker.EnterWriteLock();
try
{
var dbSet = context.Set<TEntity>();
cached.RemoveAll(e => predicate(e));
@ -415,17 +251,8 @@ namespace AsbCloudInfrastructure.Services.Cache
await context.SaveChangesAsync(token).ConfigureAwait(false);
return;
}
finally
{
cacheLocker.ExitWriteLock();
}
}
public TEntity Insert(TEntity entity)
{
cacheLocker.EnterWriteLock();
try
{
var dbSet = context.Set<TEntity>();
var dbEntity = dbSet.Add(entity).Entity;
@ -433,17 +260,8 @@ namespace AsbCloudInfrastructure.Services.Cache
cached.Add(dbEntity);
return dbEntity;
}
finally
{
cacheLocker.ExitWriteLock();
}
}
public async Task<TEntity> InsertAsync(TEntity entity, CancellationToken token = default)
{
cacheLocker.EnterWriteLock();
try
{
var dbSet = context.Set<TEntity>();
var dbEntity = dbSet.Add(entity).Entity;
@ -451,17 +269,8 @@ namespace AsbCloudInfrastructure.Services.Cache
cached.Add(dbEntity);
return dbEntity;
}
finally
{
cacheLocker.ExitWriteLock();
}
}
public IEnumerable<TEntity> Insert(IEnumerable<TEntity> newEntities)
{
cacheLocker.EnterWriteLock();
try
{
var dbSet = context.Set<TEntity>();
var dbEntities = new List<TEntity>(newEntities.Count());
@ -471,17 +280,8 @@ namespace AsbCloudInfrastructure.Services.Cache
cached.AddRange(dbEntities);
return dbEntities;
}
finally
{
cacheLocker.ExitWriteLock();
}
}
public async Task<IEnumerable<TEntity>> InsertAsync(IEnumerable<TEntity> newEntities, CancellationToken token = default)
{
cacheLocker.EnterWriteLock();
try
{
var dbSet = context.Set<TEntity>();
var dbEntities = new List<TEntity>(newEntities.Count());
@ -491,10 +291,5 @@ namespace AsbCloudInfrastructure.Services.Cache
cached.AddRange(dbEntities);
return dbEntities;
}
finally
{
cacheLocker.ExitWriteLock();
}
}
}
}

View File

@ -9,13 +9,11 @@ namespace AsbCloudInfrastructure.Services
{
public class EventService : IEventService
{
private readonly IAsbCloudDbContext db;
private readonly ITelemetryService telemetryService;
private readonly CacheTable<Event> cacheEvents;
public EventService(IAsbCloudDbContext db, CacheDb cacheDb, ITelemetryService telemetryService)
{
this.db = db;
this.telemetryService = telemetryService;
cacheEvents = cacheDb.GetCachedTable<Event>((AsbCloudDbContext)db);
}
@ -27,30 +25,14 @@ namespace AsbCloudInfrastructure.Services
var telemetryId = telemetryService.GetOrCreateTemetryIdByUid(uid);
var ids = dtos.Select(e => e.Id).ToList();
var dbIds = (from e in db.Events
where e.IdTelemetry == telemetryId && ids.Contains(e.IdEvent)
select e.IdEvent).ToList();
foreach (var dto in dtos)
{
var entity = new Event
var entities = dtos.Select(dto => new Event
{
IdEvent = dto.Id,
IdTelemetry = telemetryId,
IdCategory = dto.IdCategory,
MessageTemplate = dto.Message
};
if (dbIds.Contains(dto.Id))
db.Events.Update(entity);
else
db.Events.Add(entity);
}
db.SaveChanges();
cacheEvents.Refresh();
});
cacheEvents.Upsert(entities);
}
}
}

View File

@ -10,15 +10,13 @@ namespace AsbCloudInfrastructure.Services
{
public class TelemetryUserService : ITelemetryUserService
{
private readonly IAsbCloudDbContext db;
private readonly ITelemetryService telemetryService;
private readonly CacheTable<TelemetryUser> cacheTUsers;
private readonly CacheTable<TelemetryUser> cacheTelemetryUsers;
public TelemetryUserService(IAsbCloudDbContext db, CacheDb cacheDb, ITelemetryService telemetryService)
{
this.db = db;
this.telemetryService = telemetryService;
cacheTUsers = cacheDb.GetCachedTable<TelemetryUser>((AsbCloudDbContext)db);
cacheTelemetryUsers = cacheDb.GetCachedTable<TelemetryUser>((AsbCloudDbContext)db);
}
public void Upsert(string uid, IEnumerable<TelemetryUserDto> dtos)
@ -26,19 +24,9 @@ namespace AsbCloudInfrastructure.Services
if (!dtos.Any())
return;
dtos = dtos.Distinct(new TelemetryUserDtoComparer());
var telemetryId = telemetryService.GetOrCreateTemetryIdByUid(uid);
var ids = dtos.Select(e => e.Id).ToList();
var dbIds = (from e in db.TelemetryUsers
where e.IdTelemetry == telemetryId && ids.Contains(e.IdUser)
select e.IdUser).ToList();
foreach (var dto in dtos)
{
var entity = new TelemetryUser
var entities = dtos.Distinct(new TelemetryUserDtoComparer()).Select(dto => new TelemetryUser
{
IdUser = dto.Id,
IdTelemetry = telemetryId,
@ -46,16 +34,8 @@ namespace AsbCloudInfrastructure.Services
Name = dto.Name,
Patronymic = dto.Patronymic,
Surname = dto.Surname,
};
if (dbIds.Contains(dto.Id))
db.TelemetryUsers.Update(entity);
else
db.TelemetryUsers.Add(entity);
}
db.SaveChanges();
cacheTUsers.Refresh();
});
cacheTelemetryUsers.Upsert(entities);
}
}
}

View File

@ -1,6 +1,11 @@
using AsbCloudDb.Model;
using AsbCloudApp.Data;
using AsbCloudDb.Model;
using AsbCloudInfrastructure.Services;
using AsbCloudInfrastructure.Services.Cache;
using AutoMapper;
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Generic;
namespace ConsoleApp1
{
@ -20,10 +25,46 @@ namespace ConsoleApp1
{
static void Main(string[] args)
{
//var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
// .UseNpgsql("Host=localhost;Database=postgres;Username=postgres;Password=q;Persist Security Info=True")
// .Options;
//var context = new AsbCloudDbContext(options);
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql("Host=localhost;Database=postgres;Username=postgres;Password=q;Persist Security Info=True")
.Options;
var acfg = new MapperConfiguration(cfg => {
cfg.CreateMap<DataSaubBase, DataSaubBaseDto>();
cfg.CreateMap<DataSaubBaseDto, DataSaubBase>();
cfg.CreateMap<Message, TelemetryMessageDto>();
cfg.CreateMap<TelemetryMessageDto, Message>();
cfg.CreateMap<TelemetryInfo, TelemetryInfoDto>();
cfg.CreateMap<TelemetryInfoDto, TelemetryInfo>();
});
var context = new AsbCloudDbContext(options);
var cachedDb = new CacheDb();
var telemetryService = new TelemetryService(context, cachedDb, acfg);
var telemetryUserService = new TelemetryUserService(context, cachedDb, telemetryService);
var tusers = new List<TelemetryUserDto> {
new TelemetryUserDto{
Id=1,
Level=0,
Name="Adminum",
},
new TelemetryUserDto{
Id=2,
Level=0,
Name="Adminum2",
},
};
telemetryUserService.Upsert("123", tusers);
tusers[0].Patronymic = "Trump";
telemetryUserService.Upsert("123", tusers);
//var e = new Event
//{