diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java index adc4c1cf2f2..e63175c4f6e 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.e2e.sink.inmemory; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.MultiTableResourceManager; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter; @@ -32,6 +33,11 @@ public class InMemoryAggregatedCommitter private static final List events = new ArrayList<>(); private static final List resourceManagers = new ArrayList<>(); + private ReadonlyConfig config; + + public InMemoryAggregatedCommitter(ReadonlyConfig config) { + this.config = config; + } public static List getEvents() { return events; @@ -62,6 +68,9 @@ public void setMultiTableResourceManager( @Override public List commit( List aggregatedCommitInfo) throws IOException { + if (config.get(InMemorySinkFactory.THROW_EXCEPTION_OF_COMMITTER)) { + throw new IOException("commit failed"); + } return new ArrayList<>(); } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java index 8f1eba9af47..9e3852fb3cf 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java @@ -68,7 +68,7 @@ public Optional> getCommitInfoSerializer() { @Override public Optional> createAggregatedCommitter() throws IOException { - return Optional.of(new InMemoryAggregatedCommitter()); + return Optional.of(new InMemoryAggregatedCommitter(config)); } @Override diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java index 7b06ec99d97..1ab973652f9 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java @@ -41,6 +41,9 @@ public class InMemorySinkFactory public static final Option CHECKPOINT_SLEEP = Options.key("checkpoint_sleep").booleanType().defaultValue(false); + public static final Option THROW_EXCEPTION_OF_COMMITTER = + Options.key("throw_exception_of_committer").booleanType().defaultValue(false); + @Override public String factoryIdentifier() { return "InMemory"; @@ -49,7 +52,11 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .optional(THROW_EXCEPTION, THROW_OUT_OF_MEMORY, CHECKPOINT_SLEEP) + .optional( + THROW_EXCEPTION, + THROW_OUT_OF_MEMORY, + CHECKPOINT_SLEEP, + THROW_EXCEPTION_OF_COMMITTER) .build(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 2b7498f4856..8735048eac3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -293,6 +293,9 @@ private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) { private void restoreTaskState(TaskLocation taskLocation) { List states = new ArrayList<>(); if (latestCompletedCheckpoint != null) { + if (!latestCompletedCheckpoint.isRestored()) { + latestCompletedCheckpoint.setRestored(true); + } final Integer currentParallelism = pipelineTasks.get(taskLocation.getTaskVertexId()); plan.getSubtaskActions() .get(taskLocation) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java index 74be7952051..7865b9c4dc2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java @@ -46,7 +46,7 @@ public class CompletedCheckpoint implements Checkpoint, Serializable { private final Map taskStatistics; - @Getter @Setter private boolean isRestored = false; + @Getter @Setter private volatile boolean isRestored = false; public CompletedCheckpoint( long jobId, diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java new file mode 100644 index 00000000000..4893bd2c2b5 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.checkpoint; + +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; +import org.apache.seatunnel.engine.server.master.JobMaster; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +@DisabledOnOs(OS.WINDOWS) +public class CheckpointErrorRestoreEndTest + extends AbstractSeaTunnelServerTest { + public static String STREAM_CONF_WITH_ERROR_PATH = + "batch_fakesource_to_inmemory_with_commit_error.conf"; + + @Test + public void testCheckpointRestoreToFailEnd() { + long jobId = System.currentTimeMillis(); + startJob(jobId, STREAM_CONF_WITH_ERROR_PATH, false); + + JobMaster jobMaster = server.getCoordinatorService().getJobMaster(jobId); + Assertions.assertEquals(1, jobMaster.getPhysicalPlan().getPipelineList().size()); + await().atMost(120, TimeUnit.SECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + 3, + jobMaster + .getPhysicalPlan() + .getPipelineList() + .get(0) + .getPipelineRestoreNum())); + await().atMost(120, TimeUnit.SECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + server.getCoordinatorService().getJobStatus(jobId), + JobStatus.FAILED)); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf new file mode 100644 index 00000000000..b89ee138e27 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 2 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + row.num = 100 + split.num = 5 + split.read-interval = 3000 + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + parallelism = 1 + } +} + +transform { +} + +sink { + InMemory { + source_table_name="fake" + throw_exception_of_committer=true + } +} \ No newline at end of file