using Microsoft.EntityFrameworkCore; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Collections; using System.Diagnostics; namespace AsbCloudInfrastructure.Services.Cache { public class CacheTable : IEnumerable where TEntity : class { private const int semaphoreTimeout = 5_000; private static readonly SemaphoreSlim semaphore = new(1); private static readonly TimeSpan minPeriodRefresh = TimeSpan.FromSeconds(5); private static readonly string nameOfTEntity = typeof(TEntity).Name; private readonly CacheTableDataStore data; private readonly Func, IQueryable> configureDbSet; private readonly List cached; private readonly DbContext context; private readonly DbSet dbSet; internal CacheTable(DbContext context, CacheTableDataStore data, ISet includes = null) { this.context = context; this.data = data; dbSet = context.Set(); if (includes?.Any() == true) configureDbSet = (DbSet dbSet) => { IQueryable result = dbSet; foreach (var include in includes) result = result.Include(include); return result; }; cached = (List)data.Entities; if ((cached.Count == 0) || data.IsObsolete) Refresh(false); } internal CacheTable(DbContext context, CacheTableDataStore data, Func, IQueryable> configureDbSet = null) { this.context = context; this.data = data; this.configureDbSet = configureDbSet; dbSet = context.Set(); cached = (List)data.Entities; if ((cached.Count == 0) || data.IsObsolete) Refresh(false); } public TEntity this[int index] { get => cached.ElementAt(index); } /// /// Runs action like atomic operation. /// wasFree is action argument indicates that semaphore was free. /// It may be needed to avoid multiple operations like Refresh(). /// /// (wasFree) => {...} /// default if semaphoreTimeout. Or result of func(..) private static T Sync(Func func) { var wasFree = semaphore.CurrentCount > 0; T result = default; if (func is null || !semaphore.Wait(semaphoreTimeout)) return result; try { result = func.Invoke(wasFree); } catch (Exception ex) { Trace.WriteLine($"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{nameOfTEntity}>.Sync()"); Trace.WriteLine(ex.Message); Trace.WriteLine(ex.StackTrace); } finally { semaphore.Release(); } return result; } /// /// Runs action like atomic operation. /// wasFree is action argument indicates that semaphore was free. /// It may be needed to avoid multiple operations like Refresh(). /// /// (wasFree) => {...} /// default if semaphoreTimeout. Or result of func(..) private static async Task SyncAsync(Func> funcAsync, CancellationToken token = default) { var wasFree = semaphore.CurrentCount > 0; T result = default; if (funcAsync is null || !await semaphore.WaitAsync(semaphoreTimeout, token).ConfigureAwait(false)) return result; try { result = await funcAsync.Invoke(wasFree, token); } catch (Exception ex) { Trace.WriteLine( $"{DateTime.Now:yyyy.MM.dd HH:mm:ss:fff} error in CacheTable<{nameOfTEntity}>.SyncAsync()"); Trace.WriteLine(ex.Message); Trace.WriteLine(ex.StackTrace); } finally { semaphore.Release(); } return result; } private int InternalRefresh(bool force) { if (force || data.LastResreshDate + minPeriodRefresh < DateTime.Now) { cached.Clear(); IQueryable query = configureDbSet is null ? dbSet : configureDbSet(dbSet); var entities = query.AsNoTracking().ToList(); //Trace.WriteLine($"CacheTable<{nameOfTEntity}> refresh"); cached.AddRange(entities); data.LastResreshDate = DateTime.Now; } return cached.Count; } private async Task InternalRefreshAsync(bool force, CancellationToken token = default) { if (force || data.LastResreshDate + minPeriodRefresh < DateTime.Now) { cached.Clear(); IQueryable query = configureDbSet is null ? dbSet : configureDbSet(dbSet); var entities = await query.AsNoTracking() .ToListAsync(token).ConfigureAwait(false); //Trace.WriteLine($"CacheTable<{nameOfTEntity}> refreshAsync"); cached.AddRange(entities); data.LastResreshDate = DateTime.Now; } return cached.Count; } public int Refresh(bool force) => Sync((wasFree) => wasFree ? InternalRefresh(force) : 0); public Task RefreshAsync(bool force, CancellationToken token = default) { return SyncAsync( async (wasFree, token) => { return wasFree ? await InternalRefreshAsync(force, token) : 0; }, token); } public bool Contains(Func predicate) => FirstOrDefault(predicate) != default; public async Task ContainsAsync(Func predicate, CancellationToken token = default) => await FirstOrDefaultAsync(predicate, token) != default; public TEntity GetOrCreate(Func predicate, Func makeNew) => Sync(wasFree => { var result = cached.FirstOrDefault(predicate); if (result != default) return result; InternalRefresh(true); result = cached.FirstOrDefault(predicate); if (result != default) return result; var entry = dbSet.Add(makeNew()); context.SaveChanges(); InternalRefresh(true); return entry.Entity; }); public TEntity FirstOrDefault() { var result = cached.FirstOrDefault(); if (result != default) return result; Refresh(false); return cached.FirstOrDefault(); } public async Task FirstOrDefaultAsync(CancellationToken token = default) { var result = cached.FirstOrDefault(); if (result != default) return result; await RefreshAsync(false, token); return cached.FirstOrDefault(); } public TEntity FirstOrDefault(Func predicate) { var result = cached.FirstOrDefault(predicate); if (result != default) return result; Refresh(false); return cached.FirstOrDefault(predicate); } public async Task FirstOrDefaultAsync(Func predicate, CancellationToken token = default) { var result = cached.FirstOrDefault(predicate); if (result != default) return result; await RefreshAsync(false, token); return cached.FirstOrDefault(predicate); } public IEnumerable Where(Func predicate = default) { var result = (predicate != default) ? cached.Where(predicate) : cached; if (result.Any()) return result; Refresh(false); result = (predicate != default) ? cached.Where(predicate) : cached; return result; } public Task> WhereAsync(CancellationToken token = default) => WhereAsync(default, token); public async Task> WhereAsync(Func predicate = default, CancellationToken token = default) { var result = (predicate != default) ? cached.Where(predicate) : cached; if (result.Any()) return result; await RefreshAsync(false, token); result = (predicate != default) ? cached.Where(predicate) : cached; return result; } public int Upsert(TEntity entity) { if (entity == default) return 0; return Sync((wasFree) => { if (dbSet.Contains(entity)) dbSet.Update(entity); else dbSet.Add(entity); var affected = context.SaveChanges(); if (affected > 0) InternalRefresh(true); return affected; }); } public Task UpsertAsync(TEntity entity, CancellationToken token = default) => SyncAsync(async (wasFree, token) => { if (dbSet.Contains(entity)) dbSet.Update(entity); else dbSet.Add(entity); var affected = await context.SaveChangesAsync(token).ConfigureAwait(false); if (affected > 0) await InternalRefreshAsync(true, token); return affected; }, token); public int Upsert(IEnumerable entities) { if (!entities.Any()) return 0; return Sync((wasFree) => { foreach (var entity in entities) { if (dbSet.Contains(entity)) // TODO: это очень медленно dbSet.Update(entity); else dbSet.Add(entity); } var affected = context.SaveChanges(); if (affected > 0) InternalRefresh(true); return affected; }); } public Task UpsertAsync(IEnumerable entities, CancellationToken token = default) { if (!entities.Any()) return Task.FromResult(0); return SyncAsync(async (wasFree, token) => { var upsertedEntries = new List(entities.Count()); foreach (var entity in entities) { if (dbSet.Contains(entity)) dbSet.Update(entity); else dbSet.Add(entity); } var affected = await context.SaveChangesAsync(token).ConfigureAwait(false); if (affected > 0) await InternalRefreshAsync(true, token); return affected; }, token); } public int Remove(Func predicate) => Sync(_ => { dbSet.RemoveRange(dbSet.Where(predicate)); var affected = context.SaveChanges(); if (affected > 0) InternalRefresh(true); return affected; }); public Task RemoveAsync(Func predicate, CancellationToken token = default) => SyncAsync(async (wasFree, token) => { dbSet.RemoveRange(dbSet.Where(predicate)); var affected = await context.SaveChangesAsync(token).ConfigureAwait(false); if (affected > 0) await InternalRefreshAsync(true, token); return affected; }, token); public TEntity Insert(TEntity entity) { return Sync(_ => { var entry = dbSet.Add(entity); var affected = context.SaveChanges(); if (affected > 0) InternalRefresh(true); return entry.Entity; }); } public Task InsertAsync(TEntity entity, CancellationToken token = default) { return SyncAsync(async (wasFree, token) => { var entry = dbSet.Add(entity); var affected = await context.SaveChangesAsync(token).ConfigureAwait(false); if (affected > 0) await InternalRefreshAsync(true, token); return entry.Entity; }, token); } public IEnumerable Insert(IEnumerable newEntities) { if (newEntities is null) return null; var count = newEntities.Count(); if (count == 0) return null; return Sync(_ => { var entries = new List>(count); foreach (var newEntity in newEntities) { var entry = dbSet.Add(newEntity); entries.Add(entry); } var affected = context.SaveChanges(); if (affected > 0) InternalRefresh(true); else return null; return entries.Select(e => e.Entity); }); } public Task> InsertAsync(IEnumerable newEntities, CancellationToken token = default) { if(newEntities is null) return null; var count = newEntities.Count(); if (count == 0) return null; return SyncAsync(async (wasFree, token) => { var entries = new List>(count); foreach (var newEntity in newEntities) { var entry = dbSet.Add(newEntity); entries.Add(entry); } var affected = await context.SaveChangesAsync(token).ConfigureAwait(false); if (affected > 0) await InternalRefreshAsync(true, token); else return null; return entries.Select(e => e.Entity); }, token); } public IEnumerator GetEnumerator() => Where().GetEnumerator(); IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); } }