From 8e6384e46cbb481a676833cac0e4c32f91d44418 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A4=D1=80=D0=BE=D0=BB=D0=BE=D0=B2?= Date: Mon, 4 Oct 2021 15:52:22 +0500 Subject: [PATCH] make cache sinchronized. --- .../Services/Cache/CacheTable.cs | 315 +++++++++++------- 1 file changed, 196 insertions(+), 119 deletions(-) diff --git a/AsbCloudInfrastructure/Services/Cache/CacheTable.cs b/AsbCloudInfrastructure/Services/Cache/CacheTable.cs index c70f7929..f656c10d 100644 --- a/AsbCloudInfrastructure/Services/Cache/CacheTable.cs +++ b/AsbCloudInfrastructure/Services/Cache/CacheTable.cs @@ -31,62 +31,96 @@ namespace AsbCloudInfrastructure.Services.Cache public TEntity this[int index] { get => cached.ElementAt(index); } - //public static void Sync(Action) - - public int Refresh() + /// + /// Runs action like atomic operation. + /// wasFree is action argument indicates that semaphore was free. + /// It may be needed to avoid multiple operations like Refresh(). + /// + /// (wasFree) => {...} + /// false - semaphore.Wait returned by timeout + private static bool Sync(Action action) { var wasFree = semaphore.CurrentCount > 0; - if(!semaphore.Wait(semaphoreTimeout)) - return 0; + if (!semaphore.Wait(semaphoreTimeout)) + return false; try { - if (wasFree) - { - cached.Clear(); - var entities = dbSet.AsNoTracking().ToList(); - cached.AddRange(entities); - data.refreshDate = DateTime.Now; - } - //else - nothing, it was just updated in another thread + action?.Invoke(wasFree); } catch (Exception ex) { - Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.Refresh()"); + Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.Sync()"); Trace.WriteLine(ex.Message); + Trace.WriteLine(ex.StackTrace); } finally { semaphore.Release(); } + return true; + } + + /// + /// Runs action like atomic operation. + /// wasFree is action argument indicates that semaphore was free. + /// It may be needed to avoid multiple operations like Refresh(). + /// + /// (wasFree) => {...} + /// false - semaphore.Wait returned by timeout + private static async Task SyncAsync(Func task, CancellationToken token = default) + { + var wasFree = semaphore.CurrentCount > 0; + if (!await semaphore.WaitAsync(semaphoreTimeout, token).ConfigureAwait(false)) + return false; + try + { + await task?.Invoke(wasFree, token); + } + catch (Exception ex) + { + Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.SyncAsync()"); + Trace.WriteLine(ex.Message); + Trace.WriteLine(ex.StackTrace); + } + finally + { + semaphore.Release(); + } + return true; + } + + private void InternalRefresh() + { + cached.Clear(); + var entities = dbSet.AsNoTracking().ToList(); + cached.AddRange(entities); + data.refreshDate = DateTime.Now; + } + + private async Task InternalRefreshAsync(CancellationToken token = default) + { + cached.Clear(); + var entities = await context.Set().AsNoTracking() + .ToListAsync(token).ConfigureAwait(false); + cached.AddRange(entities); + data.refreshDate = DateTime.Now; + } + + public int Refresh() + { + Sync((wasFree) => { + if (wasFree) + InternalRefresh(); + }); return cached.Count; } public async Task RefreshAsync(CancellationToken token = default) { - var wasFree = semaphore.CurrentCount > 0; - if (!await semaphore.WaitAsync(semaphoreTimeout, token).ConfigureAwait(false)) - return 0; - try - { + await SyncAsync(async (wasFree, token) => { if (wasFree) - { - cached.Clear(); - var entities = await context.Set().AsNoTracking() - .ToListAsync(token).ConfigureAwait(false); - cached.AddRange(entities); - data.refreshDate = DateTime.Now; - } - //else - nothing, it was just updated in another thread - } - catch (Exception ex) - { - Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.Refresh()"); - Trace.WriteLine(ex.Message); - } - finally - { - semaphore.Release(); - } + await InternalRefreshAsync(token).ConfigureAwait(false); + }, token).ConfigureAwait(false); return cached.Count; } @@ -96,6 +130,27 @@ namespace AsbCloudInfrastructure.Services.Cache public async Task ContainsAsync(Func predicate, CancellationToken token = default) => await FirstOrDefaultAsync(predicate, token) != default; + public TEntity GetOrCreate(Func predicate, Func makeNew) + { + TEntity result = default; + Sync(wasFree => { + result = cached.FirstOrDefault(predicate); + if (result != default) + return; + + InternalRefresh(); + result = cached.FirstOrDefault(predicate); + if (result != default) + return; + + var entry = dbSet.Add(makeNew()); + context.SaveChanges(); + InternalRefresh(); + result = entry.Entity; + }); + return result; + } + public TEntity FirstOrDefault() { var result = cached.FirstOrDefault(); @@ -113,8 +168,7 @@ namespace AsbCloudInfrastructure.Services.Cache return result; await RefreshAsync(token); - return cached.FirstOrDefault(); - + return cached.FirstOrDefault(); } public TEntity FirstOrDefault(Func predicate) @@ -152,6 +206,9 @@ namespace AsbCloudInfrastructure.Services.Cache return result; } + public Task> WhereAsync(CancellationToken token = default) => + WhereAsync(default, token); + public async Task> WhereAsync(Func predicate = default, CancellationToken token = default) { @@ -168,109 +225,129 @@ namespace AsbCloudInfrastructure.Services.Cache return result; } - public TEntity Upsert(TEntity entity) + public void Upsert(TEntity entity) { - Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry updated; - if (dbSet.Contains(entity)) - updated = dbSet.Update(entity); - else - updated = dbSet.Add(entity); - context.SaveChanges(); - Refresh(); - return updated.Entity; - } - - public async Task UpsertAsync(TEntity entity, CancellationToken token = default) - { - Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry updated; - if (dbSet.Contains(entity)) - updated = dbSet.Update(entity); - else - updated = dbSet.Add(entity); - await context.SaveChangesAsync(token).ConfigureAwait(false); - await RefreshAsync(token).ConfigureAwait(false); - return updated.Entity; - } - - public IEnumerable Upsert(IEnumerable entities) - { - var upsertedEntries = new List(entities.Count()); - foreach (var entity in entities) + if (entity == default) + return; + Sync((wasFree) => { - Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry updated; - if (dbSet.Contains(entity)) // TODO: это очень ммедленно - updated = dbSet.Update(entity); - else - updated = dbSet.Add(entity); - upsertedEntries.Add(updated.Entity); - } - context.SaveChanges(); - Refresh(); - return upsertedEntries; - } - - public async Task> UpsertAsync(IEnumerable entities, CancellationToken token = default) - { - var upsertedEntries = new List(entities.Count()); - foreach (var entity in entities) - { - Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry updated; if (dbSet.Contains(entity)) - updated = dbSet.Update(entity); + dbSet.Update(entity); else - updated = dbSet.Add(entity); - upsertedEntries.Add(updated.Entity); - } - await context.SaveChangesAsync(token).ConfigureAwait(false); - await RefreshAsync(token).ConfigureAwait(false); - return upsertedEntries; + dbSet.Add(entity); + context.SaveChanges(); + InternalRefresh(); + }); + } + + public Task UpsertAsync(TEntity entity, CancellationToken token = default) + => SyncAsync(async (wasFree, token) => { + if (dbSet.Contains(entity)) + dbSet.Update(entity); + else + dbSet.Add(entity); + await context.SaveChangesAsync(token).ConfigureAwait(false); + await InternalRefreshAsync(token).ConfigureAwait(false); + }, token); + + public void Upsert(IEnumerable entities) + { + if (!entities.Any()) + return; + + Sync((wasFree) => + { + foreach (var entity in entities) + { + if (dbSet.Contains(entity)) // TODO: это очень ммедленно + dbSet.Update(entity); + else + dbSet.Add(entity); + } + context.SaveChanges(); + InternalRefresh(); + }); + } + + public async Task UpsertAsync(IEnumerable entities, CancellationToken token = default) + { + if (!entities.Any()) + return; + + await SyncAsync(async (wasFree, token) => { + var upsertedEntries = new List(entities.Count()); + foreach (var entity in entities) + { + if (dbSet.Contains(entity)) + dbSet.Update(entity); + else + dbSet.Add(entity); + } + await context.SaveChangesAsync(token).ConfigureAwait(false); + await InternalRefreshAsync(token).ConfigureAwait(false); + }, token); } public void Remove(Func predicate) - { - dbSet.RemoveRange(dbSet.Where(predicate)); - context.SaveChanges(); - Refresh(); - return; - } - - public async Task RemoveAsync(Func predicate, CancellationToken token = default) - { - dbSet.RemoveRange(dbSet.Where(predicate)); - await context.SaveChangesAsync(token).ConfigureAwait(false); - await RefreshAsync(token).ConfigureAwait(false); - return; - } + => Sync(_ => + { + dbSet.RemoveRange(dbSet.Where(predicate)); + context.SaveChanges(); + InternalRefresh(); + }); + + public Task RemoveAsync(Func predicate, CancellationToken token = default) + => SyncAsync(async (wasFree, token) => { + dbSet.RemoveRange(dbSet.Where(predicate)); + await context.SaveChangesAsync(token).ConfigureAwait(false); + await InternalRefreshAsync(token).ConfigureAwait(false); + }, token); public TEntity Insert(TEntity entity) { - var entry = dbSet.Add(entity); - context.SaveChanges(); - Refresh(); - return entry.Entity; + TEntity result = default; + Sync(_ => + { + var entry = dbSet.Add(entity); + context.SaveChanges(); + InternalRefresh(); + result = entry.Entity; + }); + return result; } public async Task InsertAsync(TEntity entity, CancellationToken token = default) { - var entry = dbSet.Add(entity); - await context.SaveChangesAsync(token).ConfigureAwait(false); - await RefreshAsync(token).ConfigureAwait(false); - return entry.Entity; + TEntity result = default; + await SyncAsync(async (wasFree, token) => + { + var entry = dbSet.Add(entity); + await context.SaveChangesAsync(token).ConfigureAwait(false); + await InternalRefreshAsync(token).ConfigureAwait(false); + result = entry.Entity; + }, token); + return result; } public int Insert(IEnumerable newEntities) { - dbSet.AddRange(newEntities); - var result = context.SaveChanges(); - Refresh(); + int result = 0; + Sync(_ => { + dbSet.AddRange(newEntities); + result = context.SaveChanges(); + InternalRefresh(); + }); return result; } public async Task InsertAsync(IEnumerable newEntities, CancellationToken token = default) { - dbSet.AddRange(newEntities); - var result = await context.SaveChangesAsync(token).ConfigureAwait(false); - await RefreshAsync(token).ConfigureAwait(false); + int result = 0; + await SyncAsync(async (wasFree, token) => { + dbSet.AddRange(newEntities); + result = await context.SaveChangesAsync(token).ConfigureAwait(false); + await RefreshAsync(token).ConfigureAwait(false); + }, token); return result; }