Skip to content

Commit

Permalink
adding disableAutoCommit flag to ReadFn
Browse files Browse the repository at this point in the history
* unit tests
* CHANGES.md

* unit tests
* CHANGES.md

updating getDisableAutoCommit

variable naming

cleanup

cleanup

cleanup

cleanup

cleanup

whitespace cleanup

whitespace cleanup
  • Loading branch information
Chris Ashcraft authored and Chris Ashcraft committed Nov 5, 2024
1 parent 689af5b commit c5807c2
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)).
* (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming Java Legacy Runner without Streaming Engine ([#32892](https://github.com/apache/beam/issues/32892)).
* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ public static <T> Read<T> read() {
return new AutoValue_JdbcIO_Read.Builder<T>()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.build();
}

Expand All @@ -341,6 +342,7 @@ public static ReadRows readRows() {
return new AutoValue_JdbcIO_ReadRows.Builder()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setStatementPreparator(ignored -> {})
.build();
}
Expand All @@ -356,6 +358,7 @@ public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
return new AutoValue_JdbcIO_ReadAll.Builder<ParameterT, OutputT>()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.build();
}

Expand All @@ -372,6 +375,7 @@ public static <T, PartitionColumnT> ReadWithPartitions<T, PartitionColumnT> read
.setPartitionColumnType(partitioningColumnType)
.setNumPartitions(DEFAULT_NUM_PARTITIONS)
.setFetchSize(DEFAULT_FETCH_SIZE)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setUseBeamSchema(false)
.build();
}
Expand All @@ -389,6 +393,7 @@ public static <T, PartitionColumnT> ReadWithPartitions<T, PartitionColumnT> read
.setPartitionsHelper(partitionsHelper)
.setNumPartitions(DEFAULT_NUM_PARTITIONS)
.setFetchSize(DEFAULT_FETCH_SIZE)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setUseBeamSchema(false)
.build();
}
Expand All @@ -400,6 +405,7 @@ public static <T> ReadWithPartitions<T, Long> readWithPartitions() {
private static final long DEFAULT_BATCH_SIZE = 1000L;
private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L;
private static final int DEFAULT_FETCH_SIZE = 50_000;
private static final boolean DEFAULT_DISABLE_AUTO_COMMIT = true;
// Default values used from fluent backoff.
private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(1);
private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000);
Expand Down Expand Up @@ -733,6 +739,9 @@ public abstract static class ReadRows extends PTransform<PBegin, PCollection<Row
@Pure
abstract boolean getOutputParallelization();

@Pure
abstract boolean getDisableAutoCommit();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -748,6 +757,8 @@ abstract Builder setDataSourceProviderFn(

abstract Builder setOutputParallelization(boolean outputParallelization);

abstract Builder setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadRows build();
}

Expand Down Expand Up @@ -799,6 +810,11 @@ public ReadRows withOutputParallelization(boolean outputParallelization) {
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/** Whether to disable auto commit on read. Defaults to true if not provided. */
public ReadRows withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

@Override
public PCollection<Row> expand(PBegin input) {
ValueProvider<String> query = checkStateNotNull(getQuery(), "withQuery() is required");
Expand All @@ -816,6 +832,7 @@ public PCollection<Row> expand(PBegin input) {
.withCoder(RowCoder.of(schema))
.withRowMapper(SchemaUtil.BeamRowMapper.of(schema))
.withFetchSize(getFetchSize())
.withDisableAutoCommit(getDisableAutoCommit())
.withOutputParallelization(getOutputParallelization())
.withStatementPreparator(checkStateNotNull(getStatementPreparator())));
rows.setRowSchema(schema);
Expand Down Expand Up @@ -872,6 +889,9 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
@Pure
abstract boolean getOutputParallelization();

@Pure
abstract boolean getDisableAutoCommit();

@Pure
abstract Builder<T> toBuilder();

Expand All @@ -892,6 +912,8 @@ abstract Builder<T> setDataSourceProviderFn(

abstract Builder<T> setOutputParallelization(boolean outputParallelization);

abstract Builder<T> setDisableAutoCommit(boolean disableAutoCommit);

abstract Read<T> build();
}

Expand Down Expand Up @@ -958,6 +980,11 @@ public Read<T> withOutputParallelization(boolean outputParallelization) {
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/** Whether to disable auto commit on read. Defaults to true if not provided. */
public Read<T> withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

@Override
public PCollection<T> expand(PBegin input) {
ValueProvider<String> query = checkArgumentNotNull(getQuery(), "withQuery() is required");
Expand All @@ -974,6 +1001,7 @@ public PCollection<T> expand(PBegin input) {
.withRowMapper(rowMapper)
.withFetchSize(getFetchSize())
.withOutputParallelization(getOutputParallelization())
.withDisableAutoCommit(getDisableAutoCommit())
.withParameterSetter(
(element, preparedStatement) -> {
if (getStatementPreparator() != null) {
Expand Down Expand Up @@ -1029,6 +1057,8 @@ public abstract static class ReadAll<ParameterT, OutputT>

abstract boolean getOutputParallelization();

abstract boolean getDisableAutoCommit();

abstract Builder<ParameterT, OutputT> toBuilder();

@AutoValue.Builder
Expand All @@ -1049,6 +1079,8 @@ abstract Builder<ParameterT, OutputT> setParameterSetter(

abstract Builder<ParameterT, OutputT> setOutputParallelization(boolean outputParallelization);

abstract Builder<ParameterT, OutputT> setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadAll<ParameterT, OutputT> build();
}

Expand Down Expand Up @@ -1127,6 +1159,11 @@ public ReadAll<ParameterT, OutputT> withOutputParallelization(boolean outputPara
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/** Whether to disable auto commit on read. Defaults to true if not provided. */
public ReadAll<ParameterT, OutputT> withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

private @Nullable Coder<OutputT> inferCoder(
CoderRegistry registry, SchemaRegistry schemaRegistry) {
if (getCoder() != null) {
Expand Down Expand Up @@ -1173,7 +1210,8 @@ public PCollection<OutputT> expand(PCollection<ParameterT> input) {
checkStateNotNull(getQuery()),
checkStateNotNull(getParameterSetter()),
checkStateNotNull(getRowMapper()),
getFetchSize())))
getFetchSize(),
getDisableAutoCommit())))
.setCoder(coder);

if (getOutputParallelization()) {
Expand Down Expand Up @@ -1254,6 +1292,9 @@ public abstract static class ReadWithPartitions<T, PartitionColumnT>
@Pure
abstract @Nullable JdbcReadWithPartitionsHelper<PartitionColumnT> getPartitionsHelper();

@Pure
abstract boolean getDisableAutoCommit();

@Pure
abstract Builder<T, PartitionColumnT> toBuilder();

Expand Down Expand Up @@ -1287,6 +1328,8 @@ abstract Builder<T, PartitionColumnT> setPartitionColumnType(
abstract Builder<T, PartitionColumnT> setPartitionsHelper(
JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper);

abstract Builder<T, PartitionColumnT> setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadWithPartitions<T, PartitionColumnT> build();
}

Expand Down Expand Up @@ -1419,7 +1462,8 @@ && getLowerBound() instanceof Comparable<?>) {
.withQuery(query)
.withDataSourceProviderFn(dataSourceProviderFn)
.withRowMapper(checkStateNotNull(partitionsHelper))
.withFetchSize(getFetchSize()))
.withFetchSize(getFetchSize())
.withDisableAutoCommit(getDisableAutoCommit()))
.apply(
MapElements.via(
new SimpleFunction<
Expand Down Expand Up @@ -1487,7 +1531,8 @@ public KV<Long, KV<PartitionColumnT, PartitionColumnT>> apply(
.withRowMapper(rowMapper)
.withFetchSize(getFetchSize())
.withParameterSetter(checkStateNotNull(partitionsHelper))
.withOutputParallelization(false);
.withOutputParallelization(false)
.withDisableAutoCommit(getDisableAutoCommit());

if (getUseBeamSchema()) {
checkStateNotNull(schema);
Expand Down Expand Up @@ -1537,6 +1582,7 @@ private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, Output
private final PreparedStatementSetter<ParameterT> parameterSetter;
private final RowMapper<OutputT> rowMapper;
private final int fetchSize;
private final boolean disableAutoCommit;

private @Nullable DataSource dataSource;
private @Nullable Connection connection;
Expand All @@ -1546,12 +1592,14 @@ private ReadFn(
ValueProvider<String> query,
PreparedStatementSetter<ParameterT> parameterSetter,
RowMapper<OutputT> rowMapper,
int fetchSize) {
int fetchSize,
boolean disableAutoCommit) {
this.dataSourceProviderFn = dataSourceProviderFn;
this.query = query;
this.parameterSetter = parameterSetter;
this.rowMapper = rowMapper;
this.fetchSize = fetchSize;
this.disableAutoCommit = disableAutoCommit;
}

@Setup
Expand All @@ -1575,10 +1623,15 @@ private Connection getConnection() throws SQLException {
public void processElement(ProcessContext context) throws Exception {
// Only acquire the connection if we need to perform a read.
Connection connection = getConnection();

// PostgreSQL requires autocommit to be disabled to enable cursor streaming
// see https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
LOG.info("Autocommit has been disabled");
connection.setAutoCommit(false);
// This option is configurable as Informix will error
// if calling setAutoCommit on a non-logged database
if (disableAutoCommit) {
LOG.info("Autocommit has been disabled");
connection.setAutoCommit(false);
}
try (PreparedStatement statement =
connection.prepareStatement(
query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
if (outputParallelization != null) {
readRows = readRows.withOutputParallelization(outputParallelization);
}
Boolean disableAutoCommit = config.getDisableAutoCommit();
if (disableAutoCommit != null) {
readRows = readRows.withDisableAutoCommit(disableAutoCommit);
}
return PCollectionRowTuple.of("output", input.getPipeline().apply(readRows));
}
}
Expand Down Expand Up @@ -174,6 +178,9 @@ public abstract static class JdbcReadSchemaTransformConfiguration implements Ser
@Nullable
public abstract Boolean getOutputParallelization();

@Nullable
public abstract Boolean getDisableAutoCommit();

@Nullable
public abstract String getDriverJars();

Expand Down Expand Up @@ -238,6 +245,8 @@ public abstract static class Builder {

public abstract Builder setOutputParallelization(Boolean value);

public abstract Builder setDisableAutoCommit(Boolean value);

public abstract Builder setDriverJars(String value);

public abstract JdbcReadSchemaTransformConfiguration build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public Schema configurationSchema() {
.addNullableField("readQuery", FieldType.STRING)
.addNullableField("writeStatement", FieldType.STRING)
.addNullableField("fetchSize", FieldType.INT16)
.addNullableField("disableAutoCommit", FieldType.BOOLEAN)
.addNullableField("outputParallelization", FieldType.BOOLEAN)
.addNullableField("autosharding", FieldType.BOOLEAN)
// Partitioning support. If you specify a partition column we will use that instead of
Expand Down Expand Up @@ -140,6 +141,11 @@ public PCollection<Row> expand(PBegin input) {
readRows = readRows.withFetchSize(fetchSize);
}

@Nullable Boolean disableAutoCommit = config.getBoolean("disableAutoCommit");
if (disableAutoCommit != null) {
readRows = readRows.withDisableAutoCommit(disableAutoCommit);
}

return input.apply(readRows);
} else {

Expand All @@ -163,6 +169,11 @@ public PCollection<Row> expand(PBegin input) {
readRows = readRows.withOutputParallelization(outputParallelization);
}

@Nullable Boolean disableAutoCommit = config.getBoolean("disableAutoCommit");
if (disableAutoCommit != null) {
readRows = readRows.withDisableAutoCommit(disableAutoCommit);
}

return input.apply(readRows);
}
}
Expand Down

0 comments on commit c5807c2

Please sign in to comment.