spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-22895][SQL] Push down the deterministic predicates that are after the first non-deterministic
Date Sun, 31 Dec 2017 07:07:02 GMT
Repository: spark
Updated Branches:
  refs/heads/master ee3af15fe -> cfbe11e81


[SPARK-22895][SQL] Push down the deterministic predicates that are after the first non-deterministic

## What changes were proposed in this pull request?
Currently, we do not guarantee an order evaluation of conjuncts in either Filter or Join operator.
This is also true to the mainstream RDBMS vendors like DB2 and MS SQL Server. Thus, we should
also push down the deterministic predicates that are after the first non-deterministic, if
possible.

## How was this patch tested?
Updated the existing test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20069 from gatorsmile/morePushDown.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfbe11e8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfbe11e8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfbe11e8

Branch: refs/heads/master
Commit: cfbe11e8164c04cd7d388e4faeded21a9331dac4
Parents: ee3af15
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Sun Dec 31 15:06:54 2017 +0800
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Sun Dec 31 15:06:54 2017 +0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  1 +
 .../sql/catalyst/optimizer/Optimizer.scala      | 40 ++++++++------------
 .../optimizer/FilterPushdownSuite.scala         | 33 ++++++++--------
 .../v2/PushDownOperatorsToDataSource.scala      | 10 ++---
 .../execution/python/ExtractPythonUDFs.scala    |  6 +--
 .../StreamingSymmetricHashJoinHelper.scala      |  5 +--
 .../python/BatchEvalPythonExecSuite.scala       | 10 +++--
 .../StreamingSymmetricHashJoinHelperSuite.scala | 14 +++----
 8 files changed, 54 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 4b5f56c..dc3e384 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1636,6 +1636,7 @@ options.
 
   - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced
columns only include the internal corrupt record column (named `_corrupt_record` by default).
For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()`
and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you
can cache or save the parsed results and then send the same query. For example, `val df =
spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.
   - The `percentile_approx` function previously accepted numeric type input and output double
type results. Now it supports date type, timestamp type and numeric types as input types.
The result type is also changed to be the same as the input type, which is more reasonable
for percentiles.
+  - Since Spark 2.3, the Join/Filter's deterministic predicates that are after the first
non-deterministic predicates are also pushed down/through the child operators, if possible.
In prior Spark versions, these filters are not eligible for predicate pushdown.
   - Partition column inference previously found incorrect common type for different inferred
types, for example, previously it ended up with double type as the common type for double
type and date type. Now it finds the correct common type for such conflicts. The conflict
resolution follows the table below:
 
     <table class="table">

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index eeb1b13..0d4b02c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -805,15 +805,15 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper
{
 
       // For each filter, expand the alias and check if the filter can be evaluated using
       // attributes produced by the aggregate operator's child operator.
-      val (candidates, containingNonDeterministic) =
-        splitConjunctivePredicates(condition).span(_.deterministic)
+      val (candidates, nonDeterministic) =
+        splitConjunctivePredicates(condition).partition(_.deterministic)
 
       val (pushDown, rest) = candidates.partition { cond =>
         val replaced = replaceAlias(cond, aliasMap)
         cond.references.nonEmpty && replaced.references.subsetOf(aggregate.child.outputSet)
       }
 
-      val stayUp = rest ++ containingNonDeterministic
+      val stayUp = rest ++ nonDeterministic
 
       if (pushDown.nonEmpty) {
         val pushDownPredicate = pushDown.reduce(And)
@@ -835,14 +835,14 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper
{
       if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
       val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
 
-      val (candidates, containingNonDeterministic) =
-        splitConjunctivePredicates(condition).span(_.deterministic)
+      val (candidates, nonDeterministic) =
+        splitConjunctivePredicates(condition).partition(_.deterministic)
 
       val (pushDown, rest) = candidates.partition { cond =>
         cond.references.subsetOf(partitionAttrs)
       }
 
-      val stayUp = rest ++ containingNonDeterministic
+      val stayUp = rest ++ nonDeterministic
 
       if (pushDown.nonEmpty) {
         val pushDownPredicate = pushDown.reduce(And)
@@ -854,7 +854,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper
{
 
     case filter @ Filter(condition, union: Union) =>
       // Union could change the rows, so non-deterministic predicate can't be pushed down
-      val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic)
+      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition(_.deterministic)
 
       if (pushDown.nonEmpty) {
         val pushDownCond = pushDown.reduceLeft(And)
@@ -878,13 +878,9 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper
{
       }
 
     case filter @ Filter(condition, watermark: EventTimeWatermark) =>
-      // We can only push deterministic predicates which don't reference the watermark attribute.
-      // We could in theory span() only on determinism and pull out deterministic predicates
-      // on the watermark separately. But it seems unnecessary and a bit confusing to not
simply
-      // use the prefix as we do for nondeterminism in other cases.
-
-      val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(
-        p => p.deterministic && !p.references.contains(watermark.eventTime))
+      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { p =>
+        p.deterministic && !p.references.contains(watermark.eventTime)
+      }
 
       if (pushDown.nonEmpty) {
         val pushDownPredicate = pushDown.reduceLeft(And)
@@ -925,14 +921,14 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper
{
     // come from grandchild.
     // TODO: non-deterministic predicates could be pushed through some operators that do
not change
     // the rows.
-    val (candidates, containingNonDeterministic) =
-      splitConjunctivePredicates(filter.condition).span(_.deterministic)
+    val (candidates, nonDeterministic) =
+      splitConjunctivePredicates(filter.condition).partition(_.deterministic)
 
     val (pushDown, rest) = candidates.partition { cond =>
       cond.references.subsetOf(grandchild.outputSet)
     }
 
-    val stayUp = rest ++ containingNonDeterministic
+    val stayUp = rest ++ nonDeterministic
 
     if (pushDown.nonEmpty) {
       val newChild = insertFilter(pushDown.reduceLeft(And))
@@ -975,23 +971,19 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper
{
   /**
    * Splits join condition expressions or filter predicates (on a given join's output) into
three
    * categories based on the attributes required to evaluate them. Note that we explicitly
exclude
-   * on-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or
+   * non-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or
    * canEvaluateInRight to prevent pushing these predicates on either side of the join.
    *
    * @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
    */
   private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) =
{
-    // Note: In order to ensure correctness, it's important to not change the relative ordering
of
-    // any deterministic expression that follows a non-deterministic expression. To achieve
this,
-    // we only consider pushing down those expressions that precede the first non-deterministic
-    // expression in the condition.
-    val (pushDownCandidates, containingNonDeterministic) = condition.span(_.deterministic)
+    val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic)
     val (leftEvaluateCondition, rest) =
       pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
     val (rightEvaluateCondition, commonCondition) =
         rest.partition(expr => expr.references.subsetOf(right.outputSet))
 
-    (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ containingNonDeterministic)
+    (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic)
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index a9c2306..85a5e97 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -831,9 +831,9 @@ class FilterPushdownSuite extends PlanTest {
     val optimized = Optimize.execute(originalQuery.analyze)
 
     val correctAnswer = Union(Seq(
-      testRelation.where('a === 2L),
-      testRelation2.where('d === 2L)))
-      .where('b + Rand(10).as("rnd") === 3 && 'c > 5L)
+      testRelation.where('a === 2L && 'c > 5L),
+      testRelation2.where('d === 2L && 'f > 5L)))
+      .where('b + Rand(10).as("rnd") === 3)
       .analyze
 
     comparePlans(optimized, correctAnswer)
@@ -1134,12 +1134,13 @@ class FilterPushdownSuite extends PlanTest {
     val x = testRelation.subquery('x)
     val y = testRelation.subquery('y)
 
-    // Verify that all conditions preceding the first non-deterministic condition are pushed
down
+    // Verify that all conditions except the watermark touching condition are pushed down
     // by the optimizer and others are not.
     val originalQuery = x.join(y, condition = Some("x.a".attr === 5 && "y.a".attr
=== 5 &&
       "x.a".attr === Rand(10) && "y.b".attr === 5))
-    val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5),
-        condition = Some("x.a".attr === Rand(10) && "y.b".attr === 5))
+    val correctAnswer =
+      x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5),
+        condition = Some("x.a".attr === Rand(10)))
 
     // CheckAnalysis will ensure nondeterministic expressions not appear in join condition.
     // TODO support nondeterministic expressions in join condition.
@@ -1147,16 +1148,16 @@ class FilterPushdownSuite extends PlanTest {
       checkAnalysis = false)
   }
 
-  test("watermark pushdown: no pushdown on watermark attribute") {
+  test("watermark pushdown: no pushdown on watermark attribute #1") {
     val interval = new CalendarInterval(2, 2000L)
 
-    // Verify that all conditions preceding the first watermark touching condition are pushed
down
+    // Verify that all conditions except the watermark touching condition are pushed down
     // by the optimizer and others are not.
     val originalQuery = EventTimeWatermark('b, interval, testRelation)
       .where('a === 5 && 'b === 10 && 'c === 5)
     val correctAnswer = EventTimeWatermark(
-      'b, interval, testRelation.where('a === 5))
-      .where('b === 10 && 'c === 5)
+      'b, interval, testRelation.where('a === 5 && 'c === 5))
+      .where('b === 10)
 
     comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
       checkAnalysis = false)
@@ -1165,7 +1166,7 @@ class FilterPushdownSuite extends PlanTest {
   test("watermark pushdown: no pushdown for nondeterministic filter") {
     val interval = new CalendarInterval(2, 2000L)
 
-    // Verify that all conditions preceding the first watermark touching condition are pushed
down
+    // Verify that all conditions except the watermark touching condition are pushed down
     // by the optimizer and others are not.
     val originalQuery = EventTimeWatermark('c, interval, testRelation)
       .where('a === 5 && 'b === Rand(10) && 'c === 5)
@@ -1180,7 +1181,7 @@ class FilterPushdownSuite extends PlanTest {
   test("watermark pushdown: full pushdown") {
     val interval = new CalendarInterval(2, 2000L)
 
-    // Verify that all conditions preceding the first watermark touching condition are pushed
down
+    // Verify that all conditions except the watermark touching condition are pushed down
     // by the optimizer and others are not.
     val originalQuery = EventTimeWatermark('c, interval, testRelation)
       .where('a === 5 && 'b === 10)
@@ -1191,15 +1192,15 @@ class FilterPushdownSuite extends PlanTest {
       checkAnalysis = false)
   }
 
-  test("watermark pushdown: empty pushdown") {
+  test("watermark pushdown: no pushdown on watermark attribute #2") {
     val interval = new CalendarInterval(2, 2000L)
 
-    // Verify that all conditions preceding the first watermark touching condition are pushed
down
-    // by the optimizer and others are not.
     val originalQuery = EventTimeWatermark('a, interval, testRelation)
       .where('a === 5 && 'b === 10)
+    val correctAnswer = EventTimeWatermark(
+      'a, interval, testRelation.where('b === 10)).where('a === 5)
 
-    comparePlans(Optimize.execute(originalQuery.analyze), originalQuery.analyze,
+    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
       checkAnalysis = false)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index 0c17081..df034ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -40,12 +40,8 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHel
     // top-down, then we can simplify the logic here and only collect target operators.
     val filterPushed = plan transformUp {
       case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, reader)) =>
-        // Non-deterministic expressions are stateful and we must keep the input sequence
unchanged
-        // to avoid changing the result. This means, we can't evaluate the filter conditions
that
-        // are after the first non-deterministic condition ahead. Here we only try to push
down
-        // deterministic conditions that are before the first non-deterministic condition.
-        val (candidates, containingNonDeterministic) =
-          splitConjunctivePredicates(condition).span(_.deterministic)
+        val (candidates, nonDeterministic) =
+          splitConjunctivePredicates(condition).partition(_.deterministic)
 
         val stayUpFilters: Seq[Expression] = reader match {
           case r: SupportsPushDownCatalystFilters =>
@@ -74,7 +70,7 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHel
           case _ => candidates
         }
 
-        val filterCondition = (stayUpFilters ++ containingNonDeterministic).reduceLeftOption(And)
+        val filterCondition = (stayUpFilters ++ nonDeterministic).reduceLeftOption(And)
         val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r)
         if (withFilter.output == fields) {
           withFilter

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index f5a4cbc..2f53fe7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -202,12 +202,12 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper
{
   private def trySplitFilter(plan: SparkPlan): SparkPlan = {
     plan match {
       case filter: FilterExec =>
-        val (candidates, containingNonDeterministic) =
-          splitConjunctivePredicates(filter.condition).span(_.deterministic)
+        val (candidates, nonDeterministic) =
+          splitConjunctivePredicates(filter.condition).partition(_.deterministic)
         val (pushDown, rest) = candidates.partition(!hasPythonUDF(_))
         if (pushDown.nonEmpty) {
           val newChild = FilterExec(pushDown.reduceLeft(And), filter.child)
-          FilterExec((rest ++ containingNonDeterministic).reduceLeft(And), newChild)
+          FilterExec((rest ++ nonDeterministic).reduceLeft(And), newChild)
         } else {
           filter
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
index 167e991..217e98a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
@@ -72,8 +72,7 @@ object StreamingSymmetricHashJoinHelper extends Logging {
    * left AND right AND joined is equivalent to full.
    *
    * Note that left and right do not necessarily contain *all* conjuncts which satisfy
-   * their condition. Any conjuncts after the first nondeterministic one are treated as
-   * nondeterministic for purposes of the split.
+   * their condition.
    *
    * @param leftSideOnly Deterministic conjuncts which reference only the left side of the
join.
    * @param rightSideOnly Deterministic conjuncts which reference only the right side of
the join.
@@ -111,7 +110,7 @@ object StreamingSymmetricHashJoinHelper extends Logging {
           // Span rather than partition, because nondeterministic expressions don't commute
           // across AND.
           val (deterministicConjuncts, nonDeterministicConjuncts) =
-            splitConjunctivePredicates(condition.get).span(_.deterministic)
+            splitConjunctivePredicates(condition.get).partition(_.deterministic)
 
           val (leftConjuncts, nonLeftConjuncts) = deterministicConjuncts.partition { cond
=>
             cond.references.subsetOf(left.outputSet)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
index 9e4a2e8..d456c93 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
@@ -75,13 +75,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext
{
     assert(qualifiedPlanNodes.size == 2)
   }
 
-  test("Python UDF: no push down on predicates starting from the first non-deterministic")
{
+  test("Python UDF: push down on deterministic predicates after the first non-deterministic")
{
     val df = Seq(("Hello", 4)).toDF("a", "b")
       .where("dummyPythonUDF(a) and rand() > 0.3 and b > 4")
+
     val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
-      case f @ FilterExec(And(_: And, _: GreaterThan), InputAdapter(_: BatchEvalPythonExec))
=> f
+      case f @ FilterExec(
+          And(_: AttributeReference, _: GreaterThan),
+          InputAdapter(_: BatchEvalPythonExec)) => f
+      case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(_: FilterExec)) => b
     }
-    assert(qualifiedPlanNodes.size == 1)
+    assert(qualifiedPlanNodes.size == 2)
   }
 
   test("Python UDF refers to the attributes from more than one child") {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala
index 2a854e3..69b7154 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala
@@ -18,10 +18,8 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.sql.Column
-import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec, SparkPlan}
-import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+import org.apache.spark.sql.execution.LocalTableScanExec
 import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinConditionSplitPredicates
 import org.apache.spark.sql.types._
 
@@ -95,19 +93,17 @@ class StreamingSymmetricHashJoinHelperSuite extends StreamTest {
   }
 
   test("conjuncts after nondeterministic") {
-    // All conjuncts after a nondeterministic conjunct shouldn't be split because they don't
-    // commute across it.
     val predicate =
-      (rand() > lit(0)
+      (rand(9) > lit(0)
         && leftColA > leftColB
         && rightColC > rightColD
         && leftColA === rightColC
         && lit(1) === lit(1)).expr
     val split = JoinConditionSplitPredicates(Some(predicate), left, right)
 
-    assert(split.leftSideOnly.isEmpty)
-    assert(split.rightSideOnly.isEmpty)
-    assert(split.bothSides.contains(predicate))
+    assert(split.leftSideOnly.contains((leftColA > leftColB && lit(1) === lit(1)).expr))
+    assert(split.rightSideOnly.contains((rightColC > rightColD && lit(1) === lit(1)).expr))
+    assert(split.bothSides.contains((leftColA === rightColC && rand(9) > lit(0)).expr))
     assert(split.full.contains(predicate))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message