Skip to content

Commit

Permalink
More code for preventing parsing on replica
Browse files Browse the repository at this point in the history
  • Loading branch information
mgodwan committed Aug 23, 2023
1 parent eb58389 commit 3811d44
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

/**
Expand All @@ -67,14 +68,23 @@ public BulkShardRequest(StreamInput in) throws IOException {
super(in);
final ShardId itemShardId = in.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION) ? shardId : null;
items = in.readArray(i -> i.readOptionalWriteable(inpt -> new BulkItemRequest(itemShardId, inpt)), BulkItemRequest[]::new);
parsedEntities = new HashMap[items.length];
parsedEntities = in.readArray(is -> {
Map<String, Object> map = new ConcurrentHashMap<>();
int sz = is.readInt();
for (int i = 0; i < sz; i ++) {
String key = is.readString();
String val = is.readString();
map.put(key, val);
}
return map;
}, ConcurrentHashMap[]::new);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
super(shardId);
this.items = items;
setRefreshPolicy(refreshPolicy);
parsedEntities = new HashMap[items.length];
parsedEntities = new ConcurrentHashMap[items.length];
}

public BulkItemRequest[] items() {
Expand Down Expand Up @@ -109,6 +119,15 @@ public void writeTo(StreamOutput out) throws IOException {
o.writeBoolean(false);
}
} : StreamOutput::writeOptionalWriteable, items);
out.writeArray((o, v) -> {
o.writeInt(v == null ? 0 : v.size());
if (v != null) {
for (Map.Entry<String, Object> entry : v.entrySet()) {
o.writeString(entry.getKey());
o.writeString(entry.getValue().toString());
}
}
}, parsedEntities);
}

@Override
Expand All @@ -132,6 +151,11 @@ public String toString() {
case NONE:
break;
}

b.append("Parsed Entities: " + parsedEntities.length + " \n");
for (Map<String, Object> v: parsedEntities) {
b.append(v);
}
return b.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.compress.CompressedXContent;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -112,6 +113,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -597,7 +599,7 @@ static boolean executeBulkItemRequest(
Map<String, Object> parsedEntity = context.getCurrentParsedFields();

if (parsedEntity == null) {
context.getBulkShardRequest().parsedEntities[context.getCurrentIndex()] = new HashMap<>();
context.getBulkShardRequest().parsedEntities[context.getCurrentIndex()] = new ConcurrentHashMap<>();
parsedEntity = context.getBulkShardRequest().parsedEntities[context.getCurrentIndex()];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ protected void parseCreateField(ParseContext context) throws IOException {
}
} else {
dateAsString = context.parser().textOrNull();
context.sourceToParse().parsedFields.put(this.name(), dateAsString);
}

long timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ ParsedDocument parseDocument(SourceToParse source, MetadataFieldMapper[] metadat
if(internalParseDocument(mapping, metadataFieldsMappers, context, parser)) {
validateEnd(parser);
}
//context.sourceToParse().parsedFields.forEach((k,v) -> System.out.println(k + " -> " + v));
} catch (Exception e) {
throw wrapInMapperParsingException(source, e);
}
Expand Down Expand Up @@ -127,18 +128,19 @@ private static boolean internalParseDocument(
metadataMapper.preParse(context);
}

boolean val = false;
if (mapping.root.isEnabled() == false) {
// entire type is disabled
parser.skipChildren();
} else if (emptyDoc == false) {
parseObjectOrNested(context, mapping.root);
val = parseObjectOrNested(context, mapping.root);
}

for (MetadataFieldMapper metadataMapper : metadataFieldsMappers) {
metadataMapper.postParse(context);
}

return true;
return val;
}

private static void validateStart(XContentParser parser) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ protected void parseCreateField(ParseContext context) throws IOException {
value = nullValue;
} else {
value = parser.textOrNull();
context.sourceToParse().parsedFields.put(this.name(), value);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,7 @@ protected void parseCreateField(ParseContext context) throws IOException {
value = context.externalValue().toString();
} else {
value = context.parser().textOrNull();
context.sourceToParse().parsedFields.put(this.name(), value);
}

if (value == null) {
Expand Down

0 comments on commit 3811d44

Please sign in to comment.