forked from ddrilling/AsbCloudServer
Implement UPSERT extention method to fix TelemetryDataBaseService.UpdateDataAsync().
This commit is contained in:
parent
a5eb4e5212
commit
496c24099d
135
AsbCloudDb/EFExtentions.cs
Normal file
135
AsbCloudDb/EFExtentions.cs
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.EntityFrameworkCore.Metadata;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace AsbCloudDb
|
||||||
|
{
|
||||||
|
public static class EFExtentions
|
||||||
|
{
|
||||||
|
static Dictionary<Type, IQueryStringFactory> QueryFactories { get; set; } = new Dictionary<Type, IQueryStringFactory>();
|
||||||
|
|
||||||
|
static IQueryStringFactory GetQueryStringFactory<T>(DbSet<T> dbset, IEnumerable<T> items)
|
||||||
|
where T : class
|
||||||
|
{
|
||||||
|
var t = typeof(T);
|
||||||
|
QueryStringFactory<T> factory = (QueryStringFactory<T>)QueryFactories.GetValueOrDefault(t);
|
||||||
|
if (factory is null)
|
||||||
|
{
|
||||||
|
factory = new QueryStringFactory<T>(dbset);
|
||||||
|
QueryFactories.Add(t, factory);
|
||||||
|
}
|
||||||
|
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Task<int> ExecInsertOrUpdateAsync<T>(this Microsoft.EntityFrameworkCore.Infrastructure.DatabaseFacade database, DbSet<T> dbset, IEnumerable<T> items, CancellationToken token)
|
||||||
|
where T : class
|
||||||
|
{
|
||||||
|
var factory = (QueryStringFactory<T>)GetQueryStringFactory(dbset, items);
|
||||||
|
var query = factory.MakeInsertOrUpdateSql(items);
|
||||||
|
return database.ExecuteSqlRawAsync(query, token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface IQueryStringFactory{}
|
||||||
|
|
||||||
|
class QueryStringFactory<T> : IQueryStringFactory
|
||||||
|
where T : class
|
||||||
|
{
|
||||||
|
private readonly string pk;
|
||||||
|
private readonly string tableName;
|
||||||
|
private string colunmsString;
|
||||||
|
private string conflictUpdateSet;
|
||||||
|
private IEnumerable<IClrPropertyGetter> getters;
|
||||||
|
|
||||||
|
public QueryStringFactory(DbSet<T> dbset)
|
||||||
|
{
|
||||||
|
var ps = dbset.EntityType.GetProperties();
|
||||||
|
var pkColsNames = dbset.EntityType.FindPrimaryKey()?.Properties.Select(p => p.GetColumnBaseName());
|
||||||
|
pk = pkColsNames is null ? string.Empty : $"({string.Join(", ", pkColsNames)})";
|
||||||
|
|
||||||
|
tableName = dbset.EntityType.GetTableName();
|
||||||
|
getters = ps.Select(p => p.GetGetter());
|
||||||
|
var colNames = ps.Select(p => $"\"{p.GetColumnBaseName()}\"");
|
||||||
|
colunmsString = $"({string.Join(", ", colNames)})";
|
||||||
|
conflictUpdateSet = string.Join(", ", colNames.Select(n => $"{n} = excluded.{n}"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public string MakeInsertOrUpdateSql(IEnumerable<T> items)
|
||||||
|
{
|
||||||
|
/* EXAMPLE:
|
||||||
|
INSERT INTO the_table (id, column_1, column_2)
|
||||||
|
VALUES (1, 'A', 'X'), (2, 'B', 'Y'), (3, 'C', 'Z')
|
||||||
|
ON CONFLICT (id) DO UPDATE
|
||||||
|
SET column_1 = excluded.column_1,
|
||||||
|
column_2 = excluded.column_2;
|
||||||
|
*/
|
||||||
|
var sqlBuilder = new StringBuilder("INSERT INTO ", 7);
|
||||||
|
sqlBuilder.Append(tableName);
|
||||||
|
sqlBuilder.Append(colunmsString);
|
||||||
|
sqlBuilder.AppendLine(" VALUES ");
|
||||||
|
sqlBuilder.Append(MakeQueryValues(items));
|
||||||
|
sqlBuilder.AppendLine(" ON CONFLICT ");
|
||||||
|
if (string.IsNullOrEmpty(pk))
|
||||||
|
{
|
||||||
|
sqlBuilder.Append("DO NOTHING;");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
sqlBuilder.Append(pk);
|
||||||
|
sqlBuilder.Append(" DO UPDATE SET ");
|
||||||
|
sqlBuilder.AppendLine(conflictUpdateSet);
|
||||||
|
sqlBuilder.Append(';');
|
||||||
|
}
|
||||||
|
|
||||||
|
return sqlBuilder.ToString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private string MakeQueryValues(IEnumerable<T> items)
|
||||||
|
{
|
||||||
|
var rows = items.Select(item => MakeRow(item));
|
||||||
|
return string.Join(",", rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
private string MakeRow(T item)
|
||||||
|
{
|
||||||
|
var values = getters.Select(getter => FormatValue(getter.GetClrValue(item)));
|
||||||
|
return $"({string.Join(",", values)})";
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string FormatValue(object v)
|
||||||
|
=> v switch
|
||||||
|
{
|
||||||
|
string vStr => $"'{vStr}'",
|
||||||
|
DateTime vDate => $"'{FormatDateValue(vDate)}'",
|
||||||
|
IFormattable vFormattable=> FormatFormattableValue(vFormattable),
|
||||||
|
_ => System.Text.Json.JsonSerializer.Serialize(v),
|
||||||
|
};
|
||||||
|
|
||||||
|
private static string FormatFormattableValue(IFormattable v)
|
||||||
|
=> v switch
|
||||||
|
{
|
||||||
|
double vt => vt.ToString(System.Globalization.CultureInfo.InvariantCulture),
|
||||||
|
float vt => vt.ToString(System.Globalization.CultureInfo.InvariantCulture),
|
||||||
|
decimal vt => vt.ToString(System.Globalization.CultureInfo.InvariantCulture),
|
||||||
|
int vt => vt.ToString(System.Globalization.CultureInfo.InvariantCulture),
|
||||||
|
short vt => vt.ToString(System.Globalization.CultureInfo.InvariantCulture),
|
||||||
|
uint vt => vt.ToString(System.Globalization.CultureInfo.InvariantCulture),
|
||||||
|
ushort vt => vt.ToString(System.Globalization.CultureInfo.InvariantCulture),
|
||||||
|
_ => v.ToString(null, System.Globalization.CultureInfo.InvariantCulture),
|
||||||
|
};
|
||||||
|
|
||||||
|
private static string FormatDateValue(DateTime vDate)
|
||||||
|
{
|
||||||
|
if (vDate.Kind == DateTimeKind.Unspecified)
|
||||||
|
vDate = DateTime.SpecifyKind(vDate, DateTimeKind.Utc);
|
||||||
|
return vDate.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss.ffffff zzz");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -493,20 +493,5 @@ namespace AsbCloudDb.Model
|
|||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<int> CreatePartitionAsync<TEntity>(string propertyName, int id, CancellationToken token = default)
|
|
||||||
where TEntity : class
|
|
||||||
{
|
|
||||||
var dbSet = Set<TEntity>();
|
|
||||||
var baseTableName = dbSet.EntityType.GetTableName();
|
|
||||||
var schema = dbSet.EntityType.GetSchema();
|
|
||||||
var tableObject = Microsoft.EntityFrameworkCore.Metadata.StoreObjectIdentifier.Table(baseTableName, schema);
|
|
||||||
var tableName = baseTableName.Replace("_base", "");
|
|
||||||
var property = dbSet.EntityType.GetProperty(propertyName).GetColumnName(tableObject);
|
|
||||||
|
|
||||||
var query = $"CREATE TABLE {tableName}_{id} (like {baseTableName} including all, constraint partitioning_check check ({property} = 1)) INHERITS ({baseTableName});";
|
|
||||||
|
|
||||||
return await Database.ExecuteSqlRawAsync(query, token).ConfigureAwait(false);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,5 @@ namespace AsbCloudDb.Model
|
|||||||
Task<(DateTime From, DateTime To)> GetDatesRangeAsync<T>(int idTelemetry, CancellationToken token) where T : class, ITelemetryData;
|
Task<(DateTime From, DateTime To)> GetDatesRangeAsync<T>(int idTelemetry, CancellationToken token) where T : class, ITelemetryData;
|
||||||
Task<IEnumerable<(double? MinDepth, double? MaxDepth, DateTime BeginPeriodDate)>> GetDepthToIntervalAsync(int telemetryId,
|
Task<IEnumerable<(double? MinDepth, double? MaxDepth, DateTime BeginPeriodDate)>> GetDepthToIntervalAsync(int telemetryId,
|
||||||
int intervalHoursTimestamp, int workStartTimestamp, double timezoneOffset, CancellationToken token);
|
int intervalHoursTimestamp, int workStartTimestamp, double timezoneOffset, CancellationToken token);
|
||||||
Task<int> CreatePartitionAsync<TEntity>(string propertyName, int id, CancellationToken token = default) where TEntity : class;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,6 +1,7 @@
|
|||||||
using AsbCloudApp.Data;
|
using AsbCloudApp.Data;
|
||||||
using AsbCloudApp.Services;
|
using AsbCloudApp.Services;
|
||||||
using AsbCloudDb.Model;
|
using AsbCloudDb.Model;
|
||||||
|
using AsbCloudDb;
|
||||||
using AsbCloudInfrastructure.Services.Cache;
|
using AsbCloudInfrastructure.Services.Cache;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.EntityFrameworkCore.ChangeTracking;
|
using Microsoft.EntityFrameworkCore.ChangeTracking;
|
||||||
@ -38,12 +39,11 @@ namespace AsbCloudInfrastructure.Services
|
|||||||
cacheWells = cacheDb.GetCachedTable<Well>((AsbCloudDbContext)db);
|
cacheWells = cacheDb.GetCachedTable<Well>((AsbCloudDbContext)db);
|
||||||
}
|
}
|
||||||
|
|
||||||
public virtual async Task<int> UpdateDataAsync(string uid, IEnumerable<TDto> dtos,
|
public virtual async Task<int> UpdateDataAsync(string uid, IEnumerable<TDto> dtos, CancellationToken token = default)
|
||||||
CancellationToken token = default)
|
|
||||||
{
|
{
|
||||||
if (dtos == default || !dtos.Any())
|
if (dtos == default || !dtos.Any())
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
var idTelemetry = telemetryService.GetOrCreateTemetryIdByUid(uid);
|
var idTelemetry = telemetryService.GetOrCreateTemetryIdByUid(uid);
|
||||||
var lastTelemetryDate = telemetryService.GetLastTelemetryDate(uid);
|
var lastTelemetryDate = telemetryService.GetLastTelemetryDate(uid);
|
||||||
var dtosList = dtos.OrderBy(d => d.Date).ToList();
|
var dtosList = dtos.OrderBy(d => d.Date).ToList();
|
||||||
@ -55,84 +55,29 @@ namespace AsbCloudInfrastructure.Services
|
|||||||
{
|
{
|
||||||
var duplicates = new List<TDto>(8);
|
var duplicates = new List<TDto>(8);
|
||||||
for (int i = 1; i < dtosList.Count; i++)
|
for (int i = 1; i < dtosList.Count; i++)
|
||||||
if (dtosList[i - 1].Date == dtosList[i].Date || lastTelemetryDate == dtosList[i - 1].Date)
|
if (dtosList[i].Date - dtosList[i-1].Date < TimeSpan.FromMilliseconds(100))
|
||||||
duplicates.Add(dtosList[i - 1]);
|
duplicates.Add(dtosList[i - 1]);
|
||||||
foreach (var duplicate in duplicates)
|
foreach (var duplicate in duplicates)
|
||||||
dtosList.Remove(duplicate);
|
dtosList.Remove(duplicate);
|
||||||
}
|
}
|
||||||
|
|
||||||
var dataSet = db.Set<TModel>();
|
var enitties = dtosList.Select(d => {
|
||||||
|
var e = Convert(d);
|
||||||
|
e.IdTelemetry = idTelemetry;
|
||||||
|
return e;
|
||||||
|
});
|
||||||
|
|
||||||
if(lastTelemetryDate > dtoMinDate)
|
var dbset = db.Set<TModel>();
|
||||||
{
|
|
||||||
var oldData = dataSet.Where(d => d.IdTelemetry == idTelemetry
|
|
||||||
&& d.Date > dtoMinDate
|
|
||||||
&& d.Date < dtoMaxDate);
|
|
||||||
dataSet.RemoveRange(oldData);
|
|
||||||
await db.SaveChangesAsync(token).ConfigureAwait(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
telemetryService.SaveRequestDate(uid, dtoMaxDate);
|
|
||||||
|
|
||||||
var entities = new List<EntityEntry<TModel>>(dtosList.Count);
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
foreach (var dto in dtosList)
|
return await db.Database.ExecInsertOrUpdateAsync(dbset, enitties, token).ConfigureAwait(false);
|
||||||
{
|
|
||||||
dto.IdTelemetry = idTelemetry;
|
|
||||||
dto.Date = dto.Date.AddMilliseconds((disorderId++) % 99);
|
|
||||||
var data = Convert(dto);
|
|
||||||
var entry = dataSet.Add(data);
|
|
||||||
entities.Add(entry);
|
|
||||||
}
|
|
||||||
return await db.SaveChangesAsync(token).ConfigureAwait(false);
|
|
||||||
}
|
}
|
||||||
catch (DbUpdateException ex)
|
catch(Exception ex)
|
||||||
{
|
{
|
||||||
Trace.WriteLine(ex.Message);
|
Trace.WriteLine($"Fail to save data telemerty uid: {uid}, idTelemetry {idTelemetry} count: {enitties.Count()} dataDate: {enitties.FirstOrDefault()?.Date}. Message: {ex.Message}");
|
||||||
foreach (var entity in entities)
|
return 0;
|
||||||
entity.State = EntityState.Detached;
|
|
||||||
|
|
||||||
foreach (var dto in dtosList)
|
|
||||||
{
|
|
||||||
dto.IdTelemetry = idTelemetry;
|
|
||||||
dto.Date = dto.Date.AddMilliseconds((disorderId++) % 100);
|
|
||||||
var data = Convert(dto);
|
|
||||||
var entry = dataSet.Add(data);
|
|
||||||
entities.Add(entry);
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
return await db.SaveChangesAsync(token).ConfigureAwait(false);
|
|
||||||
}
|
|
||||||
catch (DbUpdateException ex2)
|
|
||||||
{
|
|
||||||
Trace.WriteLine("2-nd :" + ex2.Message);
|
|
||||||
foreach (var entity in entities)
|
|
||||||
entity.State = EntityState.Detached;
|
|
||||||
|
|
||||||
entities.Clear();
|
|
||||||
int i = 0;
|
|
||||||
foreach (var dto in dtosList)
|
|
||||||
{
|
|
||||||
dto.IdTelemetry = idTelemetry;
|
|
||||||
dto.Date = dto.Date.AddMilliseconds((disorderId++) % 100);
|
|
||||||
var data = Convert(dto);
|
|
||||||
dataSet.Add(data);
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
i += await db.SaveChangesAsync(token).ConfigureAwait(false);
|
|
||||||
}
|
|
||||||
catch
|
|
||||||
{
|
|
||||||
Trace.WriteLine($"Fail to save data telemerty uid: {uid}, idTelemetry {idTelemetry} count: {data.Date}.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return i;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public virtual async Task<IEnumerable<TDto>> GetAsync(int idWell,
|
public virtual async Task<IEnumerable<TDto>> GetAsync(int idWell,
|
||||||
|
@ -4,6 +4,7 @@ using System.Globalization;
|
|||||||
using System.IO;
|
using System.IO;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using AsbCloudDb.Model;
|
using AsbCloudDb.Model;
|
||||||
|
using AsbCloudDb;
|
||||||
using Google.Apis.Drive.v3.Data;
|
using Google.Apis.Drive.v3.Data;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
@ -18,10 +19,17 @@ namespace ConsoleApp1
|
|||||||
{
|
{
|
||||||
static void Main(/*string[] args*/)
|
static void Main(/*string[] args*/)
|
||||||
{
|
{
|
||||||
var d = "20211102_173407926";
|
var options = new DbContextOptionsBuilder<AsbCloudDbContext>()
|
||||||
var dt = DateTime.ParseExact("20211102_173407926",
|
.UseNpgsql("Host=localhost;Database=postgres;Username=postgres;Password=q;Persist Security Info=True")
|
||||||
"yyyyMMdd_HHmmssfff",
|
.Options;
|
||||||
CultureInfo.InvariantCulture);
|
var context = new AsbCloudDbContext(options);
|
||||||
|
|
||||||
|
//var query = context.Clusters.GetInsertOrUpdateSql(
|
||||||
|
// new List<Cluster> {
|
||||||
|
// new Cluster{Caption = "cluster 1", },
|
||||||
|
// new Cluster{Caption = "cluster 2"},
|
||||||
|
// });
|
||||||
|
|
||||||
Console.ReadKey();
|
Console.ReadKey();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user