Skip to content

Commit

Permalink
Use short key for map
Browse files Browse the repository at this point in the history
  • Loading branch information
mgodwan committed Aug 25, 2023
1 parent 8b27ec0 commit 08a2d31
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ protected void parseCreateField(ParseContext context) throws IOException {
}
}
value = numericValue;
context.sourceToParse().parsedFields.put(this.name(), numericValue);
context.sourceToParse().parsedFields.put(Keys.getId(this.name()), numericValue);
}

if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public DocWriteRequest<?> getCurrent() {
return getCurrentItem().request();
}

public Map<String, Object> getCurrentParsedFields() {
public Map<Short, Object> getCurrentParsedFields() {
return request.parsedEntities[currentIndex];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,53 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> i

private final BulkItemRequest[] items;

public Map<String, Object>[] parsedEntities;
public Map<Short, Object>[] parsedEntities;

private static enum Type {
INT,
LONG,
SHORT,
BYTE,
DOUBLE,
STRING,
FLOAT,
GEO_POINT
INT((short) 0),
LONG((short) 1),
SHORT((short) 2),
BYTE((short) 3),
DOUBLE((short) 4),
STRING((short) 5),
FLOAT((short) 6),
GEO_POINT((short) 7);

short s;
Type(short s) {
this.s = s;
}
short getval() {
return s;
}

static Type [] arr;

static {
arr = new Type[Type.values().length];
for (Type t: Type.values()) {
arr[t.getval()] = t;
}
}

static Type from(short s) {
return arr[s];
}
}



public BulkShardRequest(StreamInput in) throws IOException {
super(in);
final ShardId itemShardId = in.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION) ? shardId : null;
items = in.readArray(i -> i.readOptionalWriteable(inpt -> new BulkItemRequest(itemShardId, inpt)), BulkItemRequest[]::new);
parsedEntities = in.readArray(is -> {
Map<String, Object> map = new ConcurrentHashMap<>();
int sz = is.readInt();
Map<Short, Object> map = new ConcurrentHashMap<>(sz);
for (int i = 0; i < sz; i ++) {
String key = is.readString();
short key = is.readShort();
Object val = null;
Type t = in.readEnum(Type.class);
Type t = Type.from(in.readShort());
switch (t) {
case INT:
val = in.readInt();
Expand All @@ -107,7 +130,10 @@ public BulkShardRequest(StreamInput in) throws IOException {
val = in.readString();
break;
case GEO_POINT:
val = in.readNamedWriteableList(GeoPoint.class);
int cnt = in.readInt();
List<GeoPoint> points = new ArrayList<GeoPoint>(cnt);
for (int ii = 0; ii < cnt; ii ++) points.add(new GeoPoint(in));
val = points;
break;
}
map.put(key, val);
Expand Down Expand Up @@ -161,35 +187,37 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeArray((o, v) -> {
o.writeInt(v == null ? 0 : v.size());
if (v != null) {
for (Map.Entry<String, Object> entry : v.entrySet()) {
o.writeString(entry.getKey());
for (Map.Entry<Short, Object> entry : v.entrySet()) {
o.writeShort(entry.getKey());
Object val = entry.getValue();
if (val instanceof Number) {
if (val instanceof Integer) {
o.writeEnum(Type.INT);
o.writeShort(Type.INT.getval());
o.writeInt((int) val);
} else if (val instanceof Long) {
o.writeEnum(Type.LONG);
o.writeShort(Type.LONG.getval());
o.writeLong((long) val);
} else if (val instanceof Float) {
o.writeEnum(Type.FLOAT);
o.writeShort(Type.FLOAT.getval());
o.writeFloat((float) val);
} else if (val instanceof Double) {
o.writeEnum(Type.DOUBLE);
o.writeShort(Type.DOUBLE.getval());
o.writeDouble((double) val);
} else if (val instanceof Short) {
o.writeEnum(Type.SHORT);
o.writeShort(Type.SHORT.getval());
o.writeShort((short) val);
}
else if (val instanceof Byte) {
o.writeEnum(Type.BYTE);
o.writeShort(Type.BYTE.getval());
o.writeByte((byte) val);
}
} else if (val.getClass().isAssignableFrom(gpl.getClass())) {
o.writeEnum(Type.GEO_POINT);
o.writeNamedWriteableList((List<? extends GeoPoint>) entry.getValue());
o.writeShort(Type.GEO_POINT.getval());
List<? extends GeoPoint> gpls = (List<? extends GeoPoint>) val;
o.writeInt(gpls.size());
for (int i = 0; i < gpls.size(); i ++) gpls.get(i).writeTo(o);
} else {
o.writeEnum(Type.STRING);
o.writeShort(Type.STRING.getval());
o.writeString(entry.getValue().toString());
}
}
Expand Down Expand Up @@ -220,7 +248,7 @@ public String toString() {
}

b.append("Parsed Entities: " + parsedEntities.length + " \n");
for (Map<String, Object> v: parsedEntities) {
for (Map<Short, Object> v: parsedEntities) {
b.append(v);
}
return b.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ static boolean executeBulkItemRequest(
ActionListener<Void> itemDoneListener
) throws Exception {
final DocWriteRequest.OpType opType = context.getCurrent().opType();
Map<String, Object> parsedEntity = context.getCurrentParsedFields();
Map<Short, Object> parsedEntity = context.getCurrentParsedFields();

if (parsedEntity == null) {
context.getBulkShardRequest().parsedEntities[context.getCurrentIndex()] = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -896,7 +896,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
private static Engine.Result performOpOnReplica(
DocWriteResponse primaryResponse,
DocWriteRequest<?> docWriteRequest,
Map<String, Object> parsedFields,
Map<Short, Object> parsedFields,
IndexShard replica
) throws Exception {
final Engine.Result result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -97,6 +99,19 @@ public static XContentParser createParser(
}
}

private static final BytesReference DUMMY_SOURCE = BytesReference.fromByteBuffer(ByteBuffer.wrap("{}".getBytes(StandardCharsets.UTF_8)));
public static XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
BytesReference bytes,
MediaType mediaType,
Map map
) throws IOException {
if (!map.isEmpty()) {
return createParser(xContentRegistry, deprecationHandler, DUMMY_SOURCE, mediaType);
}
return createParser(xContentRegistry, deprecationHandler, bytes, mediaType);
}
/**
* Creates a parser for the bytes using the supplied content-type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public void parse(ParseContext context) throws IOException {
return;
}
shape = geometryIndexer.prepareForIndexing(geometry);
context.sourceToParse().parsedFields.put(this.name(), shape);
context.sourceToParse().parsedFields.put(Keys.getId(this.name()), shape);
}

List<IndexableField> fields = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ protected void parseCreateField(ParseContext context) throws IOException {
} else if (timestamp == -1) {
try {
timestamp = fieldType().parse(dateAsString);
context.sourceToParse().parsedFields.put(this.name(), timestamp);
context.sourceToParse().parsedFields.put(Keys.getId(this.name()), timestamp);
} catch (IllegalArgumentException | OpenSearchParseException | DateTimeException | ArithmeticException e) {
if (ignoreMalformed) {
context.addIgnoredField(mappedFieldType.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ ParsedDocument parseDocument(SourceToParse source, MetadataFieldMapper[] metadat
docMapperParser.getXContentRegistry(),
LoggingDeprecationHandler.INSTANCE,
source.source(),
xContentType
xContentType,
source.parsedFields
)
) {
context = new ParseContext.InternalParseContext(indexSettings, docMapperParser, docMapper, source, parser);
Expand Down Expand Up @@ -122,7 +123,7 @@ private static boolean internalParseDocument(
ParseContext.InternalParseContext context,
XContentParser parser
) throws IOException {
final boolean emptyDoc = isEmptyDoc(mapping, parser);
final boolean emptyDoc = false; //isEmptyDoc(mapping, parser);

for (MetadataFieldMapper metadataMapper : metadataFieldsMappers) {
metadataMapper.preParse(context);
Expand Down Expand Up @@ -378,8 +379,8 @@ private static ObjectMapper createUpdate(ObjectMapper parent, String[] nameParts

static boolean parseObjectOrNested(ParseContext context, ObjectMapper mapper) throws IOException {
if (!context.sourceToParse().parsedFields.isEmpty()) {
for (Map.Entry<String, Object> entry: context.sourceToParse().parsedFields.entrySet()) {
String key = entry.getKey();
for (Map.Entry<Short, Object> entry: context.sourceToParse().parsedFields.entrySet()) {
String key = Keys.getKey(entry.getKey());
Object val = entry.getValue();
Mapper mapper1 = getMapper(context, mapper, key, new String[]{key});
ParseContext finContext = context.createExternalValueContext(val);
Expand Down
58 changes: 58 additions & 0 deletions server/src/main/java/org/opensearch/index/mapper/Keys.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.mapper;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class Keys {

private static List<String> keys = List.of( "cab_color",
"dropoff_datetime",
"dropoff_location",
"ehail_fee",
"extra",
"fare_amount",
"improvement_surcharge",
"mta_tax",
"passenger_count",
"payment_type",
"pickup_datetime",
"pickup_location",
"rate_code_id",
"store_and_fwd_flag",
"surcharge",
"tip_amount",
"tolls_amount",
"total_amount",
"trip_distance",
"trip_type",
"vendor_id",
"vendor_name");

private static final Map<String, Short> key_to_val = new HashMap<>();
private static final Map<Short, String> val_to_key = new HashMap<>();

static {
for (int i = 0; i < keys.size(); i ++) {
key_to_val.put(keys.get(i), (short) i);
val_to_key.put((short) i, keys.get(i));
}
}

public static short getId(String name) {
return key_to_val.get(name);
}

public static String getKey(short val) {
return val_to_key.get(val);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ protected void parseCreateField(ParseContext context) throws IOException {
value = nullValue;
} else {
value = parser.textOrNull();
context.sourceToParse().parsedFields.put(this.name(), value);
context.sourceToParse().parsedFields.put(Keys.getId(this.name()), value);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ protected void parseCreateField(ParseContext context) throws IOException {
numericValue = fieldType().type.parse(value, coerce.value());
}

context.sourceToParse().parsedFields.put(this.name(), numericValue);
context.sourceToParse().parsedFields.put(Keys.getId(this.name()), numericValue);
context.doc().addAll(fieldType().type.createFields(fieldType().name(), numericValue, indexed, hasDocValues, stored));

if (hasDocValues == false && (stored || indexed)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class SourceToParse {

private final XContentType xContentType;

public Map<String, Object> parsedFields;
public Map<Short, Object> parsedFields;

public SourceToParse(String index, String id, BytesReference source, XContentType xContentType, @Nullable String routing) {
this.index = Objects.requireNonNull(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ protected void parseCreateField(ParseContext context) throws IOException {
value = context.externalValue().toString();
} else {
value = context.parser().textOrNull();
context.sourceToParse().parsedFields.put(this.name(), value);
context.sourceToParse().parsedFields.put(Keys.getId(this.name()), value);
}

if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ public Engine.IndexResult applyIndexOperationOnPrimary(
long ifPrimaryTerm,
long autoGeneratedTimestamp,
boolean isRetry,
Map<String, Object> parsedFields
Map<Short, Object> parsedFields
) throws IOException {
assert versionType.validateVersionForWrites(version);
return applyIndexOperation(
Expand All @@ -942,7 +942,7 @@ public Engine.IndexResult applyIndexOperationOnReplica(
long autoGeneratedTimeStamp,
boolean isRetry,
SourceToParse sourceToParse,
Map<String, Object> parsedFields
Map<Short, Object> parsedFields
) throws IOException {
if (indexSettings.isSegRepEnabled()) {
Engine.Index index = new Engine.Index(
Expand Down Expand Up @@ -989,7 +989,7 @@ private Engine.IndexResult applyIndexOperation(
boolean isRetry,
Engine.Operation.Origin origin,
SourceToParse sourceToParse,
Map<String, Object> parsedFields
Map<Short, Object> parsedFields
) throws IOException {
assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ "
+ opPrimaryTerm
Expand Down Expand Up @@ -1042,7 +1042,7 @@ public static Engine.Index prepareIndex(
boolean isRetry,
long ifSeqNo,
long ifPrimaryTerm,
Map<String, Object> parsedFields
Map<Short, Object> parsedFields
) {
long startTime = System.nanoTime();
source.parsedFields = parsedFields;
Expand Down

0 comments on commit 08a2d31

Please sign in to comment.