From d03e662ead8793397df1c055ecfb06b760b83a3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeppe=20Johan=20Waarkj=C3=A6r=20Olsen?= Date: Thu, 13 Jun 2024 16:02:45 +0200 Subject: [PATCH 01/10] Move CsvOperations to common extensions --- .../{CdmChangeFeedSource => }/Extensions/CsvOperations.cs | 8 ++++---- test/Operations/CsvOperationsTests.cs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) rename src/Sources/{CdmChangeFeedSource => }/Extensions/CsvOperations.cs (93%) diff --git a/src/Sources/CdmChangeFeedSource/Extensions/CsvOperations.cs b/src/Sources/Extensions/CsvOperations.cs similarity index 93% rename from src/Sources/CdmChangeFeedSource/Extensions/CsvOperations.cs rename to src/Sources/Extensions/CsvOperations.cs index 0d891ca..02b9aff 100644 --- a/src/Sources/CdmChangeFeedSource/Extensions/CsvOperations.cs +++ b/src/Sources/Extensions/CsvOperations.cs @@ -1,8 +1,8 @@ -using System; +using System; using System.Linq; using System.Text.RegularExpressions; -namespace Arcane.Framework.Sources.CdmChangeFeedSource.Extensions; +namespace Arcane.Framework.Sources.Extensions; /// /// Contains operations for parsing CSV files. @@ -16,7 +16,7 @@ public static class CsvOperations /// Number of expected headers. /// Delimiter used in the line. /// A string array of individual values. - public static string[] ParseCsvLine(string line, int headerCount, char delimiter = ',') + public static string[] ParseCsvLine(this string line, int headerCount, char delimiter = ',') { var result = new string[headerCount]; var fieldCounter = 0; @@ -91,7 +91,7 @@ public static bool IsComplete(string csvLine) /// /// A CSV line to purge newlines from. /// A CSV line without newline characters inside column values. - public static string ReplaceQuotedNewlines(string csvLine) + public static string ReplaceQuotedNewlines(this string csvLine) { return Regex.Replace(csvLine, "\"[^\"]*(?:\"\"[^\"]*)*\"", m => m.Value.Replace("\n", "")).Replace("\r", ""); } diff --git a/test/Operations/CsvOperationsTests.cs b/test/Operations/CsvOperationsTests.cs index cbb4696..de35f7d 100644 --- a/test/Operations/CsvOperationsTests.cs +++ b/test/Operations/CsvOperationsTests.cs @@ -1,5 +1,5 @@ using System; -using Arcane.Framework.Sources.CdmChangeFeedSource.Extensions; +using Arcane.Framework.Sources.Extensions; using Xunit; namespace Arcane.Framework.Tests.Operations; From 0e77bee5e028b4d97b353af3f7f0e58362697f71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeppe=20Johan=20Waarkj=C3=A6r=20Olsen?= Date: Thu, 13 Jun 2024 16:04:01 +0200 Subject: [PATCH 02/10] Initial working version --- .../SalesForce/Models/SalesForceAttribute.cs | 127 +++++ .../SalesForce/Models/SalesForceEntity.cs | 100 ++++ .../SalesForce/Models/SalesForceJob.cs | 50 ++ src/Sources/SalesForce/SalesForceSource.cs | 523 ++++++++++++++++++ ...ISalesForceAuthenticatedMessageProvider.cs | 17 + ...namicBearerAuthenticatedMessageProvider.cs | 83 +++ test/Sources/SalesforceSourceTests.cs | 299 ++++++++++ 7 files changed, 1199 insertions(+) create mode 100644 src/Sources/SalesForce/Models/SalesForceAttribute.cs create mode 100644 src/Sources/SalesForce/Models/SalesForceEntity.cs create mode 100644 src/Sources/SalesForce/Models/SalesForceJob.cs create mode 100644 src/Sources/SalesForce/SalesForceSource.cs create mode 100644 src/Sources/SalesForce/Services/AuthenticatedMessageProviders/Base/ISalesForceAuthenticatedMessageProvider.cs create mode 100644 src/Sources/SalesForce/Services/AuthenticatedMessageProviders/DynamicBearerAuthenticatedMessageProvider.cs create mode 100644 test/Sources/SalesforceSourceTests.cs diff --git a/src/Sources/SalesForce/Models/SalesForceAttribute.cs b/src/Sources/SalesForce/Models/SalesForceAttribute.cs new file mode 100644 index 0000000..596244d --- /dev/null +++ b/src/Sources/SalesForce/Models/SalesForceAttribute.cs @@ -0,0 +1,127 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Arcane.Framework.Sources.SalesForce.Models; + +/// +/// Represents CDM Change Feed attribute +/// +public class SalesForceAttribute +{ + private static readonly Dictionary salesforceTypeMap = new() + { + { "string", typeof(string) }, + { "id", typeof(string) }, + { "address", typeof(string) }, + { "datetime", typeof(DateTime) }, + { "date", typeof(DateTime) }, + { "decimal", typeof(decimal) }, + { "integer", typeof(int) }, + { "long", typeof(long) }, + { "double", typeof(double) }, + { "boolean", typeof(bool) }, + }; + + /// + /// Attribute name + /// + [JsonPropertyName("Name")] + public string Name { get; set; } + + /// + /// Attribute data format + /// + [JsonPropertyName("ValueTypeId")] + public string DataType { get; set; } + + + /// + /// Attribute comparer + /// + public static IEqualityComparer SalesForceAttributeComparer { get; } = + new SalesForceAttributeEqualityComparer(); + + /// + /// Returns true if the type is composition of other types + /// + /// + /// + // public static bool IsComplexType(string cdmTypeName) + // { + // return !cdmTypeMap.ContainsKey(cdmTypeName.ToLower()); + // } + + // /// + // /// // Maps CDM type to .NET type + // /// + // /// CDM type name + // /// .NET type instance + // /// Thrown if type is not supported + public static Type MapSalesforceType(string salesforceTypeName) + { + if (salesforceTypeMap.ContainsKey(salesforceTypeName.ToLower())) + { + return salesforceTypeMap[salesforceTypeName.ToLower()]; + } + + throw new InvalidOperationException($"Unsupported type: {salesforceTypeName}"); + } + + // /// + // /// Resolves complex type + // /// + // /// Start element for type resolution + // /// Element types list + // /// + // public static string ResolveComplexType(JsonElement startFrom, IEnumerable types) + // { + // var objectKind = startFrom.GetProperty("extendsDataType").ValueKind; + // var typeName = objectKind == JsonValueKind.Object + // ? startFrom.GetProperty("extendsDataType").GetProperty("dataTypeReference").GetString() + // : startFrom.GetProperty("extendsDataType").GetString(); + // if (!IsComplexType(typeName)) + // { + // return typeName; + // } + + // return ResolveComplexType( + // types.Where(tp => tp.GetProperty("dataTypeName").GetString() == typeName).FirstOrDefault(), types); + // } + + private sealed class SalesForceAttributeEqualityComparer : IEqualityComparer + { + public bool Equals(SalesForceAttribute x, SalesForceAttribute y) + { + if (ReferenceEquals(x, y)) + { + return true; + } + + if (ReferenceEquals(x, null)) + { + return false; + } + + if (ReferenceEquals(y, null)) + { + return false; + } + + if (x.GetType() != y.GetType()) + { + return false; + } + + return x.Name == y.Name && + x.DataType == y.DataType; + } + + public int GetHashCode(SalesForceAttribute obj) + { + return HashCode.Combine(obj.Name, obj.DataType); + } + } +} diff --git a/src/Sources/SalesForce/Models/SalesForceEntity.cs b/src/Sources/SalesForce/Models/SalesForceEntity.cs new file mode 100644 index 0000000..affd8f7 --- /dev/null +++ b/src/Sources/SalesForce/Models/SalesForceEntity.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Text.Json; + +namespace Arcane.Framework.Sources.SalesForce.Models; + +/// +/// Represents CDM Change Feed entity +/// +public class SalesForceEntity +{ + /// + /// Entity name + /// + public string EntityName { get; set; } + + + /// + /// Attributes collection + /// + public SalesForceAttribute[] Attributes { get; set; } + + /// + /// Comparer class + /// + public static IEqualityComparer SalesForceEntityComparer { get; } = + new SalesForceEntityEqualityComparer(); + + /// + /// Parse CDM entity from a JSON document + /// + /// Json document to parse + /// Parsed SimpleCdmEntity object + public static SalesForceEntity FromJson(string entityName, JsonDocument document) + { + var entity = new SalesForceEntity + { + EntityName = entityName, + Attributes = document.RootElement.GetProperty("records").Deserialize() + }; + + + return entity; + } + + /// + /// Create DataReader for the entity + /// + /// Column to merge by + /// DataReader instance + public IDataReader GetReader() + { + var dt = new DataTable(); + + foreach (var attr in this.Attributes) + { + dt.Columns.Add(new DataColumn(attr.Name, SalesForceAttribute.MapSalesforceType(attr.DataType))); + } + + // dt.Columns.Add(new DataColumn(mergeColumnName, typeof(string))); + + return dt.CreateDataReader(); + } + + private sealed class SalesForceEntityEqualityComparer : IEqualityComparer + { + public bool Equals(SalesForceEntity x, SalesForceEntity y) + { + if (ReferenceEquals(x, y)) + { + return true; + } + + if (ReferenceEquals(x, null)) + { + return false; + } + + if (ReferenceEquals(y, null)) + { + return false; + } + + if (x.GetType() != y.GetType()) + { + return false; + } + + return x.EntityName == y.EntityName + && x.Attributes.SequenceEqual(y.Attributes, SalesForceAttribute.SalesForceAttributeComparer); + } + + public int GetHashCode(SalesForceEntity obj) + { + return HashCode.Combine(obj.EntityName, obj.Attributes); + } + } +} diff --git a/src/Sources/SalesForce/Models/SalesForceJob.cs b/src/Sources/SalesForce/Models/SalesForceJob.cs new file mode 100644 index 0000000..de5afd2 --- /dev/null +++ b/src/Sources/SalesForce/Models/SalesForceJob.cs @@ -0,0 +1,50 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Http; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Arcane.Framework.Sources.SalesForce.Models; + + +[JsonConverter(typeof(JsonStringEnumConverter))] +public enum SalesforceJobStatus +{ + UploadComplete, + InProgress, + Aborted, + JobComplete, + Failed, + None +} + +/// +/// Represents CDM Change Feed entity +/// +public class SalesForceJob +{ + /// + /// Id + /// + [JsonPropertyName("id")] + public string Id { get; set; } + + + /// + /// Attributes collection + /// + [JsonPropertyName("state")] + public SalesforceJobStatus Status { get; set; } + + [JsonPropertyName("object")] + public string Object { get; set; } + + + [JsonPropertyName("totalProcessingTime")] + public long? TotalProcessingTime { get; set; } + + [JsonPropertyName("numberRecordsProcessed")] + public long? NumberRecordsProcessed { get; set; } + +} diff --git a/src/Sources/SalesForce/SalesForceSource.cs b/src/Sources/SalesForce/SalesForceSource.cs new file mode 100644 index 0000000..3538bfe --- /dev/null +++ b/src/Sources/SalesForce/SalesForceSource.cs @@ -0,0 +1,523 @@ +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Configuration; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Json; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Event; +using Akka.Streams; +using Akka.Streams.Stage; +using Akka.Util; +using Akka.Util.Extensions; +using Arcane.Framework.Contracts; +using Arcane.Framework.Sinks.Parquet; +using Arcane.Framework.Sinks.Parquet.Models; +using Arcane.Framework.Sources.Base; +using Arcane.Framework.Sources.Exceptions; +using Arcane.Framework.Sources.Extensions; +using Arcane.Framework.Sources.SalesForce.Models; +using Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders; +using Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders.Base; +using Arcane.Framework.Sources.SalesForce.Services.UriProviders; +using Arcane.Framework.Sources.SalesForce.Services.UriProviders.Base; +using Cassandra; +using Microsoft.IdentityModel.Tokens; +using Parquet.Data; +using Polly.RateLimit; +using Snd.Sdk.Tasks; +using ZstdNet; + +namespace Arcane.Framework.Sources.SalesForce; + +/// +/// Source for reading data from SalesForce BULK v2 API. +/// +public class SalesForceSource : GraphStage>>, IParquetSource, ITaggedSource +{ + private readonly string entityName; + private readonly ISalesForceAuthenticatedMessageProvider _authenticatedMessageProvider; + private readonly bool fullLoadOnStart; + private readonly HttpClient httpClient; + // private readonly AsyncRateLimitPolicy rateLimitPolicy; + // private readonly bool stopAfterFullLoad; + // private readonly ISalesForceUriProvider uriProvider; + private readonly TimeSpan changeCaptureInterval; + + private SalesForceSource( + + ISalesForceAuthenticatedMessageProvider authenticatedMessageProvider, + + string entityName + ) + { + + this._authenticatedMessageProvider = authenticatedMessageProvider; + + this.httpClient = new HttpClient(); + this.entityName = entityName; + + this.Shape = new SourceShape>(this.Out); + } + + + /// + /// Only use this constructor for unit tests to mock http calls. + /// + /// URI provider + /// Authenticated message provider + /// How often to track changes. + /// Look back interval + /// Http client for making requests + /// Api Schema + /// Response property key chain + /// Rate limiting policy instance + /// Set to true to stream full current version of the table first. + /// Set to true if stream should stop after full load is finished + private SalesForceSource( + + ISalesForceAuthenticatedMessageProvider authenticatedMessageProvider, + string entityName, + HttpClient httpClient, + TimeSpan changeCaptureInterval) : this(authenticatedMessageProvider, entityName) + { + this.httpClient = httpClient; + } + + + /// + protected override Attributes InitialAttributes { get; } = Attributes.CreateName(nameof(SalesForceSource)); + + /// + /// Source outlet + /// + public Outlet> Out { get; } = new($"{nameof(SalesForceSource)}.Out"); + + /// + public override SourceShape> Shape { get; } + + /// + public Schema GetParquetSchema() + { + // TODO + var response = this._authenticatedMessageProvider.GetAuthenticatedMessage(httpClient).Map(msg => + + { + msg.RequestUri = new Uri($"https://test.my.salesforce.com/services/data/v60.0/query?q=SELECT Name,DataType,ValueTypeId FROM EntityParticle WHERE EntityDefinition.QualifiedApiName ='{this.entityName}' and dataType != 'address'"); + + return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => + { + if (response.IsSuccessStatusCode) + { + return response.Content.ReadAsStringAsync().Map(value => + { + + return JsonSerializer.Deserialize(value); + + }); + } + + var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; + + // this.Log.Warning(errorMsg); + + throw new HttpRequestException(errorMsg, null, response.StatusCode); + }).Flatten(); + }).Flatten().TryMap(result => result, exception => exception switch + { + HttpRequestException + { + StatusCode: HttpStatusCode.TooManyRequests + } => Option.None, // API rate limit, in case configured rate limit is not good enough + HttpRequestException + { + StatusCode: HttpStatusCode.RequestTimeout + } => Option.None, // Potential server-side timeout due to overload + _ => throw exception + }).Result; + + + + if (response.HasValue) + { + return SalesForceEntity.FromJson(this.entityName, response.Value).GetReader().ToParquetSchema(); + + } + throw new Exception("Yikes"); + // return this.apiSchema.ToParquetSchema(); + } + + /// + public SourceTags GetDefaultTags() + { + return new SourceTags + { + SourceEntity = this.entityName, + }; + } + + /// + /// Creates new instance of + /// + /// URI provider + /// How often to track changes. + /// Look back interval + /// Stream kind + /// Http request rimeout + /// Api Schema + /// Rate limiting policy instance + /// Set to true to stream full current version of the table first. + /// Set to true if stream should stop after full load is finished + /// Authenticated message provider + public static SalesForceSource Create( + DynamicBearerAuthenticatedMessageProvider headerAuthenticatedMessageProvider, + HttpClient httpClient, + string entityName, + TimeSpan changeCaptureInterval + ) + { + return new SalesForceSource(headerAuthenticatedMessageProvider + , entityName, httpClient, changeCaptureInterval); + } + + + + /// + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + return new SourceLogic(this); + } + + private sealed class SourceLogic : TimerGraphStageLogic + { + private const string TimerKey = nameof(SalesForceSource); + private readonly LocalOnlyDecider decider; + private readonly HttpClient httpClient; + private readonly SalesForceSource source; + private HttpResponseMessage currentResponse; + private SalesForceEntity entitySchema; + private Option currentJob; + private Option jobLocator; + + + // private SalesForceEntity salesForceEntity; + private Action>> responseReceived; + + public SourceLogic(SalesForceSource source) : base(source.Shape) + { + this.source = source; + + this.httpClient = this.source.httpClient ?? new HttpClient(); + + this.decider = Decider.From((ex) => ex.GetType().Name switch + { + nameof(ArgumentException) => Directive.Stop, + nameof(ArgumentNullException) => Directive.Stop, + nameof(InvalidOperationException) => Directive.Stop, + nameof(ConfigurationErrorsException) => Directive.Stop, + nameof(ObjectDisposedException) => Directive.Stop, + nameof(IOException) => Directive.Restart, + nameof(TimeoutException) => Directive.Restart, + nameof(HttpRequestException) => Directive.Restart, + _ => Directive.Stop + }); + + this.currentJob = new SalesForceJob + { + Id = "", + NumberRecordsProcessed = 0, + Object = this.source.entityName, + TotalProcessingTime = 0, + Status = SalesforceJobStatus.None + }; + + this.jobLocator = Option.None; + + this.SetHandler(source.Out, this.PullChanges, this.Finish); + } + + /// + public bool IsRunningInBackfillMode { get; set; } + + /// + + private void Finish(Exception cause) + { + if (cause is not null && cause is not SubscriptionWithCancelException.NonFailureCancellation) + { + this.FailStage(cause); + } + + this.httpClient.Dispose(); + } + + public override void PreStart() + { + this.Log.Info("Prestart"); + this.UpdateSchema(); + // base.PreStart(); + + + if (this.source.fullLoadOnStart) + { + this.IsRunningInBackfillMode = true; + } + } + + public void UpdateSchema() + { + var response = this.source._authenticatedMessageProvider.GetAuthenticatedMessage(httpClient).Map(msg => + + { + msg.RequestUri = new Uri($"https://test.my.salesforce.com/services/data/v60.0/query?q=SELECT Name,DataType,ValueTypeId FROM EntityParticle WHERE EntityDefinition.QualifiedApiName ='{this.source.entityName}' and dataType != 'address'"); + + return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => + { + if (response.IsSuccessStatusCode) + { + return response.Content.ReadAsStringAsync().Map(value => + { + this.Log.Info(value); + + return JsonSerializer.Deserialize(value); + + }); + } + + var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; + + // this.Log.Warning(errorMsg); + + throw new HttpRequestException(errorMsg, null, response.StatusCode); + }).Flatten(); + }).Flatten().TryMap(result => result, exception => exception switch + { + HttpRequestException + { + StatusCode: HttpStatusCode.TooManyRequests + } => Option.None, // API rate limit, in case configured rate limit is not good enough + HttpRequestException + { + StatusCode: HttpStatusCode.RequestTimeout + } => Option.None, // Potential server-side timeout due to overload + _ => throw exception + }).Result; + + + + if (response.IsEmpty) + { + this.Log.Warning("Could not update schema"); + } + else + { + var newSchema = SalesForceEntity.FromJson(this.source.entityName, response.Value); + var schemaUnchanged = + SalesForceEntity.SalesForceEntityComparer.Equals(this.entitySchema, newSchema); + this.entitySchema = (this.entitySchema == null, schemaEquals: schemaUnchanged) switch + { + (true, _) or (false, true) => newSchema, + (false, false) => throw new SchemaMismatchException() + }; + } + + } + + private void CreateNewJob() + { + // this.UpdateSchema(); + this.Log.Info("Creating new job"); + var response = this.source._authenticatedMessageProvider.GetAuthenticatedMessage(httpClient).Map(msg => + + { + msg.RequestUri = new Uri("https://test.my.salesforce.com/services/data/v60.0/jobs/query"); + msg.Content = JsonContent.Create(new + { + operation = "query", + query = $"SELECT {this.entitySchema.Attributes.Where(e => e.DataType != "address").Select(e => e.Name).Aggregate((a, b) => a + ", " + b)} FROM {this.entitySchema.EntityName} limit 100", + + }); + msg.Method = HttpMethod.Post; + + return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => + { + if (response.IsSuccessStatusCode) + { + return response.Content.ReadAsStringAsync().Map(value => + { + this.Log.Info(value); + return JsonSerializer.Deserialize(value); + + }); + } + + var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; + + // this.Log.Warning(errorMsg); + + throw new HttpRequestException(errorMsg, null, response.StatusCode); + }).Flatten(); + }).Flatten().TryMap(result => result, exception => exception switch + { + HttpRequestException + { + StatusCode: HttpStatusCode.TooManyRequests + } => Option.None, // API rate limit, in case configured rate limit is not good enough + HttpRequestException + { + StatusCode: HttpStatusCode.RequestTimeout + } => Option.None, // Potential server-side timeout due to overload + _ => throw exception + }).Result; + + if (response.IsEmpty) + { + this.Log.Warning("Could not create job"); + } + else + { + this.currentJob = response.Value; + } + this.ScheduleOnce(TimerKey, TimeSpan.FromSeconds(1)); + } + + private void UpdateJobStatus() + { + this.Log.Info("Updating job status"); + var response = this.source._authenticatedMessageProvider.GetAuthenticatedMessage(httpClient).Map(msg => + + { + msg.RequestUri = new Uri($"https://test.my.salesforce.com/services/data/v60.0/jobs/query/{this.currentJob.Value.Id}"); + + + return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => + { + if (response.IsSuccessStatusCode) + { + return response.Content.ReadAsStringAsync().Map(value => + { + this.Log.Info(value); + return JsonSerializer.Deserialize(value); + + }); + } + + var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; + + // this.Log.Warning(errorMsg); + + throw new HttpRequestException(errorMsg, null, response.StatusCode); + }).Flatten(); + }).Flatten().TryMap(result => result, exception => exception switch + { + HttpRequestException + { + StatusCode: HttpStatusCode.TooManyRequests + } => Option.None, // API rate limit, in case configured rate limit is not good enough + HttpRequestException + { + StatusCode: HttpStatusCode.RequestTimeout + } => Option.None, // Potential server-side timeout due to overload + _ => throw exception + }).Result; + + if (response.IsEmpty) + { + this.Log.Warning("Could not create job"); + } + else + { + this.currentJob = response.Value; + } + this.ScheduleOnce(TimerKey, TimeSpan.FromSeconds(1)); + } + + + private (Type, object) ConvertToSalesforceType(string salesforceDataType, string value) + { + var tp = SalesForceAttribute.MapSalesforceType(salesforceDataType); + var converter = TypeDescriptor.GetConverter(tp); + return (tp, value == "" ? null : converter.ConvertFromInvariantString(value)); + } + + + private void ProcessResult() + { + this.Log.Info("Processing results. bip bop"); + var response = this.source._authenticatedMessageProvider.GetAuthenticatedMessage(httpClient).Map(msg => + { + var locatorString = this.jobLocator.HasValue ? $"locator={this.jobLocator.Value}" : ""; + msg.RequestUri = new Uri($"https://test.my.salesforce.com/services/data/v60.0/jobs/query/{this.currentJob.Value.Id}/results?maxRecords=50&{locatorString}"); + msg.Headers.Add("Accept", "text/csv"); + return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => + { + if (response.IsSuccessStatusCode) + { + return response.Content.ReadAsStringAsync().Map(value => + { + this.Log.Info(value); + this.jobLocator = response.Headers.GetValues("Sforce-Locator").Select(v => v == "null" ? Option.None : v.AsOption()).First(); + return value; + + }); + } + + var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; + + // this.Log.Warning(errorMsg); + + throw new HttpRequestException(errorMsg, null, response.StatusCode); + }).Flatten(); + }).Flatten().Result; + + var rows = response.ReplaceQuotedNewlines().Split("\n").Skip(1).Where(line => !string.IsNullOrEmpty(line)).Select(line => + { + var cells = line.ParseCsvLine(this.entitySchema.Attributes.Length).Select( + (v, ix) => + { + var (tp, value) = this.ConvertToSalesforceType(this.entitySchema.Attributes[ix].DataType, v); + return new DataCell(this.entitySchema.Attributes[ix].Name, tp, + value); + }).ToList(); + return cells; + }); + this.EmitMultiple(this.source.Out, rows); + + if (this.jobLocator.IsEmpty) + { + this.currentJob = Option.None; + this.ScheduleOnce(TimerKey, this.source.changeCaptureInterval); + } + + } + + private void PullChanges() + { + switch (this.currentJob.Select(job => job.Status).GetOrElse(SalesforceJobStatus.None)) + { + case SalesforceJobStatus.UploadComplete:; this.UpdateJobStatus(); break; + case SalesforceJobStatus.InProgress: this.UpdateJobStatus(); break; + case SalesforceJobStatus.Aborted: this.FailStage(new Exception("Something bad happened")); break; + case SalesforceJobStatus.Failed: this.FailStage(new Exception("Something bad happened")); break; + case SalesforceJobStatus.JobComplete: this.ProcessResult(); break; + case SalesforceJobStatus.None: this.CreateNewJob(); break; + default: this.FailStage(new Exception("Something bad happened")); break; + }; + + + + + } + + protected override void OnTimer(object timerKey) + { + this.PullChanges(); + } + } +} diff --git a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/Base/ISalesForceAuthenticatedMessageProvider.cs b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/Base/ISalesForceAuthenticatedMessageProvider.cs new file mode 100644 index 0000000..e87d9be --- /dev/null +++ b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/Base/ISalesForceAuthenticatedMessageProvider.cs @@ -0,0 +1,17 @@ +using System.Net.Http; +using System.Threading.Tasks; + +namespace Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders.Base; + +/// +/// Authentication message provider interface for various REST API authentication methods. +/// +public interface ISalesForceAuthenticatedMessageProvider +{ + /// + /// Generates authenticated message for the REST API request. + /// + /// HTTP client + /// Authenticated message + Task GetAuthenticatedMessage(HttpClient httpClient); +} diff --git a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/DynamicBearerAuthenticatedMessageProvider.cs b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/DynamicBearerAuthenticatedMessageProvider.cs new file mode 100644 index 0000000..118f794 --- /dev/null +++ b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/DynamicBearerAuthenticatedMessageProvider.cs @@ -0,0 +1,83 @@ +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders.Base; +using Snd.Sdk.Tasks; + +namespace Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders; + +/// +/// Authenticated message provider that generated dynamic bearer token header. +/// +public record DynamicBearerAuthenticatedMessageProvider : ISalesForceAuthenticatedMessageProvider +{ + private readonly TimeSpan expirationPeriod; + private string currentToken; + private DateTimeOffset? validTo; + private readonly Uri tokenSource; + private readonly string accountName; + private readonly string clientId; + private readonly string clientSecret; + private readonly string username; + private readonly string password; + private readonly string securityToken; + + /// + /// Authenticated message provider that generated dynamic bearer token header. + /// + + public DynamicBearerAuthenticatedMessageProvider(string accountName, string clientId, string clientSecret, string username, string password, string securityToken) + { + this.tokenSource = new Uri($"https://{accountName}/services/oauth2/token"); + this.clientId = clientId; + this.clientSecret = clientSecret; + this.username = username; + this.password = password; + this.securityToken = securityToken; + } + + /// + public Task GetAuthenticatedMessage(HttpClient httpClient) + { + if (this.validTo.GetValueOrDefault(DateTimeOffset.MaxValue) < + DateTimeOffset.UtcNow.Subtract(TimeSpan.FromMinutes(1))) + { + return Task.FromResult(new HttpRequestMessage + { + Headers = { Authorization = new AuthenticationHeaderValue("Bearer", this.currentToken) } + }); + } + var dict = new Dictionary + { + { "grant_type", "password" }, + { "client_id", this.clientId }, + { "client_secret", this.clientSecret }, + { "username", this.username }, + { "password", $"{this.password}{this.securityToken}" }, + }; + var tokenHrm = new HttpRequestMessage(HttpMethod.Post, this.tokenSource) + { + Content = new FormUrlEncodedContent(dict) + }; + + return httpClient.SendAsync(tokenHrm, CancellationToken.None).Map(response => + { + response.EnsureSuccessStatusCode(); + return response.Content.ReadAsStringAsync(); + }).FlatMap(result => + { + var tokenResponse = JsonSerializer.Deserialize(result); + this.currentToken = tokenResponse.GetProperty("access_token").GetString(); + this.validTo = DateTimeOffset.UtcNow.Add(this.expirationPeriod); + return new HttpRequestMessage + { + Headers = { Authorization = new AuthenticationHeaderValue("Bearer", this.currentToken) } + }; + }); + } +} diff --git a/test/Sources/SalesforceSourceTests.cs b/test/Sources/SalesforceSourceTests.cs new file mode 100644 index 0000000..c9809ad --- /dev/null +++ b/test/Sources/SalesforceSourceTests.cs @@ -0,0 +1,299 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams.Dsl; +using Akka.Util; +using Arcane.Framework.Sinks.Parquet.Models; +using Arcane.Framework.Sources.SalesForce; +using Arcane.Framework.Sources.SalesForce.Models; +using Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders; +using Arcane.Framework.Tests.Fixtures; +using Microsoft.OpenApi.Models; +using Moq; +using Polly; +using Snd.Sdk.Tasks; +using Xunit; + +namespace Arcane.Framework.Tests.Sources; + +public class SalesforceSourceTests : IClassFixture +{ + private readonly AkkaFixture akkaFixture; + // private readonly SimpleUriProvider db; + private readonly DynamicBearerAuthenticatedMessageProvider dynamicAuth; + private readonly Mock mockHttp; + // private readonly PagedUriProvider pdb; + + public SalesforceSourceTests(AkkaFixture akkaFixture) + { + this.akkaFixture = akkaFixture; + this.mockHttp = new Mock(); + // this.db = new SimpleUriProvider("https://localhost/data?date=@date", new List + // { + // new() + // { + // FieldName = "date", FieldType = TemplatedFieldType.FILTER_DATE_FROM, + // FormatString = "yyyy-MM-ddTHH:mm:ssZ", Placement = TemplatedFieldPlacement.URL + // } + // }, new DateTimeOffset(2023, 1, 1, 0, 0, 0, TimeSpan.Zero), HttpMethod.Get); + // this.fa = new FixedHeaderAuthenticatedMessageProvider(new Dictionary { { "Bearer", "test" } }); + // this.dynamicAuth = + // new DynamicBearerAuthenticatedMessageProvider("https://localhost/auth", "token", "expiresIn"); + // this.pdb = new PagedUriProvider( + // "https://localhost/data_paged?page=@page&filter=updatedAt>=<@dateFrom,@dateTo", + // new List + // { + // new() + // { + // FieldName = "page", FieldType = TemplatedFieldType.RESPONSE_PAGE, FormatString = string.Empty, + // Placement = TemplatedFieldPlacement.URL + // }, + // new() + // { + // FieldName = "dateFrom", FieldType = TemplatedFieldType.FILTER_DATE_BETWEEN_FROM, + // FormatString = "yyyyMMddHHmmss", Placement = TemplatedFieldPlacement.URL + // }, + // new() + // { + // FieldName = "dateTo", FieldType = TemplatedFieldType.FILTER_DATE_BETWEEN_TO, + // FormatString = "yyyyMMddHHmmss", Placement = TemplatedFieldPlacement.URL + // } + // }, + // new DateTimeOffset(2023, 1, 1, 0, 0, 0, TimeSpan.Zero), + // HttpMethod.Get) + // .WithPageResolver(new PageResolverConfiguration + // { ResolverPropertyKeyChain = new[] { "TotalPages" }, ResolverType = PageResolverType.COUNTER }); + } + + [Fact] + public async Task TokenGeneration() + { + var mockContent = new + { + access_token = "", + instance_url = "https://test.my.salesforce.com", + id = "https://test.salesforce.com/id/00D3E000000D2OCUA0/0057Y0000068CtmQAE", + token_type = "Bearer", + issued_at = "1718179384536", + signature = "abc" + }; + + this.mockHttp + .Setup(http => + http.SendAsync( + It.Is(msg => + msg.RequestUri == new Uri("https://test.my.salesforce.com/services/oauth2/token")), + It.IsAny())) + .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.OK) + { Content = new StringContent(JsonSerializer.Serialize(mockContent)) }); + + + var auth = new DynamicBearerAuthenticatedMessageProvider("test.my.salesforce.com", "client_id", "client_secret", "user_name", "password", "security_token"); + + var token = await auth.GetAuthenticatedMessage(this.mockHttp.Object); + Assert.Equal("", token.Headers.Authorization.Parameter); + } + + + + [Fact] + public async Task JobDeserialization() + { + + + var job = JsonSerializer.Deserialize(@"{ + ""id"": ""750QI000007MuMUYA0"", + ""operation"": ""query"", + ""object"": ""ECCO_Consent__c"", + ""createdById"": ""0057Y0000068CtmQAE"", + ""createdDate"": ""2024-06-11T08:30:10.000+0000"", + ""systemModstamp"": ""2024-06-11T08:30:49.000+0000"", + ""state"": ""JobComplete"", + ""concurrencyMode"": ""Parallel"", + ""contentType"": ""CSV"", + ""apiVersion"": 60.0, + ""jobType"": ""V2Query"", + ""lineEnding"": ""LF"", + ""columnDelimiter"": ""COMMA"", + ""numberRecordsProcessed"": 449464, + ""retries"": 0, + ""totalProcessingTime"": 63674, + ""isPkChunkingSupported"": true +}"); + + } + + [Fact] + public async void RunStream() + { + + var mockTokenContent = new + { + access_token = "", + instance_url = "https://test.my.salesforce.com", + id = "https://test.salesforce.com/id/00D3E000000D2OCUA0/0057Y0000068CtmQAE", + token_type = "Bearer", + issued_at = "1718179384536", + signature = "abc" + }; + + this.mockHttp + .Setup(http => + http.SendAsync( + It.Is(msg => + msg.RequestUri == new Uri("https://test.my.salesforce.com/services/oauth2/token")), + It.IsAny())) + .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.OK) + { Content = new StringContent(JsonSerializer.Serialize(mockTokenContent)) }); + + var mockCreateJobContent = new + { + id = "750QI000007QKdRYAW", + operation = "query", + @object = "ECCO_Consent__c", + createdById = "0057Y0000068CtmQAE", + createdDate = "2024-06-13T06:57:56.000+0000", + systemModstamp = "2024-06-13T06:57:56.000+0000", + state = "UploadComplete", + concurrencyMode = "Parallel", + contentType = "CSV", + apiVersion = 60.0, + lineEnding = "LF", + columnDelimiter = "COMMA" + }; + + this.mockHttp + .Setup(http => + http.SendAsync( + It.Is(msg => + msg.RequestUri == new Uri("https://test.my.salesforce.com/services/data/v60.0/jobs/query") && + msg.Method == HttpMethod.Post), + It.IsAny())) + .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.OK) + { Content = new StringContent(JsonSerializer.Serialize(mockCreateJobContent)) }); + + var mockGetSchemaContent = new + { + totalSize = 2, + done = true, + records = new[]{ + new { + + Name = "Id", + @DataType = "id", + ValueTypeId = "id" + }, + new { + + Name = "Name", + @DataType = "string", + ValueTypeId = "string" + } + } + }; + + this.mockHttp + .Setup(http => + http.SendAsync( + It.Is(msg => + msg.RequestUri.ToString().Contains("https://test.my.salesforce.com/services/data/v60.0/query?q=")), + It.IsAny())) + .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.OK) + { Content = new StringContent(JsonSerializer.Serialize(mockGetSchemaContent)) }); + + + + var mockUpdateJobStatusContent = new + { + id = "750QI000007QKdRYAW", + operation = "query", + @object = "ECCO_Consent__c", + createdById = "0057Y0000068CtmQAE", + createdDate = "2024-06-12T08:03:19.000+0000", + systemModstamp = "2024-06-12T08:03:41.000+0000", + state = "JobComplete", + concurrencyMode = "Parallel", + contentType = "CSV", + apiVersion = 60.0, + jobType = "V2Query", + lineEnding = "LF", + columnDelimiter = "COMMA", + numberRecordsProcessed = 449465, + retries = 0, + totalProcessingTime = 72011, + isPkChunkingSupported = true + }; + + this.mockHttp + .Setup(http => + http.SendAsync( + It.Is(msg => + msg.RequestUri == new Uri("https://test.my.salesforce.com/services/data/v60.0/jobs/query/750QI000007QKdRYAW")), + It.IsAny())) + .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.OK) + { Content = new StringContent(JsonSerializer.Serialize(mockUpdateJobStatusContent)) }); + + var mockGetResultContent = @"""Id"",""Name"" +""1a"",""aa"" +""2b"",""bb"" +""3c"",""cc"" +""4d"",""dd"" +""5e"",""ee"""; + var mockGetResultResponse = new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StringContent(mockGetResultContent) + }; + mockGetResultResponse.Headers.Add("Sforce-Locator", "abcdefg"); + + + this.mockHttp + .Setup(http => + http.SendAsync( + It.Is(msg => + msg.RequestUri == new Uri("https://test.my.salesforce.com/services/data/v60.0/jobs/query/750QI000007QKdRYAW/results?maxRecords=50&")), + It.IsAny())) + .ReturnsAsync(mockGetResultResponse); + + var mockGetResultContent2 = @"""Id"",""Name"" +""6f"",""ff"" +""7g"",""gg"" +""8h"",""hh"" +""9i"",""ii"" +""10j"",""jj"""; + var mockGetResultResponse2 = new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StringContent(mockGetResultContent2) + }; + mockGetResultResponse2.Headers.Add("Sforce-Locator", "null"); + + + this.mockHttp + .Setup(http => + http.SendAsync( + It.Is(msg => + msg.RequestUri == new Uri("https://test.my.salesforce.com/services/data/v60.0/jobs/query/750QI000007QKdRYAW/results?maxRecords=50&locator=abcdefg")), + It.IsAny())) + .ReturnsAsync(mockGetResultResponse2); + + var auth = new DynamicBearerAuthenticatedMessageProvider("test.my.salesforce.com", "client_id", "client_secret", "user_name", "password", "security_token"); + + var source = SalesForceSource.Create(auth, this.mockHttp.Object, "account", TimeSpan.FromSeconds(5)); + + var result = await Source.FromGraph(source) + .Take(10) + .RunWith(Sink.Seq>(), this.akkaFixture.Materializer); + + Assert.Equal(10, result.Count); + } + + private class MockResult + { + public string MockValue { get; set; } + } +} From 6e7caedfa050e67de0de8f514484ef99db62faf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeppe=20Johan=20Waarkj=C3=A6r=20Olsen?= Date: Fri, 14 Jun 2024 11:44:13 +0200 Subject: [PATCH 03/10] Clean up --- .../SalesForceJobAbortedException.cs | 19 + .../Exceptions/SalesForceJobException.cs | 21 ++ .../SalesForceJobFailedException.cs | 19 + .../SalesForce/Models/SalesForceAttribute.cs | 45 +-- .../SalesForce/Models/SalesForceEntity.cs | 4 +- .../SalesForce/Models/SalesForceJob.cs | 23 +- src/Sources/SalesForce/SalesForceSource.cs | 337 ++---------------- ...ISalesForceAuthenticatedMessageProvider.cs | 17 - ...namicBearerAuthenticatedMessageProvider.cs | 83 ----- .../SalesForceJobProvider.cs | 321 +++++++++++++++++ test/Sources/SalesforceSourceTests.cs | 93 +---- 11 files changed, 449 insertions(+), 533 deletions(-) create mode 100644 src/Sources/SalesForce/Exceptions/SalesForceJobAbortedException.cs create mode 100644 src/Sources/SalesForce/Exceptions/SalesForceJobException.cs create mode 100644 src/Sources/SalesForce/Exceptions/SalesForceJobFailedException.cs delete mode 100644 src/Sources/SalesForce/Services/AuthenticatedMessageProviders/Base/ISalesForceAuthenticatedMessageProvider.cs delete mode 100644 src/Sources/SalesForce/Services/AuthenticatedMessageProviders/DynamicBearerAuthenticatedMessageProvider.cs create mode 100644 src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs diff --git a/src/Sources/SalesForce/Exceptions/SalesForceJobAbortedException.cs b/src/Sources/SalesForce/Exceptions/SalesForceJobAbortedException.cs new file mode 100644 index 0000000..d2838e7 --- /dev/null +++ b/src/Sources/SalesForce/Exceptions/SalesForceJobAbortedException.cs @@ -0,0 +1,19 @@ +using System; +using System.Diagnostics.CodeAnalysis; + +namespace Arcane.Framework.Sources.SalesForce.Exceptions; + +/// +/// Thrown if the Salesforce job was aborted. +/// +[ExcludeFromCodeCoverage(Justification = "Trivial")] +public class SalesForceJobAbortedException : SalesForceJobException +{ + public SalesForceJobAbortedException(string message) : base(message) + { + } + + public SalesForceJobAbortedException(string message, Exception inner) : base(message, inner) + { + } +} diff --git a/src/Sources/SalesForce/Exceptions/SalesForceJobException.cs b/src/Sources/SalesForce/Exceptions/SalesForceJobException.cs new file mode 100644 index 0000000..2412bdc --- /dev/null +++ b/src/Sources/SalesForce/Exceptions/SalesForceJobException.cs @@ -0,0 +1,21 @@ +using System; +using System.Diagnostics.CodeAnalysis; + +namespace Arcane.Framework.Sources.SalesForce.Exceptions; + +/// +/// Base class for all Salesforce-related exceptions +/// +[ExcludeFromCodeCoverage(Justification = "Trivial")] +public class SalesForceJobException : Exception +{ + public SalesForceJobException(string message) + : base(message) + { + } + + public SalesForceJobException(string message, Exception inner) + : base(message, inner) + { + } +} diff --git a/src/Sources/SalesForce/Exceptions/SalesForceJobFailedException.cs b/src/Sources/SalesForce/Exceptions/SalesForceJobFailedException.cs new file mode 100644 index 0000000..18b1ffa --- /dev/null +++ b/src/Sources/SalesForce/Exceptions/SalesForceJobFailedException.cs @@ -0,0 +1,19 @@ +using System; +using System.Diagnostics.CodeAnalysis; + +namespace Arcane.Framework.Sources.SalesForce.Exceptions; + +/// +/// Thrown if the Salesforce job return with failed status. +/// +[ExcludeFromCodeCoverage(Justification = "Trivial")] +public class SalesForceJobFailedException : SalesForceJobException +{ + public SalesForceJobFailedException(string message) : base(message) + { + } + + public SalesForceJobFailedException(string message, Exception inner) : base(message, inner) + { + } +} diff --git a/src/Sources/SalesForce/Models/SalesForceAttribute.cs b/src/Sources/SalesForce/Models/SalesForceAttribute.cs index 596244d..0721c23 100644 --- a/src/Sources/SalesForce/Models/SalesForceAttribute.cs +++ b/src/Sources/SalesForce/Models/SalesForceAttribute.cs @@ -1,13 +1,11 @@ using System; using System.Collections.Generic; -using System.Linq; -using System.Text.Json; using System.Text.Json.Serialization; namespace Arcane.Framework.Sources.SalesForce.Models; /// -/// Represents CDM Change Feed attribute +/// Represents Salesforce attribute /// public class SalesForceAttribute { @@ -32,7 +30,7 @@ public class SalesForceAttribute public string Name { get; set; } /// - /// Attribute data format + /// Attribute data type /// [JsonPropertyName("ValueTypeId")] public string DataType { get; set; } @@ -45,21 +43,11 @@ public class SalesForceAttribute new SalesForceAttributeEqualityComparer(); /// - /// Returns true if the type is composition of other types + /// // Maps Salesforce type to .NET type /// - /// - /// - // public static bool IsComplexType(string cdmTypeName) - // { - // return !cdmTypeMap.ContainsKey(cdmTypeName.ToLower()); - // } - - // /// - // /// // Maps CDM type to .NET type - // /// - // /// CDM type name - // /// .NET type instance - // /// Thrown if type is not supported + /// Salesforce type name + /// .NET type instance + /// Thrown if type is not supported public static Type MapSalesforceType(string salesforceTypeName) { if (salesforceTypeMap.ContainsKey(salesforceTypeName.ToLower())) @@ -70,27 +58,6 @@ public static Type MapSalesforceType(string salesforceTypeName) throw new InvalidOperationException($"Unsupported type: {salesforceTypeName}"); } - // /// - // /// Resolves complex type - // /// - // /// Start element for type resolution - // /// Element types list - // /// - // public static string ResolveComplexType(JsonElement startFrom, IEnumerable types) - // { - // var objectKind = startFrom.GetProperty("extendsDataType").ValueKind; - // var typeName = objectKind == JsonValueKind.Object - // ? startFrom.GetProperty("extendsDataType").GetProperty("dataTypeReference").GetString() - // : startFrom.GetProperty("extendsDataType").GetString(); - // if (!IsComplexType(typeName)) - // { - // return typeName; - // } - - // return ResolveComplexType( - // types.Where(tp => tp.GetProperty("dataTypeName").GetString() == typeName).FirstOrDefault(), types); - // } - private sealed class SalesForceAttributeEqualityComparer : IEqualityComparer { public bool Equals(SalesForceAttribute x, SalesForceAttribute y) diff --git a/src/Sources/SalesForce/Models/SalesForceEntity.cs b/src/Sources/SalesForce/Models/SalesForceEntity.cs index affd8f7..260e25c 100644 --- a/src/Sources/SalesForce/Models/SalesForceEntity.cs +++ b/src/Sources/SalesForce/Models/SalesForceEntity.cs @@ -29,8 +29,9 @@ public class SalesForceEntity new SalesForceEntityEqualityComparer(); /// - /// Parse CDM entity from a JSON document + /// Parse Salesforce entity from a JSON document /// + /// Name of the Salesforce entity /// Json document to parse /// Parsed SimpleCdmEntity object public static SalesForceEntity FromJson(string entityName, JsonDocument document) @@ -48,7 +49,6 @@ public static SalesForceEntity FromJson(string entityName, JsonDocument document /// /// Create DataReader for the entity /// - /// Column to merge by /// DataReader instance public IDataReader GetReader() { diff --git a/src/Sources/SalesForce/Models/SalesForceJob.cs b/src/Sources/SalesForce/Models/SalesForceJob.cs index de5afd2..8d4ad26 100644 --- a/src/Sources/SalesForce/Models/SalesForceJob.cs +++ b/src/Sources/SalesForce/Models/SalesForceJob.cs @@ -1,13 +1,10 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net.Http; -using System.Text.Json; using System.Text.Json.Serialization; namespace Arcane.Framework.Sources.SalesForce.Models; - +/// +/// Job status types +/// [JsonConverter(typeof(JsonStringEnumConverter))] public enum SalesforceJobStatus { @@ -20,7 +17,7 @@ public enum SalesforceJobStatus } /// -/// Represents CDM Change Feed entity +/// Represents Salesforce job /// public class SalesForceJob { @@ -32,18 +29,26 @@ public class SalesForceJob /// - /// Attributes collection + /// Job status /// [JsonPropertyName("state")] public SalesforceJobStatus Status { get; set; } + /// + /// object + /// [JsonPropertyName("object")] public string Object { get; set; } - + /// + /// Total processing time of the job + /// [JsonPropertyName("totalProcessingTime")] public long? TotalProcessingTime { get; set; } + /// + /// Numbers of records processed by the job + /// [JsonPropertyName("numberRecordsProcessed")] public long? NumberRecordsProcessed { get; set; } diff --git a/src/Sources/SalesForce/SalesForceSource.cs b/src/Sources/SalesForce/SalesForceSource.cs index 3538bfe..5a96726 100644 --- a/src/Sources/SalesForce/SalesForceSource.cs +++ b/src/Sources/SalesForce/SalesForceSource.cs @@ -1,38 +1,23 @@ using System; using System.Collections.Generic; -using System.ComponentModel; using System.Configuration; using System.IO; using System.Linq; -using System.Net; using System.Net.Http; -using System.Net.Http.Json; -using System.Text.Json; -using System.Threading; -using System.Threading.Tasks; using Akka.Actor; using Akka.Event; using Akka.Streams; using Akka.Streams.Stage; using Akka.Util; -using Akka.Util.Extensions; using Arcane.Framework.Contracts; using Arcane.Framework.Sinks.Parquet; using Arcane.Framework.Sinks.Parquet.Models; using Arcane.Framework.Sources.Base; using Arcane.Framework.Sources.Exceptions; -using Arcane.Framework.Sources.Extensions; using Arcane.Framework.Sources.SalesForce.Models; using Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders; -using Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders.Base; -using Arcane.Framework.Sources.SalesForce.Services.UriProviders; -using Arcane.Framework.Sources.SalesForce.Services.UriProviders.Base; -using Cassandra; -using Microsoft.IdentityModel.Tokens; +using Arcane.Framework.Sources.SalesForce.Exceptions; using Parquet.Data; -using Polly.RateLimit; -using Snd.Sdk.Tasks; -using ZstdNet; namespace Arcane.Framework.Sources.SalesForce; @@ -42,26 +27,21 @@ namespace Arcane.Framework.Sources.SalesForce; public class SalesForceSource : GraphStage>>, IParquetSource, ITaggedSource { private readonly string entityName; - private readonly ISalesForceAuthenticatedMessageProvider _authenticatedMessageProvider; - private readonly bool fullLoadOnStart; + private readonly SalesForceJobProvider jobProvider; private readonly HttpClient httpClient; - // private readonly AsyncRateLimitPolicy rateLimitPolicy; - // private readonly bool stopAfterFullLoad; - // private readonly ISalesForceUriProvider uriProvider; + private readonly TimeSpan changeCaptureInterval; private SalesForceSource( - - ISalesForceAuthenticatedMessageProvider authenticatedMessageProvider, - - string entityName + SalesForceJobProvider jobProvider, + string entityName, + TimeSpan changeCaptureInterval ) { - - this._authenticatedMessageProvider = authenticatedMessageProvider; - + this.jobProvider = jobProvider; this.httpClient = new HttpClient(); this.entityName = entityName; + this.changeCaptureInterval = changeCaptureInterval; this.Shape = new SourceShape>(this.Out); } @@ -70,22 +50,18 @@ string entityName /// /// Only use this constructor for unit tests to mock http calls. /// - /// URI provider - /// Authenticated message provider - /// How often to track changes. - /// Look back interval + /// Salesforce Job Provider + /// Name of Salesforce entity /// Http client for making requests - /// Api Schema - /// Response property key chain - /// Rate limiting policy instance - /// Set to true to stream full current version of the table first. - /// Set to true if stream should stop after full load is finished + /// How often to track changes + /// private SalesForceSource( - ISalesForceAuthenticatedMessageProvider authenticatedMessageProvider, + SalesForceJobProvider jobProvider, string entityName, HttpClient httpClient, - TimeSpan changeCaptureInterval) : this(authenticatedMessageProvider, entityName) + TimeSpan changeCaptureInterval + ) : this(jobProvider, entityName, changeCaptureInterval) { this.httpClient = httpClient; } @@ -105,52 +81,10 @@ private SalesForceSource( /// public Schema GetParquetSchema() { - // TODO - var response = this._authenticatedMessageProvider.GetAuthenticatedMessage(httpClient).Map(msg => - - { - msg.RequestUri = new Uri($"https://test.my.salesforce.com/services/data/v60.0/query?q=SELECT Name,DataType,ValueTypeId FROM EntityParticle WHERE EntityDefinition.QualifiedApiName ='{this.entityName}' and dataType != 'address'"); - - return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => - { - if (response.IsSuccessStatusCode) - { - return response.Content.ReadAsStringAsync().Map(value => - { - - return JsonSerializer.Deserialize(value); + var schema = this.jobProvider.GetSchema(httpClient, this.entityName).Result; - }); - } + return schema.Value.GetReader().ToParquetSchema(); - var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; - - // this.Log.Warning(errorMsg); - - throw new HttpRequestException(errorMsg, null, response.StatusCode); - }).Flatten(); - }).Flatten().TryMap(result => result, exception => exception switch - { - HttpRequestException - { - StatusCode: HttpStatusCode.TooManyRequests - } => Option.None, // API rate limit, in case configured rate limit is not good enough - HttpRequestException - { - StatusCode: HttpStatusCode.RequestTimeout - } => Option.None, // Potential server-side timeout due to overload - _ => throw exception - }).Result; - - - - if (response.HasValue) - { - return SalesForceEntity.FromJson(this.entityName, response.Value).GetReader().ToParquetSchema(); - - } - throw new Exception("Yikes"); - // return this.apiSchema.ToParquetSchema(); } /// @@ -163,26 +97,20 @@ public SourceTags GetDefaultTags() } /// - /// Creates new instance of + /// Creates a new instance of /// - /// URI provider - /// How often to track changes. - /// Look back interval - /// Stream kind - /// Http request rimeout - /// Api Schema - /// Rate limiting policy instance - /// Set to true to stream full current version of the table first. - /// Set to true if stream should stop after full load is finished - /// Authenticated message provider + /// + /// + /// + /// public static SalesForceSource Create( - DynamicBearerAuthenticatedMessageProvider headerAuthenticatedMessageProvider, + SalesForceJobProvider jobProvider, HttpClient httpClient, string entityName, TimeSpan changeCaptureInterval ) { - return new SalesForceSource(headerAuthenticatedMessageProvider + return new SalesForceSource(jobProvider , entityName, httpClient, changeCaptureInterval); } @@ -200,14 +128,10 @@ private sealed class SourceLogic : TimerGraphStageLogic private readonly LocalOnlyDecider decider; private readonly HttpClient httpClient; private readonly SalesForceSource source; - private HttpResponseMessage currentResponse; private SalesForceEntity entitySchema; private Option currentJob; - private Option jobLocator; - // private SalesForceEntity salesForceEntity; - private Action>> responseReceived; public SourceLogic(SalesForceSource source) : base(source.Shape) { @@ -228,25 +152,12 @@ public SourceLogic(SalesForceSource source) : base(source.Shape) _ => Directive.Stop }); - this.currentJob = new SalesForceJob - { - Id = "", - NumberRecordsProcessed = 0, - Object = this.source.entityName, - TotalProcessingTime = 0, - Status = SalesforceJobStatus.None - }; + this.currentJob = Option.None; - this.jobLocator = Option.None; this.SetHandler(source.Out, this.PullChanges, this.Finish); } - /// - public bool IsRunningInBackfillMode { get; set; } - - /// - private void Finish(Exception cause) { if (cause is not null && cause is not SubscriptionWithCancelException.NonFailureCancellation) @@ -261,68 +172,23 @@ public override void PreStart() { this.Log.Info("Prestart"); this.UpdateSchema(); - // base.PreStart(); - - - if (this.source.fullLoadOnStart) - { - this.IsRunningInBackfillMode = true; - } } public void UpdateSchema() { - var response = this.source._authenticatedMessageProvider.GetAuthenticatedMessage(httpClient).Map(msg => + var newSchema = this.source.jobProvider.GetSchema(httpClient, this.source.entityName).Result; - { - msg.RequestUri = new Uri($"https://test.my.salesforce.com/services/data/v60.0/query?q=SELECT Name,DataType,ValueTypeId FROM EntityParticle WHERE EntityDefinition.QualifiedApiName ='{this.source.entityName}' and dataType != 'address'"); - - return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => - { - if (response.IsSuccessStatusCode) - { - return response.Content.ReadAsStringAsync().Map(value => - { - this.Log.Info(value); - - return JsonSerializer.Deserialize(value); - - }); - } - - var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; - - // this.Log.Warning(errorMsg); - - throw new HttpRequestException(errorMsg, null, response.StatusCode); - }).Flatten(); - }).Flatten().TryMap(result => result, exception => exception switch - { - HttpRequestException - { - StatusCode: HttpStatusCode.TooManyRequests - } => Option.None, // API rate limit, in case configured rate limit is not good enough - HttpRequestException - { - StatusCode: HttpStatusCode.RequestTimeout - } => Option.None, // Potential server-side timeout due to overload - _ => throw exception - }).Result; - - - - if (response.IsEmpty) + if (newSchema.IsEmpty) { this.Log.Warning("Could not update schema"); } else { - var newSchema = SalesForceEntity.FromJson(this.source.entityName, response.Value); var schemaUnchanged = - SalesForceEntity.SalesForceEntityComparer.Equals(this.entitySchema, newSchema); + SalesForceEntity.SalesForceEntityComparer.Equals(this.entitySchema, newSchema.Value); this.entitySchema = (this.entitySchema == null, schemaEquals: schemaUnchanged) switch { - (true, _) or (false, true) => newSchema, + (true, _) or (false, true) => newSchema.Value, (false, false) => throw new SchemaMismatchException() }; } @@ -331,58 +197,16 @@ public void UpdateSchema() private void CreateNewJob() { - // this.UpdateSchema(); this.Log.Info("Creating new job"); - var response = this.source._authenticatedMessageProvider.GetAuthenticatedMessage(httpClient).Map(msg => + var job = this.source.jobProvider.CreateJob(httpClient, this.entitySchema).Result; - { - msg.RequestUri = new Uri("https://test.my.salesforce.com/services/data/v60.0/jobs/query"); - msg.Content = JsonContent.Create(new - { - operation = "query", - query = $"SELECT {this.entitySchema.Attributes.Where(e => e.DataType != "address").Select(e => e.Name).Aggregate((a, b) => a + ", " + b)} FROM {this.entitySchema.EntityName} limit 100", - - }); - msg.Method = HttpMethod.Post; - - return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => - { - if (response.IsSuccessStatusCode) - { - return response.Content.ReadAsStringAsync().Map(value => - { - this.Log.Info(value); - return JsonSerializer.Deserialize(value); - - }); - } - - var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; - - // this.Log.Warning(errorMsg); - - throw new HttpRequestException(errorMsg, null, response.StatusCode); - }).Flatten(); - }).Flatten().TryMap(result => result, exception => exception switch - { - HttpRequestException - { - StatusCode: HttpStatusCode.TooManyRequests - } => Option.None, // API rate limit, in case configured rate limit is not good enough - HttpRequestException - { - StatusCode: HttpStatusCode.RequestTimeout - } => Option.None, // Potential server-side timeout due to overload - _ => throw exception - }).Result; - - if (response.IsEmpty) + if (job.IsEmpty) { this.Log.Warning("Could not create job"); } else { - this.currentJob = response.Value; + this.currentJob = job; } this.ScheduleOnce(TimerKey, TimeSpan.FromSeconds(1)); } @@ -390,42 +214,7 @@ private void CreateNewJob() private void UpdateJobStatus() { this.Log.Info("Updating job status"); - var response = this.source._authenticatedMessageProvider.GetAuthenticatedMessage(httpClient).Map(msg => - - { - msg.RequestUri = new Uri($"https://test.my.salesforce.com/services/data/v60.0/jobs/query/{this.currentJob.Value.Id}"); - - - return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => - { - if (response.IsSuccessStatusCode) - { - return response.Content.ReadAsStringAsync().Map(value => - { - this.Log.Info(value); - return JsonSerializer.Deserialize(value); - - }); - } - - var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; - - // this.Log.Warning(errorMsg); - - throw new HttpRequestException(errorMsg, null, response.StatusCode); - }).Flatten(); - }).Flatten().TryMap(result => result, exception => exception switch - { - HttpRequestException - { - StatusCode: HttpStatusCode.TooManyRequests - } => Option.None, // API rate limit, in case configured rate limit is not good enough - HttpRequestException - { - StatusCode: HttpStatusCode.RequestTimeout - } => Option.None, // Potential server-side timeout due to overload - _ => throw exception - }).Result; + var response = this.source.jobProvider.GetJobStatus(httpClient, this.currentJob.Value).Result; if (response.IsEmpty) { @@ -433,63 +222,18 @@ private void UpdateJobStatus() } else { - this.currentJob = response.Value; + this.currentJob = response; } this.ScheduleOnce(TimerKey, TimeSpan.FromSeconds(1)); } - - private (Type, object) ConvertToSalesforceType(string salesforceDataType, string value) - { - var tp = SalesForceAttribute.MapSalesforceType(salesforceDataType); - var converter = TypeDescriptor.GetConverter(tp); - return (tp, value == "" ? null : converter.ConvertFromInvariantString(value)); - } - - private void ProcessResult() { - this.Log.Info("Processing results. bip bop"); - var response = this.source._authenticatedMessageProvider.GetAuthenticatedMessage(httpClient).Map(msg => - { - var locatorString = this.jobLocator.HasValue ? $"locator={this.jobLocator.Value}" : ""; - msg.RequestUri = new Uri($"https://test.my.salesforce.com/services/data/v60.0/jobs/query/{this.currentJob.Value.Id}/results?maxRecords=50&{locatorString}"); - msg.Headers.Add("Accept", "text/csv"); - return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => - { - if (response.IsSuccessStatusCode) - { - return response.Content.ReadAsStringAsync().Map(value => - { - this.Log.Info(value); - this.jobLocator = response.Headers.GetValues("Sforce-Locator").Select(v => v == "null" ? Option.None : v.AsOption()).First(); - return value; - }); - } - - var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; - - // this.Log.Warning(errorMsg); - - throw new HttpRequestException(errorMsg, null, response.StatusCode); - }).Flatten(); - }).Flatten().Result; - - var rows = response.ReplaceQuotedNewlines().Split("\n").Skip(1).Where(line => !string.IsNullOrEmpty(line)).Select(line => - { - var cells = line.ParseCsvLine(this.entitySchema.Attributes.Length).Select( - (v, ix) => - { - var (tp, value) = this.ConvertToSalesforceType(this.entitySchema.Attributes[ix].DataType, v); - return new DataCell(this.entitySchema.Attributes[ix].Name, tp, - value); - }).ToList(); - return cells; - }); + var rows = this.source.jobProvider.GetJobResult(httpClient, this.currentJob.Value, this.entitySchema).Result; this.EmitMultiple(this.source.Out, rows); - if (this.jobLocator.IsEmpty) + if (!rows.Any()) { this.currentJob = Option.None; this.ScheduleOnce(TimerKey, this.source.changeCaptureInterval); @@ -503,16 +247,11 @@ private void PullChanges() { case SalesforceJobStatus.UploadComplete:; this.UpdateJobStatus(); break; case SalesforceJobStatus.InProgress: this.UpdateJobStatus(); break; - case SalesforceJobStatus.Aborted: this.FailStage(new Exception("Something bad happened")); break; - case SalesforceJobStatus.Failed: this.FailStage(new Exception("Something bad happened")); break; + case SalesforceJobStatus.Aborted: this.FailStage(new SalesForceJobFailedException("Something bad happened")); break; + case SalesforceJobStatus.Failed: this.FailStage(new SalesForceJobAbortedException("Something bad happened")); break; case SalesforceJobStatus.JobComplete: this.ProcessResult(); break; case SalesforceJobStatus.None: this.CreateNewJob(); break; - default: this.FailStage(new Exception("Something bad happened")); break; }; - - - - } protected override void OnTimer(object timerKey) diff --git a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/Base/ISalesForceAuthenticatedMessageProvider.cs b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/Base/ISalesForceAuthenticatedMessageProvider.cs deleted file mode 100644 index e87d9be..0000000 --- a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/Base/ISalesForceAuthenticatedMessageProvider.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System.Net.Http; -using System.Threading.Tasks; - -namespace Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders.Base; - -/// -/// Authentication message provider interface for various REST API authentication methods. -/// -public interface ISalesForceAuthenticatedMessageProvider -{ - /// - /// Generates authenticated message for the REST API request. - /// - /// HTTP client - /// Authenticated message - Task GetAuthenticatedMessage(HttpClient httpClient); -} diff --git a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/DynamicBearerAuthenticatedMessageProvider.cs b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/DynamicBearerAuthenticatedMessageProvider.cs deleted file mode 100644 index 118f794..0000000 --- a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/DynamicBearerAuthenticatedMessageProvider.cs +++ /dev/null @@ -1,83 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Net.Http; -using System.Net.Http.Headers; -using System.Text; -using System.Text.Json; -using System.Threading; -using System.Threading.Tasks; -using Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders.Base; -using Snd.Sdk.Tasks; - -namespace Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders; - -/// -/// Authenticated message provider that generated dynamic bearer token header. -/// -public record DynamicBearerAuthenticatedMessageProvider : ISalesForceAuthenticatedMessageProvider -{ - private readonly TimeSpan expirationPeriod; - private string currentToken; - private DateTimeOffset? validTo; - private readonly Uri tokenSource; - private readonly string accountName; - private readonly string clientId; - private readonly string clientSecret; - private readonly string username; - private readonly string password; - private readonly string securityToken; - - /// - /// Authenticated message provider that generated dynamic bearer token header. - /// - - public DynamicBearerAuthenticatedMessageProvider(string accountName, string clientId, string clientSecret, string username, string password, string securityToken) - { - this.tokenSource = new Uri($"https://{accountName}/services/oauth2/token"); - this.clientId = clientId; - this.clientSecret = clientSecret; - this.username = username; - this.password = password; - this.securityToken = securityToken; - } - - /// - public Task GetAuthenticatedMessage(HttpClient httpClient) - { - if (this.validTo.GetValueOrDefault(DateTimeOffset.MaxValue) < - DateTimeOffset.UtcNow.Subtract(TimeSpan.FromMinutes(1))) - { - return Task.FromResult(new HttpRequestMessage - { - Headers = { Authorization = new AuthenticationHeaderValue("Bearer", this.currentToken) } - }); - } - var dict = new Dictionary - { - { "grant_type", "password" }, - { "client_id", this.clientId }, - { "client_secret", this.clientSecret }, - { "username", this.username }, - { "password", $"{this.password}{this.securityToken}" }, - }; - var tokenHrm = new HttpRequestMessage(HttpMethod.Post, this.tokenSource) - { - Content = new FormUrlEncodedContent(dict) - }; - - return httpClient.SendAsync(tokenHrm, CancellationToken.None).Map(response => - { - response.EnsureSuccessStatusCode(); - return response.Content.ReadAsStringAsync(); - }).FlatMap(result => - { - var tokenResponse = JsonSerializer.Deserialize(result); - this.currentToken = tokenResponse.GetProperty("access_token").GetString(); - this.validTo = DateTimeOffset.UtcNow.Add(this.expirationPeriod); - return new HttpRequestMessage - { - Headers = { Authorization = new AuthenticationHeaderValue("Bearer", this.currentToken) } - }; - }); - } -} diff --git a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs new file mode 100644 index 0000000..188376c --- /dev/null +++ b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs @@ -0,0 +1,321 @@ +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Net.Http.Json; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Akka.Util; +using Akka.Util.Extensions; +using Arcane.Framework.Sinks.Parquet.Models; +using Arcane.Framework.Sources.Extensions; +using Arcane.Framework.Sources.SalesForce.Models; +using Snd.Sdk.Tasks; + +namespace Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders; + +/// +/// Authenticated message provider that generated dynamic bearer token header. +/// +public record SalesForceJobProvider +{ + private readonly TimeSpan expirationPeriod; + private string currentToken; + private DateTimeOffset? validTo; + private readonly Uri tokenSource; + private readonly string accountName; + private readonly string clientId; + private readonly string clientSecret; + private readonly string username; + private readonly string password; + private readonly string securityToken; + private readonly string apiVersion; + private Option currentJobLocator; + private readonly Option rowsPerPage; + + /// + /// Authenticated message provider that generated dynamic bearer token header. + /// + + public SalesForceJobProvider(string accountName, string clientId, string clientSecret, string username, string password, string securityToken, string apiVersion, int rowsPerPage) + { + this.tokenSource = new Uri($"https://{accountName}/services/oauth2/token"); + this.accountName = accountName; + this.clientId = clientId; + this.clientSecret = clientSecret; + this.username = username; + this.password = password; + this.securityToken = securityToken; + this.apiVersion = apiVersion; + this.currentJobLocator = Option.None; + this.rowsPerPage = rowsPerPage; + } + + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public SalesForceJobProvider(string accountName, string clientId, string clientSecret, string username, string password, string securityToken, string apiVersion) + { + this.tokenSource = new Uri($"https://{accountName}/services/oauth2/token"); + this.accountName = accountName; + this.clientId = clientId; + this.clientSecret = clientSecret; + this.username = username; + this.password = password; + this.securityToken = securityToken; + this.apiVersion = apiVersion; + this.currentJobLocator = Option.None; + this.rowsPerPage = Option.None; + this.expirationPeriod = TimeSpan.FromMinutes(19); + } + + /// + /// Generates authenticated message for the REST API request. + /// + /// HTTP client + /// Authenticated message + public Task GetAuthenticatedMessage(HttpClient httpClient) + { + if (this.validTo.GetValueOrDefault(DateTimeOffset.MaxValue) < + DateTimeOffset.UtcNow.Subtract(TimeSpan.FromMinutes(1))) + { + return Task.FromResult(new HttpRequestMessage + { + Headers = { Authorization = new AuthenticationHeaderValue("Bearer", this.currentToken) } + }); + } + var dict = new Dictionary + { + { "grant_type", "password" }, + { "client_id", this.clientId }, + { "client_secret", this.clientSecret }, + { "username", this.username }, + { "password", $"{this.password}{this.securityToken}" }, + }; + var tokenHrm = new HttpRequestMessage(HttpMethod.Post, this.tokenSource) + { + Content = new FormUrlEncodedContent(dict) + }; + + return httpClient.SendAsync(tokenHrm, CancellationToken.None).Map(response => + { + response.EnsureSuccessStatusCode(); + return response.Content.ReadAsStringAsync(); + }).FlatMap(result => + { + var tokenResponse = JsonSerializer.Deserialize(result); + this.currentToken = tokenResponse.GetProperty("access_token").GetString(); + this.validTo = DateTimeOffset.UtcNow.Add(this.expirationPeriod); + return new HttpRequestMessage + { + Headers = { Authorization = new AuthenticationHeaderValue("Bearer", this.currentToken) } + }; + }); + } + + /// + /// + /// + /// + /// + /// + public Task> GetSchema(HttpClient httpClient, string entityName) + { + return this.GetAuthenticatedMessage(httpClient).Map(msg => + + { + msg.RequestUri = new Uri($"https://{this.accountName}/services/data/{this.apiVersion}/query?q=SELECT Name,DataType,ValueTypeId FROM EntityParticle WHERE EntityDefinition.QualifiedApiName ='{entityName}' and dataType != 'address'"); + + return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => + { + if (response.IsSuccessStatusCode) + { + return response.Content.ReadAsStringAsync().Map(value => + { + + return SalesForceEntity.FromJson(entityName, JsonSerializer.Deserialize(value)).AsOption(); + + }); + } + + var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; + + // this.Log.Warning(errorMsg); + + throw new HttpRequestException(errorMsg, null, response.StatusCode); + }).Flatten(); + }).Flatten().TryMap(result => result, exception => exception switch + { + HttpRequestException + { + StatusCode: HttpStatusCode.TooManyRequests + } => Option.None, // API rate limit, in case configured rate limit is not good enough + HttpRequestException + { + StatusCode: HttpStatusCode.RequestTimeout + } => Option.None, // Potential server-side timeout due to overload + _ => throw exception + }); + } + + /// + /// + /// + /// + /// + /// + public Task> CreateJob(HttpClient httpClient, SalesForceEntity entitySchema) + { + return this.GetAuthenticatedMessage(httpClient).Map(msg => + + { + msg.RequestUri = new Uri($"https://{this.accountName}/services/data/{this.apiVersion}/jobs/query"); + msg.Content = JsonContent.Create(new + { + operation = "query", + query = $"SELECT {entitySchema.Attributes.Where(e => e.DataType != "address").Select(e => e.Name).Aggregate((a, b) => a + ", " + b)} FROM {entitySchema.EntityName}", + + }); + msg.Method = HttpMethod.Post; + + return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => + { + if (response.IsSuccessStatusCode) + { + return response.Content.ReadAsStringAsync().Map(value => + { + + return JsonSerializer.Deserialize(value).AsOption(); + + }); + } + + var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; + + throw new HttpRequestException(errorMsg, null, response.StatusCode); + }).Flatten(); + }).Flatten().TryMap(result => result, exception => exception switch + { + HttpRequestException + { + StatusCode: HttpStatusCode.TooManyRequests + } => Option.None, // API rate limit, in case configured rate limit is not good enough + HttpRequestException + { + StatusCode: HttpStatusCode.RequestTimeout + } => Option.None, // Potential server-side timeout due to overload + _ => throw exception + }); + } + + /// + /// + /// + /// + /// + /// + public Task> GetJobStatus(HttpClient httpClient, SalesForceJob job) + { + return this.GetAuthenticatedMessage(httpClient).Map(msg => + + { + msg.RequestUri = new Uri($"https://{this.accountName}/services/data/{this.apiVersion}/jobs/query/{job.Id}"); + + return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => + { + if (response.IsSuccessStatusCode) + { + return response.Content.ReadAsStringAsync().Map(value => + { + + return JsonSerializer.Deserialize(value).AsOption(); + + }); + } + + var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; + + throw new HttpRequestException(errorMsg, null, response.StatusCode); + }).Flatten(); + }).Flatten().TryMap(result => result, exception => exception switch + { + HttpRequestException + { + StatusCode: HttpStatusCode.TooManyRequests + } => Option.None, // API rate limit, in case configured rate limit is not good enough + HttpRequestException + { + StatusCode: HttpStatusCode.RequestTimeout + } => Option.None, // Potential server-side timeout due to overload + _ => throw exception + }); + } + + private (Type, object) ConvertToSalesforceType(string salesforceDataType, string value) + { + var tp = SalesForceAttribute.MapSalesforceType(salesforceDataType); + var converter = TypeDescriptor.GetConverter(tp); + return (tp, value == "" ? null : converter.ConvertFromInvariantString(value)); + } + + /// + /// + /// + /// + /// + /// + /// + public Task>> GetJobResult(HttpClient httpClient, SalesForceJob job, SalesForceEntity entitySchema) + { + return this.GetAuthenticatedMessage(httpClient).Map(msg => + + { + var maxRowString = this.rowsPerPage.HasValue ? $"maxRecords={this.rowsPerPage.Value}" : ""; + var locatorString = this.currentJobLocator.HasValue ? $"locator={this.currentJobLocator.Value}" : ""; + var urlParams = string.Join("&", new[] { maxRowString, locatorString }); + msg.RequestUri = new Uri($"https://{this.accountName}/services/data/{this.apiVersion}/jobs/query/{job.Id}/results?{urlParams}"); + msg.Headers.Add("Accept", "text/csv"); + + return httpClient.SendAsync(msg, default(CancellationToken)).Map(response => + { + if (response.IsSuccessStatusCode) + { + return response.Content.ReadAsStringAsync().Map(value => + { + + this.currentJobLocator = response.Headers.GetValues("Sforce-Locator").Select(h => h == "null" ? Option.None : h.AsOption()).First(); + var rows = value.ReplaceQuotedNewlines().Split("\n").Skip(1).Where(line => !string.IsNullOrEmpty(line)).Select(line => + { + var cells = line.ParseCsvLine(entitySchema.Attributes.Length).Select( + (v, ix) => + { + var (tp, value) = this.ConvertToSalesforceType(entitySchema.Attributes[ix].DataType, v); + return new DataCell(entitySchema.Attributes[ix].Name, tp, + value); + }).ToList(); + return cells; + }); + return rows; + + + }); + } + + var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; + + throw new HttpRequestException(errorMsg, null, response.StatusCode); + }).Flatten(); + }).Flatten(); + } +} diff --git a/test/Sources/SalesforceSourceTests.cs b/test/Sources/SalesforceSourceTests.cs index c9809ad..e640761 100644 --- a/test/Sources/SalesforceSourceTests.cs +++ b/test/Sources/SalesforceSourceTests.cs @@ -3,21 +3,15 @@ using System.Linq; using System.Net; using System.Net.Http; -using System.Net.Http.Headers; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Akka.Streams.Dsl; -using Akka.Util; using Arcane.Framework.Sinks.Parquet.Models; using Arcane.Framework.Sources.SalesForce; -using Arcane.Framework.Sources.SalesForce.Models; using Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders; using Arcane.Framework.Tests.Fixtures; -using Microsoft.OpenApi.Models; using Moq; -using Polly; -using Snd.Sdk.Tasks; using Xunit; namespace Arcane.Framework.Tests.Sources; @@ -25,50 +19,17 @@ namespace Arcane.Framework.Tests.Sources; public class SalesforceSourceTests : IClassFixture { private readonly AkkaFixture akkaFixture; - // private readonly SimpleUriProvider db; - private readonly DynamicBearerAuthenticatedMessageProvider dynamicAuth; + + private readonly SalesForceJobProvider jobProvider; private readonly Mock mockHttp; - // private readonly PagedUriProvider pdb; + public SalesforceSourceTests(AkkaFixture akkaFixture) { this.akkaFixture = akkaFixture; this.mockHttp = new Mock(); - // this.db = new SimpleUriProvider("https://localhost/data?date=@date", new List - // { - // new() - // { - // FieldName = "date", FieldType = TemplatedFieldType.FILTER_DATE_FROM, - // FormatString = "yyyy-MM-ddTHH:mm:ssZ", Placement = TemplatedFieldPlacement.URL - // } - // }, new DateTimeOffset(2023, 1, 1, 0, 0, 0, TimeSpan.Zero), HttpMethod.Get); - // this.fa = new FixedHeaderAuthenticatedMessageProvider(new Dictionary { { "Bearer", "test" } }); - // this.dynamicAuth = - // new DynamicBearerAuthenticatedMessageProvider("https://localhost/auth", "token", "expiresIn"); - // this.pdb = new PagedUriProvider( - // "https://localhost/data_paged?page=@page&filter=updatedAt>=<@dateFrom,@dateTo", - // new List - // { - // new() - // { - // FieldName = "page", FieldType = TemplatedFieldType.RESPONSE_PAGE, FormatString = string.Empty, - // Placement = TemplatedFieldPlacement.URL - // }, - // new() - // { - // FieldName = "dateFrom", FieldType = TemplatedFieldType.FILTER_DATE_BETWEEN_FROM, - // FormatString = "yyyyMMddHHmmss", Placement = TemplatedFieldPlacement.URL - // }, - // new() - // { - // FieldName = "dateTo", FieldType = TemplatedFieldType.FILTER_DATE_BETWEEN_TO, - // FormatString = "yyyyMMddHHmmss", Placement = TemplatedFieldPlacement.URL - // } - // }, - // new DateTimeOffset(2023, 1, 1, 0, 0, 0, TimeSpan.Zero), - // HttpMethod.Get) - // .WithPageResolver(new PageResolverConfiguration - // { ResolverPropertyKeyChain = new[] { "TotalPages" }, ResolverType = PageResolverType.COUNTER }); + this.jobProvider = new SalesForceJobProvider("test.my.salesforce.com", "client_id", "client_secret", "user_name", "password", "security_token", "v60.0", 5); + } [Fact] @@ -94,41 +55,11 @@ public async Task TokenGeneration() { Content = new StringContent(JsonSerializer.Serialize(mockContent)) }); - var auth = new DynamicBearerAuthenticatedMessageProvider("test.my.salesforce.com", "client_id", "client_secret", "user_name", "password", "security_token"); - - var token = await auth.GetAuthenticatedMessage(this.mockHttp.Object); + var token = await this.jobProvider.GetAuthenticatedMessage(this.mockHttp.Object); Assert.Equal("", token.Headers.Authorization.Parameter); } - - [Fact] - public async Task JobDeserialization() - { - - - var job = JsonSerializer.Deserialize(@"{ - ""id"": ""750QI000007MuMUYA0"", - ""operation"": ""query"", - ""object"": ""ECCO_Consent__c"", - ""createdById"": ""0057Y0000068CtmQAE"", - ""createdDate"": ""2024-06-11T08:30:10.000+0000"", - ""systemModstamp"": ""2024-06-11T08:30:49.000+0000"", - ""state"": ""JobComplete"", - ""concurrencyMode"": ""Parallel"", - ""contentType"": ""CSV"", - ""apiVersion"": 60.0, - ""jobType"": ""V2Query"", - ""lineEnding"": ""LF"", - ""columnDelimiter"": ""COMMA"", - ""numberRecordsProcessed"": 449464, - ""retries"": 0, - ""totalProcessingTime"": 63674, - ""isPkChunkingSupported"": true -}"); - - } - [Fact] public async void RunStream() { @@ -256,7 +187,7 @@ public async void RunStream() .Setup(http => http.SendAsync( It.Is(msg => - msg.RequestUri == new Uri("https://test.my.salesforce.com/services/data/v60.0/jobs/query/750QI000007QKdRYAW/results?maxRecords=50&")), + msg.RequestUri == new Uri("https://test.my.salesforce.com/services/data/v60.0/jobs/query/750QI000007QKdRYAW/results?maxRecords=5&")), It.IsAny())) .ReturnsAsync(mockGetResultResponse); @@ -277,13 +208,11 @@ public async void RunStream() .Setup(http => http.SendAsync( It.Is(msg => - msg.RequestUri == new Uri("https://test.my.salesforce.com/services/data/v60.0/jobs/query/750QI000007QKdRYAW/results?maxRecords=50&locator=abcdefg")), + msg.RequestUri == new Uri("https://test.my.salesforce.com/services/data/v60.0/jobs/query/750QI000007QKdRYAW/results?maxRecords=5&locator=abcdefg")), It.IsAny())) .ReturnsAsync(mockGetResultResponse2); - var auth = new DynamicBearerAuthenticatedMessageProvider("test.my.salesforce.com", "client_id", "client_secret", "user_name", "password", "security_token"); - - var source = SalesForceSource.Create(auth, this.mockHttp.Object, "account", TimeSpan.FromSeconds(5)); + var source = SalesForceSource.Create(this.jobProvider, this.mockHttp.Object, "account", TimeSpan.FromSeconds(5)); var result = await Source.FromGraph(source) .Take(10) @@ -292,8 +221,4 @@ public async void RunStream() Assert.Equal(10, result.Count); } - private class MockResult - { - public string MockValue { get; set; } - } } From 6f566fa53c0e3744711bd2bca15cf30c401f1d82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeppe=20Johan=20Waarkj=C3=A6r=20Olsen?= Date: Fri, 14 Jun 2024 11:47:35 +0200 Subject: [PATCH 04/10] Remove address type since it's not supported --- src/Sources/SalesForce/Models/SalesForceAttribute.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Sources/SalesForce/Models/SalesForceAttribute.cs b/src/Sources/SalesForce/Models/SalesForceAttribute.cs index 0721c23..1af865e 100644 --- a/src/Sources/SalesForce/Models/SalesForceAttribute.cs +++ b/src/Sources/SalesForce/Models/SalesForceAttribute.cs @@ -13,7 +13,6 @@ public class SalesForceAttribute { { "string", typeof(string) }, { "id", typeof(string) }, - { "address", typeof(string) }, { "datetime", typeof(DateTime) }, { "date", typeof(DateTime) }, { "decimal", typeof(decimal) }, From e497a467d864901e38d1cb32da097b949f4103a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeppe=20Johan=20Waarkj=C3=A6r=20Olsen?= Date: Fri, 14 Jun 2024 11:48:17 +0200 Subject: [PATCH 05/10] clean up comment --- src/Sources/SalesForce/Models/SalesForceAttribute.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Sources/SalesForce/Models/SalesForceAttribute.cs b/src/Sources/SalesForce/Models/SalesForceAttribute.cs index 1af865e..74050b3 100644 --- a/src/Sources/SalesForce/Models/SalesForceAttribute.cs +++ b/src/Sources/SalesForce/Models/SalesForceAttribute.cs @@ -42,7 +42,7 @@ public class SalesForceAttribute new SalesForceAttributeEqualityComparer(); /// - /// // Maps Salesforce type to .NET type + /// Maps Salesforce type to .NET type /// /// Salesforce type name /// .NET type instance From dda3893a8f8d60985ae83d04a76c6e7521243636 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeppe=20Johan=20Waarkj=C3=A6r=20Olsen?= Date: Fri, 14 Jun 2024 11:49:48 +0200 Subject: [PATCH 06/10] clean up --- src/Sources/SalesForce/Models/SalesForceEntity.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Sources/SalesForce/Models/SalesForceEntity.cs b/src/Sources/SalesForce/Models/SalesForceEntity.cs index 260e25c..2425690 100644 --- a/src/Sources/SalesForce/Models/SalesForceEntity.cs +++ b/src/Sources/SalesForce/Models/SalesForceEntity.cs @@ -7,7 +7,7 @@ namespace Arcane.Framework.Sources.SalesForce.Models; /// -/// Represents CDM Change Feed entity +/// Represents Salesforce entity /// public class SalesForceEntity { @@ -33,7 +33,7 @@ public class SalesForceEntity /// /// Name of the Salesforce entity /// Json document to parse - /// Parsed SimpleCdmEntity object + /// Parsed SalesForceEntity object public static SalesForceEntity FromJson(string entityName, JsonDocument document) { var entity = new SalesForceEntity From 1f9febafd746f76216a1e8be24e44cb27e0d18aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeppe=20Johan=20Waarkj=C3=A6r=20Olsen?= Date: Fri, 14 Jun 2024 11:55:42 +0200 Subject: [PATCH 07/10] Add location tag --- src/Sources/SalesForce/Models/SalesForceEntity.cs | 2 -- src/Sources/SalesForce/SalesForceSource.cs | 2 ++ .../AuthenticatedMessageProviders/SalesForceJobProvider.cs | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/Sources/SalesForce/Models/SalesForceEntity.cs b/src/Sources/SalesForce/Models/SalesForceEntity.cs index 2425690..7977f3a 100644 --- a/src/Sources/SalesForce/Models/SalesForceEntity.cs +++ b/src/Sources/SalesForce/Models/SalesForceEntity.cs @@ -59,8 +59,6 @@ public IDataReader GetReader() dt.Columns.Add(new DataColumn(attr.Name, SalesForceAttribute.MapSalesforceType(attr.DataType))); } - // dt.Columns.Add(new DataColumn(mergeColumnName, typeof(string))); - return dt.CreateDataReader(); } diff --git a/src/Sources/SalesForce/SalesForceSource.cs b/src/Sources/SalesForce/SalesForceSource.cs index 5a96726..c39e397 100644 --- a/src/Sources/SalesForce/SalesForceSource.cs +++ b/src/Sources/SalesForce/SalesForceSource.cs @@ -93,6 +93,8 @@ public SourceTags GetDefaultTags() return new SourceTags { SourceEntity = this.entityName, + SourceLocation = this.jobProvider.accountName + }; } diff --git a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs index 188376c..ac58b1f 100644 --- a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs +++ b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs @@ -27,7 +27,10 @@ public record SalesForceJobProvider private string currentToken; private DateTimeOffset? validTo; private readonly Uri tokenSource; - private readonly string accountName; + /// + /// Salesforce Account + /// + public readonly string accountName; private readonly string clientId; private readonly string clientSecret; private readonly string username; From b2df90d2888c7e0dc1c646155d7ebc2a804032ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeppe=20Johan=20Waarkj=C3=A6r=20Olsen?= Date: Fri, 14 Jun 2024 12:07:11 +0200 Subject: [PATCH 08/10] more descriptive error codes --- src/Sources/SalesForce/SalesForceSource.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Sources/SalesForce/SalesForceSource.cs b/src/Sources/SalesForce/SalesForceSource.cs index c39e397..a8fa7c1 100644 --- a/src/Sources/SalesForce/SalesForceSource.cs +++ b/src/Sources/SalesForce/SalesForceSource.cs @@ -249,8 +249,8 @@ private void PullChanges() { case SalesforceJobStatus.UploadComplete:; this.UpdateJobStatus(); break; case SalesforceJobStatus.InProgress: this.UpdateJobStatus(); break; - case SalesforceJobStatus.Aborted: this.FailStage(new SalesForceJobFailedException("Something bad happened")); break; - case SalesforceJobStatus.Failed: this.FailStage(new SalesForceJobAbortedException("Something bad happened")); break; + case SalesforceJobStatus.Aborted: this.FailStage(new SalesForceJobAbortedException($"job : {this.currentJob.Value.Id} was aborted by source")); break; + case SalesforceJobStatus.Failed: this.FailStage(new SalesForceJobFailedException($"job : {this.currentJob.Value.Id} returned with failure")); break; case SalesforceJobStatus.JobComplete: this.ProcessResult(); break; case SalesforceJobStatus.None: this.CreateNewJob(); break; }; From 77967e782be7ad426d8f005f0eecd9efb5d746a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeppe=20Johan=20Waarkj=C3=A6r=20Olsen?= Date: Fri, 14 Jun 2024 12:42:43 +0200 Subject: [PATCH 09/10] more cleanup --- .../SalesForceJobProvider.cs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs index ac58b1f..59474fc 100644 --- a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs +++ b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs @@ -19,7 +19,7 @@ namespace Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders; /// -/// Authenticated message provider that generated dynamic bearer token header. +/// Salesforce Job provider that handles talking with the Bulk V2 API /// public record SalesForceJobProvider { @@ -41,7 +41,7 @@ public record SalesForceJobProvider private readonly Option rowsPerPage; /// - /// Authenticated message provider that generated dynamic bearer token header. + /// Salesforce Job provider that handles talking with the Bulk V2 API. /// public SalesForceJobProvider(string accountName, string clientId, string clientSecret, string username, string password, string securityToken, string apiVersion, int rowsPerPage) @@ -59,7 +59,7 @@ public SalesForceJobProvider(string accountName, string clientId, string clientS } /// - /// + /// Salesforce Job provider that handles talking with the Bulk V2 API /// /// /// @@ -128,11 +128,10 @@ public Task GetAuthenticatedMessage(HttpClient httpClient) } /// - /// + /// Get the schema of a Salesforce entity /// /// /// - /// public Task> GetSchema(HttpClient httpClient, string entityName) { return this.GetAuthenticatedMessage(httpClient).Map(msg => @@ -154,7 +153,6 @@ public Task> GetSchema(HttpClient httpClient, string en var errorMsg = $"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}"; - // this.Log.Warning(errorMsg); throw new HttpRequestException(errorMsg, null, response.StatusCode); }).Flatten(); @@ -173,11 +171,10 @@ public Task> GetSchema(HttpClient httpClient, string en } /// - /// + /// Submit a new query job /// /// /// - /// public Task> CreateJob(HttpClient httpClient, SalesForceEntity entitySchema) { return this.GetAuthenticatedMessage(httpClient).Map(msg => @@ -223,11 +220,10 @@ public Task> CreateJob(HttpClient httpClient, SalesForceEn } /// - /// + /// Get status of query job /// /// /// - /// public Task> GetJobStatus(HttpClient httpClient, SalesForceJob job) { return this.GetAuthenticatedMessage(httpClient).Map(msg => @@ -273,12 +269,11 @@ public Task> GetJobStatus(HttpClient httpClient, SalesForc } /// - /// + /// Get result of query job /// /// /// /// - /// public Task>> GetJobResult(HttpClient httpClient, SalesForceJob job, SalesForceEntity entitySchema) { return this.GetAuthenticatedMessage(httpClient).Map(msg => From 8e408dde464baa3b1907b4e656d8b095ad3c6bcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeppe=20Johan=20Waarkj=C3=A6r=20Olsen?= Date: Fri, 14 Jun 2024 14:28:11 +0200 Subject: [PATCH 10/10] Fix locator logic --- src/Sources/SalesForce/SalesForceSource.cs | 10 +++++----- .../SalesForceJobProvider.cs | 11 ++++------- test/Sources/SalesforceSourceTests.cs | 4 ++-- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/Sources/SalesForce/SalesForceSource.cs b/src/Sources/SalesForce/SalesForceSource.cs index a8fa7c1..25038a9 100644 --- a/src/Sources/SalesForce/SalesForceSource.cs +++ b/src/Sources/SalesForce/SalesForceSource.cs @@ -132,6 +132,7 @@ private sealed class SourceLogic : TimerGraphStageLogic private readonly SalesForceSource source; private SalesForceEntity entitySchema; private Option currentJob; + private Option nextJobLocator; @@ -155,6 +156,7 @@ public SourceLogic(SalesForceSource source) : base(source.Shape) }); this.currentJob = Option.None; + this.nextJobLocator = Option.None; this.SetHandler(source.Out, this.PullChanges, this.Finish); @@ -172,7 +174,6 @@ private void Finish(Exception cause) public override void PreStart() { - this.Log.Info("Prestart"); this.UpdateSchema(); } @@ -199,7 +200,6 @@ public void UpdateSchema() private void CreateNewJob() { - this.Log.Info("Creating new job"); var job = this.source.jobProvider.CreateJob(httpClient, this.entitySchema).Result; if (job.IsEmpty) @@ -215,7 +215,6 @@ private void CreateNewJob() private void UpdateJobStatus() { - this.Log.Info("Updating job status"); var response = this.source.jobProvider.GetJobStatus(httpClient, this.currentJob.Value).Result; if (response.IsEmpty) @@ -232,10 +231,11 @@ private void UpdateJobStatus() private void ProcessResult() { - var rows = this.source.jobProvider.GetJobResult(httpClient, this.currentJob.Value, this.entitySchema).Result; + var (rows, nextJobLocator) = this.source.jobProvider.GetJobResult(httpClient, this.currentJob.Value, this.entitySchema, this.nextJobLocator).Result; + this.nextJobLocator = nextJobLocator; this.EmitMultiple(this.source.Out, rows); - if (!rows.Any()) + if (nextJobLocator.IsEmpty) { this.currentJob = Option.None; this.ScheduleOnce(TimerKey, this.source.changeCaptureInterval); diff --git a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs index 59474fc..9d0dfde 100644 --- a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs +++ b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs @@ -37,7 +37,6 @@ public record SalesForceJobProvider private readonly string password; private readonly string securityToken; private readonly string apiVersion; - private Option currentJobLocator; private readonly Option rowsPerPage; /// @@ -54,7 +53,6 @@ public SalesForceJobProvider(string accountName, string clientId, string clientS this.password = password; this.securityToken = securityToken; this.apiVersion = apiVersion; - this.currentJobLocator = Option.None; this.rowsPerPage = rowsPerPage; } @@ -78,7 +76,6 @@ public SalesForceJobProvider(string accountName, string clientId, string clientS this.password = password; this.securityToken = securityToken; this.apiVersion = apiVersion; - this.currentJobLocator = Option.None; this.rowsPerPage = Option.None; this.expirationPeriod = TimeSpan.FromMinutes(19); } @@ -274,13 +271,13 @@ public Task> GetJobStatus(HttpClient httpClient, SalesForc /// /// /// - public Task>> GetJobResult(HttpClient httpClient, SalesForceJob job, SalesForceEntity entitySchema) + public Task<(IEnumerable>, Option)> GetJobResult(HttpClient httpClient, SalesForceJob job, SalesForceEntity entitySchema, Option jobLocator) { return this.GetAuthenticatedMessage(httpClient).Map(msg => { var maxRowString = this.rowsPerPage.HasValue ? $"maxRecords={this.rowsPerPage.Value}" : ""; - var locatorString = this.currentJobLocator.HasValue ? $"locator={this.currentJobLocator.Value}" : ""; + var locatorString = jobLocator.HasValue ? $"locator={jobLocator.Value}" : ""; var urlParams = string.Join("&", new[] { maxRowString, locatorString }); msg.RequestUri = new Uri($"https://{this.accountName}/services/data/{this.apiVersion}/jobs/query/{job.Id}/results?{urlParams}"); msg.Headers.Add("Accept", "text/csv"); @@ -292,7 +289,7 @@ public Task>> GetJobResult(HttpClient httpClient, Sal return response.Content.ReadAsStringAsync().Map(value => { - this.currentJobLocator = response.Headers.GetValues("Sforce-Locator").Select(h => h == "null" ? Option.None : h.AsOption()).First(); + var newjobLocator = response.Headers.GetValues("Sforce-Locator").Select(h => h == "null" ? Option.None : h.AsOption()).First(); var rows = value.ReplaceQuotedNewlines().Split("\n").Skip(1).Where(line => !string.IsNullOrEmpty(line)).Select(line => { var cells = line.ParseCsvLine(entitySchema.Attributes.Length).Select( @@ -304,7 +301,7 @@ public Task>> GetJobResult(HttpClient httpClient, Sal }).ToList(); return cells; }); - return rows; + return (rows, newjobLocator); }); diff --git a/test/Sources/SalesforceSourceTests.cs b/test/Sources/SalesforceSourceTests.cs index e640761..5cf66b0 100644 --- a/test/Sources/SalesforceSourceTests.cs +++ b/test/Sources/SalesforceSourceTests.cs @@ -212,10 +212,10 @@ public async void RunStream() It.IsAny())) .ReturnsAsync(mockGetResultResponse2); - var source = SalesForceSource.Create(this.jobProvider, this.mockHttp.Object, "account", TimeSpan.FromSeconds(5)); + var source = SalesForceSource.Create(this.jobProvider, this.mockHttp.Object, "account", TimeSpan.FromSeconds(60)); var result = await Source.FromGraph(source) - .Take(10) + .TakeWithin(TimeSpan.FromSeconds(10)) .RunWith(Sink.Seq>(), this.akkaFixture.Materializer); Assert.Equal(10, result.Count);