Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added spark 3.4 to the project #346

Merged
merged 8 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ codecov:
branch: main
notify:
wait_for_ci: true
after_n_builds: 4
after_n_builds: 5

coverage:
precision: 2
Expand Down Expand Up @@ -37,4 +37,4 @@ comment:
layout: "reach,diff,flags,files,footer"
behavior: once # once: update, if exists. Otherwise, post new. Skip if deleted.
require_changes: false # if true: only post the comment if coverage changes
after_n_builds: 4
after_n_builds: 5
4 changes: 4 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ spark_3.2:
spark_3.3:
- core/src/main/spark*3.3*/**/*
- core/src/main/scala/**/*

spark_3.4:
- core/src/main/spark*3.4*/**/*
- core/src/main/scala/**/*
5 changes: 5 additions & 0 deletions .github/workflows/matrix_includes.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,10 @@
"spark": "3.3",
"scope": "test-uploadReport",
"isRelease": "release"
},
{
"spark": "3.4",
alfonsorr marked this conversation as resolved.
Show resolved Hide resolved
"scope": "test-uploadReport",
"isRelease": "release"
}
]
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Type-safe columns for spark DataFrames!
| 3.1.x | [![Maven Central](https://img.shields.io/maven-central/v/org.hablapps/doric_3-1_2.12)](https://mvnrepository.com/artifact/org.hablapps/doric_3-1_2.12/0.0.6) | [![Codecov](https://img.shields.io/codecov/c/github/hablapps/doric?flag=spark-3.1.x&label=codecov&logo=codecov&token=N7ZXUXZX1I)](https://codecov.io/gh/hablapps/doric) |
| 3.2.x | [![Maven Central](https://img.shields.io/maven-central/v/org.hablapps/doric_3-2_2.12)](https://mvnrepository.com/artifact/org.hablapps/doric_3-2_2.12/0.0.6) | [![Codecov](https://img.shields.io/codecov/c/github/hablapps/doric?flag=spark-3.2.x&label=codecov&logo=codecov&token=N7ZXUXZX1I)](https://codecov.io/gh/hablapps/doric) |
| 3.3.x | [![Maven Central](https://img.shields.io/maven-central/v/org.hablapps/doric_3-3_2.12)](https://mvnrepository.com/artifact/org.hablapps/doric_3-3_2.12/0.0.6) | [![Codecov](https://img.shields.io/codecov/c/github/hablapps/doric?flag=spark-3.3.x&label=codecov&logo=codecov&token=N7ZXUXZX1I)](https://codecov.io/gh/hablapps/doric) |
| 3.4.x | [![Maven Central](https://img.shields.io/maven-central/v/org.hablapps/doric_3-4_2.12)](https://mvnrepository.com/artifact/org.hablapps/doric_3-4_2.12/0.0.6) | [![Codecov](https://img.shields.io/codecov/c/github/hablapps/doric?flag=spark-3.4.x&label=codecov&logo=codecov&token=N7ZXUXZX1I)](https://codecov.io/gh/hablapps/doric) |
----

Doric offers type-safety in DataFrame column expressions at a minimum
Expand Down Expand Up @@ -85,7 +86,12 @@ following spark versions.
| 3.2.0 | 2.12 | ✅ | - |
| 3.2.1 | 2.12 | ✅ | - |
| 3.2.2 | 2.12 | ✅ | [![Maven Central](https://img.shields.io/maven-central/v/org.hablapps/doric_3-2_2.12)](https://mvnrepository.com/artifact/org.hablapps/doric_3-2_2.12/0.0.6) |
| 3.3.0 | 2.12 | ✅ | [![Maven Central](https://img.shields.io/maven-central/v/org.hablapps/doric_3-3_2.12)](https://mvnrepository.com/artifact/org.hablapps/doric_3-3_2.12/0.0.6) |
| 3.3.0 | 2.12 | ✅ | - |
| 3.3.1 | 2.12 | ✅ | - |
| 3.3.2 | 2.12 | ✅ | [![Maven Central](https://img.shields.io/maven-central/v/org.hablapps/doric_3-3_2.12)](https://mvnrepository.com/artifact/org.hablapps/doric_3-3_2.12/0.0.6) |
| 3.4.0 | 2.12 | ✅ | [![Maven Central](https://img.shields.io/maven-central/v/org.hablapps/doric_3-4_2.12)](https://mvnrepository.com/artifact/org.hablapps/doric_3-4_2.12/0.0.6) |




## Contributing
Expand Down
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import sbt.Compile

val stableVersion = "0.0.6"

val sparkDefaultShortVersion = "3.3"
val sparkDefaultShortVersion = "3.4"
val spark24Version = "2.4.8"
val spark30Version = "3.0.3"
val spark31Version = "3.1.3"
val spark32Version = "3.2.3"
val spark33Version = "3.3.2"
val spark34Version = "3.4.0"
alfonsorr marked this conversation as resolved.
Show resolved Hide resolved

val versionRegex = """^(.*)\.(.*)\.(.*)$""".r
val versionRegexShort = """^(.*)\.(.*)$""".r
Expand All @@ -24,6 +25,7 @@ val parserSparkVersion: String => String = {
case versionRegexShort("3", "1") => spark31Version
case versionRegexShort("3", "2") => spark32Version
case versionRegexShort("3", "3") => spark33Version
case versionRegexShort("3", "4") => spark34Version
case versionRegex(a, b, c) => s"$a.$b.$c"
}

Expand All @@ -37,7 +39,7 @@ val scalaVersionSelect: String => List[String] = {
case versionRegex("3", "1", _) => List(scala212)
case versionRegex("3", "2", _) => List(scala212, scala213)
case versionRegex("3", "3", _) => List(scala212, scala213)

case versionRegex("3", "4", _) => List(scala212, scala213)
}

val catsVersion: String => String = {
Expand Down Expand Up @@ -215,7 +217,7 @@ lazy val docs = project
}
}
)
.enablePlugins(plugins: _*)
.enablePlugins(plugins *)

// Scoverage settings
Global / coverageEnabled := false
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/doric/syntax/ArrayColumns.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package doric
package syntax

import scala.jdk.CollectionConverters._
import scala.language.higherKinds
import scala.reflect.ClassTag

import cats.data.Kleisli
import cats.implicits._
import doric.types.{CollectionType, LiteralSparkType, SparkType}
import org.apache.spark.sql.catalyst.expressions.LambdaFunction.identity
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.{Column, Dataset, Row, functions => f}

import scala.jdk.CollectionConverters._
import scala.language.higherKinds
import scala.reflect.ClassTag
import org.apache.spark.sql.{Column, Dataset, Row, functions => f}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.LambdaFunction.identity

protected final case class Zipper[T1, T2, F[_]: CollectionType](
col: DoricColumn[F[T1]],
Expand Down Expand Up @@ -392,7 +393,7 @@ protected trait ArrayColumns {
* Null elements will be placed at the end of the returned array.
*
* @group Array Type
* @see [[org.apache.spark.sql.functions.array_sort]]
* @see [[https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/functions$.html#array_sort(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column]]
*/
def sortAscNullsLast: DoricColumn[F[T]] = col.elem.map(f.array_sort).toDC

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package doric
package syntax

import doric.types.NumericType

import org.apache.spark.sql.{functions => f}

protected trait NumericColumns31 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package doric
package syntax

import doric.sem.Location

import org.apache.spark.sql.{functions => f}

protected trait StringColumns31 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package doric
package syntax

import cats.implicits._
import org.apache.spark.sql.Column
import org.apache.spark.sql.{functions => f}

import org.apache.spark.sql.{Column, functions => f}
import org.apache.spark.sql.catalyst.expressions.{ShiftLeft, ShiftRight, ShiftRightUnsigned}

protected trait NumericColumns32 {
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/spark_3.4_mount/scala/doric/syntax/All.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package doric.syntax

private[doric] trait All
extends ArrayColumns
with TypeMatcher
with CommonColumns
with DStructs
with LiteralConversions
with MapColumns
with NumericColumns
with DateColumns
with TimestampColumns
with BooleanColumns
with StringColumns
with ControlStructures
with AggregationColumns
with CNameOps
with BinaryColumns
with Interpolators
with AggregationColumns31
with BooleanColumns31
with NumericColumns31
with NumericColumns32
with StringColumns31
with BinaryColumns32
with ArrayColumns3x
with CommonColumns3x
with MapColumns3x
with StringColumn3x
with AggregationColumns32
with DStructs3x
13 changes: 11 additions & 2 deletions core/src/test/scala/doric/sem/ChildColumnNotFound.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package doric.sem

import org.apache.spark.sql.SparkSession

object ChildColumnNotFound {
def apply(expectedCol: String, foundCols: List[String]): SparkErrorWrapper = {
def apply(expectedCol: String, foundCols: List[String])(implicit
location: Location,
sparkSession: SparkSession
): SparkErrorWrapper = {
SparkErrorWrapper(
new Throwable(
s"No such struct field $expectedCol in ${foundCols.mkString(", ")}"
if (!sparkSession.version.startsWith("3.4"))
s"No such struct field $expectedCol in ${foundCols.mkString(", ")}"
else
s"[FIELD_NOT_FOUND] No such struct field `$expectedCol` in ${foundCols
.mkString("`", "`, `", "`")}."
)
)
}
Expand Down
8 changes: 6 additions & 2 deletions core/src/test/scala/doric/sem/ColumnNotFound.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ object ColumnNotFound {

SparkErrorWrapper(
new Throwable(
s"""Cannot resolve column name "$expectedCol" among (${foundCols
.mkString(", ")})"""
if (!sparkSession.version.startsWith("3.4"))
s"""Cannot resolve column name "$expectedCol" among (${foundCols
.mkString(", ")})"""
else
s"[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `$expectedCol` cannot be resolved. Did you mean one of the following? [${foundCols
.mkString("`", "`, `", "`")}]."
)
)
}
Expand Down
4 changes: 1 addition & 3 deletions core/src/test/scala/doric/sem/ErrorsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ class ErrorsSpec
val err = intercept[DoricMultiError] {
Seq(1, 2, 3).toDF("value").select(colInt("notFound"))
}
val err2 = SparkErrorWrapper(
new Exception("Cannot resolve column name \"notFound\" among (value)")
)
val err2 = ColumnNotFound("notFound", List("value"))

err.errors.head.equals(err2) shouldBe true
}
Expand Down
15 changes: 4 additions & 11 deletions core/src/test/scala/doric/sem/JoinOpsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package doric
package sem

import doric.implicitConversions._

import org.apache.spark.sql.types.{LongType, StringType}

class JoinOpsSpec extends DoricTestElements {
Expand Down Expand Up @@ -58,7 +59,7 @@ class JoinOpsSpec extends DoricTestElements {

val badJoinFunction: DoricJoinColumn =
LeftDF.colString(id) ===
RightDF.colString(id + "entifier")
RightDF.colString("identifier")

intercept[DoricMultiError] {
left.join(right, "inner", badJoinFunction)
Expand All @@ -68,11 +69,7 @@ class JoinOpsSpec extends DoricTestElements {
isLeft = true
),
JoinDoricSingleError(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"" + id + "entifier\" among (" + id + ", " + otherColumn + ")"
)
),
ColumnNotFound("identifier", List("id", "otherColumn")),
isLeft = false
)
)
Expand All @@ -94,11 +91,7 @@ class JoinOpsSpec extends DoricTestElements {
isLeft = true
),
JoinDoricSingleError(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"" + id + "entifier\" among (" + id + ", " + otherColumn + ")"
)
),
ColumnNotFound("identifier", List("id", "otherColumn")),
isLeft = false
)
)
Expand Down
9 changes: 7 additions & 2 deletions core/src/test/scala/doric/sem/TransformOpsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,15 @@ class TransformOpsSpec
)
}
error.getMessage should startWith(
"Found duplicate column(s) in given column names:"
if (!spark.version.startsWith("3.4"))
"Found duplicate column(s) in given column names:"
else
"[COLUMN_ALREADY_EXISTS] The column `a` already exists. Consider to choose another name or rename the existing column."
)
error.getMessage should include("`a`")
error.getMessage should include("`b`")
if (!spark.version.startsWith("3.4")) {
error.getMessage should include("`b`")
}
}

it("should work with 'withNamedColumns' as with 'namedColumns'") {
Expand Down
49 changes: 13 additions & 36 deletions core/src/test/scala/doric/syntax/ArrayColumnsSpec.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package doric
package syntax

import scala.jdk.CollectionConverters._

import doric.SparkAuxFunctions.createLambda
import doric.sem.{ChildColumnNotFound, ColumnTypeError, DoricMultiError, SparkErrorWrapper}
import doric.sem.{ChildColumnNotFound, ColumnNotFound, ColumnTypeError, DoricMultiError}
import doric.types.SparkType
import org.apache.spark.sql.catalyst.expressions.{ArrayExists, ZipWith}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{Column, Row, functions => f}

import java.sql.Timestamp
import scala.jdk.CollectionConverters._

import org.apache.spark.sql.{Column, Row, functions => f}
import org.apache.spark.sql.catalyst.expressions.{ArrayExists, ZipWith}
import org.apache.spark.sql.types._

class ArrayColumnsSpec extends DoricTestElements {

Expand Down Expand Up @@ -52,11 +53,7 @@ class ArrayColumnsSpec extends DoricTestElements {
.transform(_ => colString("something"))
)
} should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something2\" among (col, something)"
)
),
ColumnNotFound("something2", List("col", "something")),
ColumnTypeError("something", StringType, IntegerType)
)

Expand All @@ -72,11 +69,7 @@ class ArrayColumnsSpec extends DoricTestElements {
)
}
errors should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something2\" among (col, something)"
)
),
ColumnNotFound("something2", List("col", "something")),
ColumnTypeError("_1", LongType, IntegerType),
ChildColumnNotFound("_3", List("_1", "_2"))
)
Expand Down Expand Up @@ -179,11 +172,7 @@ class ArrayColumnsSpec extends DoricTestElements {
.transformWithIndex(_ + _ + colInt("something2"))
)
} should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something2\" among (col, something)"
)
),
ColumnNotFound("something2", List("col", "something")),
ColumnTypeError("something", IntegerType, StringType)
)
}
Expand Down Expand Up @@ -212,11 +201,7 @@ class ArrayColumnsSpec extends DoricTestElements {
.aggregate(colInt("something2"))(_ + _ + colInt("something"))
)
} should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something2\" among (col, something)"
)
),
ColumnNotFound("something2", List("col", "something")),
ColumnTypeError("something", IntegerType, StringType)
)
}
Expand Down Expand Up @@ -249,16 +234,8 @@ class ArrayColumnsSpec extends DoricTestElements {
)
)
} should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something2\" among (col, something)"
)
),
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something3\" among (col, something)"
)
),
ColumnNotFound("something2", List("col", "something")),
ColumnNotFound("something3", List("col", "something")),
ColumnTypeError("something", IntegerType, StringType)
)
}
Expand Down
Loading