spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator
Date Thu, 25 May 2017 00:19:49 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 b7a2a16b1 -> 2405afce4


[SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator

## What changes were proposed in this pull request?

A one-liner change in `ShuffleExchange.nodeName` to cover the case when `coordinator` is `null`,
so that the match expression is exhaustive.

Please refer to [SPARK-20872](https://issues.apache.org/jira/browse/SPARK-20872) for a description
of the symptoms.
TL;DR is that inspecting a `ShuffleExchange` (directly or transitively) on the Executor side
can hit a case where the `coordinator` field of a `ShuffleExchange` is null, and thus will
trigger a `MatchError` in `ShuffleExchange.nodeName()`'s inexhaustive match expression.

Also changed two other match conditions in `ShuffleExchange` on the `coordinator` field to
be consistent.

## How was this patch tested?

Manually tested this change with a case where the `coordinator` is null to make sure `ShuffleExchange.nodeName`
doesn't throw a `MatchError` any more.

Author: Kris Mok <kris.mok@databricks.com>

Closes #18095 from rednaxelafx/shuffleexchange-nodename.

(cherry picked from commit c0b3e45e3b46a5235b748cb85ad200c9ec1bb426)
Signed-off-by: Xiao Li <gatorsmile@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2405afce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2405afce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2405afce

Branch: refs/heads/branch-2.2
Commit: 2405afce4e87c0486f2aef1d068f17aea2480b17
Parents: b7a2a16
Author: Kris Mok <kris.mok@databricks.com>
Authored: Wed May 24 17:19:35 2017 -0700
Committer: Xiao Li <gatorsmile@gmail.com>
Committed: Wed May 24 17:19:46 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/exchange/ShuffleExchange.scala      | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2405afce/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
index f06544e..eebe6ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
@@ -40,6 +40,9 @@ case class ShuffleExchange(
     child: SparkPlan,
     @transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
 
+  // NOTE: coordinator can be null after serialization/deserialization,
+  //       e.g. it can be null on the Executor side
+
   override lazy val metrics = Map(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
 
@@ -47,7 +50,7 @@ case class ShuffleExchange(
     val extraInfo = coordinator match {
       case Some(exchangeCoordinator) =>
         s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})"
-      case None => ""
+      case _ => ""
     }
 
     val simpleNodeName = "Exchange"
@@ -70,7 +73,7 @@ case class ShuffleExchange(
     // the plan.
     coordinator match {
       case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this)
-      case None =>
+      case _ =>
     }
   }
 
@@ -117,7 +120,7 @@ case class ShuffleExchange(
           val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
           assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
           shuffleRDD
-        case None =>
+        case _ =>
           val shuffleDependency = prepareShuffleDependency()
           preparePostShuffleRDD(shuffleDependency)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message