spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD
Date Thu, 26 Jan 2017 01:15:38 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 a5c10ff23 -> 0d7e38524


[SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD

## What changes were proposed in this pull request?

EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal
partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets
checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed,
EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed.

This would have been fine except the RDD's internal logic for computing the RDD depends on
isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark
tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed.
Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and
tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException.

The minimal fix that does not change any public behavior is to modify RDD internal to not
use public override-able API for internal logic.
## How was this patch tested?

New unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15396 from tdas/SPARK-14804.

(cherry picked from commit 47d5d0ddb06c7d2c86515d9556c41dc80081f560)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d7e3852
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d7e3852
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d7e3852

Branch: refs/heads/branch-2.1
Commit: 0d7e38524245e30f29c67aaeb5be67954a0a500c
Parents: a5c10ff
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Wed Jan 25 17:17:34 2017 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Wed Jan 25 17:17:48 2017 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  5 ++--
 .../org/apache/spark/graphx/EdgeRDDSuite.scala  | 27 ++++++++++++++++++++
 .../apache/spark/graphx/VertexRDDSuite.scala    | 26 +++++++++++++++++++
 3 files changed, 56 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0d7e3852/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 374abcc..66a773d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1610,14 +1610,15 @@ abstract class RDD[T: ClassTag](
   /**
    * Return whether this RDD is checkpointed and materialized, either reliably or locally.
    */
-  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
+  def isCheckpointed: Boolean = isCheckpointedAndMaterialized
 
   /**
    * Return whether this RDD is checkpointed and materialized, either reliably or locally.
    * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
    * return value. Exposed for testing.
    */
-  private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
+  private[spark] def isCheckpointedAndMaterialized: Boolean =
+    checkpointData.exists(_.isCheckpointed)
 
   /**
    * Return whether this RDD is marked for local checkpointing.

http://git-wip-us.apache.org/repos/asf/spark/blob/0d7e3852/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
index f1ecc9e..7a24e32 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
     }
   }
 
+  test("checkpointing") {
+    withSpark { sc =>
+      val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
+      val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
+      sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+      edges.checkpoint()
+
+      // EdgeRDD not yet checkpointed
+      assert(!edges.isCheckpointed)
+      assert(!edges.isCheckpointedAndMaterialized)
+      assert(!edges.partitionsRDD.isCheckpointed)
+      assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+      val data = edges.collect().toSeq // force checkpointing
+
+      // EdgeRDD shows up as checkpointed, but internally it is not.
+      // Only internal partitionsRDD is checkpointed.
+      assert(edges.isCheckpointed)
+      assert(!edges.isCheckpointedAndMaterialized)
+      assert(edges.partitionsRDD.isCheckpointed)
+      assert(edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+      assert(edges.collect().toSeq ===  data) // test checkpointed RDD
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d7e3852/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 0bb9e0a..8e63043 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
 import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
     }
   }
 
+  test("checkpoint") {
+    withSpark { sc =>
+      val n = 100
+      val verts = vertices(sc, n)
+      sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+      verts.checkpoint()
+
+      // VertexRDD not yet checkpointed
+      assert(!verts.isCheckpointed)
+      assert(!verts.isCheckpointedAndMaterialized)
+      assert(!verts.partitionsRDD.isCheckpointed)
+      assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+      val data = verts.collect().toSeq // force checkpointing
+
+      // VertexRDD shows up as checkpointed, but internally it is not.
+      // Only internal partitionsRDD is checkpointed.
+      assert(verts.isCheckpointed)
+      assert(!verts.isCheckpointedAndMaterialized)
+      assert(verts.partitionsRDD.isCheckpointed)
+      assert(verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+      assert(verts.collect().toSeq === data) // test checkpointed RDD
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message