Skip to content

Commit

Permalink
Support legacy DATE and TIME logical types in xlang JdbcIO (#28382)
Browse files Browse the repository at this point in the history
* Support legacy DATE and TIME logical types in xlang JdbcIO

* Add identifier to  URN for legacy Java logical types

* Implement JdbcIO logical type javasdk_date and javasdk_time in Python
  • Loading branch information
Abacn authored Sep 18, 2023
1 parent 8c7e8b0 commit a0e9775
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
Expand All @@ -74,7 +75,23 @@ public class SchemaTranslation {
private static final Logger LOG = LoggerFactory.getLogger(SchemaTranslation.class);

private static final String URN_BEAM_LOGICAL_DECIMAL = FixedPrecisionNumeric.BASE_IDENTIFIER;
private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1";

private static String getLogicalTypeUrn(String identifier) {
if (identifier.startsWith("beam:logical_type:")) {
return identifier;
} else {
String filtered = identifier.replaceAll("[^0-9A-Za-z_]", "").toLowerCase();
if (!Strings.isNullOrEmpty(filtered)) {
// urn for non-standard Java SDK logical types are assigned with javasdk_<identifier>
return String.format("beam:logical_type:javasdk_%s:v1", filtered);
} else {
// raw "javasdk" name should only be a last resort. Types defined in Beam should have their
// own URN.
return "beam:logical_type:javasdk:v1";
}
}
}

private static final String URN_BEAM_LOGICAL_MILLIS_INSTANT =
SchemaApi.LogicalTypes.Enum.MILLIS_INSTANT
.getValueDescriptor()
Expand All @@ -84,18 +101,18 @@ public class SchemaTranslation {
// TODO(https://github.com/apache/beam/issues/19715): Populate this with a LogicalTypeRegistrar,
// which includes a way to construct
// the LogicalType given an argument.
private static final ImmutableMap<String, Class<? extends LogicalType<?, ?>>>
STANDARD_LOGICAL_TYPES =
ImmutableMap.<String, Class<? extends LogicalType<?, ?>>>builder()
.put(FixedPrecisionNumeric.IDENTIFIER, FixedPrecisionNumeric.class)
.put(MicrosInstant.IDENTIFIER, MicrosInstant.class)
.put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class)
.put(PythonCallable.IDENTIFIER, PythonCallable.class)
.put(FixedBytes.IDENTIFIER, FixedBytes.class)
.put(VariableBytes.IDENTIFIER, VariableBytes.class)
.put(FixedString.IDENTIFIER, FixedString.class)
.put(VariableString.IDENTIFIER, VariableString.class)
.build();
@VisibleForTesting
static final ImmutableMap<String, Class<? extends LogicalType<?, ?>>> STANDARD_LOGICAL_TYPES =
ImmutableMap.<String, Class<? extends LogicalType<?, ?>>>builder()
.put(FixedPrecisionNumeric.IDENTIFIER, FixedPrecisionNumeric.class)
.put(MicrosInstant.IDENTIFIER, MicrosInstant.class)
.put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class)
.put(PythonCallable.IDENTIFIER, PythonCallable.class)
.put(FixedBytes.IDENTIFIER, FixedBytes.class)
.put(VariableBytes.IDENTIFIER, VariableBytes.class)
.put(FixedString.IDENTIFIER, FixedString.class)
.put(VariableString.IDENTIFIER, VariableString.class)
.build();

public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) {
String uuid = schema.getUUID() != null ? schema.getUUID().toString() : "";
Expand Down Expand Up @@ -179,11 +196,7 @@ static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean seriali
fieldValueToProto(logicalType.getArgumentType(), logicalType.getArgument()));
}
} else {
// TODO(https://github.com/apache/beam/issues/19715): "javasdk" types should only
// be a last resort. Types defined in Beam should have their own URN, and there
// should be a mechanism for users to register their own types by URN.
String urn =
identifier.startsWith("beam:logical_type:") ? identifier : URN_BEAM_LOGICAL_JAVASDK;
String urn = getLogicalTypeUrn(identifier);
logicalTypeBuilder =
SchemaApi.LogicalType.newBuilder()
.setRepresentation(
Expand Down Expand Up @@ -429,15 +442,22 @@ private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType p
} else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) {
return FieldType.DECIMAL;
} else if (urn.startsWith("beam:logical_type:")) {
try {
return FieldType.logicalType(
(LogicalType)
SerializableUtils.deserializeFromByteArray(
logicalType.getPayload().toByteArray(), "logicalType"));
} catch (IllegalArgumentException e) {
LOG.warn(
"Unable to deserialize the logical type {} from proto. Mark as UnknownLogicalType.",
urn);
if (!logicalType.getPayload().isEmpty()) {
// logical type has a payload, try to recover the instance by deserialization
try {
return FieldType.logicalType(
(LogicalType)
SerializableUtils.deserializeFromByteArray(
logicalType.getPayload().toByteArray(), "logicalType"));
} catch (IllegalArgumentException e) {
LOG.warn(
"Unable to deserialize the logical type {} from proto. Mark as UnknownLogicalType.",
urn);
}
} else {
// logical type does not have a payload. This happens when it is passed xlang.
// TODO(yathu) it appears this path is called heavily, consider cache the instance
LOG.debug("Constructing non-standard logical type {} as UnknownLogicalType", urn);
}
}
// assemble an UnknownLogicalType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.schemas;

import static org.apache.beam.sdk.schemas.SchemaTranslation.STANDARD_LOGICAL_TYPES;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -48,6 +49,7 @@
import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable;
import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.UnknownLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -186,7 +188,8 @@ public static Iterable<Schema> data() {
.withOptions(optionsBuilder))
.add(
Schema.of(
Field.of("null_argument", FieldType.logicalType(new NullArgumentLogicalType()))))
Field.of(
"null_argument", FieldType.logicalType(new PortableNullArgLogicalType()))))
.add(Schema.of(Field.of("logical_argument", FieldType.logicalType(new DateTime()))))
.add(
Schema.of(Field.of("single_arg_argument", FieldType.logicalType(FixedBytes.of(100)))))
Expand Down Expand Up @@ -348,14 +351,14 @@ public static Iterable<Row> data() {
.add(simpleRow(FieldType.row(row.getSchema()), row))
.add(simpleRow(FieldType.DATETIME, new Instant(23L)))
.add(simpleRow(FieldType.DECIMAL, BigDecimal.valueOf(100000)))
.add(simpleRow(FieldType.logicalType(new NullArgumentLogicalType()), "str"))
.add(simpleRow(FieldType.logicalType(new PortableNullArgLogicalType()), "str"))
.add(simpleRow(FieldType.logicalType(new DateTime()), LocalDateTime.of(2000, 1, 3, 3, 1)))
.add(simpleNullRow(FieldType.STRING))
.add(simpleNullRow(FieldType.INT32))
.add(simpleNullRow(FieldType.map(FieldType.STRING, FieldType.INT32)))
.add(simpleNullRow(FieldType.array(FieldType.STRING)))
.add(simpleNullRow(FieldType.row(row.getSchema())))
.add(simpleNullRow(FieldType.logicalType(new NullArgumentLogicalType())))
.add(simpleNullRow(FieldType.logicalType(new PortableNullArgLogicalType())))
.add(simpleNullRow(FieldType.logicalType(new DateTime())))
.add(simpleNullRow(FieldType.DECIMAL))
.add(simpleNullRow(FieldType.DATETIME))
Expand Down Expand Up @@ -419,14 +422,16 @@ public static Iterable<Schema.FieldType> data() {
.add(FieldType.logicalType(FixedString.of(10)))
.add(FieldType.logicalType(VariableString.of(10)))
.add(FieldType.logicalType(FixedPrecisionNumeric.of(10)))
.add(FieldType.logicalType(new PortableNullArgLogicalType()))
.add(FieldType.logicalType(new NullArgumentLogicalType()))
.build();
}

@Parameter(0)
public Schema.FieldType fieldType;

@Test
public void testPortableLogicalTypeSerializeDeserilizeCorrectly() {
public void testLogicalTypeSerializeDeserilizeCorrectly() {
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true);
Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto);

Expand All @@ -438,14 +443,64 @@ public void testPortableLogicalTypeSerializeDeserilizeCorrectly() {
assertThat(
translated.getLogicalType().getArgument(),
equalTo(fieldType.getLogicalType().getArgument()));
assertThat(
translated.getLogicalType().getIdentifier(),
equalTo(fieldType.getLogicalType().getIdentifier()));
}

@Test
public void testLogicalTypeFromToProtoCorrectly() {
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, false);
Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto);

if (STANDARD_LOGICAL_TYPES.containsKey(translated.getLogicalType().getIdentifier())) {
// standard logical type should be able to fully recover the original type
assertThat(
translated.getLogicalType().getClass(), equalTo(fieldType.getLogicalType().getClass()));
} else {
// non-standard type will get assembled to UnknownLogicalType
assertThat(translated.getLogicalType().getClass(), equalTo(UnknownLogicalType.class));
}
assertThat(
translated.getLogicalType().getArgumentType(),
equalTo(fieldType.getLogicalType().getArgumentType()));
assertThat(
translated.getLogicalType().getArgument(),
equalTo(fieldType.getLogicalType().getArgument()));
if (fieldType.getLogicalType().getIdentifier().startsWith("beam:logical_type:")) {
// portable logical type should fully recover the urn
assertThat(
translated.getLogicalType().getIdentifier(),
equalTo(fieldType.getLogicalType().getIdentifier()));
} else {
// non-portable logical type would have "javasdk_<IDENTIFIER>" urn
assertThat(
translated.getLogicalType().getIdentifier(),
equalTo(
String.format(
"beam:logical_type:javasdk_%s:v1",
fieldType
.getLogicalType()
.getIdentifier()
.toLowerCase()
.replaceAll("[^0-9A-Za-z_]", ""))));
}
}
}

/** A simple logical type that has no argument. */
private static class NullArgumentLogicalType implements Schema.LogicalType<String, String> {
/** A portable logical type that has no argument. */
private static class PortableNullArgLogicalType extends NullArgumentLogicalType {
public static final String IDENTIFIER = "beam:logical_type:null_argument:v1";

public NullArgumentLogicalType() {}
@Override
public String getIdentifier() {
return IDENTIFIER;
}
}

/** A non-portable (Java SDK) logical type that has no argument. */
private static class NullArgumentLogicalType implements Schema.LogicalType<String, String> {
public static final String IDENTIFIER = "NULL_ARGUMENT";

@Override
public String toBaseType(String input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.sql.JDBCType;
import java.time.Instant;
import java.util.Objects;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
Expand All @@ -30,11 +29,11 @@
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType} implementations of JDBC types. */
class LogicalTypes {
// Logical types of the following static members are not portable and are preserved for
// compatibility reason. Consider using portable logical types when adding new ones.
static final Schema.FieldType JDBC_BIT_TYPE =
Schema.FieldType.logicalType(
new PassThroughLogicalType<Boolean>(
Expand Down Expand Up @@ -110,69 +109,4 @@ static Schema.LogicalType<byte[], byte[]> fixedOrVariableBytes(String name, int
return FixedBytes.of(name, length);
}
}

/** Base class for JDBC logical types. */
abstract static class JdbcLogicalType<T extends @NonNull Object>
implements Schema.LogicalType<T, T> {
protected final String identifier;
protected final Schema.FieldType argumentType;
protected final Schema.FieldType baseType;
protected final Object argument;

protected JdbcLogicalType(
String identifier,
Schema.FieldType argumentType,
Schema.FieldType baseType,
Object argument) {
this.identifier = identifier;
this.argumentType = argumentType;
this.baseType = baseType;
this.argument = argument;
}

@Override
public String getIdentifier() {
return identifier;
}

@Override
public FieldType getArgumentType() {
return argumentType;
}

@Override
@SuppressWarnings("TypeParameterUnusedInFormals")
public <ArgumentT> ArgumentT getArgument() {
return (ArgumentT) argument;
}

@Override
public Schema.FieldType getBaseType() {
return baseType;
}

@Override
public T toBaseType(T input) {
return input;
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (!(o instanceof JdbcLogicalType)) {
return false;
}
JdbcLogicalType<?> that = (JdbcLogicalType<?>) o;
return Objects.equals(identifier, that.identifier)
&& Objects.equals(baseType, that.baseType)
&& Objects.equals(argument, that.argument);
}

@Override
public int hashCode() {
return Objects.hash(identifier, baseType, argument);
}
}
}
16 changes: 12 additions & 4 deletions sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

# pytype: skip-file

import datetime
import logging
import time
import typing
Expand Down Expand Up @@ -60,7 +61,8 @@
"JdbcTestRow",
[("f_id", int), ("f_float", float), ("f_char", str), ("f_varchar", str),
("f_bytes", bytes), ("f_varbytes", bytes), ("f_timestamp", Timestamp),
("f_decimal", Decimal)],
("f_decimal", Decimal), ("f_date", datetime.date),
("f_time", datetime.time)],
)
coders.registry.register_coder(JdbcTestRow, coders.RowCoder)

Expand Down Expand Up @@ -132,7 +134,7 @@ def test_xlang_jdbc_write_read(self, database):
"f_float DOUBLE PRECISION, " + "f_char CHAR(10), " +
"f_varchar VARCHAR(10), " + f"f_bytes {binary_type[0]}, " +
f"f_varbytes {binary_type[1]}, " + "f_timestamp TIMESTAMP(3), " +
"f_decimal DECIMAL(10, 2))")
"f_decimal DECIMAL(10, 2), " + "f_date DATE, " + "f_time TIME(3))")
inserted_rows = [
JdbcTestRow(
i,
Expand All @@ -144,7 +146,11 @@ def test_xlang_jdbc_write_read(self, database):
# In alignment with Java Instant which supports milli precision.
Timestamp.of(seconds=round(time.time(), 3)),
# Test both positive and negative numbers.
Decimal(f'{i-1}.23')) for i in range(ROW_COUNT)
Decimal(f'{i-1}.23'),
# Test both date before or after EPOCH
datetime.date(1969 + i, i % 12 + 1, i % 31 + 1),
datetime.time(i % 24, i % 60, i % 60, (i * 1000) % 1_000_000))
for i in range(ROW_COUNT)
]
expected_row = []
for row in inserted_rows:
Expand All @@ -163,7 +169,9 @@ def test_xlang_jdbc_write_read(self, database):
f_bytes,
row.f_bytes,
row.f_timestamp,
row.f_decimal))
row.f_decimal,
row.f_date,
row.f_time))

with TestPipeline() as p:
p.not_use_test_runner_api = True
Expand Down
Loading

0 comments on commit a0e9775

Please sign in to comment.