Skip to content

Commit

Permalink
Add support for partitioning expressions (#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
george-zubrienko authored Oct 21, 2024
1 parent 05ba0c4 commit 4a20ebc
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 49 deletions.
12 changes: 12 additions & 0 deletions src/Arcane.Framework.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
<None Update="Sources\SqlServer\SqlSnippets\GetSelectDeltaQuery.sql">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Sources\SqlServer\SqlSnippets\GetSelectAllQuery_date_partitioned.sql">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Sources\SqlServer\SqlSnippets\GetSelectDeltaQuery_date_partitioned.sql">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

<ItemGroup>
Expand All @@ -40,6 +46,12 @@
<Content Include="Sources\SqlServer\SqlSnippets\GetSelectDeltaQuery.sql">
<PackageCopyToOutput>true</PackageCopyToOutput>
</Content>
<Content Update="Sources\SqlServer\SqlSnippets\GetSelectAllQuery_date_partitioned.sql">
<PackageCopyToOutput>true</PackageCopyToOutput>
</Content>
<Content Update="Sources\SqlServer\SqlSnippets\GetSelectDeltaQuery_date_partitioned.sql">
<PackageCopyToOutput>true</PackageCopyToOutput>
</Content>
</ItemGroup>

<ItemGroup>
Expand Down
17 changes: 17 additions & 0 deletions src/Services/Models/DatePartitionMetadataDefinition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Text.Json.Serialization;

namespace Arcane.Framework.Services.Models;

/// <summary>
/// Used to deserialize the stream metadata from the Kubernetes object definition.
/// </summary>
public class DatePartitionMetadataDefinition : PartitionMetadataDefinition
{
/// <summary>
/// Expression to compute the partition value for each row - overrides FieldName and FieldFormat
/// Expression will be executed on Source side and thus will depend on backend engine the Source streams from.
/// Common use case for this is SQL-based sources, to generate a partition derived from several columns.
/// </summary>
[JsonPropertyName("fieldExpression")]
public string FieldExpression { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ namespace Arcane.Framework.Services.Models;
/// <summary>
/// Used to deserialize the stream metadata from the Kubernetes object definition.
/// </summary>
public class PartitionsMetadataDefinition
public class PartitionMetadataDefinition
{
/// <summary>
/// Partition name
/// </summary>
[JsonPropertyName("name")]
public string Name { get; init; }
[JsonPropertyName("description")]
public string Description { get; init; }

/// <summary>
/// Partition field name
/// </summary>
[JsonPropertyName("fieldName")]
public string FieldName { get; init; }
public virtual string FieldName { get; init; }

/// <summary>
/// Partition field format
/// </summary>
[JsonPropertyName("fieldFormat")]
public string FieldFormat { get; init; }
public virtual string FieldFormat { get; init; }
}
12 changes: 10 additions & 2 deletions src/Services/Models/StreamMetadataDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@ namespace Arcane.Framework.Services.Models;
public class StreamMetadataDefinition
{
/// <summary>
/// Partitioning information about the stream.
/// Partitioning information about the stream, datetime-based
/// Can be either field or an expression to be executed on the engine.
/// </summary>
[JsonPropertyName("datePartition")]
public DatePartitionMetadataDefinition DatePartition { get; init; }

/// <summary>
/// Partitioning information about the stream (non-datetime based)
/// Only fields that are present in the data can be used here
/// </summary>
[JsonPropertyName("partitions")]
public PartitionsMetadataDefinition[] Partitions { get; init; }
public PartitionMetadataDefinition[] Partitions { get; init; }
}
8 changes: 5 additions & 3 deletions src/Sinks/Extensions/StreamPartitionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ public static class StreamPartitionExtensions
/// </summary>
/// <param name="partition"></param>
/// <returns></returns>
public static StreamPartition ToStreamPartition(this PartitionsMetadataDefinition partition)
public static StreamPartition ToStreamPartition(this PartitionMetadataDefinition partition)
{
return new StreamPartition
{
Name = partition.Name,
Description = partition.Description,
FieldName = partition.FieldName,
FieldFormat = partition.FieldFormat
FieldFormat = partition.FieldFormat,
FieldExpression = (partition as DatePartitionMetadataDefinition)?.FieldExpression,
IsDatePartition = (partition as DatePartitionMetadataDefinition) is not null
};
}
}
17 changes: 15 additions & 2 deletions src/Sinks/Models/StreamMetadata.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Text.Json.Serialization;
using Akka.Util;
using Arcane.Framework.Services.Models;

namespace Arcane.Framework.Sinks.Models;

Expand All @@ -11,8 +12,8 @@ public class StreamPartition
/// <summary>
/// Partition name
/// </summary>
[JsonPropertyName("name")]
public string Name { get; init; }
[JsonPropertyName("description")]
public string Description { get; init; }

/// <summary>
/// Partition field name
Expand All @@ -25,6 +26,18 @@ public class StreamPartition
/// </summary>
[JsonPropertyName("field_format")]
public string FieldFormat { get; init; }

/// <summary>
/// see <see cref="DatePartitionMetadataDefinition.FieldExpression"/>
/// </summary>
[JsonPropertyName("field_expression")]
public string FieldExpression { get; init; }

/// <summary>
/// If the constructing partition class is <see cref="DatePartitionMetadataDefinition"/>
/// </summary>
[JsonPropertyName("is_date_partition")]
public bool IsDatePartition { get; init; }
}

/// <summary>
Expand Down
5 changes: 5 additions & 0 deletions src/Sinks/Parquet/Models/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ public static class Constants
/// Name for a merge key column attached to sources that support upserts.
/// </summary>
public const string UPSERT_MERGE_KEY = "ARCANE_MERGE_KEY";

/// <summary>
/// Name for a partition key column base on a datetime expression.
/// </summary>
public const string DATE_PARTITION_KEY = "DATE_PARTITION_KEY";
}
66 changes: 47 additions & 19 deletions src/Sources/SqlServer/SqlServerChangeTrackingSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ public class SqlServerChangeTrackingSource : GraphStage<SourceShape<List<DataCel
private readonly string schemaName;
private readonly bool stopAfterFullLoad;
private readonly string tableName;
private readonly string datePartitionExpression;


private SqlServerChangeTrackingSource(string connectionString, string schemaName, string tableName,
TimeSpan changeCaptureInterval, int commandTimeout, int lookBackRange, bool fullLoadOnstart,
bool stopAfterFullLoad)
bool stopAfterFullLoad, string datePartitionExpression = null)
{
this.connectionString = connectionString;
this.schemaName = schemaName;
Expand All @@ -52,6 +53,7 @@ private SqlServerChangeTrackingSource(string connectionString, string schemaName
this.lookBackRange = lookBackRange;
this.fullLoadOnstart = fullLoadOnstart;
this.stopAfterFullLoad = stopAfterFullLoad;
this.datePartitionExpression = datePartitionExpression;
this.Shape = new SourceShape<List<DataCell>>(this.Out);
}

Expand Down Expand Up @@ -83,7 +85,7 @@ public Schema GetParquetSchema()
mergeExpression,
columnExpression,
matchExpression,
long.MaxValue), sqlCon);
long.MaxValue, this.datePartitionExpression), sqlCon);
command.CommandTimeout = this.commandTimeout;

using var schemaReader = command.ExecuteReader(CommandBehavior.SchemaOnly);
Expand All @@ -108,19 +110,19 @@ public SourceTags GetDefaultTags()
/// <param name="connectionString">Connection string, including database name.</param>
/// <param name="schemaName">Schema name for the target table.</param>
/// <param name="tableName">Table name.</param>
/// <param name="streamKind">Stream kind</param>
/// <param name="changeCaptureInterval">How often to track changes.</param>
/// <param name="commandTimeout">Timeout for sql commands issued by this source.</param>
/// <param name="lookBackRange">Timestamp to get minimum commit_ts from.</param>
/// <param name="fullLoadOnStart">Set to true to stream full current version of the table first.</param>
/// <param name="stopAfterFullLoad">Set to true if stream should stop after full load is finished</param>
/// <param name="partitioningExpression">Optional expression to use to generate <see cref="Constants.DATE_PARTITION_KEY"/></param>
/// <returns></returns>
[ExcludeFromCodeCoverage(Justification = "Factory method")]
public static SqlServerChangeTrackingSource Create(
string connectionString,
string schemaName,
string tableName,
string streamKind,
string partitioningExpression = null,
TimeSpan? changeCaptureInterval = null,
int commandTimeout = 3600,
int lookBackRange = 86400,
Expand All @@ -133,9 +135,16 @@ public static SqlServerChangeTrackingSource Create(
$"{nameof(fullLoadOnStart)} must be true if {nameof(stopAfterFullLoad)} is set to true");
}

return new SqlServerChangeTrackingSource(connectionString, schemaName, tableName,
changeCaptureInterval.GetValueOrDefault(TimeSpan.FromSeconds(15)), commandTimeout, lookBackRange,
fullLoadOnStart, stopAfterFullLoad);
return new SqlServerChangeTrackingSource(
connectionString: connectionString,
schemaName: schemaName,
tableName: tableName,
changeCaptureInterval: changeCaptureInterval.GetValueOrDefault(TimeSpan.FromSeconds(15)),
commandTimeout: commandTimeout,
lookBackRange: lookBackRange,
fullLoadOnstart: fullLoadOnStart,
stopAfterFullLoad: stopAfterFullLoad,
datePartitionExpression: partitioningExpression);
}

/// <inheritdoc cref="GraphStage{TShape}.CreateLogic"/>
Expand All @@ -145,34 +154,53 @@ protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
}

private string GetChangesQuery(string mergeExpression, string columnStatement, string matchStatement,
long changeTrackingId)
long changeTrackingId, string partitionExpression = null)
{
var sqlConBuilder = new SqlConnectionStringBuilder(this.connectionString);
return File
.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Sources", "SqlServer", "SqlSnippets",
"GetSelectDeltaQuery.sql"))
var baseQuery = string.IsNullOrEmpty(partitionExpression) switch
{
true => File
.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Sources", "SqlServer", "SqlSnippets",
"GetSelectDeltaQuery.sql")),
false => File
.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Sources", "SqlServer", "SqlSnippets",
"GetSelectDeltaQuery_date_partitioned.sql"))
};
return baseQuery
.Replace("{dbName}", sqlConBuilder.InitialCatalog)
.Replace("{schema}", this.schemaName)
.Replace("{tableName}", this.tableName)
.Replace("{ChangeTrackingColumnsStatement}", columnStatement)
.Replace("{ChangeTrackingMatchStatement}", matchStatement)
.Replace("{MERGE_EXPRESSION}", mergeExpression)
.Replace("{MERGE_KEY}", Constants.UPSERT_MERGE_KEY)
.Replace("{DATE_PARTITION_EXPRESSION}", partitionExpression)
.Replace("{DATE_PARTITION_KEY}", Constants.DATE_PARTITION_KEY)
.Replace("{lastId}", changeTrackingId.ToString());
}

private string GetAllQuery(string mergeExpression, string columnStatement)
private string GetAllQuery(string mergeExpression, string columnStatement, string partitionExpression = null)
{
var sqlConBuilder = new SqlConnectionStringBuilder(this.connectionString);
return File
.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Sources", "SqlServer", "SqlSnippets",
"GetSelectAllQuery.sql"))
var baseQuery = string.IsNullOrEmpty(partitionExpression) switch
{
true => File
.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Sources", "SqlServer", "SqlSnippets",
"GetSelectAllQuery.sql")),
false => File
.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Sources", "SqlServer", "SqlSnippets",
"GetSelectAllQuery_date_partitioned.sql"))
};

return baseQuery
.Replace("{dbName}", sqlConBuilder.InitialCatalog)
.Replace("{schema}", this.schemaName)
.Replace("{tableName}", this.tableName)
.Replace("{ChangeTrackingColumnsStatement}", columnStatement)
.Replace("{MERGE_EXPRESSION}", mergeExpression)
.Replace("{MERGE_KEY}", Constants.UPSERT_MERGE_KEY);
.Replace("{MERGE_KEY}", Constants.UPSERT_MERGE_KEY)
.Replace("{DATE_PARTITION_EXPRESSION}", partitionExpression)
.Replace("{DATE_PARTITION_KEY}", Constants.DATE_PARTITION_KEY);
}

private sealed class SourceLogic : PollingSourceLogic, IStopAfterBackfill
Expand Down Expand Up @@ -377,15 +405,15 @@ private void GetChanges()
var query = this.source.GetChangesQuery(this.mergeExpression,
this.columnExpression,
this.matchExpression,
newVersion.GetValueOrDefault(long.MaxValue) - 1);
newVersion.GetValueOrDefault(long.MaxValue) - 1, this.source.datePartitionExpression);
this.command = new SqlCommand(query, this.sqlConnection)
{
CommandTimeout = this.source.commandTimeout
};
this.TryExecuteReader();

if (this.reader.HasRows)
// reset current version so it can be updated from the source
// reset current version so it can be updated from the source
{
this.currentVersion = 0;
}
Expand Down Expand Up @@ -453,7 +481,7 @@ public override void PreStart()
this.Log.Info("Fetching all rows for the latest version of an entity {database}.{schema}.{table}",
this.sqlConnection.Database, this.source.schemaName, this.source.tableName);

var query = this.source.GetAllQuery(this.mergeExpression, this.GetChangeTrackingColumns("tq"));
var query = this.source.GetAllQuery(this.mergeExpression, this.GetChangeTrackingColumns("tq"), this.source.datePartitionExpression);
this.command = new SqlCommand(query, this.sqlConnection)
{
CommandTimeout = this.source.commandTimeout
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION()

SELECT
{ChangeTrackingColumnsStatement},
@currentVersion AS 'ChangeTrackingVersion',
lower(convert(nvarchar(128), HashBytes('SHA2_256', {MERGE_EXPRESSION}),2)) as [{MERGE_KEY}],
{DATE_PARTITION_EXPRESSION} as [{DATE_PARTITION_KEY}]
FROM [{dbName}].[{schema}].[{tableName}] tq
2 changes: 1 addition & 1 deletion src/Sources/SqlServer/SqlSnippets/GetSelectDeltaQuery.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ SELECT
{ChangeTrackingColumnsStatement},
@currentVersion AS 'ChangeTrackingVersion',
lower(convert(nvarchar(128), HashBytes('SHA2_256', {MERGE_EXPRESSION}),2)) as [{MERGE_KEY}]
FROM [{dbName}].[{schema}].[{tableName}] tq
FROM [{dbName}].[{schema}].[{tableName}] tq
RIGHT JOIN (SELECT ct.* FROM CHANGETABLE (CHANGES [{dbName}].[{schema}].[{tableName}], {lastId}) ct ) ct ON {ChangeTrackingMatchStatement}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION()

SELECT
{ChangeTrackingColumnsStatement},
@currentVersion AS 'ChangeTrackingVersion',
lower(convert(nvarchar(128), HashBytes('SHA2_256', {MERGE_EXPRESSION}),2)) as [{MERGE_KEY}],
{DATE_PARTITION_EXPRESSION} as [{DATE_PARTITION_KEY}]
FROM [{dbName}].[{schema}].[{tableName}] tq
RIGHT JOIN (SELECT ct.* FROM CHANGETABLE (CHANGES [{dbName}].[{schema}].[{tableName}], {lastId}) ct ) ct ON {ChangeTrackingMatchStatement}
18 changes: 13 additions & 5 deletions test/SinkTests/MultilineJsonSinkTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,23 @@ public async Task OverwritesExistingSchemaMetadata()
{
new StreamPartition
{
Name = "date",
FieldName = "my_column_with_date",
FieldFormat = "datetime"
Description = "region",
FieldName = "my_column_with_region",
FieldFormat = "string"
},
new StreamPartition
{
Name = "sales_organisation",
Description = "sales_organisation",
FieldName = "my_column_with_sales_org",
FieldFormat = "string"
},
new StreamPartition
{
Description = "date_month",
FieldName = "",
FieldFormat = "",
FieldExpression = "date_format(cast('test' as date), 'yyyMM')",
IsDatePartition = true
}
});
var sink = MultilineJsonSink.Create(this.mockBlobStorageService.Object,
Expand All @@ -138,7 +146,7 @@ public async Task OverwritesExistingSchemaMetadata()
await Source.From(mockIn).Select(v => v.ToList()).RunWith(sink, this.akkaFixture.Materializer);

var expectedMetadata =
"""[{"name":"date","field_name":"my_column_with_date","field_format":"datetime"},{"name":"sales_organisation","field_name":"my_column_with_sales_org","field_format":"string"}]""";
"""[{"description":"region","field_name":"my_column_with_region","field_format":"string","field_expression":null,"is_date_partition":false},{"description":"sales_organisation","field_name":"my_column_with_sales_org","field_format":"string","field_expression":null,"is_date_partition":false},{"description":"date_month","field_name":"","field_format":"","field_expression":"date_format(cast(\u0027test\u0027 as date), \u0027yyyMM\u0027)","is_date_partition":true}]""";
this.mockBlobStorageService.Verify(m => m.SaveTextAsBlob(expectedMetadata, $"{basePath}/metadata", "v0/partitions.json"), Times.Once);
}
}
Loading

0 comments on commit 4a20ebc

Please sign in to comment.