Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Oct 16, 2024
2 parents 2f77678 + e76647d commit d6a99af
Show file tree
Hide file tree
Showing 26 changed files with 1,420 additions and 121 deletions.
42 changes: 42 additions & 0 deletions .github/workflows/contributor-open-pr-comment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: PR Comment

on:
pull_request:
types: [opened]

permissions:
pull-requests: write

jobs:
post-pr-opened-comment:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3

- name: Get and Format Username (PR only)
if: github.event_name == 'pull_request'
run: |
formatted_username=$(echo "${{ github.event.pull_request.user.login }}" | tr '[:upper:]' '[:lower:]' | sed 's/ /-/g')
echo "FORMATTED_USERNAME=$formatted_username" >> $GITHUB_ENV
- name: Create Comment (PR only)
if: github.event_name == 'pull_request'
uses: actions/github-script@v6
with:
script: |
if (context.payload.pull_request) {
const prUser = process.env.FORMATTED_USERNAME;
const url = `https://contributors.datahubproject.io/${prUser}`;
const body = `Hello @${prUser} :smile: \n\n Thank you so much for opening a pull request!\n\n![Image](https://contributors.datahubproject.io/api/og?userId=${{ github.event.pull_request.user.login }})\nYou can check out your contributor card and see all your past stats [here](${url})!`;
// Create a comment on the PR
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.payload.pull_request.number,
body: body
});
} else {
console.log('Not a pull request event.');
}
1 change: 1 addition & 0 deletions .github/workflows/metadata-ingestion.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ concurrency:
jobs:
metadata-ingestion:
runs-on: ubuntu-latest
timeout-minutes: 40
env:
SPARK_VERSION: 3.3.2
DATAHUB_TELEMETRY_ENABLED: false
Expand Down
9 changes: 4 additions & 5 deletions .github/workflows/metadata-io.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,16 @@ jobs:
- name: Disk Check
run: df -h . && docker images
- uses: acryldata/sane-checkout-action@v3
- uses: actions/setup-python@v5
with:
python-version: "3.10"
cache: "pip"
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
distribution: "zulu"
java-version: 17
- uses: gradle/actions/setup-gradle@v3
- uses: actions/setup-python@v5
if: ${{ needs.setup.outputs.ingestion_change == 'true' }}
with:
python-version: "3.10"
cache: "pip"
- name: Gradle build (and test)
run: |
./gradlew :metadata-io:test
Expand Down
12 changes: 12 additions & 0 deletions docs/advanced/mcp-mcl.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ To mitigate these downsides, we are committed to providing cross-language client

Ultimately, we intend to realize a state in which the Entities and Aspect schemas can be altered without requiring generated code and without maintaining a single mega-model schema (looking at you, Snapshot.pdl). The intention is that changes to the metadata model become even easier than they are today.

### Synchronous Ingestion Architecture

<p align="center">
<img width="70%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/advanced/mcp-mcl/sync-ingestion.svg"/>
</p>

### Asynchronous Ingestion Architecture

<p align="center">
<img width="70%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/advanced/mcp-mcl/async-ingestion.svg"/>
</p>

## Modeling

A Metadata Change Proposal is defined (in PDL) as follows
Expand Down
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11484 - Metadata service authentication enabled by default
- #11484 - Rest API authorization enabled by default
- #10472 - `SANDBOX` added as a FabricType. No rollbacks allowed once metadata with this fabric type is added without manual cleanups in databases.
- #11619 - schema field/column paths can no longer be empty strings
- #11619 - schema field/column paths can no longer be duplicated within the schema

### Potential Downtime

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.linkedin.metadata.aspect.validation;

import static com.linkedin.metadata.Constants.*;

import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator;
import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.schema.EditableSchemaFieldInfo;
import com.linkedin.schema.EditableSchemaMetadata;
import com.linkedin.schema.SchemaField;
import com.linkedin.schema.SchemaMetadata;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;

/**
* 1. Validates the Schema Field Path specification, specifically that all field IDs must be unique
* across all fields within a schema. 2. Validates that the field path id is not empty.
*
* @see <a href="https://datahubproject.io/docs/advanced/field-path-spec-v2/#requirements">Field
* Path V2 docs</a>
*/
@Setter
@Getter
@Accessors(chain = true)
public class FieldPathValidator extends AspectPayloadValidator {
@Nonnull private AspectPluginConfig config;

/** Prevent any MCP for SchemaMetadata where field ids are duplicated. */
@Override
protected Stream<AspectValidationException> validateProposedAspects(
@Nonnull Collection<? extends BatchItem> mcpItems,
@Nonnull RetrieverContext retrieverContext) {

ValidationExceptionCollection exceptions = ValidationExceptionCollection.newCollection();

mcpItems.forEach(
i -> {
if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
processSchemaMetadataAspect(i, exceptions);
} else {
processEditableSchemaMetadataAspect(i, exceptions);
}
});

return exceptions.streamAllExceptions();
}

@Override
protected Stream<AspectValidationException> validatePreCommitAspects(
@Nonnull Collection<ChangeMCP> changeMCPs, @Nonnull RetrieverContext retrieverContext) {
return Stream.of();
}

private static void processEditableSchemaMetadataAspect(
BatchItem i, ValidationExceptionCollection exceptions) {
final EditableSchemaMetadata schemaMetadata = i.getAspect(EditableSchemaMetadata.class);
final long uniquePaths =
validateAndCount(
i,
schemaMetadata.getEditableSchemaFieldInfo().stream()
.map(EditableSchemaFieldInfo::getFieldPath),
exceptions);

if (uniquePaths != schemaMetadata.getEditableSchemaFieldInfo().size()) {
exceptions.addException(
i,
String.format(
"Cannot perform %s action on proposal. EditableSchemaMetadata aspect has duplicated field paths",
i.getChangeType()));
}
}

private static void processSchemaMetadataAspect(
BatchItem i, ValidationExceptionCollection exceptions) {
final SchemaMetadata schemaMetadata = i.getAspect(SchemaMetadata.class);
final long uniquePaths =
validateAndCount(
i, schemaMetadata.getFields().stream().map(SchemaField::getFieldPath), exceptions);

if (uniquePaths != schemaMetadata.getFields().size()) {
exceptions.addException(
i,
String.format(
"Cannot perform %s action on proposal. SchemaMetadata aspect has duplicated field paths",
i.getChangeType()));
}
}

private static long validateAndCount(
BatchItem i, Stream<String> fieldPaths, ValidationExceptionCollection exceptions) {
return fieldPaths
.distinct()
// inspect the stream of fieldPath validation errors since we're already iterating
.peek(
fieldPath ->
validateFieldPath(fieldPath)
.ifPresent(message -> exceptions.addException(i, message)))
.count();
}

private static Optional<String> validateFieldPath(String fieldPath) {
if (fieldPath == null || fieldPath.isEmpty()) {
return Optional.of("SchemaMetadata aspect has empty field path.");
}
return Optional.empty();
}
}
Loading

0 comments on commit d6a99af

Please sign in to comment.