Skip to content

Commit

Permalink
Merge pull request #76 from github/caol-ila-mv-backfill-async
Browse files Browse the repository at this point in the history
Add support for MV backfills
  • Loading branch information
caol-ila authored Jul 24, 2024
2 parents 36b8834 + 0f1e79a commit 2b2b886
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 18 deletions.
2 changes: 2 additions & 0 deletions KustoSchemaTools/Changes/BaseChange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ protected BaseChange(string entityType, string entity, T from, T to)

public string Markdown { get; protected set; }

public bool IsAsync { get; set; }

}

}
9 changes: 5 additions & 4 deletions KustoSchemaTools/Changes/DatabaseScriptContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@ public DatabaseScriptContainer()

}

public DatabaseScriptContainer(DatabaseScript script, string kind, bool? isValid = null)
public DatabaseScriptContainer(DatabaseScript script, string kind, bool isAsync = false)
{
Script = script;
Kind = kind;
IsValid = isValid;
IsAsync = isAsync;
}

public DatabaseScriptContainer(string kind, int order, string script, bool? isValid = null)
public DatabaseScriptContainer(string kind, int order, string script, bool isAsync = false)
{
Script = new DatabaseScript(script, order);
Kind = kind;
IsValid = isValid;
IsAsync = isAsync;
}

public DatabaseScript Script { get; set; }
public string Kind{ get; set; }
public bool? IsValid { get; set; }
public string Text => Script.Text;
public int Order => Script.Order;
public bool IsAsync { get;set; }
}
}
2 changes: 1 addition & 1 deletion KustoSchemaTools/Changes/GenericBaseEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public GenericBaseEntity(List<DatabaseScriptContainer> scripts)

public List<DatabaseScriptContainer> Scripts { get; }

public List<DatabaseScriptContainer> CreateScripts(string name)
public List<DatabaseScriptContainer> CreateScripts(string name, bool isNew)
{
return Scripts;
}
Expand Down
4 changes: 2 additions & 2 deletions KustoSchemaTools/Changes/ScriptCompareChange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ public ScriptCompareChange(string entity, IKustoBaseEntity? from, IKustoBaseEnti

private void Init()
{
var from = From?.CreateScripts(Entity).ToDictionary(itm => itm.Kind) ?? new Dictionary<string, DatabaseScriptContainer>();
var to = To.CreateScripts(Entity);
var from = From?.CreateScripts(Entity, false).ToDictionary(itm => itm.Kind) ?? new Dictionary<string, DatabaseScriptContainer>();
var to = To.CreateScripts(Entity, from != null);
Markdown = string.Empty;

if (to.Any() == false) return;
Expand Down
2 changes: 1 addition & 1 deletion KustoSchemaTools/Model/ContinuousExport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class ContinuousExport : IKustoBaseEntity
[YamlMember(ScalarStyle = YamlDotNet.Core.ScalarStyle.Literal)]
public string Query { get; set; }

public List<DatabaseScriptContainer> CreateScripts(string name)
public List<DatabaseScriptContainer> CreateScripts(string name, bool isNew)
{
return new List<DatabaseScriptContainer>
{
Expand Down
2 changes: 1 addition & 1 deletion KustoSchemaTools/Model/ExternalTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ExternalTable : IKustoBaseEntity

#endregion

public List<DatabaseScriptContainer> CreateScripts(string name)
public List<DatabaseScriptContainer> CreateScripts(string name, bool isNew)
{
var container = new DatabaseScriptContainer
{
Expand Down
2 changes: 1 addition & 1 deletion KustoSchemaTools/Model/Function.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class Function : IKustoBaseEntity

public string Body { get; set; }

public List<DatabaseScriptContainer> CreateScripts(string name)
public List<DatabaseScriptContainer> CreateScripts(string name, bool isNew)
{
var properties = GetType().GetProperties()
.Where(p => p.GetValue(this) != null && p.Name != "Body" && p.Name != "Parameters")
Expand Down
2 changes: 1 addition & 1 deletion KustoSchemaTools/Model/IKustoBaseEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace KustoSchemaTools.Model
{
public interface IKustoBaseEntity
{
List<DatabaseScriptContainer> CreateScripts(string name);
List<DatabaseScriptContainer> CreateScripts(string name, bool isNew);
}

}
25 changes: 22 additions & 3 deletions KustoSchemaTools/Model/MaterializedView.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,46 @@ public class MaterializedView : IKustoBaseEntity
public string? EffectiveDateTime { get; set; }
public string Lookback { get; set; }
public bool? UpdateExtentsCreationTime { get; set; }
public bool? Backfill { get; set; }
public bool AutoUpdateSchema { get; set; } = false;
public List<string> DimensionTables { get; set; }
public RetentionAndCachePolicy RetentionAndCachePolicy { get; set; } = new RetentionAndCachePolicy();
[YamlMember(ScalarStyle = ScalarStyle.Literal)]
public string Query { get; set; }
public string? RowLevelSecurity { get; set; }

public List<DatabaseScriptContainer> CreateScripts(string name)
public List<DatabaseScriptContainer> CreateScripts(string name, bool isNew)
{
var excludedProperies = new HashSet<string>( new[] { "Query", "Source", "Kind", "RetentionAndCachePolicy", "RowLevelSecurity" });
var asyncSetup = isNew && Backfill == true && !string.IsNullOrWhiteSpace(EffectiveDateTime);


var excludedProperies = new HashSet<string>(["Query", "Source", "Kind", "RetentionAndCachePolicy", "RowLevelSecurity"]);
if (!asyncSetup)
{
excludedProperies.Add("EffectiveDateTime");
excludedProperies.Add("Backfill");
}

var scripts = new List<DatabaseScriptContainer>();
var properties = string.Join(", ", GetType().GetProperties()
.Where(p => p.GetValue(this) != null && excludedProperies.Contains(p.Name) == false)
.Select(p => new {Name = p.Name, Value = p.GetValue(this) })
.Where(p => !string.IsNullOrWhiteSpace(p.Value?.ToString()))
.Select(p => $"{p.Name}=\"{p.Value}\""));
scripts.Add(new DatabaseScriptContainer("CreateOrAlterMaterializedView", 40, $".create-or-alter materialized-view with ({properties}) {name} on {Kind} {Source} {{ {Query} }}"));

if (asyncSetup)
{
scripts.Add(new DatabaseScriptContainer("CreateMaterializedView", Kind == "table" ? 40 : 41, $".create async ifnotexists materialized-view with ({properties}) {name} on {Kind} {Source} {{ {Query} }}", true));
}
else
{
scripts.Add(new DatabaseScriptContainer("CreateMaterializedView", Kind == "table" ? 40 : 41, $".create-or-alter materialized-view with ({properties}) {name} on {Kind} {Source} {{ {Query} }}"));
}

if (RetentionAndCachePolicy != null)
{
scripts.AddRange(RetentionAndCachePolicy.CreateScripts(name, "materialized-view"));
}


if (!string.IsNullOrEmpty(RowLevelSecurity))
Expand Down
2 changes: 1 addition & 1 deletion KustoSchemaTools/Model/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class Table : IKustoBaseEntity
public bool RestrictedViewAccess { get; set; } = false;
public string? RowLevelSecurity { get; set; }

public List<DatabaseScriptContainer> CreateScripts(string name)
public List<DatabaseScriptContainer> CreateScripts(string name, bool isNew)
{
var scripts = new List<DatabaseScriptContainer>();
if (Columns != null)
Expand Down
6 changes: 5 additions & 1 deletion KustoSchemaTools/Parser/KustoExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Data;
using Kusto.Language.Editor;
using Kusto.Language;
using Kusto.Data;

namespace KustoSchemaTools.Parser
{
Expand Down Expand Up @@ -68,7 +69,10 @@ public static string ToKustoClusterUrl(this string cluster, bool ingest = false)
return cluster.StartsWith("https") ? cluster : $"https://{ingestPrefix}{cluster}.kusto.windows.net";
}


public static bool IsFinal(this ScriptExecuteCommandResult command)
{
return command.Result == "Completed" || command.Result == "Failed" || command.Result == "Succeeded";
}
public static string PrettifyKql(this string query)
{
return new KustoCodeService(KustoCode.Parse(query)).GetFormattedText().Text;
Expand Down
64 changes: 62 additions & 2 deletions KustoSchemaTools/Parser/KustoWriter/DefaultDatabaseWriter.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Kusto.Data;
using Kusto.Toolkit;
using KustoSchemaTools.Changes;
using KustoSchemaTools.Model;
using KustoSchemaTools.Plugins;
Expand Down Expand Up @@ -38,9 +37,70 @@ private async Task<List<ScriptExecuteCommandResult>> ApplyChangesToDatabase(stri
.SelectMany(itm => itm.Scripts)
.Where(itm => itm.Order >= 0)
.Where(itm => itm.IsValid == true)
.OrderBy(itm => itm.Order)
.OrderBy(itm => itm.Order)
.ToList();

var results = new List<ScriptExecuteCommandResult>();
var batch = new List<DatabaseScriptContainer>();
foreach (var sc in scripts)
{
if (sc.IsAsync == false)
{
batch.Add(sc);
continue;
}
else
{
var batchResults = await ExecutePendingSync(databaseName, client, logger, batch);
results.AddRange(batchResults);
var asyncResult = await ExecuteAsyncCommand(databaseName, client, logger, sc);
results.Add(asyncResult);
}
}
var finalBatchResults = await ExecutePendingSync(databaseName, client, logger, batch);
results.AddRange(finalBatchResults);
return results;

}

private async Task<ScriptExecuteCommandResult> ExecuteAsyncCommand(string databaseName, KustoClient client, ILogger logger, DatabaseScriptContainer sc)
{
var interval = TimeSpan.FromSeconds(5);
var iterations = (int)(TimeSpan.FromHours(1) / interval);
var result = await client.AdminClient.ExecuteControlCommandAsync(databaseName, sc.Text);
var operationId = result.ToScalar<Guid>();
var finalState = false;
string monitoringCommand = $".show operations | where OperationId == '{operationId}' " +
"| summarize arg_max(LastUpdatedOn, *) by OperationId " +
"| project OperationId, CommandType = Operation, Result = State, Reason = Status";
int cnt = 0;
while (!finalState)
{
if(cnt++ >= iterations)
{
finalState = true;
}

logger.LogInformation($"Waiting for operation {operationId} to complete... current iteration: {cnt}/{iterations}");
var monitoringResult = client.Client.ExecuteQuery(databaseName, monitoringCommand, new Kusto.Data.Common.ClientRequestProperties());
var operationState = monitoringResult.As<ScriptExecuteCommandResult>().FirstOrDefault();

if (operationState != null && operationState?.IsFinal() == true)
{
operationState.CommandText = sc.Text;
return operationState;
}
await Task.Delay(interval);
}
throw new Exception("Operation did not complete in a reasonable time");
}

private static async Task<List<ScriptExecuteCommandResult>> ExecutePendingSync(string databaseName, KustoClient client, ILogger logger, List<DatabaseScriptContainer> scripts)
{
if(scripts.Any() == false)
{
return new List<ScriptExecuteCommandResult>();
}
var sb = new StringBuilder();
sb.AppendLine(".execute script with(ContinueOnErrors = true) <|");
foreach (var sc in scripts)
Expand Down

0 comments on commit 2b2b886

Please sign in to comment.