From 3811d44a81d16a7be5b5633b09b28a6b39e02039 Mon Sep 17 00:00:00 2001 From: Mohit Godwani Date: Thu, 24 Aug 2023 01:16:34 +0530 Subject: [PATCH] More code for preventing parsing on replica --- .../action/bulk/BulkShardRequest.java | 28 +++++++++++++++++-- .../action/bulk/TransportShardBulkAction.java | 4 ++- .../index/mapper/DateFieldMapper.java | 1 + .../index/mapper/DocumentParser.java | 6 ++-- .../index/mapper/KeywordFieldMapper.java | 1 + .../index/mapper/TextFieldMapper.java | 1 + 6 files changed, 36 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/opensearch/action/bulk/BulkShardRequest.java index 018c69921aaa5..f0ed39e112135 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkShardRequest.java @@ -47,6 +47,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; /** @@ -67,14 +68,23 @@ public BulkShardRequest(StreamInput in) throws IOException { super(in); final ShardId itemShardId = in.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION) ? shardId : null; items = in.readArray(i -> i.readOptionalWriteable(inpt -> new BulkItemRequest(itemShardId, inpt)), BulkItemRequest[]::new); - parsedEntities = new HashMap[items.length]; + parsedEntities = in.readArray(is -> { + Map map = new ConcurrentHashMap<>(); + int sz = is.readInt(); + for (int i = 0; i < sz; i ++) { + String key = is.readString(); + String val = is.readString(); + map.put(key, val); + } + return map; + }, ConcurrentHashMap[]::new); } public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { super(shardId); this.items = items; setRefreshPolicy(refreshPolicy); - parsedEntities = new HashMap[items.length]; + parsedEntities = new ConcurrentHashMap[items.length]; } public BulkItemRequest[] items() { @@ -109,6 +119,15 @@ public void writeTo(StreamOutput out) throws IOException { o.writeBoolean(false); } } : StreamOutput::writeOptionalWriteable, items); + out.writeArray((o, v) -> { + o.writeInt(v == null ? 0 : v.size()); + if (v != null) { + for (Map.Entry entry : v.entrySet()) { + o.writeString(entry.getKey()); + o.writeString(entry.getValue().toString()); + } + } + }, parsedEntities); } @Override @@ -132,6 +151,11 @@ public String toString() { case NONE: break; } + + b.append("Parsed Entities: " + parsedEntities.length + " \n"); + for (Map v: parsedEntities) { + b.append(v); + } return b.toString(); } diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 763b4d78ef30a..8525ae090ccfa 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -71,6 +71,7 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.compress.CompressedXContent; import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.OutputStreamStreamOutput; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.settings.Settings; @@ -112,6 +113,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; @@ -597,7 +599,7 @@ static boolean executeBulkItemRequest( Map parsedEntity = context.getCurrentParsedFields(); if (parsedEntity == null) { - context.getBulkShardRequest().parsedEntities[context.getCurrentIndex()] = new HashMap<>(); + context.getBulkShardRequest().parsedEntities[context.getCurrentIndex()] = new ConcurrentHashMap<>(); parsedEntity = context.getBulkShardRequest().parsedEntities[context.getCurrentIndex()]; } diff --git a/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java index fca74330ab940..a789e1549fd59 100644 --- a/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/DateFieldMapper.java @@ -678,6 +678,7 @@ protected void parseCreateField(ParseContext context) throws IOException { } } else { dateAsString = context.parser().textOrNull(); + context.sourceToParse().parsedFields.put(this.name(), dateAsString); } long timestamp; diff --git a/server/src/main/java/org/opensearch/index/mapper/DocumentParser.java b/server/src/main/java/org/opensearch/index/mapper/DocumentParser.java index daf3fa480ec83..3d27cf36ad980 100644 --- a/server/src/main/java/org/opensearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/opensearch/index/mapper/DocumentParser.java @@ -88,6 +88,7 @@ ParsedDocument parseDocument(SourceToParse source, MetadataFieldMapper[] metadat if(internalParseDocument(mapping, metadataFieldsMappers, context, parser)) { validateEnd(parser); } + //context.sourceToParse().parsedFields.forEach((k,v) -> System.out.println(k + " -> " + v)); } catch (Exception e) { throw wrapInMapperParsingException(source, e); } @@ -127,18 +128,19 @@ private static boolean internalParseDocument( metadataMapper.preParse(context); } + boolean val = false; if (mapping.root.isEnabled() == false) { // entire type is disabled parser.skipChildren(); } else if (emptyDoc == false) { - parseObjectOrNested(context, mapping.root); + val = parseObjectOrNested(context, mapping.root); } for (MetadataFieldMapper metadataMapper : metadataFieldsMappers) { metadataMapper.postParse(context); } - return true; + return val; } private static void validateStart(XContentParser parser) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java index 92ee8067ee4a0..12cdbeb713433 100644 --- a/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java @@ -449,6 +449,7 @@ protected void parseCreateField(ParseContext context) throws IOException { value = nullValue; } else { value = parser.textOrNull(); + context.sourceToParse().parsedFields.put(this.name(), value); } } diff --git a/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java index a570c1c473cfd..a167387e9ffcb 100644 --- a/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java @@ -1025,6 +1025,7 @@ protected void parseCreateField(ParseContext context) throws IOException { value = context.externalValue().toString(); } else { value = context.parser().textOrNull(); + context.sourceToParse().parsedFields.put(this.name(), value); } if (value == null) {