make cache sinchronized.

This commit is contained in:
Фролов 2021-10-04 15:52:22 +05:00
parent cdfcb0b2f7
commit 8e6384e46c

View File

@ -31,62 +31,96 @@ namespace AsbCloudInfrastructure.Services.Cache
public TEntity this[int index] { get => cached.ElementAt(index); } public TEntity this[int index] { get => cached.ElementAt(index); }
//public static void Sync(Action) /// <summary>
/// Runs action like atomic operation.
public int Refresh() /// wasFree is action argument indicates that semaphore was free.
/// It may be needed to avoid multiple operations like Refresh().
/// </summary>
/// <param name="action">(wasFree) => {...}</param>
/// <returns>false - semaphore.Wait returned by timeout</returns>
private static bool Sync(Action<bool> action)
{ {
var wasFree = semaphore.CurrentCount > 0; var wasFree = semaphore.CurrentCount > 0;
if(!semaphore.Wait(semaphoreTimeout)) if (!semaphore.Wait(semaphoreTimeout))
return 0; return false;
try try
{ {
if (wasFree) action?.Invoke(wasFree);
{
cached.Clear();
var entities = dbSet.AsNoTracking().ToList();
cached.AddRange(entities);
data.refreshDate = DateTime.Now;
}
//else - nothing, it was just updated in another thread
} }
catch (Exception ex) catch (Exception ex)
{ {
Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.Refresh()"); Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.Sync()");
Trace.WriteLine(ex.Message); Trace.WriteLine(ex.Message);
Trace.WriteLine(ex.StackTrace);
} }
finally finally
{ {
semaphore.Release(); semaphore.Release();
} }
return true;
}
/// <summary>
/// Runs action like atomic operation.
/// wasFree is action argument indicates that semaphore was free.
/// It may be needed to avoid multiple operations like Refresh().
/// </summary>
/// <param name="action">(wasFree) => {...}</param>
/// <returns>false - semaphore.Wait returned by timeout</returns>
private static async Task<bool> SyncAsync(Func<bool, CancellationToken, Task> task, CancellationToken token = default)
{
var wasFree = semaphore.CurrentCount > 0;
if (!await semaphore.WaitAsync(semaphoreTimeout, token).ConfigureAwait(false))
return false;
try
{
await task?.Invoke(wasFree, token);
}
catch (Exception ex)
{
Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.SyncAsync()");
Trace.WriteLine(ex.Message);
Trace.WriteLine(ex.StackTrace);
}
finally
{
semaphore.Release();
}
return true;
}
private void InternalRefresh()
{
cached.Clear();
var entities = dbSet.AsNoTracking().ToList();
cached.AddRange(entities);
data.refreshDate = DateTime.Now;
}
private async Task InternalRefreshAsync(CancellationToken token = default)
{
cached.Clear();
var entities = await context.Set<TEntity>().AsNoTracking()
.ToListAsync(token).ConfigureAwait(false);
cached.AddRange(entities);
data.refreshDate = DateTime.Now;
}
public int Refresh()
{
Sync((wasFree) => {
if (wasFree)
InternalRefresh();
});
return cached.Count; return cached.Count;
} }
public async Task<int> RefreshAsync(CancellationToken token = default) public async Task<int> RefreshAsync(CancellationToken token = default)
{ {
var wasFree = semaphore.CurrentCount > 0; await SyncAsync(async (wasFree, token) => {
if (!await semaphore.WaitAsync(semaphoreTimeout, token).ConfigureAwait(false))
return 0;
try
{
if (wasFree) if (wasFree)
{ await InternalRefreshAsync(token).ConfigureAwait(false);
cached.Clear(); }, token).ConfigureAwait(false);
var entities = await context.Set<TEntity>().AsNoTracking()
.ToListAsync(token).ConfigureAwait(false);
cached.AddRange(entities);
data.refreshDate = DateTime.Now;
}
//else - nothing, it was just updated in another thread
}
catch (Exception ex)
{
Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.Refresh()");
Trace.WriteLine(ex.Message);
}
finally
{
semaphore.Release();
}
return cached.Count; return cached.Count;
} }
@ -96,6 +130,27 @@ namespace AsbCloudInfrastructure.Services.Cache
public async Task<bool> ContainsAsync(Func<TEntity, bool> predicate, CancellationToken token = default) public async Task<bool> ContainsAsync(Func<TEntity, bool> predicate, CancellationToken token = default)
=> await FirstOrDefaultAsync(predicate, token) != default; => await FirstOrDefaultAsync(predicate, token) != default;
public TEntity GetOrCreate(Func<TEntity, bool> predicate, Func<TEntity> makeNew)
{
TEntity result = default;
Sync(wasFree => {
result = cached.FirstOrDefault(predicate);
if (result != default)
return;
InternalRefresh();
result = cached.FirstOrDefault(predicate);
if (result != default)
return;
var entry = dbSet.Add(makeNew());
context.SaveChanges();
InternalRefresh();
result = entry.Entity;
});
return result;
}
public TEntity FirstOrDefault() public TEntity FirstOrDefault()
{ {
var result = cached.FirstOrDefault(); var result = cached.FirstOrDefault();
@ -113,8 +168,7 @@ namespace AsbCloudInfrastructure.Services.Cache
return result; return result;
await RefreshAsync(token); await RefreshAsync(token);
return cached.FirstOrDefault(); return cached.FirstOrDefault();
} }
public TEntity FirstOrDefault(Func<TEntity, bool> predicate) public TEntity FirstOrDefault(Func<TEntity, bool> predicate)
@ -152,6 +206,9 @@ namespace AsbCloudInfrastructure.Services.Cache
return result; return result;
} }
public Task<IEnumerable<TEntity>> WhereAsync(CancellationToken token = default) =>
WhereAsync(default, token);
public async Task<IEnumerable<TEntity>> WhereAsync(Func<TEntity, bool> predicate = default, public async Task<IEnumerable<TEntity>> WhereAsync(Func<TEntity, bool> predicate = default,
CancellationToken token = default) CancellationToken token = default)
{ {
@ -168,109 +225,129 @@ namespace AsbCloudInfrastructure.Services.Cache
return result; return result;
} }
public TEntity Upsert(TEntity entity) public void Upsert(TEntity entity)
{ {
Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry<TEntity> updated; if (entity == default)
if (dbSet.Contains(entity)) return;
updated = dbSet.Update(entity); Sync((wasFree) =>
else
updated = dbSet.Add(entity);
context.SaveChanges();
Refresh();
return updated.Entity;
}
public async Task<TEntity> UpsertAsync(TEntity entity, CancellationToken token = default)
{
Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry<TEntity> updated;
if (dbSet.Contains(entity))
updated = dbSet.Update(entity);
else
updated = dbSet.Add(entity);
await context.SaveChangesAsync(token).ConfigureAwait(false);
await RefreshAsync(token).ConfigureAwait(false);
return updated.Entity;
}
public IEnumerable<TEntity> Upsert(IEnumerable<TEntity> entities)
{
var upsertedEntries = new List<TEntity>(entities.Count());
foreach (var entity in entities)
{ {
Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry<TEntity> updated;
if (dbSet.Contains(entity)) // TODO: это очень ммедленно
updated = dbSet.Update(entity);
else
updated = dbSet.Add(entity);
upsertedEntries.Add(updated.Entity);
}
context.SaveChanges();
Refresh();
return upsertedEntries;
}
public async Task<IEnumerable<TEntity>> UpsertAsync(IEnumerable<TEntity> entities, CancellationToken token = default)
{
var upsertedEntries = new List<TEntity>(entities.Count());
foreach (var entity in entities)
{
Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry<TEntity> updated;
if (dbSet.Contains(entity)) if (dbSet.Contains(entity))
updated = dbSet.Update(entity); dbSet.Update(entity);
else else
updated = dbSet.Add(entity); dbSet.Add(entity);
upsertedEntries.Add(updated.Entity); context.SaveChanges();
} InternalRefresh();
await context.SaveChangesAsync(token).ConfigureAwait(false); });
await RefreshAsync(token).ConfigureAwait(false); }
return upsertedEntries;
public Task UpsertAsync(TEntity entity, CancellationToken token = default)
=> SyncAsync(async (wasFree, token) => {
if (dbSet.Contains(entity))
dbSet.Update(entity);
else
dbSet.Add(entity);
await context.SaveChangesAsync(token).ConfigureAwait(false);
await InternalRefreshAsync(token).ConfigureAwait(false);
}, token);
public void Upsert(IEnumerable<TEntity> entities)
{
if (!entities.Any())
return;
Sync((wasFree) =>
{
foreach (var entity in entities)
{
if (dbSet.Contains(entity)) // TODO: это очень ммедленно
dbSet.Update(entity);
else
dbSet.Add(entity);
}
context.SaveChanges();
InternalRefresh();
});
}
public async Task UpsertAsync(IEnumerable<TEntity> entities, CancellationToken token = default)
{
if (!entities.Any())
return;
await SyncAsync(async (wasFree, token) => {
var upsertedEntries = new List<TEntity>(entities.Count());
foreach (var entity in entities)
{
if (dbSet.Contains(entity))
dbSet.Update(entity);
else
dbSet.Add(entity);
}
await context.SaveChangesAsync(token).ConfigureAwait(false);
await InternalRefreshAsync(token).ConfigureAwait(false);
}, token);
} }
public void Remove(Func<TEntity, bool> predicate) public void Remove(Func<TEntity, bool> predicate)
{ => Sync(_ =>
dbSet.RemoveRange(dbSet.Where(predicate)); {
context.SaveChanges(); dbSet.RemoveRange(dbSet.Where(predicate));
Refresh(); context.SaveChanges();
return; InternalRefresh();
} });
public async Task RemoveAsync(Func<TEntity, bool> predicate, CancellationToken token = default) public Task RemoveAsync(Func<TEntity, bool> predicate, CancellationToken token = default)
{ => SyncAsync(async (wasFree, token) => {
dbSet.RemoveRange(dbSet.Where(predicate)); dbSet.RemoveRange(dbSet.Where(predicate));
await context.SaveChangesAsync(token).ConfigureAwait(false); await context.SaveChangesAsync(token).ConfigureAwait(false);
await RefreshAsync(token).ConfigureAwait(false); await InternalRefreshAsync(token).ConfigureAwait(false);
return; }, token);
}
public TEntity Insert(TEntity entity) public TEntity Insert(TEntity entity)
{ {
var entry = dbSet.Add(entity); TEntity result = default;
context.SaveChanges(); Sync(_ =>
Refresh(); {
return entry.Entity; var entry = dbSet.Add(entity);
context.SaveChanges();
InternalRefresh();
result = entry.Entity;
});
return result;
} }
public async Task<TEntity> InsertAsync(TEntity entity, CancellationToken token = default) public async Task<TEntity> InsertAsync(TEntity entity, CancellationToken token = default)
{ {
var entry = dbSet.Add(entity); TEntity result = default;
await context.SaveChangesAsync(token).ConfigureAwait(false); await SyncAsync(async (wasFree, token) =>
await RefreshAsync(token).ConfigureAwait(false); {
return entry.Entity; var entry = dbSet.Add(entity);
await context.SaveChangesAsync(token).ConfigureAwait(false);
await InternalRefreshAsync(token).ConfigureAwait(false);
result = entry.Entity;
}, token);
return result;
} }
public int Insert(IEnumerable<TEntity> newEntities) public int Insert(IEnumerable<TEntity> newEntities)
{ {
dbSet.AddRange(newEntities); int result = 0;
var result = context.SaveChanges(); Sync(_ => {
Refresh(); dbSet.AddRange(newEntities);
result = context.SaveChanges();
InternalRefresh();
});
return result; return result;
} }
public async Task<int> InsertAsync(IEnumerable<TEntity> newEntities, CancellationToken token = default) public async Task<int> InsertAsync(IEnumerable<TEntity> newEntities, CancellationToken token = default)
{ {
dbSet.AddRange(newEntities); int result = 0;
var result = await context.SaveChangesAsync(token).ConfigureAwait(false); await SyncAsync(async (wasFree, token) => {
await RefreshAsync(token).ConfigureAwait(false); dbSet.AddRange(newEntities);
result = await context.SaveChangesAsync(token).ConfigureAwait(false);
await RefreshAsync(token).ConfigureAwait(false);
}, token);
return result; return result;
} }