Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Adds support for dynamically updatable search analyzers (#290)
Browse files Browse the repository at this point in the history
* Squashed commit of the following:

commit c8c8233
Author: Himanshu Setia <[email protected]>
Date:   Tue Sep 8 19:21:58 2020 -0700

    removing comments, linting changes

commit 52526e8
Author: Himanshu Setia <[email protected]>
Date:   Tue Sep 8 19:09:30 2020 -0700

    Making refresh_synonym_analyzer uri odfe compatible

commit d4cc71e
Author: Himanshu Setia <[email protected]>
Date:   Tue Sep 8 18:33:39 2020 -0700

    Adding copyright disclaimer to new files

commit 9af7fdf
Author: Himanshu Setia <[email protected]>
Date:   Tue Sep 8 17:26:53 2020 -0700

    Misc - removing comments, adding newline, etc.

commit 1c85f18
Author: Himanshu Setia <[email protected]>
Date:   Tue Sep 8 16:39:26 2020 -0700

    Adding _refresh_synonym_analyzer API to support dynamic update for search analyzers

* Refactoring - API renaming

* Response parsing changes

* Fixed multinode response parsing issue

* Refactoring and logging

* Fixing klint errors

* Adding UTs to validate stream  parsing

* CR comments, code refactoring and more UTs

Squashed commit of the following:

commit d87842e4ccb592a39d6b6897a2b9cde9ddc2839f
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 23:36:24 2020 -0700

    Reverting RefreshSearchAnalyzerResponse stream input output parsing test for fixing later

commit 33ef3223acde38a64575255f7ec702f807d0502e
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 23:34:49 2020 -0700

    Fixing missing indices test

commit 8516d2142362c61630682bf7353e019bf9f6a9eb
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 21:59:55 2020 -0700

    CR comments and added more UTs

commit d306421ca462f2614756a73231baeeb56b388365
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 20:15:59 2020 -0700

    shardFailure refactoring

commit da80de7a68fef05e524e624ea1a8da6afdbf11d4
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 18:14:45 2020 -0700

    Working changes after refactoring shardFailure

commit be9bf2daaa31fb6444f799a8f3ed3d0383551471
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 17:45:31 2020 -0700

    Corrected response parsing for indices with some failed shards

commit 256f06ed23ac4ab2d78724601a1dd7e9d1423331
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 17:12:00 2020 -0700

    Working

* Integ test fixes by enforcing refresh during ingestion

* CR comments - reading shardResponses as List from inputStream

* API path change and few minor tweaks

* misc changes

* Reverting action name to follow the code convention

Co-authored-by: Drew Baugher <[email protected]>
  • Loading branch information
setiah and dbbaughe authored Sep 12, 2020
1 parent 1042dd6 commit a095266
Show file tree
Hide file tree
Showing 14 changed files with 852 additions and 8 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ integTest.runner {
systemProperty 'tests.security.manager', 'false'
systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath
systemProperty 'tests.path.repo', repo.absolutePath
systemProperty 'buildDir', buildDir
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for
// requests. The 'doFirst' delays reading the debug setting on the cluster till execution time.
doFirst {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.TransportRefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParser
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner
Expand Down Expand Up @@ -117,6 +120,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
nodesInCluster: Supplier<DiscoveryNodes>
): List<RestHandler> {
return listOf(
RestRefreshSearchAnalyzerAction(),
RestIndexPolicyAction(settings, clusterService, indexManagementIndices),
RestGetPolicyAction(),
RestDeletePolicyAction(),
Expand Down Expand Up @@ -190,6 +194,10 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
ActionPlugin.ActionHandler(
UpdateManagedIndexMetaDataAction.INSTANCE,
TransportUpdateManagedIndexMetaDataAction::class.java
),
ActionPlugin.ActionHandler(
RefreshSearchAnalyzerAction.INSTANCE,
TransportRefreshSearchAnalyzerAction::class.java
)
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.elasticsearch.action.ActionType
import org.elasticsearch.common.io.stream.Writeable

class RefreshSearchAnalyzerAction : ActionType<RefreshSearchAnalyzerResponse>(NAME, reader) {
companion object {
const val NAME = "indices:admin/refresh_search_analyzers"
val INSTANCE = RefreshSearchAnalyzerAction()
val reader = Writeable.Reader { inp -> RefreshSearchAnalyzerResponse(inp) }
}

override fun getResponseReader(): Writeable.Reader<RefreshSearchAnalyzerResponse> = reader
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.elasticsearch.action.support.broadcast.BroadcastRequest
import org.elasticsearch.common.io.stream.StreamInput
import java.io.IOException

class RefreshSearchAnalyzerRequest : BroadcastRequest<RefreshSearchAnalyzerRequest> {
constructor(vararg indices: String) : super(*indices)

@Throws(IOException::class)
constructor(inp: StreamInput) : super(inp)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.elasticsearch.action.support.DefaultShardOperationFailedException
import org.elasticsearch.action.support.broadcast.BroadcastResponse
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.xcontent.ConstructingObjectParser
import org.elasticsearch.common.xcontent.ToXContent.Params
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.rest.action.RestActions
import java.io.IOException
import java.util.function.Function

class RefreshSearchAnalyzerResponse : BroadcastResponse {

private lateinit var shardResponses: MutableList<RefreshSearchAnalyzerShardResponse>
private lateinit var shardFailures: MutableList<DefaultShardOperationFailedException>

@Throws(IOException::class)
constructor(inp: StreamInput) : super(inp) {
inp.readList(::RefreshSearchAnalyzerShardResponse)
inp.readList(DefaultShardOperationFailedException::readShardOperationFailed)
}

constructor(
totalShards: Int,
successfulShards: Int,
failedShards: Int,
shardFailures: List<DefaultShardOperationFailedException>,
shardResponses: List<RefreshSearchAnalyzerShardResponse>
) : super(
totalShards, successfulShards, failedShards, shardFailures
) {
this.shardResponses = shardResponses.toMutableList()
this.shardFailures = shardFailures.toMutableList()
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder? {
builder.startObject()
RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, -1, failedShards, shardFailures.toTypedArray())
builder.startArray("successful_refresh_details")
val successfulIndices = getSuccessfulRefreshDetails()
for (index in successfulIndices.keys) {
val reloadedAnalyzers = successfulIndices.get(index)!!
builder.startObject().field("index", index).startArray("refreshed_analyzers")
for (analyzer in reloadedAnalyzers) {
builder.value(analyzer)
}
builder.endArray().endObject()
}
builder.endArray().endObject()
return builder
}

// TODO: restrict it for testing
fun getSuccessfulRefreshDetails(): MutableMap<String, List<String>> {
var successfulRefreshDetails: MutableMap<String, List<String>> = HashMap()
var failedIndices = mutableSetOf<String>()
for (failure in shardFailures) {
failedIndices.add(failure.index()!!)
}
for (response in shardResponses) {
if (!failedIndices.contains(response.index)) {
successfulRefreshDetails.putIfAbsent(response.index, response.reloadedAnalyzers)
}
}
return successfulRefreshDetails
}

companion object {
private val PARSER = ConstructingObjectParser<RefreshSearchAnalyzerResponse, Void>("_refresh_search_analyzers", true,
Function { arg: Array<Any> ->
val response = arg[0] as RefreshSearchAnalyzerResponse
RefreshSearchAnalyzerResponse(response.totalShards, response.successfulShards, response.failedShards,
response.shardFailures, response.shardResponses)
})
init {
declareBroadcastFields(PARSER)
}
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeCollection(shardResponses)
out.writeCollection(shardFailures)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.elasticsearch.action.support.broadcast.BroadcastShardResponse
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.index.shard.ShardId
import java.io.IOException

class RefreshSearchAnalyzerShardResponse : BroadcastShardResponse {
var reloadedAnalyzers: List<String>

constructor(si: StreamInput) : super(si) {
reloadedAnalyzers = si.readStringArray().toList()
}

constructor(shardId: ShardId, reloadedAnalyzers: List<String>) : super(shardId) {
this.reloadedAnalyzers = reloadedAnalyzers
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeStringArray(reloadedAnalyzers.toTypedArray())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.OPEN_DISTRO_BASE_URI
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.Strings
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.RestHandler.Route
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.RestRequest.Method.POST
import org.elasticsearch.rest.action.RestToXContentListener
import java.io.IOException

class RestRefreshSearchAnalyzerAction : BaseRestHandler() {

override fun getName(): String = "refresh_search_analyzer_action"

override fun routes(): List<Route> {
return listOf(
Route(POST, REFRESH_SEARCH_ANALYZER_BASE_URI),
Route(POST, "$REFRESH_SEARCH_ANALYZER_BASE_URI/{index}")
)
}

// TODO: Add indicesOptions?

@Throws(IOException::class)
@Suppress("SpreadOperator")
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val indices: Array<String>? = Strings.splitStringByCommaToArray(request.param("index"))

if (indices.isNullOrEmpty()) {
throw IllegalArgumentException("Missing indices")
}

val refreshSearchAnalyzerRequest: RefreshSearchAnalyzerRequest = RefreshSearchAnalyzerRequest()
.indices(*indices)

return RestChannelConsumer { channel ->
client.execute(RefreshSearchAnalyzerAction.INSTANCE, refreshSearchAnalyzerRequest, RestToXContentListener(channel))
}
}

companion object {
const val REFRESH_SEARCH_ANALYZER_BASE_URI = "$OPEN_DISTRO_BASE_URI/_refresh_search_analyzers"
}
}
Loading

0 comments on commit a095266

Please sign in to comment.