Skip to content

Commit

Permalink
Add code to handle more fields
Browse files Browse the repository at this point in the history
  • Loading branch information
mgodwan committed Aug 24, 2023
1 parent 3811d44 commit 8b27ec0
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ protected void parseCreateField(ParseContext context) throws IOException {
}
}
value = numericValue;
context.sourceToParse().parsedFields.put(this.name(), numericValue);
}

if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,13 @@
import org.opensearch.Version;
import org.opensearch.action.support.replication.ReplicatedWriteRequest;
import org.opensearch.action.support.replication.ReplicationRequest;
import org.opensearch.common.geo.GeoPoint;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.shard.ShardId;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

Expand All @@ -64,6 +62,17 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> i

public Map<String, Object>[] parsedEntities;

private static enum Type {
INT,
LONG,
SHORT,
BYTE,
DOUBLE,
STRING,
FLOAT,
GEO_POINT
}

public BulkShardRequest(StreamInput in) throws IOException {
super(in);
final ShardId itemShardId = in.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION) ? shardId : null;
Expand All @@ -73,7 +82,34 @@ public BulkShardRequest(StreamInput in) throws IOException {
int sz = is.readInt();
for (int i = 0; i < sz; i ++) {
String key = is.readString();
String val = is.readString();
Object val = null;
Type t = in.readEnum(Type.class);
switch (t) {
case INT:
val = in.readInt();
break;
case LONG:
val = in.readLong();
break;
case FLOAT:
val = in.readFloat();
break;
case DOUBLE:
val = in.readDouble();
break;
case SHORT:
val = in.readShort();
break;
case BYTE:
val = in.readByte();
break;
case STRING:
val = in.readString();
break;
case GEO_POINT:
val = in.readNamedWriteableList(GeoPoint.class);
break;
}
map.put(key, val);
}
return map;
Expand Down Expand Up @@ -108,6 +144,9 @@ public String[] indices() {
return indices.toArray(new String[0]);
}


private static final List<? extends GeoPoint> gpl = new ArrayList<>();

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -124,7 +163,35 @@ public void writeTo(StreamOutput out) throws IOException {
if (v != null) {
for (Map.Entry<String, Object> entry : v.entrySet()) {
o.writeString(entry.getKey());
o.writeString(entry.getValue().toString());
Object val = entry.getValue();
if (val instanceof Number) {
if (val instanceof Integer) {
o.writeEnum(Type.INT);
o.writeInt((int) val);
} else if (val instanceof Long) {
o.writeEnum(Type.LONG);
o.writeLong((long) val);
} else if (val instanceof Float) {
o.writeEnum(Type.FLOAT);
o.writeFloat((float) val);
} else if (val instanceof Double) {
o.writeEnum(Type.DOUBLE);
o.writeDouble((double) val);
} else if (val instanceof Short) {
o.writeEnum(Type.SHORT);
o.writeShort((short) val);
}
else if (val instanceof Byte) {
o.writeEnum(Type.BYTE);
o.writeByte((byte) val);
}
} else if (val.getClass().isAssignableFrom(gpl.getClass())) {
o.writeEnum(Type.GEO_POINT);
o.writeNamedWriteableList((List<? extends GeoPoint>) entry.getValue());
} else {
o.writeEnum(Type.STRING);
o.writeString(entry.getValue().toString());
}
}
}
}, parsedEntities);
Expand Down
10 changes: 9 additions & 1 deletion server/src/main/java/org/opensearch/common/geo/GeoPoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import org.apache.lucene.util.BytesRef;
import org.opensearch.OpenSearchParseException;
import org.opensearch.common.geo.GeoUtils.EffectivePoint;
import org.opensearch.common.io.stream.NamedWriteable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.geometry.Geometry;
Expand All @@ -63,13 +65,19 @@
*
* @opensearch.internal
*/
public class GeoPoint implements ToXContentFragment {
public class GeoPoint implements ToXContentFragment, NamedWriteable {

protected double lat;
protected double lon;

public GeoPoint() {}

public static final String NAME = "geo_point";
@Override
public String getWriteableName() {
return NAME;
}

/**
* Create a new Geopoint from a string. This String must either be a geohash
* or a lat-lon tuple.
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ public abstract static class Result {
private Translog.Location translogLocation;
private long took;

public int fieldCount;

protected Result(Operation.TYPE operationType, Exception failure, long version, long term, long seqNo) {
this.operationType = operationType;
this.failure = Objects.requireNonNull(failure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ public void parse(ParseContext context) throws IOException {
return;
}
shape = geometryIndexer.prepareForIndexing(geometry);
context.sourceToParse().parsedFields.put(this.name(), shape);
}

List<IndexableField> fields = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,27 +669,30 @@ protected DateFieldMapper clone() {
@Override
protected void parseCreateField(ParseContext context) throws IOException {
String dateAsString;
long timestamp = -1;

if (context.externalValueSet()) {
Object dateAsObject = context.externalValue();
if (dateAsObject == null) {
dateAsString = null;
} else {
dateAsString = dateAsObject.toString();
}
timestamp = Long.parseLong(dateAsString);
} else {
dateAsString = context.parser().textOrNull();
context.sourceToParse().parsedFields.put(this.name(), dateAsString);
}

long timestamp;

if (dateAsString == null) {
if (nullValue == null) {
return;
}
timestamp = nullValue;
} else {
} else if (timestamp == -1) {
try {
timestamp = fieldType().parse(dateAsString);
context.sourceToParse().parsedFields.put(this.name(), timestamp);
} catch (IllegalArgumentException | OpenSearchParseException | DateTimeException | ArithmeticException e) {
if (ignoreMalformed) {
context.addIgnoredField(mappedFieldType.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,7 @@ protected void parseCreateField(ParseContext context) throws IOException {
numericValue = fieldType().type.parse(value, coerce.value());
}

context.sourceToParse().parsedFields.put(this.name(), numericValue);
context.doc().addAll(fieldType().type.createFields(fieldType().name(), numericValue, indexed, hasDocValues, stored));

if (hasDocValues == false && (stored || indexed)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,8 @@ private Engine.IndexResult applyIndexOperation(
return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo);
}

return index(engine, operation);
Engine.IndexResult result = index(engine, operation);
return result;
}

public static Engine.Index prepareIndex(
Expand All @@ -1050,7 +1051,7 @@ public static Engine.Index prepareIndex(
doc.addDynamicMappingsUpdate(docMapper.getMapping());
}
Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(doc.id()));
return new Engine.Index(
Engine.Index index = new Engine.Index(
uid,
doc,
seqNo,
Expand All @@ -1064,6 +1065,7 @@ public static Engine.Index prepareIndex(
ifSeqNo,
ifPrimaryTerm
);
return index;
}

private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.admin.indices.rollover.MaxDocsCondition;
import org.opensearch.action.admin.indices.rollover.MaxSizeCondition;
import org.opensearch.action.resync.TransportResyncReplicationAction;
import org.opensearch.common.geo.GeoPoint;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.ParseField;
import org.opensearch.common.inject.AbstractModule;
Expand Down Expand Up @@ -115,6 +116,7 @@ private void registerBuiltinWritables() {
namedWritables.add(new NamedWriteableRegistry.Entry(Condition.class, MaxAgeCondition.NAME, MaxAgeCondition::new));
namedWritables.add(new NamedWriteableRegistry.Entry(Condition.class, MaxDocsCondition.NAME, MaxDocsCondition::new));
namedWritables.add(new NamedWriteableRegistry.Entry(Condition.class, MaxSizeCondition.NAME, MaxSizeCondition::new));
namedWritables.add(new NamedWriteableRegistry.Entry(GeoPoint.class, GeoPoint.NAME, GeoPoint::new));
}

public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
Expand Down

0 comments on commit 8b27ec0

Please sign in to comment.