Skip to content

Commit

Permalink
Openlineage endpoint and Spark Lineage Beta Plugin (datahub-project#9870
Browse files Browse the repository at this point in the history
)

Co-authored-by: David Leifker <[email protected]>
  • Loading branch information
treff7es and david-leifker authored Mar 1, 2024
1 parent 05593f4 commit 6eb5f80
Show file tree
Hide file tree
Showing 66 changed files with 9,328 additions and 20 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.5.0'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'linkedin'

Expand Down Expand Up @@ -176,6 +178,7 @@ project.ext.externalDependency = [
'kafkaClients': "org.apache.kafka:kafka-clients:$kafkaVersion",
'snappy': 'org.xerial.snappy:snappy-java:1.1.10.4',
'logbackClassic': "ch.qos.logback:logback-classic:$logbackClassic",
'logbackClassicJava8' : "ch.qos.logback:logback-classic:$logbackClassicJava8",
'slf4jApi': "org.slf4j:slf4j-api:$slf4jVersion",
'log4jCore': "org.apache.logging.log4j:log4j-core:$log4jVersion",
'log4jApi': "org.apache.logging.log4j:log4j-api:$log4jVersion",
Expand Down
4 changes: 4 additions & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ module.exports = {
"metadata-integration/java/spark-lineage/README",
"metadata-ingestion/integration_docs/great-expectations",
"metadata-integration/java/datahub-protobuf/README",
"metadata-integration/java/spark-lineage-beta/README",
//"metadata-ingestion/source-docs-template",
{
type: "autogenerated",
Expand Down Expand Up @@ -746,6 +747,9 @@ module.exports = {
//"docs/how/build-metadata-service",
//"docs/how/graph-onboarding",
//"docs/demo/graph-onboarding",
//"metadata-integration/java/spark-lineage/README",
// "metadata-integration/java/spark-lineage-beta/README.md
// "metadata-integration/java/openlineage-converter/README"
//"metadata-ingestion-modules/airflow-plugin/README"
// "metadata-ingestion/schedule_docs/datahub", // we can delete this
// TODO: change the titles of these, removing the "What is..." portion from the sidebar"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.ImmutableTriple;

public abstract class AbstractMultiFieldPatchBuilder<T extends AbstractMultiFieldPatchBuilder<T>> {
Expand Down Expand Up @@ -64,6 +65,14 @@ public T urn(Urn urn) {
*/
protected abstract String getEntityType();

protected static String encodeValue(@Nonnull String value) {
return value.replace("~ ", "~0").replace("/", "~1");
}

protected static String encodeValueUrn(@Nonnull Urn urn) {
return encodeValue(urn.toString());
}

/**
* Overrides basic behavior to construct multiple patches based on properties
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public DataJobInputOutputPatchBuilder addInputDatajobEdge(@Nonnull DataJobUrn da
pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
INPUT_DATA_JOB_EDGES_PATH_START + dataJobUrn,
INPUT_DATA_JOB_EDGES_PATH_START + encodeValue(dataJobUrn.toString()),
value));
return this;
}
Expand All @@ -41,7 +41,7 @@ public DataJobInputOutputPatchBuilder removeInputDatajobEdge(@Nonnull DataJobUrn
pathValues.add(
ImmutableTriple.of(
PatchOperationType.REMOVE.getValue(),
INPUT_DATA_JOB_EDGES_PATH_START + dataJobUrn,
INPUT_DATA_JOB_EDGES_PATH_START + encodeValue(dataJobUrn.toString()),
null));
return this;
}
Expand All @@ -51,15 +51,17 @@ public DataJobInputOutputPatchBuilder addInputDatasetEdge(@Nonnull DatasetUrn da

pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(), INPUT_DATASET_EDGES_PATH_START + datasetUrn, value));
PatchOperationType.ADD.getValue(),
INPUT_DATASET_EDGES_PATH_START + encodeValue(datasetUrn.toString()),
value));
return this;
}

public DataJobInputOutputPatchBuilder removeInputDatasetEdge(@Nonnull DatasetUrn datasetUrn) {
pathValues.add(
ImmutableTriple.of(
PatchOperationType.REMOVE.getValue(),
INPUT_DATASET_EDGES_PATH_START + datasetUrn,
INPUT_DATASET_EDGES_PATH_START + encodeValue(datasetUrn.toString()),
null));
return this;
}
Expand All @@ -70,7 +72,7 @@ public DataJobInputOutputPatchBuilder addOutputDatasetEdge(@Nonnull DatasetUrn d
pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
OUTPUT_DATASET_EDGES_PATH_START + datasetUrn,
OUTPUT_DATASET_EDGES_PATH_START + encodeValue(datasetUrn.toString()),
value));
return this;
}
Expand All @@ -79,7 +81,7 @@ public DataJobInputOutputPatchBuilder removeOutputDatasetEdge(@Nonnull DatasetUr
pathValues.add(
ImmutableTriple.of(
PatchOperationType.REMOVE.getValue(),
OUTPUT_DATASET_EDGES_PATH_START + datasetUrn,
OUTPUT_DATASET_EDGES_PATH_START + encodeValue(datasetUrn.toString()),
null));
return this;
}
Expand All @@ -88,31 +90,39 @@ public DataJobInputOutputPatchBuilder addInputDatasetField(@Nonnull Urn urn) {
TextNode textNode = instance.textNode(urn.toString());
pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(), INPUT_DATASET_FIELDS_PATH_START + urn, textNode));
PatchOperationType.ADD.getValue(),
INPUT_DATASET_FIELDS_PATH_START + encodeValue(urn.toString()),
textNode));

return this;
}

public DataJobInputOutputPatchBuilder removeInputDatasetField(@Nonnull Urn urn) {
pathValues.add(
ImmutableTriple.of(
PatchOperationType.REMOVE.getValue(), INPUT_DATASET_FIELDS_PATH_START + urn, null));
PatchOperationType.REMOVE.getValue(),
INPUT_DATASET_FIELDS_PATH_START + encodeValue(urn.toString()),
null));
return this;
}

public DataJobInputOutputPatchBuilder addOutputDatasetField(@Nonnull Urn urn) {
TextNode textNode = instance.textNode(urn.toString());
pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(), OUTPUT_DATASET_FIELDS_PATH_START + urn, textNode));
PatchOperationType.ADD.getValue(),
OUTPUT_DATASET_FIELDS_PATH_START + encodeValue(urn.toString()),
textNode));

return this;
}

public DataJobInputOutputPatchBuilder removeOutputDatasetField(@Nonnull Urn urn) {
pathValues.add(
ImmutableTriple.of(
PatchOperationType.REMOVE.getValue(), OUTPUT_DATASET_FIELDS_PATH_START + urn, null));
PatchOperationType.REMOVE.getValue(),
OUTPUT_DATASET_FIELDS_PATH_START + encodeValue(urn.toString()),
null));
return this;
}

Expand Down Expand Up @@ -147,17 +157,17 @@ private String getEdgePath(@Nonnull Edge edge, LineageDirection direction) {

if (DATASET_ENTITY_NAME.equals(destinationUrn.getEntityType())
&& LineageDirection.UPSTREAM.equals(direction)) {
return INPUT_DATASET_EDGES_PATH_START + destinationUrn;
return INPUT_DATASET_EDGES_PATH_START + encodeValue(destinationUrn.toString());
}

if (DATASET_ENTITY_NAME.equals(destinationUrn.getEntityType())
&& LineageDirection.DOWNSTREAM.equals(direction)) {
return INPUT_DATASET_EDGES_PATH_START + destinationUrn;
return INPUT_DATASET_EDGES_PATH_START + encodeValue(destinationUrn.toString());
}

if (DATA_JOB_ENTITY_NAME.equals(destinationUrn.getEntityType())
&& LineageDirection.UPSTREAM.equals(direction)) {
return INPUT_DATA_JOB_EDGES_PATH_START + destinationUrn;
return INPUT_DATA_JOB_EDGES_PATH_START + encodeValue(destinationUrn.toString());
}

// TODO: Output Data Jobs not supported by aspect, add here if this changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.DatasetLineageType;
import com.linkedin.dataset.FineGrainedLineageDownstreamType;
import com.linkedin.dataset.FineGrainedLineageUpstreamType;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -40,14 +42,60 @@ public UpstreamLineagePatchBuilder addUpstream(

pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(), UPSTREAMS_PATH_START + datasetUrn, value));
PatchOperationType.ADD.getValue(),
UPSTREAMS_PATH_START + encodeValueUrn(datasetUrn),
value));
return this;
}

public UpstreamLineagePatchBuilder removeUpstream(@Nonnull DatasetUrn datasetUrn) {
pathValues.add(
ImmutableTriple.of(
PatchOperationType.REMOVE.getValue(), UPSTREAMS_PATH_START + datasetUrn, null));
PatchOperationType.REMOVE.getValue(),
UPSTREAMS_PATH_START + encodeValueUrn(datasetUrn),
null));
return this;
}

/**
* Adds a field as a fine grained upstream
*
* @param schemaFieldUrn a schema field to be marked as upstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the upstream lineage type, either Field or Field Set
* @return this builder
*/
public UpstreamLineagePatchBuilder addFineGrainedUpstreamField(
@Nonnull Urn schemaFieldUrn,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageUpstreamType type) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString();
} else {
finalType = type.toString();
}

pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "upstreamType"
+ "/"
+ finalType
+ "/"
+ encodeValueUrn(schemaFieldUrn),
instance.numberNode(finalConfidenceScore)));

return this;
}

Expand Down Expand Up @@ -91,9 +139,50 @@ public UpstreamLineagePatchBuilder addFineGrainedUpstreamField(
+ "/"
+ finalQueryUrn
+ "/"
+ upstreamSchemaField,
fineGrainedLineageNode));
+ encodeValueUrn(upstreamSchemaField),
instance.numberNode(finalConfidenceScore)));

return this;
}

/**
* Adds a field as a fine grained downstream
*
* @param schemaFieldUrn a schema field to be marked as downstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the downstream lineage type, either Field or Field Set
* @return this builder
*/
public UpstreamLineagePatchBuilder addFineGrainedDownstreamField(
@Nonnull Urn schemaFieldUrn,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageDownstreamType type) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString();
} else {
finalType = type.toString();
}

pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "downstreamType"
+ "/"
+ finalType
+ "/"
+ encodeValueUrn(schemaFieldUrn),
instance.numberNode(finalConfidenceScore)));
return this;
}

Expand Down Expand Up @@ -142,7 +231,7 @@ public UpstreamLineagePatchBuilder removeFineGrainedUpstreamField(
+ "/"
+ finalQueryUrn
+ "/"
+ upstreamSchemaFieldUrn,
+ encodeValueUrn(upstreamSchemaFieldUrn),
null));

return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public RecordTemplate getDefaultTemplate(String aspectSpecName) {
public <T extends RecordTemplate> RecordTemplate applyPatch(
RecordTemplate recordTemplate, Patch jsonPatch, AspectSpec aspectSpec)
throws JsonProcessingException, JsonPatchException {

Template<T> template = getTemplate(aspectSpec);
return template.applyPatch(recordTemplate, jsonPatch);
}
Expand Down
1 change: 1 addition & 0 deletions metadata-integration/java/datahub-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ jar.enabled = false // Since we only want to build shadow jars, disabling the re

dependencies {
implementation project(':entity-registry')
implementation project(':metadata-integration:java:datahub-event')
implementation(externalDependency.kafkaAvroSerializer) {
exclude group: "org.apache.avro"
}
Expand Down
61 changes: 61 additions & 0 deletions metadata-integration/java/datahub-event/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
plugins {
id("com.palantir.git-version") apply false
id 'java'
id 'com.github.johnrengelman.shadow'
id 'jacoco'
id 'signing'
id 'io.codearte.nexus-staging'
id 'maven-publish'
}

apply from: "../versioning.gradle"

dependencies {
implementation project(':metadata-models')
implementation project(path: ':metadata-models', configuration: "dataTemplate")

implementation externalDependency.slf4jApi
implementation externalDependency.jacksonDataBind
runtimeOnly externalDependency.jna

compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok
// VisibleForTesting
compileOnly externalDependency.guava
testImplementation externalDependency.testng
testImplementation externalDependency.mockito
testImplementation externalDependency.testContainers
testImplementation externalDependency.httpAsyncClient
testRuntimeOnly externalDependency.logbackClassicJava8
}

jacocoTestReport {
dependsOn test // tests are required to run before generating the report
}

task copyAvroSchemas {
dependsOn(':metadata-events:mxe-schemas:renameNamespace')
copy {
from file('../../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeProposal.avsc')
into file('./src/main/resources')
}
}

compileJava.dependsOn copyAvroSchemas

test {
// to avoid simultaneous executions of tests when complete build is run
mustRunAfter(":metadata-io:test")
useJUnit()
finalizedBy jacocoTestReport
}

// task sourcesJar(type: Jar) {
// archiveClassifier = 'sources'
// from sourceSets.main.allSource
//}

//task javadocJar(type: Jar) {
// archiveClassifier = 'javadoc'
// from javadoc
//}
Loading

0 comments on commit 6eb5f80

Please sign in to comment.