diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java index e003ee7..bff80bc 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java @@ -18,7 +18,7 @@ import jp.co.yahoo.yosegi.inmemory.ILoader; import jp.co.yahoo.yosegi.inmemory.ILoaderFactory; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkArrayLoader; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyArrayLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkRunLengthEncodingArrayLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkUnionArrayLoader; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -37,7 +37,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) throws IOException { if (columnBinary == null) { // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyArrayLoader(vector, loadSize); } switch (getLoadType(columnBinary, loadSize)) { case ARRAY: @@ -47,8 +47,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) case UNION: return new SparkUnionArrayLoader(vector, loadSize); default: - // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyArrayLoader(vector, loadSize); } } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java index b6a38ba..fc06b4c 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java @@ -17,8 +17,8 @@ import jp.co.yahoo.yosegi.binary.ColumnBinary; import jp.co.yahoo.yosegi.inmemory.ILoader; import jp.co.yahoo.yosegi.inmemory.ILoaderFactory; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyMapLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkMapLoader; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkUnionMapLoader; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -36,7 +36,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) throws IOException { if (columnBinary == null) { // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyMapLoader(vector, loadSize); } switch (getLoadType(columnBinary, loadSize)) { case SPREAD: @@ -45,7 +45,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) return new SparkUnionMapLoader(vector, loadSize); default: // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyMapLoader(vector, loadSize); } } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java index 3c4fb3d..0f0b9f8 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java @@ -17,7 +17,7 @@ import jp.co.yahoo.yosegi.binary.ColumnBinary; import jp.co.yahoo.yosegi.inmemory.ILoader; import jp.co.yahoo.yosegi.inmemory.ILoaderFactory; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyStructLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkStructLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkUnionStructLoader; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -36,7 +36,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) throws IOException { if (columnBinary == null) { // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyStructLoader(vector, loadSize); } switch (getLoadType(columnBinary, loadSize)) { case SPREAD: @@ -45,7 +45,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) return new SparkUnionStructLoader(vector, loadSize); default: // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyStructLoader(vector, loadSize); } } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java index f99512a..a272314 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java @@ -38,7 +38,7 @@ public int getLoadSize() { @Override public void setNull(final int index) throws IOException { - vector.putNull(index); + vector.putArray(index, 0, 0); } @Override diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoader.java new file mode 100644 index 0000000..29ad378 --- /dev/null +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoader.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import jp.co.yahoo.yosegi.inmemory.LoadType; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +import java.io.IOException; + +public class SparkEmptyArrayLoader implements ILoader { + + private final WritableColumnVector vector; + private final int loadSize; + + public SparkEmptyArrayLoader(final WritableColumnVector vector, final int loadSize) { + this.vector = vector; + this.loadSize = loadSize; + this.vector.getChild(0).reset(); + this.vector.getChild(0).reserve(0); + if (this.vector.getChild(0).hasDictionary()) { + this.vector.getChild(0).reserveDictionaryIds(0); + this.vector.getChild(0).setDictionary(null); + } + } + + @Override + public LoadType getLoaderType() { + return LoadType.NULL; + } + + @Override + public int getLoadSize() { + return loadSize; + } + + @Override + public void setNull(final int index) throws IOException { + // FIXME: + } + + @Override + public void finish() throws IOException { + // FIXME: + } + + @Override + public WritableColumnVector build() throws IOException { + for (int i = 0; i < loadSize; i++) { + vector.putArray(i, 0, 0); + } + return vector; + } + + @Override + public boolean isLoadingSkipped() { + return true; + } +} diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoader.java new file mode 100644 index 0000000..7669968 --- /dev/null +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoader.java @@ -0,0 +1,23 @@ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; + +public class SparkEmptyLoader { + public static void load(final WritableColumnVector vector, final int loadSize) throws IOException { + final Class klass = vector.dataType().getClass(); + if (klass == ArrayType.class) { + new SparkEmptyArrayLoader(vector, loadSize).build(); + } else if (klass == StructType.class) { + new SparkEmptyStructLoader(vector, loadSize).build(); + } else if (klass == MapType.class) { + new SparkEmptyMapLoader(vector, loadSize).build(); + } else { + new SparkNullLoader(vector, loadSize).build(); + } + } +} diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoader.java new file mode 100644 index 0000000..7e2f598 --- /dev/null +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoader.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import jp.co.yahoo.yosegi.inmemory.LoadType; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +import java.io.IOException; + +public class SparkEmptyMapLoader implements ILoader { + + private final WritableColumnVector vector; + private final int loadSize; + + public SparkEmptyMapLoader(final WritableColumnVector vector, final int loadSize) { + this.vector = vector; + this.loadSize = loadSize; + } + + @Override + public LoadType getLoaderType() { + return LoadType.NULL; + } + + @Override + public int getLoadSize() { + return loadSize; + } + + @Override + public void setNull(final int index) throws IOException { + // FIXME: + } + + @Override + public void finish() throws IOException { + // FIXME: + } + + @Override + public WritableColumnVector build() throws IOException { + vector.getChild(0).reset(); + vector.getChild(0).reserve(0); + vector.getChild(1).reset(); + vector.getChild(1).reserve(0); + return vector; + } + + @Override + public boolean isLoadingSkipped() { + return true; + } +} diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoader.java new file mode 100644 index 0000000..cabc740 --- /dev/null +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoader.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import jp.co.yahoo.yosegi.inmemory.LoadType; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; + +public class SparkEmptyStructLoader implements ILoader { + + private final WritableColumnVector vector; + private final int loadSize; + private final String[] names; + + public SparkEmptyStructLoader(final WritableColumnVector vector, final int loadSize) { + this.vector = vector; + this.loadSize = loadSize; + final StructType structType = (StructType) vector.dataType(); + this.names = structType.fieldNames(); + for (int i = 0; i < names.length; i++) { + vector.getChild(i).reset(); + vector.getChild(i).reserve(loadSize); + if (vector.getChild(i).hasDictionary()) { + vector.getChild(i).reserveDictionaryIds(0); + vector.getChild(i).setDictionary(null); + } + } + } + + @Override + public LoadType getLoaderType() { + return LoadType.NULL; + } + + @Override + public int getLoadSize() { + return loadSize; + } + + @Override + public void setNull(final int index) throws IOException { + // FIXME: + } + + @Override + public void finish() throws IOException { + // FIXME: + } + + @Override + public WritableColumnVector build() throws IOException { + for (int i = 0; i < names.length; i++) { + SparkEmptyLoader.load(vector.getChild(i), loadSize); + } + return vector; + } + + @Override + public boolean isLoadingSkipped() { + return true; + } +} diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java index d8d04d1..9f8112f 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java @@ -14,13 +14,13 @@ */ package jp.co.yahoo.yosegi.spark.inmemory.loader; -import jp.co.yahoo.yosegi.inmemory.ISequentialLoader; +import jp.co.yahoo.yosegi.inmemory.ILoader; import jp.co.yahoo.yosegi.inmemory.LoadType; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import java.io.IOException; -public class SparkNullLoader implements ISequentialLoader { +public class SparkNullLoader implements ILoader { private final WritableColumnVector vector; private final int loadSize; @@ -42,17 +42,22 @@ public int getLoadSize() { @Override public void setNull(final int index) throws IOException { - // TODO: + // FIXME: } @Override public void finish() throws IOException { // FIXME: - vector.putNulls(0, loadSize); } @Override public WritableColumnVector build() throws IOException { + vector.putNulls(0, loadSize); return vector; } + + @Override + public boolean isLoadingSkipped() { + return true; + } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java index 9c5dda1..3ebb236 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java @@ -49,7 +49,7 @@ public WritableColumnVector build() throws IOException { @Override public void setNull(final int index) throws IOException { - vector.putNull(index); + vector.putArray(index, 0, 0); } @Override @@ -59,7 +59,7 @@ public void setRowGroupCount(final int count) throws IOException {} public void setNullAndRepetitions( final int startIndex, final int repetitions, final int rowGroupIndex) throws IOException { for (int i = 0; i < repetitions; i++) { - vector.putNull(rowId); + vector.putArray(rowId, 0, 0); rowId++; } } @@ -83,6 +83,10 @@ public void setRowGourpIndexAndRepetitions( public void loadChild(final ColumnBinary columnBinary, final int childLength) throws IOException { vector.getChild(0).reset(); vector.getChild(0).reserve(childLength); + if (vector.getChild(0).hasDictionary()) { + vector.getChild(0).reserveDictionaryIds(0); + vector.getChild(0).setDictionary(null); + } SparkLoaderFactoryUtil.createLoaderFactory(vector.getChild(0)) .create(columnBinary, childLength); } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java index fb2e52e..828a1c7 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java @@ -70,7 +70,7 @@ public WritableColumnVector build() throws IOException { // NOTE: Fill unloaded columns with nulls. for (int i = 0; i < names.length; i++) { if (loaderFactoryMap.containsKey(names[i])) { - vector.getChild(i).putNulls(0, loadSize); + SparkEmptyLoader.load(vector.getChild(i), loadSize); } } return vector; diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java index 346fe26..0950840 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java @@ -29,6 +29,12 @@ public class SparkUnionArrayLoader implements IUnionLoader public SparkUnionArrayLoader(final WritableColumnVector vector, final int loadSize) { this.vector = vector; this.loadSize = loadSize; + this.vector.getChild(0).reset(); + this.vector.getChild(0).reserve(0); + if (this.vector.getChild(0).hasDictionary()) { + this.vector.getChild(0).reserveDictionaryIds(0); + this.vector.getChild(0).setDictionary(null); + } } @Override @@ -38,12 +44,12 @@ public int getLoadSize() { @Override public void setNull(final int index) throws IOException { - vector.putNull(index); + vector.putArray(index, 0, 0); } @Override public void finish() throws IOException { - // + // FIXME: } @Override @@ -55,13 +61,14 @@ public WritableColumnVector build() throws IOException { public void setIndexAndColumnType(final int index, final ColumnType columnType) throws IOException { // FIXME: if (columnType != ColumnType.ARRAY) { - vector.putNull(index); + vector.putArray(index, 0, 0); } } @Override public void loadChild(final ColumnBinary columnBinary, final int childLoadSize) throws IOException { if (columnBinary.columnType == ColumnType.ARRAY) { + vector.getChild(0).reserve(childLoadSize); SparkLoaderFactoryUtil.createLoaderFactory(vector).create(columnBinary, childLoadSize); } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java index 107ccb0..68274e2 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java @@ -29,6 +29,10 @@ public class SparkUnionMapLoader implements IUnionLoader { public SparkUnionMapLoader(WritableColumnVector vector, int loadSize) { this.vector = vector; this.loadSize = loadSize; + this.vector.getChild(0).reset(); + this.vector.getChild(0).reserve(0); + this.vector.getChild(1).reset(); + this.vector.getChild(1).reserve(0); } @Override @@ -38,12 +42,12 @@ public int getLoadSize() { @Override public void setNull(int index) throws IOException { - vector.putNull(index); + // FIXME: } @Override public void finish() throws IOException { - // + // FIXME: } @Override @@ -54,9 +58,6 @@ public WritableColumnVector build() throws IOException { @Override public void setIndexAndColumnType(int index, ColumnType columnType) throws IOException { // FIXME: - if (columnType != ColumnType.SPREAD) { - vector.putNull(index); - } } @Override diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java index c4e86de..dfc2341 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java @@ -25,6 +25,7 @@ public class SparkUnionStructLoader implements IUnionLoader { private final WritableColumnVector vector; private final int loadSize; + private boolean childLoaded; public SparkUnionStructLoader(WritableColumnVector vector, int loadSize) { this.vector = vector; @@ -38,30 +39,31 @@ public int getLoadSize() { @Override public void setNull(int index) throws IOException { - vector.putNull(index); + // FIXME: } @Override public void finish() throws IOException { - // + // FIXME: } @Override public WritableColumnVector build() throws IOException { + if (!childLoaded) { + new SparkEmptyStructLoader(vector, loadSize).build(); + } return vector; } @Override public void setIndexAndColumnType(int index, ColumnType columnType) throws IOException { // FIXME: - if (columnType != ColumnType.SPREAD) { - vector.putNull(index); - } } @Override public void loadChild(ColumnBinary columnBinary, int childLoadSize) throws IOException { if (columnBinary.columnType == ColumnType.SPREAD) { + childLoaded = true; SparkLoaderFactoryUtil.createLoaderFactory(vector).create(columnBinary, childLoadSize); } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java b/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java index fb38e8f..19b9a58 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java @@ -17,9 +17,11 @@ import jp.co.yahoo.yosegi.binary.ColumnBinary; import jp.co.yahoo.yosegi.inmemory.IRawConverter; import jp.co.yahoo.yosegi.spark.inmemory.SparkLoaderFactoryUtil; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyLoader; import jp.co.yahoo.yosegi.spark.utils.PartitionColumnUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -72,10 +74,10 @@ public ColumnarBatch convert(final List raw, final int loadSize) t isSet[index] = true; SparkLoaderFactoryUtil.createLoaderFactory(((WritableColumnVector) childColumns[index])).create(columnBinary, loadSize); } - // NOTE: null columns + // NOTE: Empty columns for (int i = 0; i < schema.length(); i++) { if (!isSet[i]) { - ((WritableColumnVector) childColumns[i]).putNulls(0, loadSize); + SparkEmptyLoader.load((WritableColumnVector) childColumns[i], loadSize); } } // NOTE: partitionColumns diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/LoadTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/LoadTest.java index 0b8b6c6..65ddaf7 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/LoadTest.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/LoadTest.java @@ -322,33 +322,24 @@ void T_load_Array_Array_Integer_1() throws IOException { final int jIndex = ldfj.get(i).fieldIndex("aa"); final int yIndex = ldfy.get(i).fieldIndex("aa"); if (ldfj.get(i).isNullAt(jIndex)) { - if (ldfy.get(i).isNullAt(yIndex)) { - // NOTE: json:null, yosegi:null - assertTrue(ldfy.get(i).isNullAt(yIndex)); - } else { - // FIXME: json:null, yosegi:[] - assertTrue(false); - } + // NOTE: json:null, yosegi:[] + assertEquals(0, ldfy.get(i).getList(yIndex).size()); } else { if (ldfy.get(i).isNullAt(yIndex)) { // NOTE: json:[], yosegi:null - assertEquals(0, ldfj.get(i).getList(jIndex).size()); + assertTrue(false); } else { final List> ldfj2 = ldfj.get(i).getList(jIndex); final List> ldfy2 = ldfy.get(i).getList(yIndex); for (int j = 0; j < ldfj2.size(); j++) { final WrappedArray waj = ldfj2.get(j); final WrappedArray way = ldfy2.get(j); - if (way == null) { - if (waj == null) { - // NOTE: json:[null], yosegi:[null] - assertNull(waj); - } else { - // NOTE: json:[[]], yosegi:[null] - assertEquals(0, waj.size()); - } + if (waj == null) { + // NOTE: json:[null], yosegi:[[]] + assertEquals(0, way.size()); } else { // NOTE: json:[[]], yosegi:[[]] + assertEquals(waj.size(), way.size()); for (int k = 0; k < waj.size(); k++) { assertEquals(waj.apply(k), way.apply(k)); } @@ -397,59 +388,28 @@ void T_load_Array_Struct_Primitive_1() throws IOException { final int jIndex = ldfj.get(i).fieldIndex("as"); final int yIndex = ldfy.get(i).fieldIndex("as"); if (ldfj.get(i).isNullAt(jIndex)) { - if (ldfy.get(i).isNullAt(yIndex)) { - // NOTE: json:null, yosegi:null - assertTrue(ldfy.get(i).isNullAt(yIndex)); - } else { - // FIXME: json:null, yosegi:[] - assertTrue(false); - } + // NOTE: json:null, yosegi:[] + assertEquals(0, ldfy.get(i).getList(yIndex).size()); } else { if (ldfy.get(i).isNullAt(yIndex)) { - final List lrj = ldfj.get(i).getList(jIndex); - for (int j = 0; j < lrj.size(); j++) { - final Row rj = lrj.get(j); - if (rj == null) { - // NOTE: json:[null], yosegi:null - assertNull(rj); - } else { - // NOTE: json[as.field:null], yosegi:null - for (final StructField field : fields) { - final String name = field.name(); - assertNull(rj.getAs(name)); - } - } - } + assertTrue(false); } else { final List lrj = ldfj.get(i).getList(jIndex); final List lry = ldfy.get(i).getList(yIndex); for (int j = 0; j < lrj.size(); j++) { final Row rj = lrj.get(j); final Row ry = lry.get(j); - if (ry == null) { - if (rj == null) { - // NOTE: json:[null], yosegi:[null] - assertNull(rj); - } else { - // NOTE: json:[{}], yosegi:[null] - for (final StructField field : fields) { - final String name = field.name(); - assertNull(rj.getAs(name)); - } + if (rj == null) { + // NOTE: json:[null], yosegi:[{}] + for (final StructField field : fields) { + final String name = field.name(); + assertNull(ry.getAs(name)); } } else { - if (rj == null) { - // NOTE: json:[null], yosegi:[{}] - for (final StructField field : fields) { - final String name = field.name(); - assertNull(ry.getAs(name)); - } - } else { - // NOTE: json:[{}], yosegi:[{}] - for (final StructField field : fields) { - final String name = field.name(); - assertEquals((Object) rj.getAs(name), (Object) ry.getAs(name)); - } + // NOTE: json:[{}], yosegi:[{}] + for (final StructField field : fields) { + final String name = field.name(); + assertEquals((Object) rj.getAs(name), (Object) ry.getAs(name)); } } } @@ -484,31 +444,12 @@ void T_load_Array_Map_Integer_1() throws IOException { final int jIndex = ldfj.get(i).fieldIndex("am"); final int yIndex = ldfy.get(i).fieldIndex("am"); if (ldfj.get(i).isNullAt(jIndex)) { - if (ldfy.get(i).isNullAt(yIndex)) { - // NOTE: json:null, yosegi:null - assertTrue(ldfy.get(i).isNullAt(yIndex)); - } else { - // FIXME: json:null, yosegi:[] - assertTrue(false); - } + // NOTE: json:null, yosegi:[] + assertEquals(0, ldfy.get(i).getList(yIndex).size()); } else { if (ldfy.get(i).isNullAt(yIndex)) { // NOTE: json:[], yosegi:null - final List> lmj = ldfj.get(i).getList(jIndex); - for (int j = 0; j < lmj.size(); j++) { - final Map mj = lmj.get(j); - if (mj == null) { - // NOTE: json:[null], yosegi:null - assertNull(mj); - } else { - // NOTE: json:[am.key:null], yosegi:null - final Iterator iter = mj.keysIterator(); - while (iter.hasNext()) { - final String key = iter.next(); - assertNull(mj.get(key).get()); - } - } - } + assertTrue(false); } else { // NOTE: json:[], yosegi:[] final List> lmj = ldfj.get(i).getList(jIndex); @@ -516,38 +457,20 @@ void T_load_Array_Map_Integer_1() throws IOException { for (int j = 0; j < lmj.size(); j++) { final Map mj = lmj.get(j); final Map my = lmy.get(j); - if (my == null) { - if (mj == null) { - // NOTE: json:[null], yosegi:[null] - assertNull(mj); - } else { - // NOTE: json:[{}], yosegi:[null] - final Iterator iter = mj.keysIterator(); - while (iter.hasNext()) { - final String key = iter.next(); - assertNull(mj.get(key).get()); - } - } + if (mj == null) { + // NOTE: json:[null], yosegi:[{}] + assertEquals(0, my.size()); } else { - if (mj == null) { - // NOTE: json:[null], yosegi:[{}] - final Iterator iter = my.keysIterator(); - while (iter.hasNext()) { - final String key = iter.next(); - assertNull(my.get(key).get()); - } - } else { - // NOTE: json[{}], yosegi:[{}] - final Iterator iter = mj.keysIterator(); - while (iter.hasNext()) { - final String key = iter.next(); - if (mj.get(key).get() == null) { - // NOTE: json:[{key:null}], yosegi:[{key:not exist}] - assertTrue(my.get(key).isEmpty()); - } else { - // NOTE: json:[{key}], yosegi:[{key}] - assertEquals(mj.get(key), my.get(key)); - } + // NOTE: json[{}], yosegi:[{}] + final Iterator iter = mj.keysIterator(); + while (iter.hasNext()) { + final String key = iter.next(); + if (mj.get(key).get() == null) { + // NOTE: json:[{key:null}], yosegi:[{key:not exist}] + assertTrue(my.get(key).isEmpty()); + } else { + // NOTE: json:[{key}], yosegi:[{key}] + assertEquals(mj.get(key), my.get(key)); } } } diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactoryTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactoryTest.java index 7cab7a4..0dbab82 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactoryTest.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactoryTest.java @@ -19,7 +19,7 @@ import jp.co.yahoo.yosegi.binary.maker.MaxLengthBasedArrayColumnBinaryMaker; import jp.co.yahoo.yosegi.inmemory.ILoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkArrayLoader; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyArrayLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkRunLengthEncodingArrayLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkUnionArrayLoader; import jp.co.yahoo.yosegi.spark.test.Utils; @@ -99,6 +99,6 @@ void T_createLoader_Null() throws IOException { final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); final ILoader loader = new SparkArrayLoaderFactory(vector).createLoader(columnBinary, loadSize); - assertTrue(loader instanceof SparkNullLoader); + assertTrue(loader instanceof SparkEmptyArrayLoader); } } diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactoryTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactoryTest.java index 21b2f8e..1a823fe 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactoryTest.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactoryTest.java @@ -17,8 +17,8 @@ import jp.co.yahoo.yosegi.binary.ColumnBinary; import jp.co.yahoo.yosegi.binary.maker.DumpSpreadColumnBinaryMaker; import jp.co.yahoo.yosegi.inmemory.ILoader; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyMapLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkMapLoader; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; import jp.co.yahoo.yosegi.spark.test.Utils; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -64,6 +64,6 @@ void T_createLoader_Null() throws IOException { final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); final ILoader loader = new SparkMapLoaderFactory(vector).createLoader(columnBinary, loadSize); - assertTrue(loader instanceof SparkNullLoader); + assertTrue(loader instanceof SparkEmptyMapLoader); } } diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactoryTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactoryTest.java index 1a458fd..1e66d38 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactoryTest.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactoryTest.java @@ -17,7 +17,7 @@ import jp.co.yahoo.yosegi.binary.ColumnBinary; import jp.co.yahoo.yosegi.binary.maker.DumpSpreadColumnBinaryMaker; import jp.co.yahoo.yosegi.inmemory.ILoader; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyStructLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkStructLoader; import jp.co.yahoo.yosegi.spark.test.Utils; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; @@ -89,6 +89,6 @@ void T_createLoader_Null() throws IOException { final ILoader loader = new SparkStructLoaderFactory(vector).createLoader(columnBinary, loadSize); - assertTrue(loader instanceof SparkNullLoader); + assertTrue(loader instanceof SparkEmptyStructLoader); } } diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoaderTest.java new file mode 100644 index 0000000..db61a1a --- /dev/null +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoaderTest.java @@ -0,0 +1,104 @@ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class SparkEmptyArrayLoaderTest { + public static Stream data() { + final int loadSize = 5; + return Stream.of( + arguments(DataTypes.BooleanType, loadSize), + arguments(DataTypes.ByteType, loadSize), + arguments(DataTypes.StringType, loadSize), + arguments(DataTypes.createDecimalType(), loadSize), + arguments(DataTypes.DoubleType, loadSize), + arguments(DataTypes.FloatType, loadSize), + arguments(DataTypes.IntegerType, loadSize), + arguments(DataTypes.LongType, loadSize), + arguments(DataTypes.ShortType, loadSize) + ); + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Primitive(final DataType elmDataType, final int loadSize) throws IOException { + final DataType dataType = DataTypes.createArrayType(elmDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyArrayLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Array(final DataType elmDataType, final int loadSize) throws IOException { + final DataType arrayDataType = DataTypes.createArrayType(elmDataType); + final DataType dataType = DataTypes.createArrayType(arrayDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyArrayLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Map(final DataType elmDataType, final int loadSize) throws IOException { + final DataType mapDataType = DataTypes.createMapType(DataTypes.StringType, elmDataType, true); + final DataType dataType = DataTypes.createArrayType(mapDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyArrayLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @Test + void T_load_Array_Struct() throws IOException { + final List fields = + Arrays.asList( + DataTypes.createStructField("bo", DataTypes.BooleanType, true), + DataTypes.createStructField("by", DataTypes.ByteType, true), + DataTypes.createStructField("bi", DataTypes.BinaryType, true), + DataTypes.createStructField("do", DataTypes.DoubleType, true), + DataTypes.createStructField("fl", DataTypes.FloatType, true), + DataTypes.createStructField("in", DataTypes.IntegerType, true), + DataTypes.createStructField("lo", DataTypes.LongType, true), + DataTypes.createStructField("sh", DataTypes.ShortType, true), + DataTypes.createStructField("st", DataTypes.StringType, true)); + final DataType structDataType = DataTypes.createStructType(fields); + final DataType dataType = DataTypes.createArrayType(structDataType, true); + final int loadSize = 5; + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyArrayLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } +} \ No newline at end of file diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoaderTest.java new file mode 100644 index 0000000..b7a23f4 --- /dev/null +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoaderTest.java @@ -0,0 +1,186 @@ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class SparkEmptyLoaderTest { + public static Stream data() { + final int loadSize = 5; + return Stream.of( + arguments(DataTypes.BooleanType, loadSize), + arguments(DataTypes.ByteType, loadSize), + arguments(DataTypes.StringType, loadSize), + arguments(DataTypes.createDecimalType(), loadSize), + arguments(DataTypes.DoubleType, loadSize), + arguments(DataTypes.FloatType, loadSize), + arguments(DataTypes.IntegerType, loadSize), + arguments(DataTypes.LongType, loadSize), + arguments(DataTypes.ShortType, loadSize) + ); + } + + public void assertNull(final WritableColumnVector vector, final int loadSize) { + for (int i = 0; i < loadSize; i++) { + assertTrue(vector.isNullAt(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Primitive(final DataType dataType, final int loadSize) throws IOException { + final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + assertNull(vector, loadSize); + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Primitive(final DataType elmDataType, final int loadSize) throws IOException { + final DataType dataType = DataTypes.createArrayType(elmDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Array(final DataType elmDataType, final int loadSize) throws IOException { + final DataType arrayDataType = DataTypes.createArrayType(elmDataType); + final DataType dataType = DataTypes.createArrayType(arrayDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Map(final DataType elmDataType, final int loadSize) throws IOException { + final DataType mapDataType = DataTypes.createMapType(DataTypes.StringType, elmDataType, true); + final DataType dataType = DataTypes.createArrayType(mapDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @Test + void T_load_Array_Struct() throws IOException { + final List fields = + Arrays.asList( + DataTypes.createStructField("bo", DataTypes.BooleanType, true), + DataTypes.createStructField("by", DataTypes.ByteType, true), + DataTypes.createStructField("bi", DataTypes.BinaryType, true), + DataTypes.createStructField("do", DataTypes.DoubleType, true), + DataTypes.createStructField("fl", DataTypes.FloatType, true), + DataTypes.createStructField("in", DataTypes.IntegerType, true), + DataTypes.createStructField("lo", DataTypes.LongType, true), + DataTypes.createStructField("sh", DataTypes.ShortType, true), + DataTypes.createStructField("st", DataTypes.StringType, true)); + final DataType structDataType = DataTypes.createStructType(fields); + final DataType dataType = DataTypes.createArrayType(structDataType, true); + final int loadSize = 5; + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Map(final DataType elmDataType, final int loadSize) throws Exception { + final DataType dataType = DataTypes.createMapType(DataTypes.StringType, elmDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + final ColumnarMap cm = vector.getMap(i); + assertEquals(0, cm.numElements()); + } + } + + @Test + void T_load_Struct() throws IOException { + final List e4Fields = + Arrays.asList( + DataTypes.createStructField("e1", DataTypes.LongType, true), + DataTypes.createStructField("e2", DataTypes.createArrayType(DataTypes.LongType, true), true), + DataTypes.createStructField("e3", + DataTypes.createMapType(DataTypes.StringType, DataTypes.LongType, true), true) + ); + final List fields = + Arrays.asList( + DataTypes.createStructField("e1", DataTypes.StringType, true), + DataTypes.createStructField("e2", DataTypes.createArrayType(DataTypes.StringType, true), true), + DataTypes.createStructField("e3", + DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType, true), true), + DataTypes.createStructField("e4", DataTypes.createStructType(e4Fields), true) + ); + final DataType dataType = DataTypes.createStructType(fields); + final int loadSize = 5; + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + for (int j = 0; j < fields.size(); j++) { + final StructField field = fields.get(j); + final String name = field.name(); + final DataType type = field.dataType(); + if (name.equals("e1")) { + assertTrue(vector.getChild(j).isNullAt(i)); + } else if (name.equals("e2")) { + assertFalse(vector.getChild(j).isNullAt(i)); + assertEquals(0, vector.getChild(j).getArrayLength(i)); + } else if (name.equals("e3")) { + assertFalse(vector.getChild(j).isNullAt(i)); + final ColumnarMap cm = vector.getChild(j).getMap(i); + assertEquals(0, cm.numElements()); + } else if (name.equals("e4")) { + for (int k = 0; k < e4Fields.size(); k++) { + final StructField e4Field = e4Fields.get(k); + final String e4Name = e4Field.name(); + final DataType e4Type = e4Field.dataType(); + if (e4Name.equals("e1")) { + assertTrue(vector.getChild(j).getChild(k).isNullAt(i)); + } else if (e4Name.equals("e2")) { + assertFalse(vector.getChild(j).isNullAt(i)); + assertEquals(0, vector.getChild(j).getChild(k).getArrayLength(i)); + } else if (e4Name.equals("e3")) { + assertFalse(vector.getChild(j).isNullAt(i)); + final ColumnarMap e4Cm = vector.getChild(j).getChild(k).getMap(i); + assertEquals(0, e4Cm.numElements()); + } else { + assertTrue(false); + } + } + } else { + assertTrue(false); + } + } + } + } +} \ No newline at end of file diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoaderTest.java new file mode 100644 index 0000000..208a377 --- /dev/null +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoaderTest.java @@ -0,0 +1,48 @@ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class SparkEmptyMapLoaderTest { + public static Stream data() { + final int loadSize = 5; + return Stream.of( + arguments(DataTypes.BooleanType, loadSize), + arguments(DataTypes.ByteType, loadSize), + arguments(DataTypes.StringType, loadSize), + arguments(DataTypes.createDecimalType(), loadSize), + arguments(DataTypes.DoubleType, loadSize), + arguments(DataTypes.FloatType, loadSize), + arguments(DataTypes.IntegerType, loadSize), + arguments(DataTypes.LongType, loadSize), + arguments(DataTypes.ShortType, loadSize) + ); + } + + @ParameterizedTest + @MethodSource("data") + void T_load(final DataType elmDataType, final int loadSize) throws Exception { + final DataType dataType = DataTypes.createMapType(DataTypes.StringType, elmDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyMapLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + final ColumnarMap cm = vector.getMap(i); + assertEquals(0, cm.numElements()); + } + } +} \ No newline at end of file diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoaderTest.java new file mode 100644 index 0000000..869e167 --- /dev/null +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoaderTest.java @@ -0,0 +1,81 @@ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class SparkEmptyStructLoaderTest { + @Test + void T_load() throws IOException { + final List e4Fields = + Arrays.asList( + DataTypes.createStructField("e1", DataTypes.LongType, true), + DataTypes.createStructField("e2", DataTypes.createArrayType(DataTypes.LongType, true), true), + DataTypes.createStructField("e3", + DataTypes.createMapType(DataTypes.StringType, DataTypes.LongType, true), true) + ); + final List fields = + Arrays.asList( + DataTypes.createStructField("e1", DataTypes.StringType, true), + DataTypes.createStructField("e2", DataTypes.createArrayType(DataTypes.StringType, true), true), + DataTypes.createStructField("e3", + DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType, true), true), + DataTypes.createStructField("e4", DataTypes.createStructType(e4Fields), true) + ); + final DataType dataType = DataTypes.createStructType(fields); + final int loadSize = 5; + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyStructLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + for (int j = 0; j < fields.size(); j++) { + final StructField field = fields.get(j); + final String name = field.name(); + final DataType type = field.dataType(); + if (name.equals("e1")) { + assertTrue(vector.getChild(j).isNullAt(i)); + } else if (name.equals("e2")) { + assertFalse(vector.getChild(j).isNullAt(i)); + assertEquals(0, vector.getChild(j).getArrayLength(i)); + } else if (name.equals("e3")) { + assertFalse(vector.getChild(j).isNullAt(i)); + final ColumnarMap cm = vector.getChild(j).getMap(i); + assertEquals(0, cm.numElements()); + } else if (name.equals("e4")) { + for (int k = 0; k < e4Fields.size(); k++) { + final StructField e4Field = e4Fields.get(k); + final String e4Name = e4Field.name(); + final DataType e4Type = e4Field.dataType(); + if (e4Name.equals("e1")) { + assertTrue(vector.getChild(j).getChild(k).isNullAt(i)); + } else if (e4Name.equals("e2")) { + assertFalse(vector.getChild(j).isNullAt(i)); + assertEquals(0, vector.getChild(j).getChild(k).getArrayLength(i)); + } else if (e4Name.equals("e3")) { + assertFalse(vector.getChild(j).isNullAt(i)); + final ColumnarMap e4Cm = vector.getChild(j).getChild(k).getMap(i); + assertEquals(0, e4Cm.numElements()); + } else { + assertTrue(false); + } + } + } else { + assertTrue(false); + } + } + } + } +} \ No newline at end of file diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoaderTest.java index 6893be5..86ae384 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoaderTest.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoaderTest.java @@ -14,7 +14,7 @@ */ package jp.co.yahoo.yosegi.spark.inmemory.loader; -import jp.co.yahoo.yosegi.inmemory.ISequentialLoader; +import jp.co.yahoo.yosegi.inmemory.ILoader; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.types.DataType; @@ -38,8 +38,8 @@ void T_load_Boolean_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.BooleanType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -51,8 +51,8 @@ void T_load_Byte_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.ByteType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -64,8 +64,8 @@ void T_load_Bytes_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.BinaryType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -77,8 +77,8 @@ void T_load_Double_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.DoubleType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -90,8 +90,8 @@ void T_load_Float_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.FloatType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -103,8 +103,8 @@ void T_load_Integer_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.IntegerType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -116,8 +116,8 @@ void T_load_Long_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.LongType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -129,8 +129,8 @@ void T_load_Short_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.ShortType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -142,8 +142,8 @@ void T_load_String_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.StringType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize);