Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK-7134 - Re-implement Aggregations Stats for all Catalog Browsers #100

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
/**
* Created by jtarraga on 09/03/17.
*/

public class FacetField {
private String name;
private long count;
private Long count;
private List<Bucket> buckets;
private String aggregationName;
private List<Double> aggregationValues;
Expand Down Expand Up @@ -68,17 +67,20 @@ public FacetField setName(String name) {
return this;
}

public long getCount() {
public Long getCount() {
return count;
}

public FacetField setCount(long count) {
public FacetField setCount(Long count) {
this.count = count;
return this;
}

public FacetField addCount(long delta) {
this.count += delta;
if (this.count == null) {
this.count = 0L;
}
this.count = this.count.longValue() + delta;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ private <T> DataResult<T> endQuery(List result, long numMatches, double start) {
long end = System.currentTimeMillis();
int numResults = (result != null) ? result.size() : 0;

DataResult<T> queryResult = new DataResult((int) (end - start), Collections.emptyList(), numResults, result, numMatches, null);
return queryResult;
return new DataResult((int) (end - start), Collections.emptyList(), numResults, result, numMatches, null);
}

private DataResult endWrite(long start) {
Expand Down Expand Up @@ -331,31 +330,25 @@ public <T> DataResult<T> aggregate(List<? extends Bson> operations, ComplexTypeC
QueryOptions options) {

long start = startQuery();

DataResult<T> queryResult;
MongoDBIterator<T> iterator = mongoDBNativeQuery.aggregate(operations, converter, options);
// MongoCursor<Document> iterator = output.iterator();
List<T> list = new LinkedList<>();
if (queryResultWriter != null) {
try {
queryResultWriter.open();
if (operations != null && !operations.isEmpty()) {
MongoDBIterator<T> iterator = mongoDBNativeQuery.aggregate(operations, converter, options);
if (queryResultWriter != null) {
try {
queryResultWriter.open();
while (iterator.hasNext()) {
queryResultWriter.write(iterator.next());
}
queryResultWriter.close();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
} else {
while (iterator.hasNext()) {
queryResultWriter.write(iterator.next());
list.add(iterator.next());
}
queryResultWriter.close();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
} else {
// if (converter != null) {
// while (iterator.hasNext()) {
// list.add(converter.convertToDataModelType(iterator.next()));
// }
// } else {
while (iterator.hasNext()) {
list.add((T) iterator.next());
}
// }
}
queryResult = endQuery(list, start);
return queryResult;
Expand Down Expand Up @@ -435,7 +428,7 @@ public DataResult update(ClientSession clientSession, List<? extends Bson> queri

return endWrite(
wr.getMatchedCount(),
wr.getInsertedCount() + wr.getUpserts().size(),
(long) wr.getInsertedCount() + wr.getUpserts().size(),
wr.getModifiedCount(),
wr.getDeletedCount(),
0,
Expand Down Expand Up @@ -553,8 +546,7 @@ public DataResult createIndex(Bson keys, ObjectMap options) {
}

mongoDBNativeQuery.createIndex(keys, i);
DataResult dataResult = endQuery(Collections.emptyList(), start);
return dataResult;
return endQuery(Collections.emptyList(), start);
}

public void dropIndexes() {
Expand All @@ -564,15 +556,13 @@ public void dropIndexes() {
public DataResult dropIndex(Bson keys) {
long start = startQuery();
mongoDBNativeQuery.dropIndex(keys);
DataResult dataResult = endQuery(Collections.emptyList(), start);
return dataResult;
return endQuery(Collections.emptyList(), start);
}

public DataResult<Document> getIndex() {
long start = startQuery();
List<Document> index = mongoDBNativeQuery.getIndex();
DataResult<Document> queryResult = endQuery(index, start);
return queryResult;
return endQuery(index, start);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package org.opencb.commons.datastore.mongodb;

import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.opencb.commons.datastore.core.ComplexTypeConverter;
import org.opencb.commons.datastore.core.FacetField;

import java.util.*;

import static org.opencb.commons.datastore.mongodb.MongoDBQueryUtils.*;
import static org.opencb.commons.datastore.mongodb.MongoDBQueryUtils.Accumulator.*;

public class MongoDBDocumentToFacetFieldsConverter implements ComplexTypeConverter<List<FacetField>, Document> {

@Override
public List<FacetField> convertToDataModelType(Document document) {
if (document == null || document.entrySet().size() == 0) {
return Collections.emptyList();
}

List<FacetField> facets = new ArrayList<>();
for (Map.Entry<String, Object> entry : document.entrySet()) {
String key = entry.getKey();
List<Document> documentValues = (List<Document>) entry.getValue();
if (key.endsWith(COUNTS_SUFFIX)) {
List<FacetField.Bucket> buckets = new ArrayList<>(documentValues.size());
long total = 0;
for (Document documentValue : documentValues) {
long counter = documentValue.getInteger(count.name());
String bucketValue = "";
Object internalIdValue = documentValue.get(INTERNAL_ID);
if (internalIdValue instanceof String) {
bucketValue = (String) internalIdValue;
} else if (internalIdValue instanceof Boolean
|| internalIdValue instanceof Integer
|| internalIdValue instanceof Double) {
bucketValue = internalIdValue.toString();
} else if (internalIdValue instanceof Document) {
bucketValue = StringUtils.join(((Document) internalIdValue).values(), AND_SEPARATOR);
}
buckets.add(new FacetField.Bucket(bucketValue, counter, null));
total += counter;
}
key = key.substring(0, key.length() - COUNTS_SUFFIX.length());
facets.add(new FacetField(key, total, buckets));
} else if (key.endsWith(RANGES_SUFFIX)) {
List<Double> facetFieldValues = new ArrayList<>();
Number start = null;
Number end = null;
Number step = null;
Double other = null;
for (Document value : documentValues) {
if (value.get(INTERNAL_ID) instanceof String && OTHER.equals(value.getString(INTERNAL_ID))) {
other = 1.0d * value.getInteger(count.name());
} else {
Double range = value.getDouble(INTERNAL_ID);
Integer counter = value.getInteger(count.name());
facetFieldValues.add(1.0d * counter);
if (start == null) {
start = range;
}
end = range;
if (step == null && start != end) {
step = end.doubleValue() - start.doubleValue();
}
}
}
key = key.substring(0, key.length() - RANGES_SUFFIX.length()).replace(GenericDocumentComplexConverter.TO_REPLACE_DOTS, ".");
if (other != null) {
key += " (counts out of range: " + other + ")";
}
FacetField facetField = new FacetField(key, "range", facetFieldValues)
.setStart(start)
.setEnd(end)
.setStep(step);
facets.add(facetField);
} else {
Document documentValue = ((List<Document>) entry.getValue()).get(0);
MongoDBQueryUtils.Accumulator accumulator = getAccumulator(documentValue);
switch (accumulator) {
case max:
case min:
case avg:
case stdDevPop:
case stdDevSamp: {
List<Double> fieldValues = new ArrayList<>();
if (documentValue.get(accumulator.name()) instanceof Integer) {
fieldValues.add(1.0d * documentValue.getInteger(accumulator.name()));
} else if (documentValue.get(accumulator.name()) instanceof Long) {
fieldValues.add(1.0d * documentValue.getLong(accumulator.name()));
} else if (documentValue.get(accumulator.name()) instanceof List) {
List<Number> list = (List<Number>) documentValue.get(accumulator.name());
for (Number number : list) {
fieldValues.add(number.doubleValue());
}
} else {
fieldValues.add(documentValue.getDouble(accumulator.name()));
}
facets.add(new FacetField(documentValue.getString(INTERNAL_ID), accumulator.name(), fieldValues));
break;
}
default: {
// Do nothing, exception is raised
}
}
}
}
return facets;
}

private MongoDBQueryUtils.Accumulator getAccumulator(Document document) {
for (Map.Entry<String, Object> entry : document.entrySet()) {
try {
MongoDBQueryUtils.Accumulator accumulator = MongoDBQueryUtils.Accumulator.valueOf(entry.getKey());
return accumulator;
} catch (IllegalArgumentException e) {
// Do nothing
}
}
throw new IllegalArgumentException("No accumulators found in facet document: " + StringUtils.join(document.keySet(), ",")
+ " Valid accumulator functions: " + StringUtils.join(Arrays.asList(count, max, min, avg, stdDevPop, stdDevSamp), ","));
}

@Override
public Document convertToStorageType(List<FacetField> facetFields) {
throw new RuntimeException("Not yet implemented");
}
}
Loading
Loading