Skip to content

Commit

Permalink
Merge pull request #166 from holdenk/upgrade-to-modern-spark
Browse files Browse the repository at this point in the history
Upgrade to modern spark (3.4)
  • Loading branch information
colindean authored Apr 10, 2024
2 parents 2eb1389 + faa81ab commit b65ec2a
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 33 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ jobs:
- uses: coursier/setup-action@v1
with:
jvm: adopt:1.8
- name: Build, test, and package project
run: bin/sbt clean compile test package makePom
- name: Build, test, and package project on Spark 3.5
run: bin/sbt clean compile test package makePom -DsparkVersion=3.5.1
- name: Build and package project on "legacy" Spark
run: bin/sbt clean compile package makePom
2 changes: 2 additions & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ jobs:
# uses sbt-github-packages, see build.sbt
- name: Publish with SBT
run: bin/sbt publish
- name: Publish with SBT
run: bin/sbt publish -DsparkVersion=3.5.1
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ Usage: data-validator [options]
--help Show this help message and exit.
```
If you want to build with Java 11 or newer, set the "MODERN_JAVA" environment variable.
This may become the default in the future.
## Example Run
With the JAR directly:
Expand Down
66 changes: 56 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,11 +1,42 @@
name := "data-validator"
organization := "com.target"

scalaVersion := "2.11.12"
val sparkVersion = settingKey[String]("Spark version")

val sparkVersion = "2.3.4"
sparkVersion := System.getProperty("sparkVersion", "2.3.4")

val circeVersion = "0.11.2"
scalaVersion := {
if (sparkVersion.value > "3.0") {
"2.12.19"
} else {
"2.11.12"
}
}

val sparkValidationVersion = settingKey[String]("Version of package")

sparkValidationVersion := "0.15.0"

version := sparkVersion.value + "_" + sparkValidationVersion.value

val circeVersion = settingKey[String]("Circe version")
val circeYamlVersion = settingKey[String]("Circe YAML version")

circeVersion := {
if (sparkVersion.value > "3.0") {
"0.14.6"
} else {
"0.11.2"
}
}

circeYamlVersion := {
if (sparkVersion.value > "3.0") {
"0.15.1"
} else {
"0.10.1"
}
}

//addDependencyTreePlugin
enablePlugins(GitVersioning)
Expand Down Expand Up @@ -35,18 +66,28 @@ libraryDependencies ++= Seq(
"com.github.scopt" %% "scopt" % "4.1.0",
"com.sun.mail" % "javax.mail" % "1.6.2",
"com.lihaoyi" %% "scalatags" % "0.12.0",
"io.circe" %% "circe-yaml" % "0.10.1",
"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-generic" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"io.circe" %% "circe-yaml" % circeYamlVersion.value,
"io.circe" %% "circe-core" % circeVersion.value,
"io.circe" %% "circe-generic" % circeVersion.value,
"io.circe" %% "circe-parser" % circeVersion.value,
"org.apache.spark" %% "spark-sql" % sparkVersion.value % Provided,
"junit" % "junit" % "4.13.2" % Test,
"org.scalatest" %% "scalatest" % "3.2.17" % Test,
"com.github.sbt" % "junit-interface" % "0.13.3" % Test exclude ("junit", "junit-dep")
)

Test / fork := true
javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:+CMSClassUnloadingEnabled")
javaOptions ++= (if (sparkVersion.value > "3.0" && System.getenv("MODERN_JAVA") == "TRUE") {
// For modern Java we need to open up a lot of config options.
Seq("-Xms4048M", "-Xmx4048M",
// these were added in JDK 11 and newer, apparently.
"-Dio.netty.tryReflectionSetAccessible=true",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.io=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED")
} else {
Seq("-Xms4048M", "-Xmx4048M")
})
Test / parallelExecution := false
// required for unit tests, but not set in some environments
Test / envVars ++= Map(
Expand All @@ -57,6 +98,11 @@ Test / envVars ++= Map(

assembly / mainClass := Some("com.target.data_validator.Main")

assembly / assemblyShadeRules := Seq(
ShadeRule.rename("shapeless.**" -> "new_shapeless.@1").inAll,
ShadeRule.rename("cats.kernel.**" -> s"new_cats.kernel.@1").inAll
)

// Enforces scalastyle checks
val compileScalastyle = TaskKey[Unit]("compileScalastyle")
scalastyleFailOnWarning := true
Expand All @@ -75,6 +121,6 @@ compileScalastyle := (Compile / scalastyle).toTask("").value

(Compile / runMain) := Defaults.runMainTask(Compile / fullClasspath, Compile / run / runner).evaluated
TaskKey[Unit]("generateTestData") := {
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion.value
(Compile / runMain).toTask(" com.target.data_validator.GenTestData").value
}
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
addSbtPlugin("com.github.sbt" % "sbt-git" % "2.0.1")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
addSbtPlugin("com.codecommit" % "sbt-github-packages" % "0.5.3")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.10.4")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ case class MinNumRows(minNumRows: Json) extends ColumnBased("", ValidatorBase.L0
}

case class ColumnMaxCheck(column: String, value: Json)
extends ColumnBased(column, Max(UnresolvedAttribute(column)).toAggregateExpression()) {
extends ColumnBased(column, Max(UnresolvedAttribute.quoted(column)).toAggregateExpression()) {

override def substituteVariables(dict: VarSubstitution): ValidatorBase = {
val ret = copy(column = getVarSub(column, "column", dict), value = getVarSubJson(value, "value", dict))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ case class ColumnSumCheck(
minValue: Option[Json] = None,
maxValue: Option[Json] = None,
inclusive: Option[Json] = None
) extends ColumnBased(column, Sum(UnresolvedAttribute(column)).toAggregateExpression()) {
) extends ColumnBased(column, Sum(UnresolvedAttribute.quoted(column)).toAggregateExpression()) {

private val minOrMax: Either[String, Unit] = if (minValue.isEmpty && maxValue.isEmpty) {
Left("'minValue' or 'maxValue' or both must be defined")
Expand Down Expand Up @@ -78,13 +78,14 @@ case class ColumnSumCheck(
}

def getData(pctError: String): ListMap[String, String] = {
((minValue, maxValue) match {
val initial: ListMap[String, String] = ((minValue, maxValue) match {
case (Some(x), Some(y)) =>
ListMap("lower_bound" -> x.asNumber.get.toString, "upper_bound" -> y.asNumber.get.toString)
case (None, Some(y)) => ListMap("upper_bound" -> y.asNumber.get.toString)
case (Some(x), None) => ListMap("lower_bound" -> x.asNumber.get.toString)
case (None, None) => throw new RuntimeException("Must define at least one of minValue or maxValue.")
}) + ("inclusive" -> isInclusive.toString, "actual" -> r(idx).toString, "relative_error" -> pctError)
})
initial ++ List("inclusive" -> isInclusive.toString, "actual" -> r(idx).toString, "relative_error" -> pctError)
}

val actualSum: Double = dataType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ object ValidatorBase extends LazyLogging {
private val backtick = "`"
val I0: Literal = Literal.create(0, IntegerType)
val D0: Literal = Literal.create(0.0, DoubleType)
val L0: Literal = Literal.create(0, LongType)
val L1: Literal = Literal.create(1, LongType)
val L0: Literal = Literal.create(0L, LongType)
val L1: Literal = Literal.create(1L, LongType)

def isValueColumn(v: String): Boolean = v.startsWith(backtick)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class RangeCheckSpec extends AnyFunSpec with Matchers with TestingSparkSession {
ValidatorQuickCheckError(
("item", "Eggs") :: Nil,
5.99,
"RangeCheck failed! max = 5.99 and (('max <= 6.0) || ('max >= 10.0))"
"RangeCheck failed! max = 5.99 and (('max <= 6.0) OR ('max >= 10.0))"
)
)
}
Expand Down Expand Up @@ -341,7 +341,7 @@ class RangeCheckSpec extends AnyFunSpec with Matchers with TestingSparkSession {
val dict = new VarSubstitution
val df = mkDataFrame(spark, defData)
assert(!sut.configCheck(df))
assert(sut.colTest(df.schema, dict).sql == "((`avg` < `min`) OR (`avg` > `max`))")
assert(sut.colTest(df.schema, dict).sql == "((avg < min) OR (avg > max))")
}

it("bad minValue column") {
Expand Down Expand Up @@ -395,7 +395,7 @@ class RangeCheckSpec extends AnyFunSpec with Matchers with TestingSparkSession {
ValidatorQuickCheckError(
("item", "Bread") :: Nil,
0.99,
"RangeCheck failed! avg = 0.99 and (('avg <= 'min) || ('avg >= 'max))"
"RangeCheck failed! avg = 0.99 and (('avg <= 'min) OR ('avg >= 'max))"
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,13 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringLengthCheck failed! item = I and ((length('item) < 5) || (length('item) > 6))"
"StringLengthCheck failed! item = I and ((length('item) < 5) OR (length('item) > 6))"
)) ^
(sut.getEvents contains
ValidatorQuickCheckError(
("item", "") :: Nil,
"",
"StringLengthCheck failed! item = and ((length('item) < 5) || (length('item) > 6))"
"StringLengthCheck failed! item = and ((length('item) < 5) OR (length('item) > 6))"
))
)
}
Expand Down Expand Up @@ -364,7 +364,7 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringLengthCheck failed! item = I and ((length('item) < 5) || (length('item) > 6))"
"StringLengthCheck failed! item = I and ((length('item) < 5) OR (length('item) > 6))"
)
)

Expand All @@ -373,7 +373,7 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "") :: Nil,
"",
"StringLengthCheck failed! item = and ((length('item) < 5) || (length('item) > 6))"
"StringLengthCheck failed! item = and ((length('item) < 5) OR (length('item) > 6))"
)
)
}
Expand Down Expand Up @@ -404,7 +404,7 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringLengthCheck failed! item = I and ((length('item) < 5) || (length('item) > 5))"
"StringLengthCheck failed! item = I and ((length('item) < 5) OR (length('item) > 5))"
)
)

Expand All @@ -413,7 +413,7 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "") :: Nil,
"",
"StringLengthCheck failed! item = and ((length('item) < 5) || (length('item) > 5))"
"StringLengthCheck failed! item = and ((length('item) < 5) OR (length('item) > 5))"
)
)

Expand All @@ -422,7 +422,7 @@ class StringLengthCheckSpec extends AnyFunSpec with Matchers with TestingSparkSe
ValidatorQuickCheckError(
("item", "Item23") :: Nil,
"Item23",
"StringLengthCheck failed! item = Item23 and ((length('item) < 5) || (length('item) > 5))"
"StringLengthCheck failed! item = Item23 and ((length('item) < 5) OR (length('item) > 5))"
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class StringRegexCheckSpec extends AnyFunSpec with Matchers with TestingSparkSes
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringRegexCheck failed! item = I and (NOT 'item RLIKE ^It && isnotnull('item))"
"StringRegexCheck failed! item = I and (NOT RLIKE('item, ^It) AND isnotnull('item))"
)
)
}
Expand Down Expand Up @@ -214,7 +214,7 @@ class StringRegexCheckSpec extends AnyFunSpec with Matchers with TestingSparkSes
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringRegexCheck failed! item = I and (NOT 'item RLIKE ^Item2 && isnotnull('item))"
"StringRegexCheck failed! item = I and (NOT RLIKE('item, ^Item2) AND isnotnull('item))"
)
)

Expand All @@ -223,7 +223,7 @@ class StringRegexCheckSpec extends AnyFunSpec with Matchers with TestingSparkSes
ValidatorQuickCheckError(
("item", "Item1") :: Nil,
"Item1",
"StringRegexCheck failed! item = Item1 and (NOT 'item RLIKE ^Item2 && isnotnull('item))"
"StringRegexCheck failed! item = Item1 and (NOT RLIKE('item, ^Item2) AND isnotnull('item))"
)
)
}
Expand Down Expand Up @@ -254,13 +254,13 @@ class StringRegexCheckSpec extends AnyFunSpec with Matchers with TestingSparkSes
ValidatorQuickCheckError(
("item", "I") :: Nil,
"I",
"StringRegexCheck failed! item = I and (NOT 'item RLIKE ^Item2 && isnotnull('item))"
"StringRegexCheck failed! item = I and (NOT RLIKE('item, ^Item2) AND isnotnull('item))"
)) ^
(sut.getEvents contains
ValidatorQuickCheckError(
("item", "Item1") :: Nil,
"Item1",
"StringRegexCheck failed! item = Item1 and (NOT 'item RLIKE ^Item2 && isnotnull('item))"
"StringRegexCheck failed! item = Item1 and (NOT RLIKE('item, ^Item2) AND isnotnull('item))"
))
)
}
Expand Down

0 comments on commit b65ec2a

Please sign in to comment.