diff --git a/src/Sources/CdmChangeFeedSource/Extensions/CsvOperations.cs b/src/Sources/Extensions/CsvOperations.cs
similarity index 93%
rename from src/Sources/CdmChangeFeedSource/Extensions/CsvOperations.cs
rename to src/Sources/Extensions/CsvOperations.cs
index 21bc174..db52b29 100644
--- a/src/Sources/CdmChangeFeedSource/Extensions/CsvOperations.cs
+++ b/src/Sources/Extensions/CsvOperations.cs
@@ -1,8 +1,8 @@
-using System;
+using System;
using System.Linq;
using System.Text.RegularExpressions;
-namespace Arcane.Framework.Sources.CdmChangeFeedSource.Extensions;
+namespace Arcane.Framework.Sources.Extensions;
///
/// Contains operations for parsing CSV files.
@@ -16,7 +16,7 @@ internal static class CsvOperations
/// Number of expected headers.
/// Delimiter used in the line.
/// A string array of individual values.
- public static string[] ParseCsvLine(string line, int headerCount, char delimiter = ',')
+ public static string[] ParseCsvLine(this string line, int headerCount, char delimiter = ',')
{
var result = new string[headerCount];
var fieldCounter = 0;
@@ -91,7 +91,7 @@ public static bool IsComplete(string csvLine)
///
/// A CSV line to purge newlines from.
/// A CSV line without newline characters inside column values.
- public static string ReplaceQuotedNewlines(string csvLine)
+ public static string ReplaceQuotedNewlines(this string csvLine)
{
return Regex.Replace(csvLine, "\"[^\"]*(?:\"\"[^\"]*)*\"", m => m.Value.Replace("\n", "")).Replace("\r", "");
}
diff --git a/src/Sources/SalesForce/Exceptions/SalesForceJobAbortedException.cs b/src/Sources/SalesForce/Exceptions/SalesForceJobAbortedException.cs
new file mode 100644
index 0000000..d2838e7
--- /dev/null
+++ b/src/Sources/SalesForce/Exceptions/SalesForceJobAbortedException.cs
@@ -0,0 +1,19 @@
+using System;
+using System.Diagnostics.CodeAnalysis;
+
+namespace Arcane.Framework.Sources.SalesForce.Exceptions;
+
+///
+/// Thrown if the Salesforce job was aborted.
+///
+[ExcludeFromCodeCoverage(Justification = "Trivial")]
+public class SalesForceJobAbortedException : SalesForceJobException
+{
+ public SalesForceJobAbortedException(string message) : base(message)
+ {
+ }
+
+ public SalesForceJobAbortedException(string message, Exception inner) : base(message, inner)
+ {
+ }
+}
diff --git a/src/Sources/SalesForce/Exceptions/SalesForceJobException.cs b/src/Sources/SalesForce/Exceptions/SalesForceJobException.cs
new file mode 100644
index 0000000..2412bdc
--- /dev/null
+++ b/src/Sources/SalesForce/Exceptions/SalesForceJobException.cs
@@ -0,0 +1,21 @@
+using System;
+using System.Diagnostics.CodeAnalysis;
+
+namespace Arcane.Framework.Sources.SalesForce.Exceptions;
+
+///
+/// Base class for all Salesforce-related exceptions
+///
+[ExcludeFromCodeCoverage(Justification = "Trivial")]
+public class SalesForceJobException : Exception
+{
+ public SalesForceJobException(string message)
+ : base(message)
+ {
+ }
+
+ public SalesForceJobException(string message, Exception inner)
+ : base(message, inner)
+ {
+ }
+}
diff --git a/src/Sources/SalesForce/Exceptions/SalesForceJobFailedException.cs b/src/Sources/SalesForce/Exceptions/SalesForceJobFailedException.cs
new file mode 100644
index 0000000..18b1ffa
--- /dev/null
+++ b/src/Sources/SalesForce/Exceptions/SalesForceJobFailedException.cs
@@ -0,0 +1,19 @@
+using System;
+using System.Diagnostics.CodeAnalysis;
+
+namespace Arcane.Framework.Sources.SalesForce.Exceptions;
+
+///
+/// Thrown if the Salesforce job return with failed status.
+///
+[ExcludeFromCodeCoverage(Justification = "Trivial")]
+public class SalesForceJobFailedException : SalesForceJobException
+{
+ public SalesForceJobFailedException(string message) : base(message)
+ {
+ }
+
+ public SalesForceJobFailedException(string message, Exception inner) : base(message, inner)
+ {
+ }
+}
diff --git a/src/Sources/SalesForce/Models/SalesForceAttribute.cs b/src/Sources/SalesForce/Models/SalesForceAttribute.cs
new file mode 100644
index 0000000..74050b3
--- /dev/null
+++ b/src/Sources/SalesForce/Models/SalesForceAttribute.cs
@@ -0,0 +1,93 @@
+using System;
+using System.Collections.Generic;
+using System.Text.Json.Serialization;
+
+namespace Arcane.Framework.Sources.SalesForce.Models;
+
+///
+/// Represents Salesforce attribute
+///
+public class SalesForceAttribute
+{
+ private static readonly Dictionary salesforceTypeMap = new()
+ {
+ { "string", typeof(string) },
+ { "id", typeof(string) },
+ { "datetime", typeof(DateTime) },
+ { "date", typeof(DateTime) },
+ { "decimal", typeof(decimal) },
+ { "integer", typeof(int) },
+ { "long", typeof(long) },
+ { "double", typeof(double) },
+ { "boolean", typeof(bool) },
+ };
+
+ ///
+ /// Attribute name
+ ///
+ [JsonPropertyName("Name")]
+ public string Name { get; set; }
+
+ ///
+ /// Attribute data type
+ ///
+ [JsonPropertyName("ValueTypeId")]
+ public string DataType { get; set; }
+
+
+ ///
+ /// Attribute comparer
+ ///
+ public static IEqualityComparer SalesForceAttributeComparer { get; } =
+ new SalesForceAttributeEqualityComparer();
+
+ ///
+ /// Maps Salesforce type to .NET type
+ ///
+ /// Salesforce type name
+ /// .NET type instance
+ /// Thrown if type is not supported
+ public static Type MapSalesforceType(string salesforceTypeName)
+ {
+ if (salesforceTypeMap.ContainsKey(salesforceTypeName.ToLower()))
+ {
+ return salesforceTypeMap[salesforceTypeName.ToLower()];
+ }
+
+ throw new InvalidOperationException($"Unsupported type: {salesforceTypeName}");
+ }
+
+ private sealed class SalesForceAttributeEqualityComparer : IEqualityComparer
+ {
+ public bool Equals(SalesForceAttribute x, SalesForceAttribute y)
+ {
+ if (ReferenceEquals(x, y))
+ {
+ return true;
+ }
+
+ if (ReferenceEquals(x, null))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(y, null))
+ {
+ return false;
+ }
+
+ if (x.GetType() != y.GetType())
+ {
+ return false;
+ }
+
+ return x.Name == y.Name &&
+ x.DataType == y.DataType;
+ }
+
+ public int GetHashCode(SalesForceAttribute obj)
+ {
+ return HashCode.Combine(obj.Name, obj.DataType);
+ }
+ }
+}
diff --git a/src/Sources/SalesForce/Models/SalesForceEntity.cs b/src/Sources/SalesForce/Models/SalesForceEntity.cs
new file mode 100644
index 0000000..7977f3a
--- /dev/null
+++ b/src/Sources/SalesForce/Models/SalesForceEntity.cs
@@ -0,0 +1,98 @@
+using System;
+using System.Collections.Generic;
+using System.Data;
+using System.Linq;
+using System.Text.Json;
+
+namespace Arcane.Framework.Sources.SalesForce.Models;
+
+///
+/// Represents Salesforce entity
+///
+public class SalesForceEntity
+{
+ ///
+ /// Entity name
+ ///
+ public string EntityName { get; set; }
+
+
+ ///
+ /// Attributes collection
+ ///
+ public SalesForceAttribute[] Attributes { get; set; }
+
+ ///
+ /// Comparer class
+ ///
+ public static IEqualityComparer SalesForceEntityComparer { get; } =
+ new SalesForceEntityEqualityComparer();
+
+ ///
+ /// Parse Salesforce entity from a JSON document
+ ///
+ /// Name of the Salesforce entity
+ /// Json document to parse
+ /// Parsed SalesForceEntity object
+ public static SalesForceEntity FromJson(string entityName, JsonDocument document)
+ {
+ var entity = new SalesForceEntity
+ {
+ EntityName = entityName,
+ Attributes = document.RootElement.GetProperty("records").Deserialize()
+ };
+
+
+ return entity;
+ }
+
+ ///
+ /// Create DataReader for the entity
+ ///
+ /// DataReader instance
+ public IDataReader GetReader()
+ {
+ var dt = new DataTable();
+
+ foreach (var attr in this.Attributes)
+ {
+ dt.Columns.Add(new DataColumn(attr.Name, SalesForceAttribute.MapSalesforceType(attr.DataType)));
+ }
+
+ return dt.CreateDataReader();
+ }
+
+ private sealed class SalesForceEntityEqualityComparer : IEqualityComparer
+ {
+ public bool Equals(SalesForceEntity x, SalesForceEntity y)
+ {
+ if (ReferenceEquals(x, y))
+ {
+ return true;
+ }
+
+ if (ReferenceEquals(x, null))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(y, null))
+ {
+ return false;
+ }
+
+ if (x.GetType() != y.GetType())
+ {
+ return false;
+ }
+
+ return x.EntityName == y.EntityName
+ && x.Attributes.SequenceEqual(y.Attributes, SalesForceAttribute.SalesForceAttributeComparer);
+ }
+
+ public int GetHashCode(SalesForceEntity obj)
+ {
+ return HashCode.Combine(obj.EntityName, obj.Attributes);
+ }
+ }
+}
diff --git a/src/Sources/SalesForce/Models/SalesForceJob.cs b/src/Sources/SalesForce/Models/SalesForceJob.cs
new file mode 100644
index 0000000..8d4ad26
--- /dev/null
+++ b/src/Sources/SalesForce/Models/SalesForceJob.cs
@@ -0,0 +1,55 @@
+using System.Text.Json.Serialization;
+
+namespace Arcane.Framework.Sources.SalesForce.Models;
+
+///
+/// Job status types
+///
+[JsonConverter(typeof(JsonStringEnumConverter))]
+public enum SalesforceJobStatus
+{
+ UploadComplete,
+ InProgress,
+ Aborted,
+ JobComplete,
+ Failed,
+ None
+}
+
+///
+/// Represents Salesforce job
+///
+public class SalesForceJob
+{
+ ///
+ /// Id
+ ///
+ [JsonPropertyName("id")]
+ public string Id { get; set; }
+
+
+ ///
+ /// Job status
+ ///
+ [JsonPropertyName("state")]
+ public SalesforceJobStatus Status { get; set; }
+
+ ///
+ /// object
+ ///
+ [JsonPropertyName("object")]
+ public string Object { get; set; }
+
+ ///
+ /// Total processing time of the job
+ ///
+ [JsonPropertyName("totalProcessingTime")]
+ public long? TotalProcessingTime { get; set; }
+
+ ///
+ /// Numbers of records processed by the job
+ ///
+ [JsonPropertyName("numberRecordsProcessed")]
+ public long? NumberRecordsProcessed { get; set; }
+
+}
diff --git a/src/Sources/SalesForce/SalesForceSource.cs b/src/Sources/SalesForce/SalesForceSource.cs
new file mode 100644
index 0000000..25038a9
--- /dev/null
+++ b/src/Sources/SalesForce/SalesForceSource.cs
@@ -0,0 +1,264 @@
+using System;
+using System.Collections.Generic;
+using System.Configuration;
+using System.IO;
+using System.Linq;
+using System.Net.Http;
+using Akka.Actor;
+using Akka.Event;
+using Akka.Streams;
+using Akka.Streams.Stage;
+using Akka.Util;
+using Arcane.Framework.Contracts;
+using Arcane.Framework.Sinks.Parquet;
+using Arcane.Framework.Sinks.Parquet.Models;
+using Arcane.Framework.Sources.Base;
+using Arcane.Framework.Sources.Exceptions;
+using Arcane.Framework.Sources.SalesForce.Models;
+using Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders;
+using Arcane.Framework.Sources.SalesForce.Exceptions;
+using Parquet.Data;
+
+namespace Arcane.Framework.Sources.SalesForce;
+
+///
+/// Source for reading data from SalesForce BULK v2 API.
+///
+public class SalesForceSource : GraphStage>>, IParquetSource, ITaggedSource
+{
+ private readonly string entityName;
+ private readonly SalesForceJobProvider jobProvider;
+ private readonly HttpClient httpClient;
+
+ private readonly TimeSpan changeCaptureInterval;
+
+ private SalesForceSource(
+ SalesForceJobProvider jobProvider,
+ string entityName,
+ TimeSpan changeCaptureInterval
+ )
+ {
+ this.jobProvider = jobProvider;
+ this.httpClient = new HttpClient();
+ this.entityName = entityName;
+ this.changeCaptureInterval = changeCaptureInterval;
+
+ this.Shape = new SourceShape>(this.Out);
+ }
+
+
+ ///
+ /// Only use this constructor for unit tests to mock http calls.
+ ///
+ /// Salesforce Job Provider
+ /// Name of Salesforce entity
+ /// Http client for making requests
+ /// How often to track changes
+ ///
+ private SalesForceSource(
+
+ SalesForceJobProvider jobProvider,
+ string entityName,
+ HttpClient httpClient,
+ TimeSpan changeCaptureInterval
+ ) : this(jobProvider, entityName, changeCaptureInterval)
+ {
+ this.httpClient = httpClient;
+ }
+
+
+ ///
+ protected override Attributes InitialAttributes { get; } = Attributes.CreateName(nameof(SalesForceSource));
+
+ ///
+ /// Source outlet
+ ///
+ public Outlet> Out { get; } = new($"{nameof(SalesForceSource)}.Out");
+
+ ///
+ public override SourceShape> Shape { get; }
+
+ ///
+ public Schema GetParquetSchema()
+ {
+ var schema = this.jobProvider.GetSchema(httpClient, this.entityName).Result;
+
+ return schema.Value.GetReader().ToParquetSchema();
+
+ }
+
+ ///
+ public SourceTags GetDefaultTags()
+ {
+ return new SourceTags
+ {
+ SourceEntity = this.entityName,
+ SourceLocation = this.jobProvider.accountName
+
+ };
+ }
+
+ ///
+ /// Creates a new instance of
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static SalesForceSource Create(
+ SalesForceJobProvider jobProvider,
+ HttpClient httpClient,
+ string entityName,
+ TimeSpan changeCaptureInterval
+ )
+ {
+ return new SalesForceSource(jobProvider
+ , entityName, httpClient, changeCaptureInterval);
+ }
+
+
+
+ ///
+ protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
+ {
+ return new SourceLogic(this);
+ }
+
+ private sealed class SourceLogic : TimerGraphStageLogic
+ {
+ private const string TimerKey = nameof(SalesForceSource);
+ private readonly LocalOnlyDecider decider;
+ private readonly HttpClient httpClient;
+ private readonly SalesForceSource source;
+ private SalesForceEntity entitySchema;
+ private Option currentJob;
+ private Option nextJobLocator;
+
+
+
+ public SourceLogic(SalesForceSource source) : base(source.Shape)
+ {
+ this.source = source;
+
+ this.httpClient = this.source.httpClient ?? new HttpClient();
+
+ this.decider = Decider.From((ex) => ex.GetType().Name switch
+ {
+ nameof(ArgumentException) => Directive.Stop,
+ nameof(ArgumentNullException) => Directive.Stop,
+ nameof(InvalidOperationException) => Directive.Stop,
+ nameof(ConfigurationErrorsException) => Directive.Stop,
+ nameof(ObjectDisposedException) => Directive.Stop,
+ nameof(IOException) => Directive.Restart,
+ nameof(TimeoutException) => Directive.Restart,
+ nameof(HttpRequestException) => Directive.Restart,
+ _ => Directive.Stop
+ });
+
+ this.currentJob = Option.None;
+ this.nextJobLocator = Option.None;
+
+
+ this.SetHandler(source.Out, this.PullChanges, this.Finish);
+ }
+
+ private void Finish(Exception cause)
+ {
+ if (cause is not null && cause is not SubscriptionWithCancelException.NonFailureCancellation)
+ {
+ this.FailStage(cause);
+ }
+
+ this.httpClient.Dispose();
+ }
+
+ public override void PreStart()
+ {
+ this.UpdateSchema();
+ }
+
+ public void UpdateSchema()
+ {
+ var newSchema = this.source.jobProvider.GetSchema(httpClient, this.source.entityName).Result;
+
+ if (newSchema.IsEmpty)
+ {
+ this.Log.Warning("Could not update schema");
+ }
+ else
+ {
+ var schemaUnchanged =
+ SalesForceEntity.SalesForceEntityComparer.Equals(this.entitySchema, newSchema.Value);
+ this.entitySchema = (this.entitySchema == null, schemaEquals: schemaUnchanged) switch
+ {
+ (true, _) or (false, true) => newSchema.Value,
+ (false, false) => throw new SchemaMismatchException()
+ };
+ }
+
+ }
+
+ private void CreateNewJob()
+ {
+ var job = this.source.jobProvider.CreateJob(httpClient, this.entitySchema).Result;
+
+ if (job.IsEmpty)
+ {
+ this.Log.Warning("Could not create job");
+ }
+ else
+ {
+ this.currentJob = job;
+ }
+ this.ScheduleOnce(TimerKey, TimeSpan.FromSeconds(1));
+ }
+
+ private void UpdateJobStatus()
+ {
+ var response = this.source.jobProvider.GetJobStatus(httpClient, this.currentJob.Value).Result;
+
+ if (response.IsEmpty)
+ {
+ this.Log.Warning("Could not create job");
+ }
+ else
+ {
+ this.currentJob = response;
+ }
+ this.ScheduleOnce(TimerKey, TimeSpan.FromSeconds(1));
+ }
+
+ private void ProcessResult()
+ {
+
+ var (rows, nextJobLocator) = this.source.jobProvider.GetJobResult(httpClient, this.currentJob.Value, this.entitySchema, this.nextJobLocator).Result;
+ this.nextJobLocator = nextJobLocator;
+ this.EmitMultiple(this.source.Out, rows);
+
+ if (nextJobLocator.IsEmpty)
+ {
+ this.currentJob = Option.None;
+ this.ScheduleOnce(TimerKey, this.source.changeCaptureInterval);
+ }
+
+ }
+
+ private void PullChanges()
+ {
+ switch (this.currentJob.Select(job => job.Status).GetOrElse(SalesforceJobStatus.None))
+ {
+ case SalesforceJobStatus.UploadComplete:; this.UpdateJobStatus(); break;
+ case SalesforceJobStatus.InProgress: this.UpdateJobStatus(); break;
+ case SalesforceJobStatus.Aborted: this.FailStage(new SalesForceJobAbortedException($"job : {this.currentJob.Value.Id} was aborted by source")); break;
+ case SalesforceJobStatus.Failed: this.FailStage(new SalesForceJobFailedException($"job : {this.currentJob.Value.Id} returned with failure")); break;
+ case SalesforceJobStatus.JobComplete: this.ProcessResult(); break;
+ case SalesforceJobStatus.None: this.CreateNewJob(); break;
+ };
+ }
+
+ protected override void OnTimer(object timerKey)
+ {
+ this.PullChanges();
+ }
+ }
+}
diff --git a/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs
new file mode 100644
index 0000000..9d0dfde
--- /dev/null
+++ b/src/Sources/SalesForce/Services/AuthenticatedMessageProviders/SalesForceJobProvider.cs
@@ -0,0 +1,316 @@
+using System;
+using System.Collections.Generic;
+using System.ComponentModel;
+using System.Linq;
+using System.Net;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Net.Http.Json;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using Akka.Util;
+using Akka.Util.Extensions;
+using Arcane.Framework.Sinks.Parquet.Models;
+using Arcane.Framework.Sources.Extensions;
+using Arcane.Framework.Sources.SalesForce.Models;
+using Snd.Sdk.Tasks;
+
+namespace Arcane.Framework.Sources.SalesForce.Services.AuthenticatedMessageProviders;
+
+///
+/// Salesforce Job provider that handles talking with the Bulk V2 API
+///
+public record SalesForceJobProvider
+{
+ private readonly TimeSpan expirationPeriod;
+ private string currentToken;
+ private DateTimeOffset? validTo;
+ private readonly Uri tokenSource;
+ ///
+ /// Salesforce Account
+ ///
+ public readonly string accountName;
+ private readonly string clientId;
+ private readonly string clientSecret;
+ private readonly string username;
+ private readonly string password;
+ private readonly string securityToken;
+ private readonly string apiVersion;
+ private readonly Option rowsPerPage;
+
+ ///
+ /// Salesforce Job provider that handles talking with the Bulk V2 API.
+ ///
+
+ public SalesForceJobProvider(string accountName, string clientId, string clientSecret, string username, string password, string securityToken, string apiVersion, int rowsPerPage)
+ {
+ this.tokenSource = new Uri($"https://{accountName}/services/oauth2/token");
+ this.accountName = accountName;
+ this.clientId = clientId;
+ this.clientSecret = clientSecret;
+ this.username = username;
+ this.password = password;
+ this.securityToken = securityToken;
+ this.apiVersion = apiVersion;
+ this.rowsPerPage = rowsPerPage;
+ }
+
+ ///
+ /// Salesforce Job provider that handles talking with the Bulk V2 API
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public SalesForceJobProvider(string accountName, string clientId, string clientSecret, string username, string password, string securityToken, string apiVersion)
+ {
+ this.tokenSource = new Uri($"https://{accountName}/services/oauth2/token");
+ this.accountName = accountName;
+ this.clientId = clientId;
+ this.clientSecret = clientSecret;
+ this.username = username;
+ this.password = password;
+ this.securityToken = securityToken;
+ this.apiVersion = apiVersion;
+ this.rowsPerPage = Option.None;
+ this.expirationPeriod = TimeSpan.FromMinutes(19);
+ }
+
+ ///
+ /// Generates authenticated message for the REST API request.
+ ///
+ /// HTTP client
+ /// Authenticated message
+ public Task GetAuthenticatedMessage(HttpClient httpClient)
+ {
+ if (this.validTo.GetValueOrDefault(DateTimeOffset.MaxValue) <
+ DateTimeOffset.UtcNow.Subtract(TimeSpan.FromMinutes(1)))
+ {
+ return Task.FromResult(new HttpRequestMessage
+ {
+ Headers = { Authorization = new AuthenticationHeaderValue("Bearer", this.currentToken) }
+ });
+ }
+ var dict = new Dictionary
+ {
+ { "grant_type", "password" },
+ { "client_id", this.clientId },
+ { "client_secret", this.clientSecret },
+ { "username", this.username },
+ { "password", $"{this.password}{this.securityToken}" },
+ };
+ var tokenHrm = new HttpRequestMessage(HttpMethod.Post, this.tokenSource)
+ {
+ Content = new FormUrlEncodedContent(dict)
+ };
+
+ return httpClient.SendAsync(tokenHrm, CancellationToken.None).Map(response =>
+ {
+ response.EnsureSuccessStatusCode();
+ return response.Content.ReadAsStringAsync();
+ }).FlatMap(result =>
+ {
+ var tokenResponse = JsonSerializer.Deserialize(result);
+ this.currentToken = tokenResponse.GetProperty("access_token").GetString();
+ this.validTo = DateTimeOffset.UtcNow.Add(this.expirationPeriod);
+ return new HttpRequestMessage
+ {
+ Headers = { Authorization = new AuthenticationHeaderValue("Bearer", this.currentToken) }
+ };
+ });
+ }
+
+ ///
+ /// Get the schema of a Salesforce entity
+ ///
+ ///
+ ///
+ public Task