Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add salesforce connector #57

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open

Add salesforce connector #57

wants to merge 11 commits into from

Conversation

jeppe742
Copy link

Implements #56

Scope

Implemented:

  • Add new source for Salesforce Bulk API
    Rough steps done to fetch data
  1. Generate bearer token
  2. Get schema of table
  3. Create a query job
  4. wait until it's completed
  5. fetch results as CSV and convert to parquet

Additional changes:

  • Moved CsvOperations to common extensions since it's used both in CDM and Salesforce

Checklist

  • GitHub issue exists for this change.
  • Unit tests added and they pass.
  • Line Coverage is at least 80%.
  • Review requested on latest commit.

@jeppe742 jeppe742 requested a review from a team as a code owner June 14, 2024 12:39
if (nextJobLocator.IsEmpty)
{
this.currentJob = Option<SalesForceJob>.None;
this.ScheduleOnce(TimerKey, this.source.changeCaptureInterval);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is not working here.
The stream keeps trying to fetch data instead of waiting 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why this issue is happening is how Akka actually works.

Every Akka stream is a set of an objects (stages) running in parallel. Each stage can push an element to the downstream stage or pull an element from the upstream stage. When a sink is scheduled for execution and has no element inside its processing que, it will pull the source to obtain a new element for processing.

If we look in the execution stack when execution hits the PullChanges method, we can observe two kinds of stack:

  • when the next element is pulled by the sink:
    image

  • when the pull was caused by timer:
    image

To ensure this source operates correctly, we can examine the source code of the Pulse stage. This stage's function is essentially very similar to what you need to accomplish: it attempts to retrieve data from a certain data source, and if data is available, it pushes it to the subsequent stage. However, if data is not available or if the sink pulls from your source too frequently, the appropriate response is to do nothing.

Copy link
Contributor

@s-vitaliy s-vitaliy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments, we can discuss the code a bit next week.

[ExcludeFromCodeCoverage(Justification = "Trivial")]
public class SalesForceJobAbortedException : SalesForceJobException
{
public SalesForceJobAbortedException(string message) : base(message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warnings about comment should be fixed, here and below.

/// <summary>
/// Source for reading data from SalesForce BULK v2 API.
/// </summary>
public class SalesForceSource : GraphStage<SourceShape<List<DataCell>>>, IParquetSource, ITaggedSource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this source support backfill?

Comment on lines +114 to +117
{
return new SalesForceSource(jobProvider
, entityName, httpClient, changeCaptureInterval);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{
return new SalesForceSource(jobProvider
, entityName, httpClient, changeCaptureInterval);
}
{
return new SalesForceSource(jobProvider, entityName, httpClient, changeCaptureInterval);
}

/// <summary>
/// Salesforce Job provider that handles talking with the Bulk V2 API
/// </summary>
public record SalesForceJobProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I can see, in this class you are providing HttpClient as a parameter in every call. It would be better to provide HttpClient from constructor and make it a private field and implement the IDisposable interface fot this class.

/// <returns>Authenticated message</returns>
public Task<HttpRequestMessage> GetAuthenticatedMessage(HttpClient httpClient)
{
if (this.validTo.GetValueOrDefault(DateTimeOffset.MaxValue) <
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the only way to update the token? What happens if for some reason the old token is expired?

/// <param name="job"></param>
public Task<Option<SalesForceJob>> GetJobStatus(HttpClient httpClient, SalesForceJob job)
{
return this.GetAuthenticatedMessage(httpClient).Map(msg =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe FlatMap here?

});
}

private (Type, object) ConvertToSalesforceType(string salesforceDataType, string value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be static?

if (nextJobLocator.IsEmpty)
{
this.currentJob = Option<SalesForceJob>.None;
this.ScheduleOnce(TimerKey, this.source.changeCaptureInterval);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why this issue is happening is how Akka actually works.

Every Akka stream is a set of an objects (stages) running in parallel. Each stage can push an element to the downstream stage or pull an element from the upstream stage. When a sink is scheduled for execution and has no element inside its processing que, it will pull the source to obtain a new element for processing.

If we look in the execution stack when execution hits the PullChanges method, we can observe two kinds of stack:

  • when the next element is pulled by the sink:
    image

  • when the pull was caused by timer:
    image

To ensure this source operates correctly, we can examine the source code of the Pulse stage. This stage's function is essentially very similar to what you need to accomplish: it attempts to retrieve data from a certain data source, and if data is available, it pushes it to the subsequent stage. However, if data is not available or if the sink pulls from your source too frequently, the appropriate response is to do nothing.

}

[Fact]
public async Task TokenGeneration()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's usually better to distinguish the different steps of a test (like Arrange, Act and Assert) with comments as described here: https://automationpanda.com/2020/07/07/arrange-act-assert-a-pattern-for-writing-good-tests/

You can see an example in Arcane.Operator

/// <summary>
/// Represents Salesforce entity
/// </summary>
public class SalesForceEntity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The classes that are not intended for use by plugins should always be internal, not public. This is because the documentation for internal classes is not published as API docs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants