Skip to content

Commit

Permalink
refactored and addressed pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Blazer-007 committed Oct 8, 2024
1 parent 7972d81 commit 9e02107
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@

package org.apache.gobblin.data.management.copy.iceberg;

import java.util.Arrays;
import java.io.IOException;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;

import com.google.common.annotations.VisibleForTesting;

import lombok.extern.slf4j.Slf4j;

/**
* Validator for Iceberg table metadata, ensuring that the source and destination tables have
* compatible schemas and partition specifications.
*/
@Slf4j
public class IcebergTableMetadataValidator {
public class IcebergTableMetadataValidatorUtils {

private IcebergTableMetadataValidator() {
private IcebergTableMetadataValidatorUtils() {
// Do not instantiate
}

Expand All @@ -43,9 +41,10 @@ private IcebergTableMetadataValidator() {
*
* @param srcTableMetadata the metadata of the source table
* @param destTableMetadata the metadata of the destination table
* @throws IllegalArgumentException if the schemas or partition specifications do not match
* @throws IOException if the schemas or partition specifications do not match
*/
public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcTableMetadata, TableMetadata destTableMetadata) {
public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcTableMetadata,
TableMetadata destTableMetadata) throws IOException {
log.info("Starting validation of Source : {} and Destination : {} Iceberg Tables Metadata",
srcTableMetadata.location(),
destTableMetadata.location());
Expand All @@ -60,42 +59,29 @@ public static void validateSourceAndDestinationTablesMetadata(TableMetadata srcT
destTableMetadata.location());
}

/**
* Validates that the schemas of the source and destination tables are identical.
*
* @param srcTableSchema the schema of the source table
* @param destTableSchema the schema of the destination table
* @throws IllegalArgumentException if the schemas do not match
*/
@VisibleForTesting
protected static void validateSchemaForEquality(Schema srcTableSchema, Schema destTableSchema) {
private static void validateSchemaForEquality(Schema srcTableSchema, Schema destTableSchema) throws IOException {
// TODO: Need to add support for schema evolution, currently only supporting copying
// between iceberg tables with same schema.
// This function needs to be broken down into multiple functions to support schema evolution
// Possible cases - Src Schema == Dest Schema,
// - Src Schema is subset of Dest Schema [ Destination Schema Evolved ],
// - Src Schema is superset of Dest Schema [ Source Schema Evolved ],
// - Other cases?
// Also consider using Strategy or any other design pattern for this to make it a better solution
if (!srcTableSchema.sameSchema(destTableSchema)) {
String errMsg = String.format(
"Schema Mismatch between Source and Destination Iceberg Tables Schema - Source : {%s} and Destination : {%s}",
srcTableSchema,
destTableSchema
"Schema Mismatch between Source and Destination Iceberg Tables Schema - Source-Schema-Id : {%s} and "
+ "Destination-Schema-Id : {%s}",
srcTableSchema.schemaId(),
destTableSchema.schemaId()
);
log.error(errMsg);
throw new IllegalArgumentException(errMsg);
throw new IOException(errMsg);
}
}

/**
* Validates that the partition specifications of the source and destination tables are compatible.
*
* @param srcPartitionSpec the partition specification of the source table
* @param destPartitionSpec the partition specification of the destination table
* @throws IllegalArgumentException if the partition specifications do not match
*/
@VisibleForTesting
protected static void validatePartitionSpecForEquality(PartitionSpec srcPartitionSpec, PartitionSpec destPartitionSpec) {
private static void validatePartitionSpecForEquality(PartitionSpec srcPartitionSpec, PartitionSpec destPartitionSpec)
throws IOException {
// Currently, only supporting copying between iceberg tables with same partition spec
if (!srcPartitionSpec.compatibleWith(destPartitionSpec)) {
String errMsg = String.format(
Expand All @@ -104,24 +90,7 @@ protected static void validatePartitionSpecForEquality(PartitionSpec srcPartitio
destPartitionSpec
);
log.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
// .compatibleWith() does not check if the partition field in partition spec have same java classes or not
// i.e. if the partition field in partition spec is of type Integer in src table and String in dest table,
// so need to put an additional check for that
// try to run test testValidatePartitionSpecWithDiffType() in IcebergTableMetadataValidatorTest.java with
// this check commented out
// TODO: This check can be removed after adding support for schema evolution
if (!Arrays.equals(srcPartitionSpec.javaClasses(), destPartitionSpec.javaClasses())) {
String errMsg = String.format(
"Partition Spec Have different types for same partition field between Source and Destination Iceberg Table - "
+ "Source : {%s} and Destination : {%s}",
srcPartitionSpec,
destPartitionSpec
);
log.error(errMsg);
throw new IllegalArgumentException(errMsg);
throw new IOException(errMsg);
}
}

}

This file was deleted.

Loading

0 comments on commit 9e02107

Please sign in to comment.