From cdfcb0b2f7e51a35cfe816585bcadfec5b8b6034 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A4=D1=80=D0=BE=D0=BB=D0=BE=D0=B2?= Date: Sun, 3 Oct 2021 20:08:17 +0500 Subject: [PATCH] semaphore --- .../Services/Cache/CacheDb.cs | 7 +- .../Services/Cache/CacheTable.cs | 303 +++++++----------- .../Services/Cache/RefreshMode.cs | 4 - ConsoleApp1/Program.cs | 94 +++++- 4 files changed, 220 insertions(+), 188 deletions(-) delete mode 100644 AsbCloudInfrastructure/Services/Cache/RefreshMode.cs diff --git a/AsbCloudInfrastructure/Services/Cache/CacheDb.cs b/AsbCloudInfrastructure/Services/Cache/CacheDb.cs index a6c5baec..dcd27a58 100644 --- a/AsbCloudInfrastructure/Services/Cache/CacheDb.cs +++ b/AsbCloudInfrastructure/Services/Cache/CacheDb.cs @@ -1,5 +1,6 @@ using Microsoft.EntityFrameworkCore; using System; +using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; @@ -8,8 +9,8 @@ namespace AsbCloudInfrastructure.Services.Cache public class CacheDb { - private readonly ConcurrentDictionary cache = - new ConcurrentDictionary(); + private readonly Dictionary cache = + new Dictionary(); private readonly TimeSpan obsolesenceTime = TimeSpan.FromMinutes(15); @@ -19,7 +20,7 @@ namespace AsbCloudInfrastructure.Services.Cache var entityTypeName = typeof(TEntity).FullName; if (!cache.ContainsKey(entityTypeName)) - cache[entityTypeName] = (DateTime.Now, new ConcurrentBag()); + cache[entityTypeName] = (DateTime.Now, new List()); bool isCachedDataObsolete = DateTime.Now - cache[entityTypeName].Item1 > obsolesenceTime; diff --git a/AsbCloudInfrastructure/Services/Cache/CacheTable.cs b/AsbCloudInfrastructure/Services/Cache/CacheTable.cs index e3155e70..c70f7929 100644 --- a/AsbCloudInfrastructure/Services/Cache/CacheTable.cs +++ b/AsbCloudInfrastructure/Services/Cache/CacheTable.cs @@ -1,229 +1,180 @@ using Microsoft.EntityFrameworkCore; using System; using System.Collections.Generic; -using System.Collections.Concurrent; using System.Linq; using System.Threading; using System.Threading.Tasks; +using System.Collections; +using System.Diagnostics; namespace AsbCloudInfrastructure.Services.Cache { public class CacheTable : IEnumerable where TEntity : class { + private const int semaphoreTimeout = 5_000; + private static readonly SemaphoreSlim semaphore = new(1); private readonly DbContext context; - private (DateTime refreshDate, object entities) data; - private readonly ConcurrentBag cached; + private (DateTime refreshDate, IEnumerable entities) data; + private readonly List cached; private readonly DbSet dbSet; - internal CacheTable(DbContext context, (DateTime refreshDate, object entities) data) + internal CacheTable(DbContext context, (DateTime refreshDate, IEnumerable entities) data) { this.context = context; this.data = data; - this.cached = (ConcurrentBag)data.entities; dbSet = context.Set(); + cached = (List)data.entities; + if (cached.Count == 0) + Refresh(); } public TEntity this[int index] { get => cached.ElementAt(index); } + //public static void Sync(Action) + public int Refresh() { - cached.Clear(); - - var dbEntities = context.Set().AsNoTracking().ToList(); - foreach(var e in dbEntities) - cached.Add(e); - data.refreshDate = DateTime.Now; + var wasFree = semaphore.CurrentCount > 0; + if(!semaphore.Wait(semaphoreTimeout)) + return 0; + try + { + if (wasFree) + { + cached.Clear(); + var entities = dbSet.AsNoTracking().ToList(); + cached.AddRange(entities); + data.refreshDate = DateTime.Now; + } + //else - nothing, it was just updated in another thread + } + catch (Exception ex) + { + Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.Refresh()"); + Trace.WriteLine(ex.Message); + } + finally + { + semaphore.Release(); + } return cached.Count; } public async Task RefreshAsync(CancellationToken token = default) { - cached.Clear(); - - var dbEntities = await context.Set().AsNoTracking() - .ToListAsync(token).ConfigureAwait(false); - - foreach (var e in dbEntities) - cached.Add(e); - data.refreshDate = DateTime.Now; + var wasFree = semaphore.CurrentCount > 0; + if (!await semaphore.WaitAsync(semaphoreTimeout, token).ConfigureAwait(false)) + return 0; + try + { + if (wasFree) + { + cached.Clear(); + var entities = await context.Set().AsNoTracking() + .ToListAsync(token).ConfigureAwait(false); + cached.AddRange(entities); + data.refreshDate = DateTime.Now; + } + //else - nothing, it was just updated in another thread + } + catch (Exception ex) + { + Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{typeof(TEntity).Name}>.Refresh()"); + Trace.WriteLine(ex.Message); + } + finally + { + semaphore.Release(); + } return cached.Count; } - private bool CheckRefresh(RefreshMode refreshMode) + public bool Contains(Func predicate) + => FirstOrDefault(predicate) != default; + + public async Task ContainsAsync(Func predicate, CancellationToken token = default) + => await FirstOrDefaultAsync(predicate, token) != default; + + public TEntity FirstOrDefault() { - if (refreshMode == RefreshMode.Force) - { - Refresh(); - return true; - } - - if ((refreshMode == RefreshMode.IfResultEmpty) && !cached.Any()) - { - Refresh(); - return true; - } - - return false; - } - - private async Task CheckRefreshAsync(RefreshMode refreshMode, CancellationToken token = default) - { - if (refreshMode == RefreshMode.Force) - { - await RefreshAsync(token); - return true; - } - - if (refreshMode == RefreshMode.IfResultEmpty && !cached.Any()) - { - await RefreshAsync(token); - return true; - } - - return false; - } - - public bool Contains(Func predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty) - => FirstOrDefault(predicate, refreshMode) != default; - - public Task ContainsAsync(Func predicate, CancellationToken token = default) - => ContainsAsync(predicate, RefreshMode.IfResultEmpty, token); - - public async Task ContainsAsync(Func predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty, CancellationToken token = default) - => await FirstOrDefaultAsync(predicate, refreshMode, token) != default; - - public Task FirstOrDefaultAsync(CancellationToken token = default) - => FirstOrDefaultAsync(RefreshMode.IfResultEmpty, token); - - public TEntity FirstOrDefault(RefreshMode refreshMode = RefreshMode.IfResultEmpty) - { - bool isUpdated = CheckRefresh(refreshMode); var result = cached.FirstOrDefault(); - if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) - { - Refresh(); - return cached.FirstOrDefault(); - } - return result; + if (result != default) + return result; + + Refresh(); + return cached.FirstOrDefault(); } - public async Task FirstOrDefaultAsync(RefreshMode refreshMode = RefreshMode.IfResultEmpty, CancellationToken token = default) + public async Task FirstOrDefaultAsync(CancellationToken token = default) { - bool isUpdated = await CheckRefreshAsync(refreshMode, token); var result = cached.FirstOrDefault(); - if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) - { - await RefreshAsync(token); - return cached.FirstOrDefault(); - } - return result; + if (result != default) + return result; + + await RefreshAsync(token); + return cached.FirstOrDefault(); + } - public Task FirstOrDefaultAsync(Func predicate, CancellationToken token = default) - => FirstOrDefaultAsync(predicate, RefreshMode.IfResultEmpty, token); - - public TEntity FirstOrDefault(Func predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty) + public TEntity FirstOrDefault(Func predicate) { - bool isUpdated = CheckRefresh(refreshMode); var result = cached.FirstOrDefault(predicate); - if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) - { - Refresh(); - return cached.FirstOrDefault(predicate); - } - return result; + if (result != default) + return result; + + Refresh(); + return cached.FirstOrDefault(predicate); } - public async Task FirstOrDefaultAsync(Func predicate, RefreshMode refreshMode = RefreshMode.IfResultEmpty, CancellationToken token = default) + public async Task FirstOrDefaultAsync(Func predicate, CancellationToken token = default) { - bool isUpdated = await CheckRefreshAsync(refreshMode, token); var result = cached.FirstOrDefault(predicate); - if (result == default && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) - { - await RefreshAsync(token); - return cached.FirstOrDefault(predicate); - } - return result; + if (result != default) + return result; + + await RefreshAsync(token); + return cached.FirstOrDefault(predicate); } - public Task> WhereAsync(CancellationToken token = default) - => WhereAsync(default, RefreshMode.IfResultEmpty, token); - - public Task> WhereAsync(Func predicate, CancellationToken token = default) - => WhereAsync(predicate, RefreshMode.IfResultEmpty, token); - - public IEnumerable Where(Func predicate = default, RefreshMode refreshMode = RefreshMode.IfResultEmpty) + public IEnumerable Where(Func predicate = default) { - bool isUpdated = CheckRefresh(refreshMode); var result = (predicate != default) ? cached.Where(predicate) : cached; - if (!result.Any() && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) - { - Refresh(); - result = (predicate != default) - ? cached.Where(predicate) - : cached; - } + if (result.Any()) + return result; + + Refresh(); + result = (predicate != default) + ? cached.Where(predicate) + : cached; return result; } public async Task> WhereAsync(Func predicate = default, - RefreshMode refreshMode = RefreshMode.IfResultEmpty, CancellationToken token = default) + CancellationToken token = default) { - bool isUpdated = await CheckRefreshAsync(refreshMode, token); var result = (predicate != default) ? cached.Where(predicate) : cached; - if (!result.Any() && refreshMode == RefreshMode.IfResultEmpty && !isUpdated) - { - await RefreshAsync(token); - result = (predicate != default) - ? cached.Where(predicate) - : cached; - } + if (result.Any()) + return result; + + await RefreshAsync(token); + result = (predicate != default) + ? cached.Where(predicate) + : cached; return result; } - //public IEnumerable Mutate(Func predicate, - // Action mutation) - //{ - // var dbEntities = dbSet.Where(predicate); - // if (dbEntities.Any()) - // { - // foreach (var dbEntity in dbEntities) - // mutation(dbEntity); - // context.SaveChanges(); - // } - // var matchedByPredicate = cached.Select(el => predicate(el)); - // foreach (var item in matchedByPredicate) - // cached.TryTake(out var t); - // cached = cached.RemoveAll(e => predicate(e)); - // foreach (var e in dbEntities) - // cached.Add(e); - // return dbEntities; - //} - - //public async Task> MutateAsync(Func predicate, Action mutation, CancellationToken token = default) - //{ - // var dbEntities = dbSet.Where(predicate); - // if (dbEntities.Any()) - // { - // foreach (var dbEntity in dbEntities) - // mutation(dbEntity); - // await context.SaveChangesAsync(token).ConfigureAwait(false); - // } - // cached.RemoveAll(e => predicate(e)); - // foreach (var e in dbEntities) - // cached.Add(e); - // return dbEntities; - //} - public TEntity Upsert(TEntity entity) { - var updated = dbSet.Update(entity); + Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry updated; + if (dbSet.Contains(entity)) + updated = dbSet.Update(entity); + else + updated = dbSet.Add(entity); context.SaveChanges(); Refresh(); return updated.Entity; @@ -295,7 +246,7 @@ namespace AsbCloudInfrastructure.Services.Cache { var entry = dbSet.Add(entity); context.SaveChanges(); - cached.Add(entry.Entity); + Refresh(); return entry.Entity; } @@ -303,34 +254,28 @@ namespace AsbCloudInfrastructure.Services.Cache { var entry = dbSet.Add(entity); await context.SaveChangesAsync(token).ConfigureAwait(false); - cached.Add(entry.Entity); + await RefreshAsync(token).ConfigureAwait(false); return entry.Entity; } - public IEnumerable Insert(IEnumerable newEntities) + public int Insert(IEnumerable newEntities) { - var dbEntities = new List(newEntities.Count()); - foreach (var item in newEntities) - dbEntities.Add(dbSet.Add(item).Entity); - context.SaveChanges(); - foreach (var e in dbEntities) - cached.Add(e); - return dbEntities; + dbSet.AddRange(newEntities); + var result = context.SaveChanges(); + Refresh(); + return result; } - public async Task> InsertAsync(IEnumerable newEntities, CancellationToken token = default) + public async Task InsertAsync(IEnumerable newEntities, CancellationToken token = default) { - var dbEntities = new List(newEntities.Count()); - foreach (var item in newEntities) - dbEntities.Add(dbSet.Add(item).Entity); - await context.SaveChangesAsync(token).ConfigureAwait(false); - foreach (var e in dbEntities) - cached.Add(e); - return dbEntities; + dbSet.AddRange(newEntities); + var result = await context.SaveChangesAsync(token).ConfigureAwait(false); + await RefreshAsync(token).ConfigureAwait(false); + return result; } public IEnumerator GetEnumerator() => Where().GetEnumerator(); - System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); } } diff --git a/AsbCloudInfrastructure/Services/Cache/RefreshMode.cs b/AsbCloudInfrastructure/Services/Cache/RefreshMode.cs deleted file mode 100644 index bd60b4ee..00000000 --- a/AsbCloudInfrastructure/Services/Cache/RefreshMode.cs +++ /dev/null @@ -1,4 +0,0 @@ -namespace AsbCloudInfrastructure.Services.Cache -{ - public enum RefreshMode { None, IfResultEmpty, Force, } -} \ No newline at end of file diff --git a/ConsoleApp1/Program.cs b/ConsoleApp1/Program.cs index efc26bc4..aef3eb20 100644 --- a/ConsoleApp1/Program.cs +++ b/ConsoleApp1/Program.cs @@ -9,6 +9,8 @@ using AsbCloudDb.Model; using Microsoft.EntityFrameworkCore; using AsbCloudInfrastructure.Services.Cache; using AsbCloudInfrastructure.Services; +using System.Threading.Tasks; +using System.Threading; namespace ConsoleApp1 { @@ -17,11 +19,99 @@ namespace ConsoleApp1 // .Options; //var context = new AsbCloudDbContext(options); - class Program - { + { + static object lockObject = new(); + static int inc = 0; + static void RefreshMonitor() + { + if (Monitor.TryEnter(lockObject)) + { + Task.Delay(2_000); + Interlocked.Increment(ref inc); + } + else + Monitor.Wait(lockObject); + + } + + //static Mutex mutex = new Mutex(); + //static void RefreshMutex() + //{ + // if(Mutex.TryOpenExisting() + // { + + // } + + // Interlocked.Increment(ref inc); + //} + + static bool isUnLocked = true; + static void RefreshBool() { + if(isUnLocked) + { + isUnLocked = false; + Console.WriteLine("."); + Task.Delay(50).Wait(); + inc++; + isUnLocked = true; + } + while(!isUnLocked) + Task.Delay(10).Wait(); + } + + static readonly SemaphoreSlim semaphore = new(1); + static void Refresh() + { + var wasFree = semaphore.CurrentCount > 0; + semaphore.Wait(); + if (wasFree) + { + Console.WriteLine("."); + Task.Delay(500).Wait(); + inc++; + } + semaphore.Release(); + } + + static async Task RefreshAsync() + { + var wasFree = semaphore.CurrentCount == 1; + await semaphore.WaitAsync(); + if (wasFree) + { + Console.WriteLine("."); + await Task.Delay(500); + inc++; + }else + Console.Write(","); + semaphore.Release(); + } + static void Main(/*string[] args*/) { + + Console.WriteLine(DateTime.Now.ToString("mm:ss.fff")); + //semaphore.Release(); + Refresh(); + for (int i= 0; i < 4; i++) + { + var ts = new List(100); + for (int j = 0; j < 20; j++) + { + ts.Add( + Task.Run( RefreshAsync)); + } + Task.WaitAll(ts.ToArray()); + Console.WriteLine("*"); + } + + Console.WriteLine("_"); + Console.ReadKey(); + Console.WriteLine(inc); + return; + + var options = new DbContextOptionsBuilder() .UseNpgsql("Host=localhost;Database=postgres;Username=postgres;Password=q;Persist Security Info=True") .Options;