forked from ddrilling/AsbCloudServer
Add new ef layer 2 cache
This commit is contained in:
parent
213675c5a9
commit
6fb82e7333
247
AsbCloudInfrastructure/Services/Cache/EfCacheL2.cs
Normal file
247
AsbCloudInfrastructure/Services/Cache/EfCacheL2.cs
Normal file
@ -0,0 +1,247 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace AsbCloudInfrastructure.Services.Cache
|
||||||
|
{
|
||||||
|
#nullable enable
|
||||||
|
public static class EfCacheL2
|
||||||
|
{
|
||||||
|
public static int RequestsToDb = 0;
|
||||||
|
|
||||||
|
static readonly ConcurrentDictionary<string, Lazy<CacheItem>> caches = new(1, 16);
|
||||||
|
|
||||||
|
private const int semaphoreTimeout = 25_000;
|
||||||
|
private static readonly SemaphoreSlim semaphore = new(1);
|
||||||
|
|
||||||
|
private struct CacheItem
|
||||||
|
{
|
||||||
|
public readonly IEnumerable Data;
|
||||||
|
public readonly DateTime DateObsolete;
|
||||||
|
|
||||||
|
public CacheItem(IEnumerable data, TimeSpan obsolescence)
|
||||||
|
{
|
||||||
|
DateObsolete = DateTime.Now + obsolescence;
|
||||||
|
Data = data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static CacheItem GetOrAddCache(string tag, Func<CacheItem> valueFactory)
|
||||||
|
{
|
||||||
|
Lazy<CacheItem>? lazyCache;
|
||||||
|
while (!caches.TryGetValue(tag, out lazyCache))
|
||||||
|
{
|
||||||
|
if (semaphore.Wait(0))
|
||||||
|
{
|
||||||
|
lazyCache = new Lazy<CacheItem>(valueFactory);
|
||||||
|
caches.TryAdd(tag, lazyCache);
|
||||||
|
_ = lazyCache.Value;
|
||||||
|
semaphore.Release();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if(semaphore.Wait(semaphoreTimeout))
|
||||||
|
semaphore.Release();
|
||||||
|
else
|
||||||
|
{
|
||||||
|
semaphore.Release();
|
||||||
|
throw new TimeoutException("EfCacheL2.GetOrAddCache. Can't wait too long while getting cache");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lazyCache.Value.DateObsolete < DateTime.Now)
|
||||||
|
{
|
||||||
|
var isUpdated = false;
|
||||||
|
if (semaphore.Wait(0))
|
||||||
|
{
|
||||||
|
lazyCache = new Lazy<CacheItem>(valueFactory);
|
||||||
|
caches.Remove(tag, out _);
|
||||||
|
caches.TryAdd(tag, lazyCache);
|
||||||
|
_ = lazyCache.Value;
|
||||||
|
isUpdated = true;
|
||||||
|
semaphore.Release();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (semaphore.Wait(semaphoreTimeout))
|
||||||
|
semaphore.Release();
|
||||||
|
else
|
||||||
|
{
|
||||||
|
semaphore.Release();
|
||||||
|
throw new TimeoutException("EfCacheL2.GetOrAddCache. Can't wait too long while getting cache");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isUpdated || caches.TryGetValue(tag, out lazyCache))
|
||||||
|
return lazyCache.Value;
|
||||||
|
|
||||||
|
throw new Exception("EfCacheL2.GetOrAddCache it should never happens");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
return lazyCache.Value;
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo: делает лишние запросы
|
||||||
|
private static async Task<CacheItem> GetOrAddCacheAsync(string tag, Func<CacheItem> valueFactory, CancellationToken token)
|
||||||
|
{
|
||||||
|
Lazy<CacheItem>? lazyCache;
|
||||||
|
while (!caches.TryGetValue(tag, out lazyCache))
|
||||||
|
{
|
||||||
|
if (semaphore.Wait(0, CancellationToken.None))
|
||||||
|
{
|
||||||
|
lazyCache = new Lazy<CacheItem>(valueFactory);
|
||||||
|
caches.TryAdd(tag, lazyCache);
|
||||||
|
_ = lazyCache.Value;
|
||||||
|
semaphore.Release();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (await semaphore.WaitAsync(semaphoreTimeout, token))
|
||||||
|
semaphore.Release();
|
||||||
|
else
|
||||||
|
{
|
||||||
|
semaphore.Release();
|
||||||
|
throw new TimeoutException("EfCacheL2.GetOrAddCache. Can't wait too long while getting cache");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lazyCache.Value.DateObsolete < DateTime.Now)
|
||||||
|
{
|
||||||
|
var isUpdated = false;
|
||||||
|
if (semaphore.Wait(0, CancellationToken.None))
|
||||||
|
{
|
||||||
|
lazyCache = new Lazy<CacheItem>(valueFactory);
|
||||||
|
caches.Remove(tag, out _);
|
||||||
|
caches.TryAdd(tag, lazyCache);
|
||||||
|
_ = lazyCache.Value;
|
||||||
|
isUpdated = true;
|
||||||
|
semaphore.Release();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (await semaphore.WaitAsync(semaphoreTimeout, token))
|
||||||
|
semaphore.Release();
|
||||||
|
else
|
||||||
|
{
|
||||||
|
semaphore.Release();
|
||||||
|
throw new TimeoutException("EfCacheL2.GetOrAddCache. Can't wait too long while getting cache");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isUpdated || caches.TryGetValue(tag, out lazyCache))
|
||||||
|
return lazyCache.Value;
|
||||||
|
|
||||||
|
throw new Exception("EfCacheL2.GetOrAddCache it should never happens");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
return lazyCache.Value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static IEnumerable<T> ConvertToIEnumerable<T>(IEnumerable data)
|
||||||
|
{
|
||||||
|
if (data is IEnumerable<T> list)
|
||||||
|
return list;
|
||||||
|
else if (data is IDictionary dictionary)
|
||||||
|
{
|
||||||
|
System.Diagnostics.Trace.TraceWarning($"ConvertToIEnumerable. Use keyless method on keyed cache. Type: {typeof(T).Name};");
|
||||||
|
return (IEnumerable<T>)dictionary.Values;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw new NotSupportedException("cache.Data has wrong type.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Dictionary<TKey, T> ConvertToDictionary<TKey, T>(IEnumerable data, Func<T, TKey> keySelector)
|
||||||
|
where TKey : notnull
|
||||||
|
{
|
||||||
|
if (data is Dictionary<TKey, T> dictionary)
|
||||||
|
return dictionary;
|
||||||
|
else
|
||||||
|
{
|
||||||
|
System.Diagnostics.Trace.TraceWarning($"ConvertToDictionary. Use keyed method on keyless cache. Type: {typeof(T).Name};");
|
||||||
|
return ((IEnumerable<T>)data).ToDictionary(keySelector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Func<CacheItem> MakeValueListFactory<T>(IQueryable<T> query, TimeSpan obsolescence)
|
||||||
|
{
|
||||||
|
CacheItem ValueFactory()
|
||||||
|
{
|
||||||
|
var list = query.ToList();
|
||||||
|
RequestsToDb++;
|
||||||
|
return new CacheItem(list, obsolescence);
|
||||||
|
}
|
||||||
|
return ValueFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Func<CacheItem> MakeValueDictionaryFactory<TKey, T>(IQueryable<T> query, TimeSpan obsolescence, Func<T, TKey> keySelector)
|
||||||
|
where TKey : notnull
|
||||||
|
{
|
||||||
|
CacheItem ValueFactory()
|
||||||
|
{
|
||||||
|
var dictionary = query.ToDictionary(keySelector);
|
||||||
|
RequestsToDb++;
|
||||||
|
return new CacheItem(dictionary, obsolescence);
|
||||||
|
};
|
||||||
|
return ValueFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static IEnumerable<T> FromCache<T>(this IQueryable<T> query, string tag, TimeSpan obsolescence)
|
||||||
|
{
|
||||||
|
var factory = MakeValueListFactory(query, obsolescence);
|
||||||
|
var cache = GetOrAddCache(tag, factory);
|
||||||
|
return ConvertToIEnumerable<T>(cache.Data);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static async Task<IEnumerable<T>> FromCacheAsync<T>(this IQueryable<T> query, string tag, TimeSpan obsolescence, CancellationToken token = default)
|
||||||
|
{
|
||||||
|
var factory = MakeValueListFactory(query, obsolescence);
|
||||||
|
var cache = await GetOrAddCacheAsync(tag, factory, token);
|
||||||
|
return ConvertToIEnumerable<T>(cache.Data);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Dictionary<TKey, T> FromCache<TKey, T>(this IQueryable<T> query, string tag, TimeSpan obsolescence, Func<T, TKey> keySelector)
|
||||||
|
where TKey: notnull
|
||||||
|
{
|
||||||
|
var factory = MakeValueDictionaryFactory(query, obsolescence, keySelector);
|
||||||
|
var cache = GetOrAddCache(tag, factory);
|
||||||
|
return ConvertToDictionary(cache.Data, keySelector);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static async Task<Dictionary<TKey, T>> FromCacheAsync<TKey, T>(this IQueryable<T> query, string tag, TimeSpan obsolescence, Func<T, TKey> keySelector, CancellationToken token = default)
|
||||||
|
where TKey : notnull
|
||||||
|
{
|
||||||
|
var factory = MakeValueDictionaryFactory(query, obsolescence, keySelector);
|
||||||
|
var cache = await GetOrAddCacheAsync(tag, factory, token);
|
||||||
|
return ConvertToDictionary(cache.Data, keySelector);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static T? FromCacheGetValueOrDefault<TKey, T>(this IQueryable<T> query, string tag, TimeSpan obsolescence, Func<T, TKey> keySelector, TKey key)
|
||||||
|
where TKey : notnull
|
||||||
|
{
|
||||||
|
var factory = MakeValueDictionaryFactory(query, obsolescence, keySelector);
|
||||||
|
var cache = GetOrAddCache(tag, factory);
|
||||||
|
|
||||||
|
if (cache.Data is Dictionary<TKey, T> dictionary)
|
||||||
|
return dictionary.GetValueOrDefault(key);
|
||||||
|
else
|
||||||
|
{
|
||||||
|
System.Diagnostics.Trace.TraceWarning($"Use keyed method on keyless cache. Tag: {tag}, type: {typeof(T).Name};");
|
||||||
|
return ((IEnumerable<T>)cache.Data).FirstOrDefault(v => keySelector(v).Equals(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void DropCache<T>(this IQueryable<T> query, string tag)
|
||||||
|
{
|
||||||
|
caches.Remove(tag, out var _);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#nullable disable
|
||||||
|
}
|
17
ConsoleApp1/Cron.cs
Normal file
17
ConsoleApp1/Cron.cs
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace ConsoleApp1
|
||||||
|
{
|
||||||
|
class Cron
|
||||||
|
{
|
||||||
|
public DateTimeOffset Origin { get; set; }
|
||||||
|
public TimeSpan Period { get; set; }
|
||||||
|
|
||||||
|
public DateTimeOffset Next()
|
||||||
|
{
|
||||||
|
var delta = DateTimeOffset.Now - Origin;
|
||||||
|
var n = Math.Ceiling(delta / Period);
|
||||||
|
return Origin + n * Period;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,43 +1,76 @@
|
|||||||
using AsbCloudDb.Model;
|
using AsbCloudDb.Model;
|
||||||
using AsbCloudInfrastructure.Services.DailyReport;
|
using AsbCloudInfrastructure.Services.Cache;
|
||||||
|
//using AsbCloudInfrastructure.Services.Cache;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace ConsoleApp1
|
namespace ConsoleApp1
|
||||||
{
|
{
|
||||||
class Cron
|
|
||||||
{
|
|
||||||
public DateTimeOffset Origin { get; set; }
|
|
||||||
public TimeSpan Period { get; set; }
|
|
||||||
|
|
||||||
public DateTimeOffset Next()
|
|
||||||
{
|
|
||||||
var delta = DateTimeOffset.Now - Origin;
|
|
||||||
var n = Math.Ceiling(delta / Period);
|
|
||||||
return Origin + n * Period;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class Program
|
class Program
|
||||||
{
|
{
|
||||||
static void Main(/*string[] args*/)
|
static void Main(/*string[] args*/)
|
||||||
{
|
{
|
||||||
// use ServiceFactory to make services
|
// use ServiceFactory to make services
|
||||||
var op = ServiceFactory.MakeWellOperationsService();
|
Console.WriteLine("hit keyboard to start");
|
||||||
var d = op.FirstOperationDate(90);
|
Console.ReadLine();
|
||||||
|
|
||||||
var period = TimeSpan.FromHours(5);
|
for (int i = 0; i < 24; i++)
|
||||||
var c = new Cron
|
|
||||||
{
|
{
|
||||||
Period = period,
|
//Thread.Sleep(3000);
|
||||||
Origin = new DateTimeOffset(2022, 5, 8, 0, 0, 7, TimeSpan.FromHours(5)),
|
var t = new Thread(_ => {
|
||||||
};
|
for (int j = 0; j < 64; j++)
|
||||||
|
//Task.Run(GetClastersAsync).Wait();
|
||||||
Console.WriteLine($"origin: {c.Origin} next: {c.Next()}");
|
GetClasters();
|
||||||
|
});
|
||||||
|
t.Start();
|
||||||
|
}
|
||||||
|
|
||||||
Console.WriteLine("End of Test");
|
Console.WriteLine("End of Test");
|
||||||
Console.ReadLine();
|
Console.ReadLine();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static TimeSpan obso = TimeSpan.FromSeconds(5);
|
||||||
|
static (long, long) GetClasters()
|
||||||
|
{
|
||||||
|
using var db = ServiceFactory.MakeContext();
|
||||||
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||||
|
var cs = db.TelemetryDataSaub
|
||||||
|
.Where(t => t.IdTelemetry == 135)
|
||||||
|
.OrderBy(t => t.DateTime)
|
||||||
|
.Take(100_000)
|
||||||
|
.FromCache("tds", obso, r=>(r.IdTelemetry, r.DateTime))
|
||||||
|
.ToList();
|
||||||
|
sw.Stop();
|
||||||
|
Console.WriteLine($"{DateTime.Now}\tth: {Thread.CurrentThread.ManagedThreadId}\trequests {EfCacheL2.RequestsToDb}\ttime {sw.ElapsedMilliseconds}\tcount {cs.Count}");
|
||||||
|
//Console.WriteLine($"{DateTime.Now}\tth: {Thread.CurrentThread.ManagedThreadId}\ttime {sw.ElapsedMilliseconds}\tcount {cs.Count}");
|
||||||
|
|
||||||
|
GC.Collect();
|
||||||
|
Thread.Sleep(100);
|
||||||
|
return (cs.Count, sw.ElapsedMilliseconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
static async Task<(long, long)> GetClastersAsync()
|
||||||
|
{
|
||||||
|
using var db = ServiceFactory.MakeContext();
|
||||||
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||||
|
var cs = ( await db.TelemetryDataSaub
|
||||||
|
.Where(t => t.IdTelemetry == 135)
|
||||||
|
.OrderBy(t => t.DateTime)
|
||||||
|
.Take(100_000)
|
||||||
|
.FromCacheAsync("tds", obso, r => (r.IdTelemetry, r.DateTime)))
|
||||||
|
.ToList();
|
||||||
|
sw.Stop();
|
||||||
|
Console.WriteLine($"{DateTime.Now}\tth: {Thread.CurrentThread.ManagedThreadId}\trequests {EfCacheL2.RequestsToDb}\ttime {sw.ElapsedMilliseconds}\tcount {cs.Count}");
|
||||||
|
//Console.WriteLine($"{DateTime.Now}\tth: {Thread.CurrentThread.ManagedThreadId}\ttime {sw.ElapsedMilliseconds}\tcount {cs.Count}");
|
||||||
|
|
||||||
|
GC.Collect();
|
||||||
|
Thread.Sleep(100);
|
||||||
|
return (cs.Count, sw.ElapsedMilliseconds);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user