diff --git a/CHANGES.md b/CHANGES.md index 1a9d2045cbf6..9aeaf65a7bf3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -93,6 +93,7 @@ * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)). * (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming Java Legacy Runner without Streaming Engine ([#32892](https://github.com/apache/beam/issues/32892)). +* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111)) ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 2f164fa3bb78..bbe65d0e2312 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -131,7 +131,7 @@ * .withPassword("password")) * .withQuery("select id,name from Person") * .withRowMapper(new JdbcIO.RowMapper>() { - * public KV mapRow(ResultSet resultSet) throws Exception { + * withDisableAutoCommit KV mapRow(ResultSet resultSet) throws Exception { * return KV.of(resultSet.getInt(1), resultSet.getString(2)); * } * }) @@ -332,6 +332,7 @@ public class JdbcIO { public static Read read() { return new AutoValue_JdbcIO_Read.Builder() .setFetchSize(DEFAULT_FETCH_SIZE) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .setOutputParallelization(true) .build(); } @@ -340,6 +341,7 @@ public static Read read() { public static ReadRows readRows() { return new AutoValue_JdbcIO_ReadRows.Builder() .setFetchSize(DEFAULT_FETCH_SIZE) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .setOutputParallelization(true) .setStatementPreparator(ignored -> {}) .build(); @@ -355,6 +357,7 @@ public static ReadRows readRows() { public static ReadAll readAll() { return new AutoValue_JdbcIO_ReadAll.Builder() .setFetchSize(DEFAULT_FETCH_SIZE) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .setOutputParallelization(true) .build(); } @@ -372,6 +375,7 @@ public static ReadWithPartitions read .setPartitionColumnType(partitioningColumnType) .setNumPartitions(DEFAULT_NUM_PARTITIONS) .setFetchSize(DEFAULT_FETCH_SIZE) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .setUseBeamSchema(false) .build(); } @@ -389,6 +393,7 @@ public static ReadWithPartitions read .setPartitionsHelper(partitionsHelper) .setNumPartitions(DEFAULT_NUM_PARTITIONS) .setFetchSize(DEFAULT_FETCH_SIZE) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .setUseBeamSchema(false) .build(); } @@ -400,6 +405,7 @@ public static ReadWithPartitions readWithPartitions() { private static final long DEFAULT_BATCH_SIZE = 1000L; private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L; private static final int DEFAULT_FETCH_SIZE = 50_000; + private static final boolean DEFAULT_DISABLE_AUTO_COMMIT = true; // Default values used from fluent backoff. private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(1); private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000); @@ -733,6 +739,9 @@ public abstract static class ReadRows extends PTransform expand(PBegin input) { ValueProvider query = checkStateNotNull(getQuery(), "withQuery() is required"); @@ -872,6 +888,9 @@ public abstract static class Read extends PTransform> @Pure abstract boolean getOutputParallelization(); + @Pure + abstract boolean getDisableAutoCommit(); + @Pure abstract Builder toBuilder(); @@ -884,6 +903,8 @@ abstract Builder setDataSourceProviderFn( abstract Builder setStatementPreparator(StatementPreparator statementPreparator); + abstract Builder setDisableAutoCommit(boolean disableAutoCommit); + abstract Builder setRowMapper(RowMapper rowMapper); abstract Builder setCoder(Coder coder); @@ -958,6 +979,11 @@ public Read withOutputParallelization(boolean outputParallelization) { return toBuilder().setOutputParallelization(outputParallelization).build(); } + /** Whether to disable auto commit on read. Defaults to true if not provided. */ + public Read withDisableAutoCommit(boolean disableAutoCommit) { + return toBuilder().setDisableAutoCommit(disableAutoCommit).build(); + } + @Override public PCollection expand(PBegin input) { ValueProvider query = checkArgumentNotNull(getQuery(), "withQuery() is required"); @@ -1029,6 +1055,8 @@ public abstract static class ReadAll abstract boolean getOutputParallelization(); + abstract boolean getDisableAutoCommit(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -1049,6 +1077,8 @@ abstract Builder setParameterSetter( abstract Builder setOutputParallelization(boolean outputParallelization); + abstract Builder setDisableAutoCommit(boolean disableAutoCommit); + abstract ReadAll build(); } @@ -1127,6 +1157,11 @@ public ReadAll withOutputParallelization(boolean outputPara return toBuilder().setOutputParallelization(outputParallelization).build(); } + /** Whether to disable auto commit on read. Defaults to true if not provided. */ + public ReadAll withDisableAutoCommit(boolean disableAutoCommit) { + return toBuilder().setDisableAutoCommit(disableAutoCommit).build(); + } + private @Nullable Coder inferCoder( CoderRegistry registry, SchemaRegistry schemaRegistry) { if (getCoder() != null) { @@ -1173,7 +1208,8 @@ public PCollection expand(PCollection input) { checkStateNotNull(getQuery()), checkStateNotNull(getParameterSetter()), checkStateNotNull(getRowMapper()), - getFetchSize()))) + getFetchSize(), + getDisableAutoCommit()))) .setCoder(coder); if (getOutputParallelization()) { @@ -1254,6 +1290,9 @@ public abstract static class ReadWithPartitions @Pure abstract @Nullable JdbcReadWithPartitionsHelper getPartitionsHelper(); + @Pure + abstract boolean getDisableAutoCommit(); + @Pure abstract Builder toBuilder(); @@ -1287,6 +1326,8 @@ abstract Builder setPartitionColumnType( abstract Builder setPartitionsHelper( JdbcReadWithPartitionsHelper partitionsHelper); + abstract Builder setDisableAutoCommit(boolean disableAutoCommit); + abstract ReadWithPartitions build(); } @@ -1537,6 +1578,7 @@ private static class ReadFn extends DoFn parameterSetter; private final RowMapper rowMapper; private final int fetchSize; + private final boolean disableAutoCommit; private @Nullable DataSource dataSource; private @Nullable Connection connection; @@ -1546,12 +1588,14 @@ private ReadFn( ValueProvider query, PreparedStatementSetter parameterSetter, RowMapper rowMapper, - int fetchSize) { + int fetchSize, + boolean disableAutoCommit) { this.dataSourceProviderFn = dataSourceProviderFn; this.query = query; this.parameterSetter = parameterSetter; this.rowMapper = rowMapper; this.fetchSize = fetchSize; + this.disableAutoCommit = disableAutoCommit; } @Setup @@ -1575,10 +1619,15 @@ private Connection getConnection() throws SQLException { public void processElement(ProcessContext context) throws Exception { // Only acquire the connection if we need to perform a read. Connection connection = getConnection(); + // PostgreSQL requires autocommit to be disabled to enable cursor streaming // see https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor - LOG.info("Autocommit has been disabled"); - connection.setAutoCommit(false); + // This option is configurable as Informix will error + // if calling setAutoCommit on a non-logged database + if (disableAutoCommit) { + LOG.info("Autocommit has been disabled"); + connection.setAutoCommit(false); + } try (PreparedStatement statement = connection.prepareStatement( query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {