Skip to content

Commit

Permalink
[Hotfix][Zeta] Fix that batch tasks cannot be stopped after retrying (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Jul 10, 2024
1 parent 49d397c commit 57e5627
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.e2e.sink.inmemory;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
Expand All @@ -32,6 +33,11 @@ public class InMemoryAggregatedCommitter
private static final List<String> events = new ArrayList<>();
private static final List<InMemoryMultiTableResourceManager> resourceManagers =
new ArrayList<>();
private ReadonlyConfig config;

public InMemoryAggregatedCommitter(ReadonlyConfig config) {
this.config = config;
}

public static List<String> getEvents() {
return events;
Expand Down Expand Up @@ -62,6 +68,9 @@ public void setMultiTableResourceManager(
@Override
public List<InMemoryAggregatedCommitInfo> commit(
List<InMemoryAggregatedCommitInfo> aggregatedCommitInfo) throws IOException {
if (config.get(InMemorySinkFactory.THROW_EXCEPTION_OF_COMMITTER)) {
throw new IOException("commit failed");
}
return new ArrayList<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Optional<Serializer<InMemoryCommitInfo>> getCommitInfoSerializer() {
@Override
public Optional<SinkAggregatedCommitter<InMemoryCommitInfo, InMemoryAggregatedCommitInfo>>
createAggregatedCommitter() throws IOException {
return Optional.of(new InMemoryAggregatedCommitter());
return Optional.of(new InMemoryAggregatedCommitter(config));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class InMemorySinkFactory
public static final Option<Boolean> CHECKPOINT_SLEEP =
Options.key("checkpoint_sleep").booleanType().defaultValue(false);

public static final Option<Boolean> THROW_EXCEPTION_OF_COMMITTER =
Options.key("throw_exception_of_committer").booleanType().defaultValue(false);

@Override
public String factoryIdentifier() {
return "InMemory";
Expand All @@ -49,7 +52,11 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.optional(THROW_EXCEPTION, THROW_OUT_OF_MEMORY, CHECKPOINT_SLEEP)
.optional(
THROW_EXCEPTION,
THROW_OUT_OF_MEMORY,
CHECKPOINT_SLEEP,
THROW_EXCEPTION_OF_COMMITTER)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) {
private void restoreTaskState(TaskLocation taskLocation) {
List<ActionSubtaskState> states = new ArrayList<>();
if (latestCompletedCheckpoint != null) {
if (!latestCompletedCheckpoint.isRestored()) {
latestCompletedCheckpoint.setRestored(true);
}
final Integer currentParallelism = pipelineTasks.get(taskLocation.getTaskVertexId());
plan.getSubtaskActions()
.get(taskLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class CompletedCheckpoint implements Checkpoint, Serializable {

private final Map<Long, TaskStatistics> taskStatistics;

@Getter @Setter private boolean isRestored = false;
@Getter @Setter private volatile boolean isRestored = false;

public CompletedCheckpoint(
long jobId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.checkpoint;

import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.master.JobMaster;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;

@DisabledOnOs(OS.WINDOWS)
public class CheckpointErrorRestoreEndTest
extends AbstractSeaTunnelServerTest<CheckpointErrorRestoreEndTest> {
public static String STREAM_CONF_WITH_ERROR_PATH =
"batch_fakesource_to_inmemory_with_commit_error.conf";

@Test
public void testCheckpointRestoreToFailEnd() {
long jobId = System.currentTimeMillis();
startJob(jobId, STREAM_CONF_WITH_ERROR_PATH, false);

JobMaster jobMaster = server.getCoordinatorService().getJobMaster(jobId);
Assertions.assertEquals(1, jobMaster.getPhysicalPlan().getPipelineList().size());
await().atMost(120, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
3,
jobMaster
.getPhysicalPlan()
.getPipelineList()
.get(0)
.getPipelineRestoreNum()));
await().atMost(120, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(jobId),
JobStatus.FAILED));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 2
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
row.num = 100
split.num = 5
split.read-interval = 3000
parallelism = 1
schema = {
fields {
name = "string"
age = "int"
}
}
parallelism = 1
}
}

transform {
}

sink {
InMemory {
source_table_name="fake"
throw_exception_of_committer=true
}
}

0 comments on commit 57e5627

Please sign in to comment.