Skip to content
This repository has been archived by the owner on Mar 20, 2019. It is now read-only.

Commit

Permalink
Fully supported custom json data types with low footprint serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
arvid committed Jan 17, 2014
1 parent c6442e2 commit a7a491e
Show file tree
Hide file tree
Showing 65 changed files with 628 additions and 579 deletions.
2 changes: 1 addition & 1 deletion meteor/meteor-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<artifactId>meteor</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.4</version>
<version>0.4.1</version>
</parent>

<artifactId>meteor-client</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion meteor/meteor-meteor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>meteor</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.4</version>
<version>0.4.1</version>
</parent>
<artifactId>meteor-meteor</artifactId>
<name>meteor-meteor</name>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// $ANTLR 3.5 /home/arv/workspace/stratosphere-sopremo/meteor/meteor-meteor/src/main/java/eu/stratosphere/meteor/Meteor.g 2014-01-13 14:02:54
// $ANTLR 3.5 /home/arv/workspace/stratosphere-sopremo/meteor/meteor-meteor/src/main/java/eu/stratosphere/meteor/Meteor.g 2014-01-17 14:21:14

package eu.stratosphere.meteor;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// $ANTLR 3.5 /home/arv/workspace/stratosphere-sopremo/meteor/meteor-meteor/src/main/java/eu/stratosphere/meteor/Meteor.g 2014-01-13 14:02:53
// $ANTLR 3.5 /home/arv/workspace/stratosphere-sopremo/meteor/meteor-meteor/src/main/java/eu/stratosphere/meteor/Meteor.g 2014-01-17 14:21:14

package eu.stratosphere.meteor;

Expand Down Expand Up @@ -2278,7 +2278,7 @@ public final MeteorParser.comparisonExpression_return comparisonExpression() thr
}

// AST REWRITE
// elements: e1, e2, e1, e1, e2, e1, e2
// elements: e1, e2, e1, e2, e2, e1, e1
// token labels:
// rule labels: retval, e1, e2
// token list labels:
Expand Down Expand Up @@ -2455,7 +2455,7 @@ else if ( (LA17_0==44) ) {
}

// AST REWRITE
// elements: e1, e2, e1
// elements: e1, e1, e2
// token labels:
// rule labels: retval, e1, e2
// token list labels:
Expand Down
2 changes: 1 addition & 1 deletion meteor/meteor-testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>meteor</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.4</version>
<version>0.4.1</version>
</parent>
<artifactId>meteor-testing</artifactId>
<name>meteor-testing</name>
Expand Down
2 changes: 1 addition & 1 deletion meteor/meteor-webfrontend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<artifactId>meteor</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.4</version>
<version>0.4.1</version>
</parent>

<name>meteor-webfrontend</name>
Expand Down
2 changes: 1 addition & 1 deletion meteor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>hll</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.4</version>
<version>0.4.1</version>
</parent>
<artifactId>meteor</artifactId>
<packaging>pom</packaging>
Expand Down
2 changes: 1 addition & 1 deletion sopremo-meteor-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>hll</artifactId>
<version>0.4</version>
<version>0.4.1</version>
</parent>

<artifactId>sopremo-meteor-dist</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion sopremo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hll</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.4</version>
<version>0.4.1</version>
</parent>
<artifactId>sopremo</artifactId>
<packaging>pom</packaging>
Expand Down
2 changes: 1 addition & 1 deletion sopremo/sopremo-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>sopremo</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.4</version>
<version>0.4.1</version>
</parent>
<artifactId>sopremo-base</artifactId>
<name>sopremo-base</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.sopremo.AbstractSopremoType;
import eu.stratosphere.sopremo.EvaluationContext;
import eu.stratosphere.sopremo.expressions.EvaluationExpression;
import eu.stratosphere.sopremo.expressions.ObjectAccess;
import eu.stratosphere.sopremo.expressions.PathSegmentExpression;
Expand All @@ -15,7 +14,6 @@
import eu.stratosphere.sopremo.operator.Property;
import eu.stratosphere.sopremo.pact.JsonCollector;
import eu.stratosphere.sopremo.pact.SopremoMap;
import eu.stratosphere.sopremo.serialization.SopremoRecordLayout;
import eu.stratosphere.sopremo.type.IJsonNode;
import eu.stratosphere.sopremo.type.IObjectNode;
import eu.stratosphere.sopremo.type.IntNode;
Expand Down Expand Up @@ -144,11 +142,10 @@ public GlobalEnumeration withValueFieldName(final String valueFieldName) {
* eu.stratosphere.sopremo.serialization.SopremoRecordLayout)
*/
@Override
protected void configureOperator(final Operator contract, final Configuration stubConfiguration,
final EvaluationContext context, final SopremoRecordLayout layout) {
protected void configureOperator(final Operator contract, final Configuration stubConfiguration) {
if (this.enumerationExpression == AUTO_ENUMERATION)
this.enumerationExpression = new AutoProjection(this.idFieldName, this.valueFieldName);
super.configureOperator(contract, stubConfiguration, context, layout);
super.configureOperator(contract, stubConfiguration);
if (this.enumerationExpression instanceof AutoProjection)
this.enumerationExpression = AUTO_ENUMERATION;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package eu.stratosphere.sopremo.base;

import eu.stratosphere.pact.common.plan.PactModule;
import eu.stratosphere.sopremo.EvaluationContext;
import eu.stratosphere.sopremo.expressions.EvaluationExpression;
import eu.stratosphere.sopremo.operator.ElementaryOperator;
import eu.stratosphere.sopremo.operator.InputCardinality;
import eu.stratosphere.sopremo.operator.Name;
import eu.stratosphere.sopremo.operator.OutputCardinality;
import eu.stratosphere.sopremo.pact.JsonCollector;
import eu.stratosphere.sopremo.pact.SopremoMap;
import eu.stratosphere.sopremo.serialization.SopremoRecordLayout;
import eu.stratosphere.sopremo.type.IJsonNode;

@InputCardinality(1)
Expand All @@ -22,10 +20,10 @@ public class Projection extends ElementaryOperator<Projection> {
* eu.stratosphere.sopremo.serialization.SopremoRecordLayout)
*/
@Override
public PactModule asPactModule(final EvaluationContext context, final SopremoRecordLayout layout) {
public PactModule asPactModule() {
if (this.getResultProjection() == EvaluationExpression.VALUE)
return this.createShortCircuitModule();
return super.asPactModule(context, layout);
return super.asPactModule();
}

public static class ProjectionFunction extends SopremoMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import eu.stratosphere.api.common.operators.util.OperatorUtil;
import eu.stratosphere.pact.common.plan.PactModule;
import eu.stratosphere.sopremo.EvaluationContext;
import eu.stratosphere.sopremo.base.join.AntiJoin;
import eu.stratosphere.sopremo.base.join.EquiJoin;
import eu.stratosphere.sopremo.base.join.EquiJoin.Mode;
Expand All @@ -29,7 +28,6 @@
import eu.stratosphere.sopremo.operator.Internal;
import eu.stratosphere.sopremo.operator.Name;
import eu.stratosphere.sopremo.operator.Property;
import eu.stratosphere.sopremo.serialization.SopremoRecordLayout;

@Internal
public class TwoSourceJoin extends TwoSourceJoinBase<TwoSourceJoin> {
Expand Down Expand Up @@ -69,7 +67,7 @@ public void appendAsString(final Appendable appendable) throws IOException {
* eu.stratosphere.sopremo.serialization.SopremoRecordLayout)
*/
@Override
public PactModule asPactModule(final EvaluationContext context, final SopremoRecordLayout layout) {
public PactModule asPactModule() {
if (this.inverseInputs)
this.strategy.setResultProjection(this.getResultProjection().clone().replace(
Predicates.instanceOf(InputSelection.class), new TransformFunction() {
Expand All @@ -88,7 +86,7 @@ public EvaluationExpression apply(final EvaluationExpression argument) {
if (this.getDegreeOfParallelism() != STANDARD_DEGREE_OF_PARALLELISM)
this.strategy.setDegreeOfParallelism(this.getDegreeOfParallelism());

final PactModule pactModule = this.strategy.asPactModule(context, layout);
final PactModule pactModule = this.strategy.asPactModule();
if (this.inverseInputs)
OperatorUtil.swapInputs(pactModule.getOutput(0).getInputs().get(0), 0, 1);
return pactModule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.base.MapOperatorBase;
import eu.stratosphere.pact.common.plan.PactModule;
import eu.stratosphere.sopremo.EvaluationContext;
import eu.stratosphere.sopremo.operator.ElementaryOperator;
import eu.stratosphere.sopremo.operator.InputCardinality;
import eu.stratosphere.sopremo.operator.JsonStream;
import eu.stratosphere.sopremo.operator.Name;
import eu.stratosphere.sopremo.pact.SopremoNop;
import eu.stratosphere.sopremo.serialization.SopremoRecordLayout;

/**
* Unifies the input json streams in a bag semantic.
Expand All @@ -25,7 +23,7 @@ public class UnionAll extends ElementaryOperator<UnionAll> {
* @see eu.stratosphere.sopremo.ElementaryOperator#asPactModule(eu.stratosphere.sopremo.EvaluationContext)
*/
@Override
public PactModule asPactModule(final EvaluationContext context, final SopremoRecordLayout layout) {
public PactModule asPactModule() {
final List<JsonStream> inputs = this.getInputs();
final PactModule module = new PactModule(inputs.size(), 1);
// // TODO: remove identity map, when Pact/Nephele can deal with direct source->sink connections
Expand Down
2 changes: 1 addition & 1 deletion sopremo/sopremo-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>sopremo</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.4</version>
<version>0.4.1</version>
</parent>
<artifactId>sopremo-client</artifactId>
<name>sopremo-client</name>
Expand Down
2 changes: 1 addition & 1 deletion sopremo/sopremo-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>sopremo</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.4</version>
<version>0.4.1</version>
</parent>
<artifactId>sopremo-common</artifactId>
<name>sopremo-common</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,15 @@

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import com.esotericsoftware.kryo.Kryo;

import eu.stratosphere.core.fs.Path;
import eu.stratosphere.sopremo.expressions.EvaluationExpression;
import eu.stratosphere.sopremo.packages.DefaultNameChooserProvider;
import eu.stratosphere.sopremo.packages.DefaultTypeRegistry;
import eu.stratosphere.sopremo.packages.ITypeRegistry;
import eu.stratosphere.sopremo.packages.NameChooserProvider;
import eu.stratosphere.sopremo.type.BooleanNode;
import eu.stratosphere.sopremo.type.IArrayNode;
import eu.stratosphere.sopremo.type.IJsonNode;
import eu.stratosphere.sopremo.type.IObjectNode;
import eu.stratosphere.sopremo.type.MissingNode;
import eu.stratosphere.sopremo.type.NullNode;
import eu.stratosphere.sopremo.type.TextNode;
import eu.stratosphere.sopremo.type.TypeCoercer;
import eu.stratosphere.util.SopremoKryo;

/**
Expand All @@ -53,8 +37,6 @@ public class EvaluationContext extends AbstractSopremoType {

private EvaluationExpression resultProjection = EvaluationExpression.VALUE;

private final ITypeRegistry typeRegistry;

private final NameChooserProvider nameChooserProvider;

// public LinkedList<Operator<?>> getOperatorStack() {
Expand All @@ -63,48 +45,33 @@ public class EvaluationContext extends AbstractSopremoType {

private int taskId;

private final transient Kryo kryo, dataKryo;
private final transient Kryo kryo;

private final Map<String, Object> contextParameters = new HashMap<String, Object>();

/**
* Initializes EvaluationContext.
*/
public EvaluationContext() {
this(new DefaultTypeRegistry(), new DefaultNameChooserProvider());
this(new DefaultNameChooserProvider());
}

/**
* Initializes EvaluationContext.
*/
public EvaluationContext(final ITypeRegistry typeRegistry, final NameChooserProvider nameChooserProvider) {
this.typeRegistry = typeRegistry;
public EvaluationContext(final NameChooserProvider nameChooserProvider) {
this.nameChooserProvider = nameChooserProvider;

this.workingPath = new Path(new File(".").toURI().toString()).toString();

this.dataKryo = new Kryo();
this.dataKryo.setReferences(false);
for (final Class<? extends IJsonNode> type : TypeCoercer.NUMERIC_TYPES)
this.dataKryo.register(type);
final List<Class<? extends Object>> defaultTypes =
Arrays.asList(BooleanNode.class, TextNode.class, IObjectNode.class, IArrayNode.class, NullNode.class,
MissingNode.class, TreeMap.class, ArrayList.class, BigInteger.class, BigDecimal.class);
for (final Class<?> type : defaultTypes)
this.dataKryo.register(type);

final List<Class<? extends IJsonNode>> types = typeRegistry.getTypes();
for (final Class<? extends IJsonNode> type : types)
this.dataKryo.register(type);

this.kryo = new SopremoKryo();
}

/**
* Initializes EvaluationContext.
*/
protected EvaluationContext(final EvaluationContext context) {
this(context.typeRegistry, context.nameChooserProvider);
this(context.nameChooserProvider);
this.contextParameters.putAll(context.contextParameters);
this.copyPropertiesFrom(context);
}
Expand All @@ -130,17 +97,6 @@ public EvaluationContext clone() {
return (EvaluationContext) super.clone();
}

// /**
// * Returns the classResolver.
// *
// * @return the classResolver
// */
// public ClassResolver getClassResolver() {
// if(this.classResolver == null)
// this.classResolver = new SopremoClassResolver(this.getTypeRegistry());
// return this.classResolver;
// }
//
/*
* (non-Javadoc)
* @see eu.stratosphere.sopremo.AbstractSopremoType#copyPropertiesFrom(eu.
Expand Down Expand Up @@ -178,11 +134,6 @@ public boolean equals(final Object obj) {
return false;
if (this.taskId != other.taskId)
return false;
if (this.typeRegistry == null) {
if (other.typeRegistry != null)
return false;
} else if (!this.typeRegistry.equals(other.typeRegistry))
return false;
if (this.workingPath == null) {
if (other.workingPath != null)
return false;
Expand All @@ -199,14 +150,6 @@ public Kryo getKryo() {
return this.kryo;
}

/*
* (non-Javadoc)
* @see eu.stratosphere.sopremo.AbstractSopremoType#getKryo()
*/
public Kryo getKryoForDataSerialization() {
return this.dataKryo;
}

/**
* Returns the nameChooserProvider.
*
Expand Down Expand Up @@ -239,15 +182,6 @@ public int getTaskId() {
return this.taskId;
}

/**
* Returns the typeRegistry.
*
* @return the typeRegistry
*/
public ITypeRegistry getTypeRegistry() {
return this.typeRegistry;
}

/**
* Returns the hdfsPath.
*
Expand All @@ -265,7 +199,6 @@ public int hashCode() {
result = prime * result + (this.operatorDescription == null ? 0 : this.operatorDescription.hashCode());
result = prime * result + (this.resultProjection == null ? 0 : this.resultProjection.hashCode());
result = prime * result + this.taskId;
result = prime * result + (this.typeRegistry == null ? 0 : this.typeRegistry.hashCode());
result = prime * result + (this.workingPath == null ? 0 : this.workingPath.hashCode());
return result;
}
Expand Down
Loading

0 comments on commit a7a491e

Please sign in to comment.