Skip to content

Commit

Permalink
Revert "Fix for custom logical types in DatastreamIO (#1767)" (#1845)
Browse files Browse the repository at this point in the history
This reverts commit 7832ae4.
  • Loading branch information
shreyakhajanchi authored Sep 10, 2024
1 parent 583c0a9 commit 9f1308d
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 222 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@
<exclude>**/constants/**</exclude>
<exclude>**/CustomTransformationImplFetcher.*</exclude>
<exclude>**/JarFileReader.*</exclude>
<exclude>**/CustomTransformationWithShardFor*IT.*</exclude>
<exclude>**/CustomTransformationWithShardForIT.*</exclude>
</excludes>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,8 @@ public class FormatDatastreamRecordToJson
public static class CustomAvroTypes {
public static final String VARCHAR = "varchar";
public static final String NUMBER = "number";
public static final String TIME_INTERVAL_MICROS = "time-interval-micros";
}

static final String LOGICAL_TYPE = "logicalType";

static final Logger LOG = LoggerFactory.getLogger(FormatDatastreamRecordToJson.class);
static final DateTimeFormatter DEFAULT_DATE_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE;
static final DateTimeFormatter DEFAULT_TIMESTAMP_WITH_TZ_FORMATTER =
Expand Down Expand Up @@ -359,19 +356,10 @@ public static void payloadToJson(GenericRecord payloadRecord, ObjectNode jsonNod

static void putField(
String fieldName, Schema fieldSchema, GenericRecord record, ObjectNode jsonObject) {
// fieldSchema.getLogicalType() returns object of type org.apache.avro.LogicalType,
// therefore, is null for custom logical types
if (fieldSchema.getLogicalType() != null) {
// Logical types should be handled separately.
handleLogicalFieldType(fieldName, fieldSchema, record, jsonObject);
return;
} else if (fieldSchema.getProp(LOGICAL_TYPE) != null) {
// Handling for custom logical types.
boolean isSupportedCustomType =
handleCustomLogicalType(fieldName, fieldSchema, record, jsonObject);
if (isSupportedCustomType) {
return;
}
}

switch (fieldSchema.getType()) {
Expand Down Expand Up @@ -431,45 +419,6 @@ static void putField(
}
}

static boolean handleCustomLogicalType(
String fieldName, Schema fieldSchema, GenericRecord element, ObjectNode jsonObject) {
if (fieldSchema.getProp(LOGICAL_TYPE).equals(CustomAvroTypes.TIME_INTERVAL_MICROS)) {
Long timeMicrosTotal = (Long) element.get(fieldName);
boolean isNegative = false;
if (timeMicrosTotal < 0) {
timeMicrosTotal *= -1;
isNegative = true;
}
Long nanoseconds = timeMicrosTotal * TimeUnit.MICROSECONDS.toNanos(1);
Long hours = TimeUnit.NANOSECONDS.toHours(nanoseconds);
nanoseconds -= TimeUnit.HOURS.toNanos(hours);
Long minutes = TimeUnit.NANOSECONDS.toMinutes(nanoseconds);
nanoseconds -= TimeUnit.MINUTES.toNanos(minutes);
Long seconds = TimeUnit.NANOSECONDS.toSeconds(nanoseconds);
nanoseconds -= TimeUnit.SECONDS.toNanos(seconds);
Long micros = TimeUnit.NANOSECONDS.toMicros(nanoseconds);
// Pad 0 if single digit hour.
String timeString =
(hours < 10) ? String.format("%02d", hours) : String.format("%d", hours);
timeString += String.format(":%02d:%02d", minutes, seconds);
if (micros > 0) {
timeString += String.format(".%d", micros);
}
String resultString = isNegative ? "-" + timeString : timeString;
jsonObject.put(fieldName, resultString);
return true;
} else if (fieldSchema.getProp(LOGICAL_TYPE).equals(CustomAvroTypes.NUMBER)) {
String number = element.get(fieldName).toString();
jsonObject.put(fieldName, number);
return true;
} else if (fieldSchema.getProp(LOGICAL_TYPE).equals(CustomAvroTypes.VARCHAR)) {
String varcharValue = element.get(fieldName).toString();
jsonObject.put(fieldName, varcharValue);
return true;
}
return false;
}

static void handleLogicalFieldType(
String fieldName, Schema fieldSchema, GenericRecord element, ObjectNode jsonObject) {
// TODO(pabloem) Actually test this.
Expand Down Expand Up @@ -507,6 +456,12 @@ static void handleLogicalFieldType(
jsonObject.put(
fieldName,
timestamp.atOffset(ZoneOffset.UTC).format(DEFAULT_TIMESTAMP_WITH_TZ_FORMATTER));
} else if (fieldSchema.getLogicalType().getName().equals(CustomAvroTypes.NUMBER)) {
String number = (String) element.get(fieldName);
jsonObject.put(fieldName, number);
} else if (fieldSchema.getLogicalType().getName().equals(CustomAvroTypes.VARCHAR)) {
String varcharValue = (String) element.get(fieldName);
jsonObject.put(fieldName, varcharValue);
} else {
LOG.error(
"Unknown field type {} for field {} in {}. Ignoring it.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void setUp() throws IOException, InterruptedException {
createAndUploadJarToGcs("DatatypeIT");
CustomTransformation customTransformation =
CustomTransformation.builder(
"customTransformation.jar", "com.custom.CustomTransformationWithShardForLiveIT")
"customTransformation.jar", "com.custom.CustomTransformationWithShardForIT")
.build();
jobInfo =
launchDataflowJob(
Expand Down Expand Up @@ -410,7 +410,7 @@ private void assertAllDatatypeColumnsTableBackfillContents() {
row.put("decimal_column", "456.12");
row.put("datetime_column", "2024-02-08T08:15:30Z");
row.put("timestamp_column", "2024-02-08T08:15:30Z");
row.put("time_column", "08:15:30");
row.put("time_column", "29730000000");
row.put("year_column", "2022");
// text, char, tinytext, mediumtext, longtext are BYTE columns
row.put("text_column", "/u/9n58P");
Expand Down Expand Up @@ -448,7 +448,7 @@ private void assertAllDatatypeColumnsTableBackfillContents() {
row.put("decimal_column", 123.45);
row.put("datetime_column", "2024-02-09T15:30:45Z");
row.put("timestamp_column", "2024-02-09T15:30:45Z");
row.put("time_column", "15:30:45");
row.put("time_column", "55845000000");
row.put("year_column", "2023");
// text, char, tinytext, mediumtext, longtext are BYTE columns
row.put("text_column", "/u/9n58f");
Expand Down Expand Up @@ -496,7 +496,7 @@ private void assertAllDatatypeColumnsTableCdcContents() {
row.put("decimal_column", "456.12");
row.put("datetime_column", "2024-02-08T08:15:30Z");
row.put("timestamp_column", "2024-02-08T08:15:30Z");
row.put("time_column", "08:15:30");
row.put("time_column", "29730000000");
row.put("year_column", "2022");
// text, char, tinytext, mediumtext, longtext are BYTE columns
row.put("text_column", "/u/9n58P");
Expand Down Expand Up @@ -545,7 +545,7 @@ private void assertAllDatatypeColumns2TableBackfillContents() {
row.put("decimal_column", 456.12);
row.put("datetime_column", "2024-02-08T08:15:30Z");
row.put("timestamp_column", "2024-02-08T08:15:30Z");
row.put("time_column", "08:15:30");
row.put("time_column", "29730000000");
row.put("year_column", "2022");
row.put("char_column", "char_1");
// Source column value: 74696e79626c6f625f646174615f31 ( in BYTES, "tinyblob_data_1" in STRING)
Expand Down Expand Up @@ -578,7 +578,7 @@ private void assertAllDatatypeColumns2TableBackfillContents() {
row.put("decimal_column", 123.45);
row.put("datetime_column", "2024-02-09T15:30:45Z");
row.put("timestamp_column", "2024-02-09T15:30:45Z");
row.put("time_column", "15:30:45");
row.put("time_column", "55845000000");
row.put("year_column", "2023");
row.put("char_column", "char_2");
row.put("tinyblob_column", "dGlueWJsb2JfZGF0YV8y");
Expand Down Expand Up @@ -621,7 +621,8 @@ private void assertAllDatatypeTransformationTableBackfillContents() {
row.put("decimal_column", 23457.78);
row.put("datetime_column", "2022-12-31T23:59:58Z");
row.put("timestamp_column", "2022-12-31T23:59:58Z");
row.put("time_column", "00:59:59");
// TODO (b/349257952): update once TIME handling is made consistent for bulk and live.
// row.put("time_column", "86399001000");
row.put("year_column", "2023");
row.put("blob_column", "V29ybWQ=");
row.put("enum_column", "1");
Expand All @@ -642,7 +643,8 @@ private void assertAllDatatypeTransformationTableBackfillContents() {
row.put("decimal_column", 34568.89);
row.put("datetime_column", "2023-12-31T23:59:59Z");
row.put("timestamp_column", "2023-12-31T23:59:59Z");
row.put("time_column", "01:00:00");
// TODO (b/349257952): update once TIME handling is made consistent for bulk and live.
// row.put("time_column", "1000");
row.put("year_column", "2025");
row.put("blob_column", "V29ybWQ=");
row.put("enum_column", "1");
Expand All @@ -663,7 +665,8 @@ private void assertAllDatatypeTransformationTableBackfillContents() {
row.put("decimal_column", 45679.90);
row.put("datetime_column", "2021-11-11T11:11:10Z");
row.put("timestamp_column", "2021-11-11T11:11:10Z");
row.put("time_column", "12:11:11");
// TODO (b/349257952): update once TIME handling is made consistent for bulk and live.
// row.put("time_column", "40271001000");
row.put("year_column", "2022");
row.put("blob_column", "V29ybWQ=");
row.put("enum_column", "1");
Expand All @@ -674,7 +677,7 @@ private void assertAllDatatypeTransformationTableBackfillContents() {

SpannerAsserts.assertThatStructs(
spannerResourceManager.runQuery(
"SELECT varchar_column, tinyint_column, text_column, date_column, int_column, bigint_column, float_column, double_column, decimal_column, datetime_column, timestamp_column, time_column, year_column, blob_column, enum_column, bool_column, binary_column, bit_column FROM AllDatatypeTransformation"))
"SELECT varchar_column, tinyint_column, text_column, date_column, int_column, bigint_column, float_column, double_column, decimal_column, datetime_column, timestamp_column, year_column, blob_column, enum_column, bool_column, binary_column, bit_column FROM AllDatatypeTransformation"))
.hasRecordsUnorderedCaseInsensitiveColumns(events);
}

Expand All @@ -692,7 +695,7 @@ private void assertAllDatatypeTransformationTableCdcContents() {
row.put("decimal_column", 23456.79);
row.put("datetime_column", "2023-01-01T12:00:00Z");
row.put("timestamp_column", "2023-01-01T12:00:00Z");
row.put("time_column", "12:00:00");
row.put("time_column", "43200000000");
row.put("year_column", "2023");
row.put("blob_column", "EjRWeJCrze8=");
row.put("enum_column", "3");
Expand All @@ -713,7 +716,7 @@ private void assertAllDatatypeTransformationTableCdcContents() {
row.put("decimal_column", 34567.90);
row.put("datetime_column", "2024-01-02T00:00:00Z");
row.put("timestamp_column", "2024-01-02T00:00:00Z");
row.put("time_column", "01:00:00");
row.put("time_column", "3600000000");
row.put("year_column", "2025");
row.put("blob_column", "q83vEjRWeJA=");
row.put("enum_column", "1");
Expand Down Expand Up @@ -743,7 +746,7 @@ private void assertAllDatatypeColumns2TableCdcContents() {
row.put("decimal_column", 456.12);
row.put("datetime_column", "2024-02-08T08:15:30Z");
row.put("timestamp_column", "2024-02-08T08:15:30Z");
row.put("time_column", "08:15:30");
row.put("time_column", "29730000000");
row.put("year_column", "2022");
row.put("char_column", "char_1");
// Source column value: 74696e79626c6f625f646174615f31 ( in BYTES, "tinyblob_data_1" in STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void setUp() throws IOException, InterruptedException {
createAndUploadJarToGcs("shard1");
CustomTransformation customTransformation =
CustomTransformation.builder(
"customTransformation.jar", "com.custom.CustomTransformationWithShardForLiveIT")
"customTransformation.jar", "com.custom.CustomTransformationWithShardForIT")
.build();
if (jobInfo1 == null) {
jobInfo1 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void setUp() throws IOException, InterruptedException {
createAndUploadJarToGcs(gcsResourceManager);
CustomTransformation customTransformation =
CustomTransformation.builder(
"customTransformation.jar", "com.custom.CustomTransformationWithShardForLiveIT")
"customTransformation.jar", "com.custom.CustomTransformationWithShardForIT")
.build();
launchWriterDataflowJob(customTransformation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void simpleTest() throws Exception {
createAndUploadJarToGcs("CustomTransformationAllTypes");
CustomTransformation customTransformation =
CustomTransformation.builder(
"customTransformation.jar", "com.custom.CustomTransformationWithShardForBulkIT")
"customTransformation.jar", "com.custom.CustomTransformationWithShardForIT")
.build();
jobInfo =
launchDataflowJob(
Expand Down

This file was deleted.

Loading

0 comments on commit 9f1308d

Please sign in to comment.