Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding task cancellation timestamp in task API #7445

Merged
merged 16 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
- Add task cancellation timestamp in task API ([#7455](https://github.com/opensearch-project/OpenSearch/pull/7455))
- Adds ExtensionsManager.lookupExtensionSettingsById ([#7466](https://github.com/opensearch-project/OpenSearch/pull/7466))
- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394))
- Provide mechanism to configure XContent parsing constraints (after update to Jackson 2.15.0 and above) ([#7550](https://github.com/opensearch-project/OpenSearch/pull/7550))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ static TaskInfo randomTaskInfo() {
boolean cancellable = randomBoolean();
boolean cancelled = cancellable == true ? randomBoolean() : false;
TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId();
Long cancellationStartTime = null;
if (cancelled) {
cancellationStartTime = randomNonNegativeLong();
}
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
Expand All @@ -110,7 +114,8 @@ static TaskInfo randomTaskInfo() {
cancelled,
parentTaskId,
headers,
randomResourceStats()
randomResourceStats(),
cancellationStartTime
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,17 @@ public void testTasksCancellation() throws Exception {
.get();
assertEquals(1, cancelTasksResponse.getTasks().size());

// Tasks are marked as cancelled at this point but not yet completed.
List<TaskInfo> taskInfoList = client().admin()
.cluster()
.prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "*")
.get()
.getTasks();
for (TaskInfo taskInfo : taskInfoList) {
assertTrue(taskInfo.isCancelled());
assertNotNull(taskInfo.getCancellationStartTime());
}
future.get();

logger.info("--> checking that test tasks are not running");
Expand Down
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/tasks/CancellableTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ public abstract class CancellableTask extends Task {
private volatile String reason;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final TimeValue cancelAfterTimeInterval;
/**
* The time this task was cancelled as a wall clock time since epoch ({@link System#currentTimeMillis()} style).
*/
private Long cancellationStartTime = null;
/**
* The time this task was cancelled as a relative time ({@link System#nanoTime()} style).
*/
private Long cancellationStartTimeNanos = null;
dblock marked this conversation as resolved.
Show resolved Hide resolved

public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT);
Expand All @@ -74,6 +82,8 @@ public CancellableTask(
public void cancel(String reason) {
assert reason != null;
if (cancelled.compareAndSet(false, true)) {
this.cancellationStartTime = System.currentTimeMillis();
this.cancellationStartTimeNanos = System.nanoTime();
this.reason = reason;
onCancelled();
}
Expand All @@ -87,6 +97,14 @@ public boolean cancelOnParentLeaving() {
return true;
}

public Long getCancellationStartTime() {
return cancellationStartTime;
}

public Long getCancellationStartTimeNanos() {
return cancellationStartTimeNanos;
}

/**
* Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled.
*/
Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status
* Build a proper {@link TaskInfo} for this task.
*/
protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) {
boolean cancelled = this instanceof CancellableTask && ((CancellableTask) this).isCancelled();
Long cancellationStartTime = null;
if (cancelled) {
cancellationStartTime = ((CancellableTask) this).getCancellationStartTime();
}
return new TaskInfo(
new TaskId(localNodeId, getId()),
getType(),
Expand All @@ -201,10 +206,11 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status
startTime,
System.nanoTime() - startTimeNanos,
this instanceof CancellableTask,
this instanceof CancellableTask && ((CancellableTask) this).isCancelled(),
cancelled,
parentTask,
headers,
resourceStats
resourceStats,
cancellationStartTime
);
}

Expand Down
61 changes: 58 additions & 3 deletions server/src/main/java/org/opensearch/tasks/TaskInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {

private final boolean cancelled;

private final Long cancellationStartTime;

private final TaskId parentTaskId;

private final Map<String, String> headers;
Expand All @@ -104,6 +106,38 @@ public TaskInfo(
TaskId parentTaskId,
Map<String, String> headers,
TaskResourceStats resourceStats
) {
this(
taskId,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers,
resourceStats,
null
);
}

public TaskInfo(
TaskId taskId,
String type,
String action,
String description,
Task.Status status,
long startTime,
long runningTimeNanos,
boolean cancellable,
boolean cancelled,
TaskId parentTaskId,
Map<String, String> headers,
TaskResourceStats resourceStats,
Long cancellationStartTime
) {
if (cancellable == false && cancelled == true) {
throw new IllegalArgumentException("task cannot be cancelled");
Expand All @@ -120,6 +154,7 @@ public TaskInfo(
this.parentTaskId = parentTaskId;
this.headers = headers;
this.resourceStats = resourceStats;
this.cancellationStartTime = cancellationStartTime;
}

/**
Expand Down Expand Up @@ -150,6 +185,11 @@ public TaskInfo(StreamInput in) throws IOException {
} else {
resourceStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
cancellationStartTime = in.readOptionalLong();
} else {
cancellationStartTime = null;
}
}

@Override
Expand All @@ -170,6 +210,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_1_0)) {
out.writeOptionalWriteable(resourceStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalLong(cancellationStartTime);
}
}

public TaskId getTaskId() {
Expand Down Expand Up @@ -228,6 +271,10 @@ public boolean isCancelled() {
return cancelled;
}

public Long getCancellationStartTime() {
return cancellationStartTime;
}

/**
* Returns the parent task id
*/
Expand Down Expand Up @@ -281,6 +328,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
resourceStats.toXContent(builder, params);
builder.endObject();
}
if (cancellationStartTime != null) {
builder.humanReadableField("cancellation_time_millis", "cancellation_time", new TimeValue(cancellationStartTime));
}
return builder;
}

Expand Down Expand Up @@ -308,6 +358,7 @@ public static TaskInfo fromXContent(XContentParser parser) {
}
@SuppressWarnings("unchecked")
TaskResourceStats resourceStats = (TaskResourceStats) a[i++];
Long cancellationStartTime = (Long) a[i++];
RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes);
TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString);
return new TaskInfo(
Expand All @@ -322,7 +373,8 @@ public static TaskInfo fromXContent(XContentParser parser) {
cancelled,
parentTaskId,
headers,
resourceStats
resourceStats,
cancellationStartTime
);
});
static {
Expand All @@ -341,6 +393,7 @@ public static TaskInfo fromXContent(XContentParser parser) {
PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> TaskResourceStats.fromXContent(p), new ParseField("resource_stats"));
PARSER.declareLong(optionalConstructorArg(), new ParseField("cancellation_time_millis"));
}

@Override
Expand All @@ -366,7 +419,8 @@ public boolean equals(Object obj) {
&& Objects.equals(cancelled, other.cancelled)
&& Objects.equals(status, other.status)
&& Objects.equals(headers, other.headers)
&& Objects.equals(resourceStats, other.resourceStats);
&& Objects.equals(resourceStats, other.resourceStats)
&& Objects.equals(cancellationStartTime, other.cancellationStartTime);
}

@Override
Expand All @@ -383,7 +437,8 @@ public int hashCode() {
cancelled,
status,
headers,
resourceStats
resourceStats,
cancellationStartTime
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testCancellableOptionWhenCancelledTrue() {
long taskId = randomIntBetween(0, 100000);
long startTime = randomNonNegativeLong();
long runningTime = randomNonNegativeLong();
long cancellationStartTime = randomNonNegativeLong();
boolean cancellable = true;
boolean cancelled = true;
TaskInfo taskInfo = new TaskInfo(
Expand All @@ -103,12 +104,14 @@ public void testCancellableOptionWhenCancelledTrue() {
cancelled,
TaskId.EMPTY_TASK_ID,
Collections.singletonMap("foo", "bar"),
randomResourceStats(randomBoolean())
randomResourceStats(randomBoolean()),
cancellationStartTime
);
String taskInfoString = taskInfo.toString();
Map<String, Object> map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2();
assertEquals(map.get("cancellable"), cancellable);
assertEquals(map.get("cancelled"), cancelled);
assertEquals(map.get("cancellation_time_millis"), cancellationStartTime);
}

public void testCancellableOptionWhenCancelledFalse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void testNonEmptyToString() {
{
put("dummy-type1", new TaskResourceUsage(100, 100));
}
})
}),
0L
);
ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList());
assertEquals(
Expand All @@ -105,7 +106,9 @@ public void testNonEmptyToString() {
+ " \"cpu_time_in_nanos\" : 100,\n"
+ " \"memory_in_bytes\" : 100\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"cancellation_time\" : \"0s\",\n"
+ " \"cancellation_time_millis\" : 0\n"
+ " }\n"
+ " ]\n"
+ "}",
Expand Down
Loading