diff --git a/CHANGES.md b/CHANGES.md index cdedce22e975..7b2e717990a7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -95,6 +95,7 @@ * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)). * (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming Java Legacy Runner without Streaming Engine ([#32892](https://github.com/apache/beam/issues/32892)). +* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111)) ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 2f164fa3bb78..e4e38fa0fc05 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -333,6 +333,7 @@ public static Read read() { return new AutoValue_JdbcIO_Read.Builder() .setFetchSize(DEFAULT_FETCH_SIZE) .setOutputParallelization(true) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .build(); } @@ -341,6 +342,7 @@ public static ReadRows readRows() { return new AutoValue_JdbcIO_ReadRows.Builder() .setFetchSize(DEFAULT_FETCH_SIZE) .setOutputParallelization(true) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .setStatementPreparator(ignored -> {}) .build(); } @@ -356,6 +358,7 @@ public static ReadAll readAll() { return new AutoValue_JdbcIO_ReadAll.Builder() .setFetchSize(DEFAULT_FETCH_SIZE) .setOutputParallelization(true) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .build(); } @@ -372,6 +375,7 @@ public static ReadWithPartitions read .setPartitionColumnType(partitioningColumnType) .setNumPartitions(DEFAULT_NUM_PARTITIONS) .setFetchSize(DEFAULT_FETCH_SIZE) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .setUseBeamSchema(false) .build(); } @@ -389,6 +393,7 @@ public static ReadWithPartitions read .setPartitionsHelper(partitionsHelper) .setNumPartitions(DEFAULT_NUM_PARTITIONS) .setFetchSize(DEFAULT_FETCH_SIZE) + .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT) .setUseBeamSchema(false) .build(); } @@ -400,6 +405,7 @@ public static ReadWithPartitions readWithPartitions() { private static final long DEFAULT_BATCH_SIZE = 1000L; private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L; private static final int DEFAULT_FETCH_SIZE = 50_000; + private static final boolean DEFAULT_DISABLE_AUTO_COMMIT = true; // Default values used from fluent backoff. private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(1); private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000); @@ -733,6 +739,9 @@ public abstract static class ReadRows extends PTransform expand(PBegin input) { ValueProvider query = checkStateNotNull(getQuery(), "withQuery() is required"); @@ -816,6 +832,7 @@ public PCollection expand(PBegin input) { .withCoder(RowCoder.of(schema)) .withRowMapper(SchemaUtil.BeamRowMapper.of(schema)) .withFetchSize(getFetchSize()) + .withDisableAutoCommit(getDisableAutoCommit()) .withOutputParallelization(getOutputParallelization()) .withStatementPreparator(checkStateNotNull(getStatementPreparator()))); rows.setRowSchema(schema); @@ -872,6 +889,9 @@ public abstract static class Read extends PTransform> @Pure abstract boolean getOutputParallelization(); + @Pure + abstract boolean getDisableAutoCommit(); + @Pure abstract Builder toBuilder(); @@ -892,6 +912,8 @@ abstract Builder setDataSourceProviderFn( abstract Builder setOutputParallelization(boolean outputParallelization); + abstract Builder setDisableAutoCommit(boolean disableAutoCommit); + abstract Read build(); } @@ -958,6 +980,11 @@ public Read withOutputParallelization(boolean outputParallelization) { return toBuilder().setOutputParallelization(outputParallelization).build(); } + /** Whether to disable auto commit on read. Defaults to true if not provided. */ + public Read withDisableAutoCommit(boolean disableAutoCommit) { + return toBuilder().setDisableAutoCommit(disableAutoCommit).build(); + } + @Override public PCollection expand(PBegin input) { ValueProvider query = checkArgumentNotNull(getQuery(), "withQuery() is required"); @@ -974,6 +1001,7 @@ public PCollection expand(PBegin input) { .withRowMapper(rowMapper) .withFetchSize(getFetchSize()) .withOutputParallelization(getOutputParallelization()) + .withDisableAutoCommit(getDisableAutoCommit()) .withParameterSetter( (element, preparedStatement) -> { if (getStatementPreparator() != null) { @@ -1029,6 +1057,8 @@ public abstract static class ReadAll abstract boolean getOutputParallelization(); + abstract boolean getDisableAutoCommit(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -1049,6 +1079,8 @@ abstract Builder setParameterSetter( abstract Builder setOutputParallelization(boolean outputParallelization); + abstract Builder setDisableAutoCommit(boolean disableAutoCommit); + abstract ReadAll build(); } @@ -1127,6 +1159,11 @@ public ReadAll withOutputParallelization(boolean outputPara return toBuilder().setOutputParallelization(outputParallelization).build(); } + /** Whether to disable auto commit on read. Defaults to true if not provided. */ + public ReadAll withDisableAutoCommit(boolean disableAutoCommit) { + return toBuilder().setDisableAutoCommit(disableAutoCommit).build(); + } + private @Nullable Coder inferCoder( CoderRegistry registry, SchemaRegistry schemaRegistry) { if (getCoder() != null) { @@ -1173,7 +1210,8 @@ public PCollection expand(PCollection input) { checkStateNotNull(getQuery()), checkStateNotNull(getParameterSetter()), checkStateNotNull(getRowMapper()), - getFetchSize()))) + getFetchSize(), + getDisableAutoCommit()))) .setCoder(coder); if (getOutputParallelization()) { @@ -1254,6 +1292,9 @@ public abstract static class ReadWithPartitions @Pure abstract @Nullable JdbcReadWithPartitionsHelper getPartitionsHelper(); + @Pure + abstract boolean getDisableAutoCommit(); + @Pure abstract Builder toBuilder(); @@ -1287,6 +1328,8 @@ abstract Builder setPartitionColumnType( abstract Builder setPartitionsHelper( JdbcReadWithPartitionsHelper partitionsHelper); + abstract Builder setDisableAutoCommit(boolean disableAutoCommit); + abstract ReadWithPartitions build(); } @@ -1419,7 +1462,8 @@ && getLowerBound() instanceof Comparable) { .withQuery(query) .withDataSourceProviderFn(dataSourceProviderFn) .withRowMapper(checkStateNotNull(partitionsHelper)) - .withFetchSize(getFetchSize())) + .withFetchSize(getFetchSize()) + .withDisableAutoCommit(getDisableAutoCommit())) .apply( MapElements.via( new SimpleFunction< @@ -1487,7 +1531,8 @@ public KV> apply( .withRowMapper(rowMapper) .withFetchSize(getFetchSize()) .withParameterSetter(checkStateNotNull(partitionsHelper)) - .withOutputParallelization(false); + .withOutputParallelization(false) + .withDisableAutoCommit(getDisableAutoCommit()); if (getUseBeamSchema()) { checkStateNotNull(schema); @@ -1537,6 +1582,7 @@ private static class ReadFn extends DoFn parameterSetter; private final RowMapper rowMapper; private final int fetchSize; + private final boolean disableAutoCommit; private @Nullable DataSource dataSource; private @Nullable Connection connection; @@ -1546,12 +1592,14 @@ private ReadFn( ValueProvider query, PreparedStatementSetter parameterSetter, RowMapper rowMapper, - int fetchSize) { + int fetchSize, + boolean disableAutoCommit) { this.dataSourceProviderFn = dataSourceProviderFn; this.query = query; this.parameterSetter = parameterSetter; this.rowMapper = rowMapper; this.fetchSize = fetchSize; + this.disableAutoCommit = disableAutoCommit; } @Setup @@ -1575,10 +1623,15 @@ private Connection getConnection() throws SQLException { public void processElement(ProcessContext context) throws Exception { // Only acquire the connection if we need to perform a read. Connection connection = getConnection(); + // PostgreSQL requires autocommit to be disabled to enable cursor streaming // see https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor - LOG.info("Autocommit has been disabled"); - connection.setAutoCommit(false); + // This option is configurable as Informix will error + // if calling setAutoCommit on a non-logged database + if (disableAutoCommit) { + LOG.info("Autocommit has been disabled"); + connection.setAutoCommit(false); + } try (PreparedStatement statement = connection.prepareStatement( query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index 0139207235a0..435bfc138b5b 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -117,6 +117,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { if (outputParallelization != null) { readRows = readRows.withOutputParallelization(outputParallelization); } + Boolean disableAutoCommit = config.getDisableAutoCommit(); + if (disableAutoCommit != null) { + readRows = readRows.withDisableAutoCommit(disableAutoCommit); + } return PCollectionRowTuple.of("output", input.getPipeline().apply(readRows)); } } @@ -174,6 +178,9 @@ public abstract static class JdbcReadSchemaTransformConfiguration implements Ser @Nullable public abstract Boolean getOutputParallelization(); + @Nullable + public abstract Boolean getDisableAutoCommit(); + @Nullable public abstract String getDriverJars(); @@ -238,6 +245,8 @@ public abstract static class Builder { public abstract Builder setOutputParallelization(Boolean value); + public abstract Builder setDisableAutoCommit(Boolean value); + public abstract Builder setDriverJars(String value); public abstract JdbcReadSchemaTransformConfiguration build(); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java index 4b5dc0d7e24a..30012465eb9e 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java @@ -65,6 +65,7 @@ public Schema configurationSchema() { .addNullableField("readQuery", FieldType.STRING) .addNullableField("writeStatement", FieldType.STRING) .addNullableField("fetchSize", FieldType.INT16) + .addNullableField("disableAutoCommit", FieldType.BOOLEAN) .addNullableField("outputParallelization", FieldType.BOOLEAN) .addNullableField("autosharding", FieldType.BOOLEAN) // Partitioning support. If you specify a partition column we will use that instead of @@ -140,6 +141,11 @@ public PCollection expand(PBegin input) { readRows = readRows.withFetchSize(fetchSize); } + @Nullable Boolean disableAutoCommit = config.getBoolean("disableAutoCommit"); + if (disableAutoCommit != null) { + readRows = readRows.withDisableAutoCommit(disableAutoCommit); + } + return input.apply(readRows); } else { @@ -163,6 +169,11 @@ public PCollection expand(PBegin input) { readRows = readRows.withOutputParallelization(outputParallelization); } + @Nullable Boolean disableAutoCommit = config.getBoolean("disableAutoCommit"); + if (disableAutoCommit != null) { + readRows = readRows.withDisableAutoCommit(disableAutoCommit); + } + return input.apply(readRows); } }