Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix] Fix IllegalArgumentException thrown when creating a PIT #16781

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763))
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))
- Fix illegal argument exception when creating a PIT ([#16781](https://github.com/opensearch-project/OpenSearch/pull/16781))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;

/**
* Base class for all individual search phases like collecting distributed frequencies, fetching documents, querying shards.
Expand Down Expand Up @@ -69,11 +70,15 @@ public String getName() {
}

/**
* Returns the SearchPhase name as {@link SearchPhaseName}. Exception will come if SearchPhase name is not defined
* in {@link SearchPhaseName}
* @return {@link SearchPhaseName}
* Returns an Optional of the SearchPhase name as {@link SearchPhaseName}. If there's not a matching SearchPhaseName,
* returns an empty Optional.
* @return {@link Optional<SearchPhaseName>}
*/
public SearchPhaseName getSearchPhaseName() {
return SearchPhaseName.valueOf(name.toUpperCase(Locale.ROOT));
public Optional<SearchPhaseName> getSearchPhaseName() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change to a class annotated PublicApi so I think you'll want to make this change in way that keeps the existing signature.

try {
return Optional.of(SearchPhaseName.valueOf(name.toUpperCase(Locale.ROOT)));
} catch (IllegalArgumentException e) {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,22 @@ public long getTookMetric() {

@Override
protected void onPhaseStart(SearchPhaseContext context) {
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc();
context.getCurrentPhase().getSearchPhaseName().ifPresent(name -> phaseStatsMap.get(name).current.inc());
}

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
StatsHolder phaseStats = phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName());
phaseStats.current.dec();
phaseStats.total.inc();
phaseStats.timing.inc(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos()));
context.getCurrentPhase().getSearchPhaseName().ifPresent(name -> {
StatsHolder phaseStats = phaseStatsMap.get(name);
phaseStats.current.dec();
phaseStats.total.inc();
phaseStats.timing.inc(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos()));
});
}

@Override
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
context.getCurrentPhase().getSearchPhaseName().ifPresent(name -> phaseStatsMap.get(name).current.dec());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,29 +399,29 @@ public void testOnPhaseFailureAndVerifyListeners() {
final List<SearchRequestOperationsListener> requestOperationListeners = List.of(testListener, assertingListener);
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
action.start();
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName().get()));
action.onPhaseFailure(new SearchPhase("test") {
@Override
public void run() {

}
}, "message", null);
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName().get()));

SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(
requestOperationListeners
);
searchDfsQueryThenFetchAsyncAction.start();
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName().get()));
searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") {
@Override
public void run() {

}
}, "message", null);
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName().get()));

FetchSearchPhase fetchPhase = createFetchSearchPhase();
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
Expand All @@ -430,15 +430,15 @@ public void run() {
action.skipShard(searchShardIterator);
action.start();
action.executeNextPhase(action, fetchPhase);
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName().get()));
action.onPhaseFailure(new SearchPhase("test") {
@Override
public void run() {

}
}, "message", null);
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName().get()));
}

public void testOnPhaseFailure() {
Expand Down Expand Up @@ -722,7 +722,7 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx
action.start();

// Verify queryPhase current metric
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName().get()));
TimeUnit.MILLISECONDS.sleep(delay);

FetchSearchPhase fetchPhase = createFetchSearchPhase();
Expand All @@ -733,31 +733,31 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx
action.executeNextPhase(action, fetchPhase);

// Verify queryPhase total, current and latency metrics
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertThat(testListener.getPhaseMetric(action.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName().get()));
assertThat(testListener.getPhaseMetric(action.getSearchPhaseName().get()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(action.getSearchPhaseName().get()));

// Verify fetchPhase current metric
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName().get()));
TimeUnit.MILLISECONDS.sleep(delay);

ExpandSearchPhase expandPhase = createExpandSearchPhase();
action.executeNextPhase(fetchPhase, expandPhase);
TimeUnit.MILLISECONDS.sleep(delay);

// Verify fetchPhase total, current and latency metrics
assertThat(testListener.getPhaseMetric(fetchPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertThat(testListener.getPhaseMetric(fetchPhase.getSearchPhaseName().get()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName().get()));

assertEquals(1, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName().get()));

action.executeNextPhase(expandPhase, fetchPhase);
action.onPhaseDone(); /* finish phase since we don't have reponse being sent */

assertThat(testListener.getPhaseMetric(expandPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(expandPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName()));
assertThat(testListener.getPhaseMetric(expandPhase.getSearchPhaseName().get()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(expandPhase.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName().get()));
}

public void testOnPhaseListenersWithDfsType() throws InterruptedException {
Expand All @@ -772,7 +772,7 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException {

FetchSearchPhase fetchPhase = createFetchSearchPhase();
searchDfsQueryThenFetchAsyncAction.start();
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName().get()));
TimeUnit.MILLISECONDS.sleep(delay);
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE);
Expand All @@ -786,9 +786,9 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException {
null
); /* finalizing the fetch phase since we do adhoc phase lifecycle calls */

assertThat(testListener.getPhaseMetric(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertThat(testListener.getPhaseMetric(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName().get()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName().get()));
}

private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -30,18 +31,18 @@ public void testListenersAreExecuted() {

@Override
public void onPhaseStart(SearchPhaseContext context) {
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName().get()).current.inc();
}

@Override
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).total.inc();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName().get()).current.dec();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName().get()).total.inc();
}

@Override
public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName().get()).current.dec();
}
};

Expand All @@ -61,7 +62,7 @@ public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {

for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
when(ctx.getCurrentPhase()).thenReturn(searchPhase);
when(searchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(searchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
compositeListener.onPhaseStart(ctx);
assertEquals(totalListeners, searchPhaseMap.get(searchPhaseName).current.count());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void testSearchRequestPhaseFailure() {
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);

for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
testRequestStats.onPhaseStart(ctx);
assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName));
testRequestStats.onPhaseFailure(ctx, new Throwable());
Expand All @@ -85,7 +86,7 @@ public void testSearchRequestStats() {
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);

for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
long tookTimeInMillis = randomIntBetween(1, 10);
testRequestStats.onPhaseStart(ctx);
long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis);
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedE
SearchPhaseContext ctx = mock(SearchPhaseContext.class);
SearchPhase mockSearchPhase = mock(SearchPhase.class);
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
for (int i = 0; i < numTasks; i++) {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
Expand Down Expand Up @@ -145,7 +146,7 @@ public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedExc
SearchPhaseContext ctx = mock(SearchPhaseContext.class);
SearchPhase mockSearchPhase = mock(SearchPhase.class);
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
long tookTimeInMillis = randomIntBetween(1, 10);
long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis);
when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime);
Expand Down Expand Up @@ -188,7 +189,7 @@ public void testSearchRequestStatsOnPhaseFailureConcurrently() throws Interrupte
SearchPhaseContext ctx = mock(SearchPhaseContext.class);
SearchPhase mockSearchPhase = mock(SearchPhase.class);
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
for (int i = 0; i < numTasks; i++) {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
Expand All @@ -205,4 +206,25 @@ public void testSearchRequestStatsOnPhaseFailureConcurrently() throws Interrupte
assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName));
}
}

public void testOtherPhaseNamesAreIgnored() {
// Unrecognized phase names shouldn't be tracked, but should not throw any error.
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testRequestStats = new SearchRequestStats(clusterSettings);
SearchPhaseContext ctx = mock(SearchPhaseContext.class);
SearchPhase mockSearchPhase = new SearchPhase("unrecognized_phase") {
@Override
public void run() {}
};
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
testRequestStats.onPhaseStart(ctx);
testRequestStats.onPhaseEnd(
ctx,
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
new SearchRequest(),
() -> null
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -86,7 +87,7 @@ public void testShardLevelSearchGroupStats() throws Exception {
SearchPhase mockSearchPhase = mock(SearchPhase.class);
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
when(mockSearchPhase.getStartTimeInNanos()).thenReturn(System.nanoTime() - TimeUnit.SECONDS.toNanos(paramValue));
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
for (int iterator = 0; iterator < paramValue; iterator++) {
onPhaseStart(testRequestStats, ctx);
onPhaseEnd(testRequestStats, ctx);
Expand Down
Loading