Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark-redshift generate corrupted Parquet files #449

Open
java8964 opened this issue Dec 3, 2019 · 0 comments
Open

spark-redshift generate corrupted Parquet files #449

java8964 opened this issue Dec 3, 2019 · 0 comments

Comments

@java8964
Copy link

java8964 commented Dec 3, 2019

Hi,
We are using Spark 2.2.0 + Spark-Redshift version 2.0.1.
Suddenly we found out using Spark-Redshift to generate parquet files from an existing Redshift table cannot be read by the Spark anymore. This maybe is caused by Redshift's side changes, but we cannot find out it.
We like the performance and throughput of Spark-Redshift, and use the following sample code to bring one table data from Redshift to HDFS:

val df = spark.sqlContext.read.format("com.databricks.spark.redshift").option("url", s"$url?" + s"user=$user&password=$jdbcPassword").option("dbtable", s"$tablename}").option("tempdir", s"$s3Dest").option("forward_spark_s3_credentials","true").load()

df.toDF(df.columns.map(.toLowerCase):*).write.format("parquet").mode(SaveMode.Overwrite).option("compression","snappy").save(stagePath.toString)

After that, if we tried to read the parquet files generated:
val df = spark.read.parquet(stagePath.toString)
df.count works
but following spark code won't work:
spark-shell
SPARK_MAJOR_VERSION is set to 2, using Spark2
Welcome to
____ __
/ / ___ / /
\ / _ / _ `/ __/ '/
/
/ .__/_,// //_\ version 2.2.0.2.6.3.0-235
/
/
scala> val df = spark.read.parquet("/redshift_parquet_files/")
df: org.apache.spark.sql.DataFrame = [domain: string, scan_last_update_time: timestamp ... 161 more fields]
scala> df.count
19/12/03 13:04:56 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
res0: Long = 32053319
scala> df.filter("domain is not null").count
[Stage 3:=============================================> (1036 + 100) / 1162]19/12/03 13:07:40 WARN TaskSetManager: Lost task 1120.0 in stage 3.0 (TID 2302, p2-hdp117.ad.prodcc.net, executor 125): java.lang.ArrayIndexOutOfBoundsException
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putBytes(OnHeapColumnVector.java:163)
at org.apache.spark.sql.execution.vectorized.ColumnVector.appendBytes(ColumnVector.java:733)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:410)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:167)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBinarys(VectorizedRleValuesReader.java:402)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:419)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:203)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:123)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Basically, any column using isNULL or isNOTNULL filter will cause the java.lang.ArrayIndexOutOfBoundsException

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant