semaphore

This commit is contained in:
Фролов 2021-10-03 20:08:17 +05:00
parent 031ff8c747
commit cdfcb0b2f7
4 changed files with 220 additions and 188 deletions

View File

@ -1,5 +1,6 @@
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using System; using System;
using System.Collections;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
@ -8,8 +9,8 @@ namespace AsbCloudInfrastructure.Services.Cache
public class CacheDb public class CacheDb
{ {
private readonly ConcurrentDictionary<string, (DateTime, object)> cache = private readonly Dictionary<string, (DateTime, IEnumerable)> cache =
new ConcurrentDictionary<string, (DateTime, object)>(); new Dictionary<string, (DateTime, IEnumerable)>();
private readonly TimeSpan obsolesenceTime = TimeSpan.FromMinutes(15); private readonly TimeSpan obsolesenceTime = TimeSpan.FromMinutes(15);
@ -19,7 +20,7 @@ namespace AsbCloudInfrastructure.Services.Cache
var entityTypeName = typeof(TEntity).FullName; var entityTypeName = typeof(TEntity).FullName;
if (!cache.ContainsKey(entityTypeName)) if (!cache.ContainsKey(entityTypeName))
cache[entityTypeName] = (DateTime.Now, new ConcurrentBag<TEntity>()); cache[entityTypeName] = (DateTime.Now, new List<TEntity>());
bool isCachedDataObsolete = DateTime.Now - cache[entityTypeName].Item1 > obsolesenceTime; bool isCachedDataObsolete = DateTime.Now - cache[entityTypeName].Item1 > obsolesenceTime;

View File

@ -1,229 +1,180 @@
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Collections;
using System.Diagnostics;
namespace AsbCloudInfrastructure.Services.Cache namespace AsbCloudInfrastructure.Services.Cache
{ {
public class CacheTable<TEntity> : IEnumerable<TEntity> public class CacheTable<TEntity> : IEnumerable<TEntity>
where TEntity : class where TEntity : class
{ {
private const int semaphoreTimeout = 5_000;
private static readonly SemaphoreSlim semaphore = new(1);
private readonly DbContext context; private readonly DbContext context;
private (DateTime refreshDate, object entities) data; private (DateTime refreshDate, IEnumerable entities) data;
private readonly ConcurrentBag<TEntity> cached; private readonly List<TEntity> cached;
private readonly DbSet<TEntity> dbSet; private readonly DbSet<TEntity> dbSet;
internal CacheTable(DbContext context, (DateTime refreshDate, object entities) data) internal CacheTable(DbContext context, (DateTime refreshDate, IEnumerable entities) data)
{ {
this.context = context; this.context = context;
this.data = data; this.data = data;
this.cached = (ConcurrentBag<TEntity>)data.entities;
dbSet = context.Set<TEntity>(); dbSet = context.Set<TEntity>();
cached = (List<TEntity>)data.entities;
if (cached.Count == 0)
Refresh();
} }
public TEntity this[int index] { get => cached.ElementAt(index); } public TEntity this[int index] { get => cached.ElementAt(index); }
//public static void Sync(Action)
public int Refresh() public int Refresh()
{
var wasFree = semaphore.CurrentCount > 0;
if(!semaphore.Wait(semaphoreTimeout))
return 0;
try
{
if (wasFree)
{ {
cached.Clear(); cached.Clear();
var entities = dbSet.AsNoTracking().ToList();
var dbEntities = context.Set<TEntity>().AsNoTracking().ToList(); cached.AddRange(entities);
foreach(var e in dbEntities)
cached.Add(e);
data.refreshDate = DateTime.Now; data.refreshDate = DateTime.Now;
}
//else - nothing, it was just updated in another thread
}
catch (Exception ex)
{
Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.Refresh()");
Trace.WriteLine(ex.Message);
}
finally
{
semaphore.Release();
}
return cached.Count; return cached.Count;
} }
public async Task<int> RefreshAsync(CancellationToken token = default) public async Task<int> RefreshAsync(CancellationToken token = default)
{
var wasFree = semaphore.CurrentCount > 0;
if (!await semaphore.WaitAsync(semaphoreTimeout, token).ConfigureAwait(false))
return 0;
try
{
if (wasFree)
{ {
cached.Clear(); cached.Clear();
var entities = await context.Set<TEntity>().AsNoTracking()
var dbEntities = await context.Set<TEntity>().AsNoTracking()
.ToListAsync(token).ConfigureAwait(false); .ToListAsync(token).ConfigureAwait(false);
cached.AddRange(entities);
foreach (var e in dbEntities)
cached.Add(e);
data.refreshDate = DateTime.Now; data.refreshDate = DateTime.Now;
}
//else - nothing, it was just updated in another thread
}
catch (Exception ex)
{
Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.Refresh()");
Trace.WriteLine(ex.Message);
}
finally
{
semaphore.Release();
}
return cached.Count; return cached.Count;
} }
private bool CheckRefresh(RefreshMode refreshMode) public bool Contains(Func<TEntity, bool> predicate)
=> FirstOrDefault(predicate) != default;
public async Task<bool> ContainsAsync(Func<TEntity, bool> predicate, CancellationToken token = default)
=> await FirstOrDefaultAsync(predicate, token) != default;
public TEntity FirstOrDefault()
{ {
if (refreshMode == RefreshMode.Force)
{
Refresh();
return true;
}
if ((refreshMode == RefreshMode.IfResultEmpty) && !cached.Any())
{
Refresh();
return true;
}
return false;
}
private async Task<bool> CheckRefreshAsync(RefreshMode refreshMode, CancellationToken token = default)
{
if (refreshMode == RefreshMode.Force)
{
await RefreshAsync(token);
return true;
}
if (refreshMode == RefreshMode.IfResultEmpty && !cached.Any())
{
await RefreshAsync(token);
return true;
}
return false;
}
public bool Contains(Func<TEntity, bool> predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty)
=> FirstOrDefault(predicate, refreshMode) != default;
public Task<bool> ContainsAsync(Func<TEntity, bool> predicate, CancellationToken token = default)
=> ContainsAsync(predicate, RefreshMode.IfResultEmpty, token);
public async Task<bool> ContainsAsync(Func<TEntity, bool> predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty, CancellationToken token = default)
=> await FirstOrDefaultAsync(predicate, refreshMode, token) != default;
public Task<TEntity> FirstOrDefaultAsync(CancellationToken token = default)
=> FirstOrDefaultAsync(RefreshMode.IfResultEmpty, token);
public TEntity FirstOrDefault(RefreshMode refreshMode = RefreshMode.IfResultEmpty)
{
bool isUpdated = CheckRefresh(refreshMode);
var result = cached.FirstOrDefault(); var result = cached.FirstOrDefault();
if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) if (result != default)
{ return result;
Refresh(); Refresh();
return cached.FirstOrDefault(); return cached.FirstOrDefault();
} }
return result;
}
public async Task<TEntity> FirstOrDefaultAsync(RefreshMode refreshMode = RefreshMode.IfResultEmpty, CancellationToken token = default) public async Task<TEntity> FirstOrDefaultAsync(CancellationToken token = default)
{ {
bool isUpdated = await CheckRefreshAsync(refreshMode, token);
var result = cached.FirstOrDefault(); var result = cached.FirstOrDefault();
if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) if (result != default)
{ return result;
await RefreshAsync(token); await RefreshAsync(token);
return cached.FirstOrDefault(); return cached.FirstOrDefault();
}
return result;
} }
public Task<TEntity> FirstOrDefaultAsync(Func<TEntity, bool> predicate, CancellationToken token = default) public TEntity FirstOrDefault(Func<TEntity, bool> predicate)
=> FirstOrDefaultAsync(predicate, RefreshMode.IfResultEmpty, token);
public TEntity FirstOrDefault(Func<TEntity, bool> predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty)
{ {
bool isUpdated = CheckRefresh(refreshMode);
var result = cached.FirstOrDefault(predicate); var result = cached.FirstOrDefault(predicate);
if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) if (result != default)
{ return result;
Refresh(); Refresh();
return cached.FirstOrDefault(predicate); return cached.FirstOrDefault(predicate);
} }
return result;
}
public async Task<TEntity> FirstOrDefaultAsync(Func<TEntity, bool> predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty, CancellationToken token = default) public async Task<TEntity> FirstOrDefaultAsync(Func<TEntity, bool> predicate, CancellationToken token = default)
{ {
bool isUpdated = await CheckRefreshAsync(refreshMode, token);
var result = cached.FirstOrDefault(predicate); var result = cached.FirstOrDefault(predicate);
if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) if (result != default)
{ return result;
await RefreshAsync(token); await RefreshAsync(token);
return cached.FirstOrDefault(predicate); return cached.FirstOrDefault(predicate);
} }
return result;
}
public Task<IEnumerable<TEntity>> WhereAsync(CancellationToken token = default) public IEnumerable<TEntity> Where(Func<TEntity, bool> predicate = default)
=> WhereAsync(default, RefreshMode.IfResultEmpty, token);
public Task<IEnumerable<TEntity>> WhereAsync(Func<TEntity, bool> predicate, CancellationToken token = default)
=> WhereAsync(predicate, RefreshMode.IfResultEmpty, token);
public IEnumerable<TEntity> Where(Func<TEntity, bool> predicate = default, RefreshMode refreshMode = RefreshMode.IfResultEmpty)
{ {
bool isUpdated = CheckRefresh(refreshMode);
var result = (predicate != default) var result = (predicate != default)
? cached.Where(predicate) ? cached.Where(predicate)
: cached; : cached;
if (!result.Any() && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) if (result.Any())
{ return result;
Refresh(); Refresh();
result = (predicate != default) result = (predicate != default)
? cached.Where(predicate) ? cached.Where(predicate)
: cached; : cached;
}
return result; return result;
} }
public async Task<IEnumerable<TEntity>> WhereAsync(Func<TEntity, bool> predicate = default, public async Task<IEnumerable<TEntity>> WhereAsync(Func<TEntity, bool> predicate = default,
RefreshMode refreshMode = RefreshMode.IfResultEmpty, CancellationToken token = default) CancellationToken token = default)
{ {
bool isUpdated = await CheckRefreshAsync(refreshMode, token);
var result = (predicate != default) var result = (predicate != default)
? cached.Where(predicate) ? cached.Where(predicate)
: cached; : cached;
if (!result.Any() && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) if (result.Any())
{ return result;
await RefreshAsync(token); await RefreshAsync(token);
result = (predicate != default) result = (predicate != default)
? cached.Where(predicate) ? cached.Where(predicate)
: cached; : cached;
}
return result; return result;
} }
//public IEnumerable<TEntity> Mutate(Func<TEntity, bool> predicate,
// Action<TEntity> mutation)
//{
// var dbEntities = dbSet.Where(predicate);
// if (dbEntities.Any())
// {
// foreach (var dbEntity in dbEntities)
// mutation(dbEntity);
// context.SaveChanges();
// }
// var matchedByPredicate = cached.Select(el => predicate(el));
// foreach (var item in matchedByPredicate)
// cached.TryTake(out var t);
// cached = cached.RemoveAll(e => predicate(e));
// foreach (var e in dbEntities)
// cached.Add(e);
// return dbEntities;
//}
//public async Task<IEnumerable<TEntity>> MutateAsync(Func<TEntity,
// bool> predicate, Action<TEntity> mutation, CancellationToken token = default)
//{
// var dbEntities = dbSet.Where(predicate);
// if (dbEntities.Any())
// {
// foreach (var dbEntity in dbEntities)
// mutation(dbEntity);
// await context.SaveChangesAsync(token).ConfigureAwait(false);
// }
// cached.RemoveAll(e => predicate(e));
// foreach (var e in dbEntities)
// cached.Add(e);
// return dbEntities;
//}
public TEntity Upsert(TEntity entity) public TEntity Upsert(TEntity entity)
{ {
var updated = dbSet.Update(entity); Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry<TEntity> updated;
if (dbSet.Contains(entity))
updated = dbSet.Update(entity);
else
updated = dbSet.Add(entity);
context.SaveChanges(); context.SaveChanges();
Refresh(); Refresh();
return updated.Entity; return updated.Entity;
@ -295,7 +246,7 @@ namespace AsbCloudInfrastructure.Services.Cache
{ {
var entry = dbSet.Add(entity); var entry = dbSet.Add(entity);
context.SaveChanges(); context.SaveChanges();
cached.Add(entry.Entity); Refresh();
return entry.Entity; return entry.Entity;
} }
@ -303,34 +254,28 @@ namespace AsbCloudInfrastructure.Services.Cache
{ {
var entry = dbSet.Add(entity); var entry = dbSet.Add(entity);
await context.SaveChangesAsync(token).ConfigureAwait(false); await context.SaveChangesAsync(token).ConfigureAwait(false);
cached.Add(entry.Entity); await RefreshAsync(token).ConfigureAwait(false);
return entry.Entity; return entry.Entity;
} }
public IEnumerable<TEntity> Insert(IEnumerable<TEntity> newEntities) public int Insert(IEnumerable<TEntity> newEntities)
{ {
var dbEntities = new List<TEntity>(newEntities.Count()); dbSet.AddRange(newEntities);
foreach (var item in newEntities) var result = context.SaveChanges();
dbEntities.Add(dbSet.Add(item).Entity); Refresh();
context.SaveChanges(); return result;
foreach (var e in dbEntities)
cached.Add(e);
return dbEntities;
} }
public async Task<IEnumerable<TEntity>> InsertAsync(IEnumerable<TEntity> newEntities, CancellationToken token = default) public async Task<int> InsertAsync(IEnumerable<TEntity> newEntities, CancellationToken token = default)
{ {
var dbEntities = new List<TEntity>(newEntities.Count()); dbSet.AddRange(newEntities);
foreach (var item in newEntities) var result = await context.SaveChangesAsync(token).ConfigureAwait(false);
dbEntities.Add(dbSet.Add(item).Entity); await RefreshAsync(token).ConfigureAwait(false);
await context.SaveChangesAsync(token).ConfigureAwait(false); return result;
foreach (var e in dbEntities)
cached.Add(e);
return dbEntities;
} }
public IEnumerator<TEntity> GetEnumerator() => Where().GetEnumerator(); public IEnumerator<TEntity> GetEnumerator() => Where().GetEnumerator();
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator(); IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
} }
} }

View File

@ -1,4 +0,0 @@
namespace AsbCloudInfrastructure.Services.Cache
{
public enum RefreshMode { None, IfResultEmpty, Force, }
}

View File

@ -9,6 +9,8 @@ using AsbCloudDb.Model;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using AsbCloudInfrastructure.Services.Cache; using AsbCloudInfrastructure.Services.Cache;
using AsbCloudInfrastructure.Services; using AsbCloudInfrastructure.Services;
using System.Threading.Tasks;
using System.Threading;
namespace ConsoleApp1 namespace ConsoleApp1
{ {
@ -17,11 +19,99 @@ namespace ConsoleApp1
// .Options; // .Options;
//var context = new AsbCloudDbContext(options); //var context = new AsbCloudDbContext(options);
class Program class Program
{ {
static object lockObject = new();
static int inc = 0;
static void RefreshMonitor()
{
if (Monitor.TryEnter(lockObject))
{
Task.Delay(2_000);
Interlocked.Increment(ref inc);
}
else
Monitor.Wait(lockObject);
}
//static Mutex mutex = new Mutex();
//static void RefreshMutex()
//{
// if(Mutex.TryOpenExisting()
// {
// }
// Interlocked.Increment(ref inc);
//}
static bool isUnLocked = true;
static void RefreshBool() {
if(isUnLocked)
{
isUnLocked = false;
Console.WriteLine(".");
Task.Delay(50).Wait();
inc++;
isUnLocked = true;
}
while(!isUnLocked)
Task.Delay(10).Wait();
}
static readonly SemaphoreSlim semaphore = new(1);
static void Refresh()
{
var wasFree = semaphore.CurrentCount > 0;
semaphore.Wait();
if (wasFree)
{
Console.WriteLine(".");
Task.Delay(500).Wait();
inc++;
}
semaphore.Release();
}
static async Task RefreshAsync()
{
var wasFree = semaphore.CurrentCount == 1;
await semaphore.WaitAsync();
if (wasFree)
{
Console.WriteLine(".");
await Task.Delay(500);
inc++;
}else
Console.Write(",");
semaphore.Release();
}
static void Main(/*string[] args*/) static void Main(/*string[] args*/)
{ {
Console.WriteLine(DateTime.Now.ToString("mm:ss.fff"));
//semaphore.Release();
Refresh();
for (int i= 0; i < 4; i++)
{
var ts = new List<Task>(100);
for (int j = 0; j < 20; j++)
{
ts.Add(
Task.Run( RefreshAsync));
}
Task.WaitAll(ts.ToArray());
Console.WriteLine("*");
}
Console.WriteLine("_");
Console.ReadKey();
Console.WriteLine(inc);
return;
var options = new DbContextOptionsBuilder<AsbCloudDbContext>() var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
.UseNpgsql("Host=localhost;Database=postgres;Username=postgres;Password=q;Persist Security Info=True") .UseNpgsql("Host=localhost;Database=postgres;Username=postgres;Password=q;Persist Security Info=True")
.Options; .Options;