Skip to content

Commit

Permalink
adding disableAutoCommit flag to ReadFn
Browse files Browse the repository at this point in the history
* unit tests
* CHANGES.md

* unit tests
* CHANGES.md

updating getDisableAutoCommit

variable naming

cleanup

cleanup

cleanup

cleanup

cleanup
  • Loading branch information
Chris Ashcraft authored and Chris Ashcraft committed Oct 31, 2024
1 parent 61e7258 commit ccd1b57
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)).
* (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming Java Legacy Runner without Streaming Engine ([#32892](https://github.com/apache/beam/issues/32892)).
* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
* .withPassword("password"))
* .withQuery("select id,name from Person")
* .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
* public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
* withDisableAutoCommit KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
* return KV.of(resultSet.getInt(1), resultSet.getString(2));
* }
* })
Expand Down Expand Up @@ -332,6 +332,7 @@ public class JdbcIO {
public static <T> Read<T> read() {
return new AutoValue_JdbcIO_Read.Builder<T>()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setOutputParallelization(true)
.build();
}
Expand All @@ -340,6 +341,7 @@ public static <T> Read<T> read() {
public static ReadRows readRows() {
return new AutoValue_JdbcIO_ReadRows.Builder()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setOutputParallelization(true)
.setStatementPreparator(ignored -> {})
.build();
Expand All @@ -355,6 +357,7 @@ public static ReadRows readRows() {
public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
return new AutoValue_JdbcIO_ReadAll.Builder<ParameterT, OutputT>()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setOutputParallelization(true)
.build();
}
Expand All @@ -372,6 +375,7 @@ public static <T, PartitionColumnT> ReadWithPartitions<T, PartitionColumnT> read
.setPartitionColumnType(partitioningColumnType)
.setNumPartitions(DEFAULT_NUM_PARTITIONS)
.setFetchSize(DEFAULT_FETCH_SIZE)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setUseBeamSchema(false)
.build();
}
Expand All @@ -389,6 +393,7 @@ public static <T, PartitionColumnT> ReadWithPartitions<T, PartitionColumnT> read
.setPartitionsHelper(partitionsHelper)
.setNumPartitions(DEFAULT_NUM_PARTITIONS)
.setFetchSize(DEFAULT_FETCH_SIZE)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setUseBeamSchema(false)
.build();
}
Expand All @@ -400,6 +405,7 @@ public static <T> ReadWithPartitions<T, Long> readWithPartitions() {
private static final long DEFAULT_BATCH_SIZE = 1000L;
private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L;
private static final int DEFAULT_FETCH_SIZE = 50_000;
private static final boolean DEFAULT_DISABLE_AUTO_COMMIT = true;
// Default values used from fluent backoff.
private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(1);
private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000);
Expand Down Expand Up @@ -733,6 +739,9 @@ public abstract static class ReadRows extends PTransform<PBegin, PCollection<Row
@Pure
abstract boolean getOutputParallelization();

@Pure
abstract boolean getDisableAutoCommit();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -748,6 +757,8 @@ abstract Builder setDataSourceProviderFn(

abstract Builder setOutputParallelization(boolean outputParallelization);

abstract Builder setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadRows build();
}

Expand Down Expand Up @@ -799,6 +810,11 @@ public ReadRows withOutputParallelization(boolean outputParallelization) {
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/** Whether to disable auto commit on read. Defaults to true if not provided. */
public ReadRows withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

@Override
public PCollection<Row> expand(PBegin input) {
ValueProvider<String> query = checkStateNotNull(getQuery(), "withQuery() is required");
Expand Down Expand Up @@ -872,6 +888,9 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
@Pure
abstract boolean getOutputParallelization();

@Pure
abstract boolean getDisableAutoCommit();

@Pure
abstract Builder<T> toBuilder();

Expand All @@ -884,6 +903,8 @@ abstract Builder<T> setDataSourceProviderFn(

abstract Builder<T> setStatementPreparator(StatementPreparator statementPreparator);

abstract Builder<T> setDisableAutoCommit(boolean disableAutoCommit);

abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);

abstract Builder<T> setCoder(Coder<T> coder);
Expand Down Expand Up @@ -958,6 +979,11 @@ public Read<T> withOutputParallelization(boolean outputParallelization) {
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/** Whether to disable auto commit on read. Defaults to true if not provided. */
public Read<T> withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

@Override
public PCollection<T> expand(PBegin input) {
ValueProvider<String> query = checkArgumentNotNull(getQuery(), "withQuery() is required");
Expand Down Expand Up @@ -1029,6 +1055,8 @@ public abstract static class ReadAll<ParameterT, OutputT>

abstract boolean getOutputParallelization();

abstract boolean getDisableAutoCommit();

abstract Builder<ParameterT, OutputT> toBuilder();

@AutoValue.Builder
Expand All @@ -1049,6 +1077,8 @@ abstract Builder<ParameterT, OutputT> setParameterSetter(

abstract Builder<ParameterT, OutputT> setOutputParallelization(boolean outputParallelization);

abstract Builder<ParameterT, OutputT> setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadAll<ParameterT, OutputT> build();
}

Expand Down Expand Up @@ -1127,6 +1157,11 @@ public ReadAll<ParameterT, OutputT> withOutputParallelization(boolean outputPara
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/** Whether to disable auto commit on read. Defaults to true if not provided. */
public ReadAll<ParameterT, OutputT> withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

private @Nullable Coder<OutputT> inferCoder(
CoderRegistry registry, SchemaRegistry schemaRegistry) {
if (getCoder() != null) {
Expand Down Expand Up @@ -1173,7 +1208,8 @@ public PCollection<OutputT> expand(PCollection<ParameterT> input) {
checkStateNotNull(getQuery()),
checkStateNotNull(getParameterSetter()),
checkStateNotNull(getRowMapper()),
getFetchSize())))
getFetchSize(),
getDisableAutoCommit())))
.setCoder(coder);

if (getOutputParallelization()) {
Expand Down Expand Up @@ -1254,6 +1290,9 @@ public abstract static class ReadWithPartitions<T, PartitionColumnT>
@Pure
abstract @Nullable JdbcReadWithPartitionsHelper<PartitionColumnT> getPartitionsHelper();

@Pure
abstract boolean getDisableAutoCommit();

@Pure
abstract Builder<T, PartitionColumnT> toBuilder();

Expand Down Expand Up @@ -1287,6 +1326,8 @@ abstract Builder<T, PartitionColumnT> setPartitionColumnType(
abstract Builder<T, PartitionColumnT> setPartitionsHelper(
JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper);

abstract Builder<T, PartitionColumnT> setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadWithPartitions<T, PartitionColumnT> build();
}

Expand Down Expand Up @@ -1537,6 +1578,7 @@ private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, Output
private final PreparedStatementSetter<ParameterT> parameterSetter;
private final RowMapper<OutputT> rowMapper;
private final int fetchSize;
private final boolean disableAutoCommit;

private @Nullable DataSource dataSource;
private @Nullable Connection connection;
Expand All @@ -1546,12 +1588,14 @@ private ReadFn(
ValueProvider<String> query,
PreparedStatementSetter<ParameterT> parameterSetter,
RowMapper<OutputT> rowMapper,
int fetchSize) {
int fetchSize,
boolean disableAutoCommit) {
this.dataSourceProviderFn = dataSourceProviderFn;
this.query = query;
this.parameterSetter = parameterSetter;
this.rowMapper = rowMapper;
this.fetchSize = fetchSize;
this.disableAutoCommit = disableAutoCommit;
}

@Setup
Expand All @@ -1575,10 +1619,15 @@ private Connection getConnection() throws SQLException {
public void processElement(ProcessContext context) throws Exception {
// Only acquire the connection if we need to perform a read.
Connection connection = getConnection();

// PostgreSQL requires autocommit to be disabled to enable cursor streaming
// see https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
LOG.info("Autocommit has been disabled");
connection.setAutoCommit(false);
// This option is configurable as Informix will error
// if calling setAutoCommit on a non-logged database
if (disableAutoCommit) {
LOG.info("Autocommit has been disabled");
connection.setAutoCommit(false);
}
try (PreparedStatement statement =
connection.prepareStatement(
query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
Expand Down

0 comments on commit ccd1b57

Please sign in to comment.