spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject spark git commit: [SPARK-5461] [graphx] Add isCheckpointed, getCheckpointedFiles methods to Graph
Date Mon, 02 Feb 2015 22:35:06 GMT
Repository: spark
Updated Branches:
  refs/heads/master 5a5526164 -> 842d00032


[SPARK-5461] [graphx] Add isCheckpointed, getCheckpointedFiles methods to Graph

Added the 2 methods to Graph and GraphImpl.  Both make calls to the underlying vertex and
edge RDDs.

This is needed for another PR (for LDA): [https://github.com/apache/spark/pull/4047]

Notes:
* getCheckpointedFiles is plural and returns a Seq[String] instead of an Option[String].
* I attempted to test to make sure the methods returned the correct values after checkpointing.
 It did not work; I guess that checkpointing does not occur quickly enough?  I noticed that
there are not checkpointing tests for RDDs; is it just hard to test well?

CC: rxin

CC: mengxr  (since related to LDA)

Author: Joseph K. Bradley <joseph@databricks.com>

Closes #4253 from jkbradley/graphx-checkpoint and squashes the following commits:

b680148 [Joseph K. Bradley] added class tag to firstParent call in VertexRDDImpl.isCheckpointed,
though not needed to compile
250810e [Joseph K. Bradley] In EdgeRDDImple, VertexRDDImpl, added transient back to partitionsRDD,
and made isCheckpointed check firstParent instead of partitionsRDD
695b7a3 [Joseph K. Bradley] changed partitionsRDD in EdgeRDDImpl, VertexRDDImpl to be non-transient
cc00767 [Joseph K. Bradley] added overrides for isCheckpointed, getCheckpointFile in EdgeRDDImpl,
VertexRDDImpl. The corresponding Graph methods now work.
188665f [Joseph K. Bradley] improved documentation
235738c [Joseph K. Bradley] Added isCheckpointed and getCheckpointFiles to Graph, GraphImpl


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/842d0003
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/842d0003
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/842d0003

Branch: refs/heads/master
Commit: 842d00032d0b09fb1f9cfc77359b77693e70a614
Parents: 5a55261
Author: Joseph K. Bradley <joseph@databricks.com>
Authored: Mon Feb 2 14:34:48 2015 -0800
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Mon Feb 2 14:34:48 2015 -0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/graphx/Graph.scala  | 12 ++++++++++++
 .../org/apache/spark/graphx/impl/EdgeRDDImpl.scala      | 10 +++++++++-
 .../scala/org/apache/spark/graphx/impl/GraphImpl.scala  | 11 +++++++++++
 .../org/apache/spark/graphx/impl/VertexRDDImpl.scala    | 10 +++++++++-
 .../test/scala/org/apache/spark/graphx/GraphSuite.scala |  4 ++++
 project/MimaExcludes.scala                              |  6 ++++++
 6 files changed, 51 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/842d0003/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index ab56580..8494d06 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -105,6 +105,18 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends
Serializab
   def checkpoint(): Unit
 
   /**
+   * Return whether this Graph has been checkpointed or not.
+   * This returns true iff both the vertices RDD and edges RDD have been checkpointed.
+   */
+  def isCheckpointed: Boolean
+
+  /**
+   * Gets the name of the files to which this Graph was checkpointed.
+   * (The vertices RDD and edges RDD are checkpointed separately.)
+   */
+  def getCheckpointFiles: Seq[String]
+
+  /**
    * Uncaches both vertices and edges of this graph. This is useful in iterative algorithms
that
    * build a new graph in each iteration.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/842d0003/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index f1550ac..6c35d70 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -73,7 +73,15 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
   override def checkpoint() = {
     partitionsRDD.checkpoint()
   }
-    
+
+  override def isCheckpointed: Boolean = {
+    firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed
+  }
+
+  override def getCheckpointFile: Option[String] = {
+    partitionsRDD.getCheckpointFile
+  }
+
   /** The number of edges in the RDD. */
   override def count(): Long = {
     partitionsRDD.map(_._2.size.toLong).reduce(_ + _)

http://git-wip-us.apache.org/repos/asf/spark/blob/842d0003/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 3f4a900..90a74d2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -70,6 +70,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     replicatedVertexView.edges.checkpoint()
   }
 
+  override def isCheckpointed: Boolean = {
+    vertices.isCheckpointed && replicatedVertexView.edges.isCheckpointed
+  }
+
+  override def getCheckpointFiles: Seq[String] = {
+    Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap
{
+      case Some(path) => Seq(path)
+      case None => Seq()
+    }
+  }
+
   override def unpersist(blocking: Boolean = true): Graph[VD, ED] = {
     unpersistVertices(blocking)
     replicatedVertexView.edges.unpersist(blocking)

http://git-wip-us.apache.org/repos/asf/spark/blob/842d0003/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 9732c5b..3e4968d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -74,7 +74,15 @@ class VertexRDDImpl[VD] private[graphx] (
   override def checkpoint() = {
     partitionsRDD.checkpoint()
   }
-    
+
+  override def isCheckpointed: Boolean = {
+    firstParent[ShippableVertexPartition[VD]].isCheckpointed
+  }
+
+  override def getCheckpointFile: Option[String] = {
+    partitionsRDD.getCheckpointFile
+  }
+
   /** The number of vertices in the RDD. */
   override def count(): Long = {
     partitionsRDD.map(_.size).reduce(_ + _)

http://git-wip-us.apache.org/repos/asf/spark/blob/842d0003/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index ed9876b..59a57ba 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -375,6 +375,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
       val rdd = sc.parallelize(ring)
       val graph = Graph.fromEdges(rdd, 1.0F)
+      assert(!graph.isCheckpointed)
+      assert(graph.getCheckpointFiles.size === 0)
       graph.checkpoint()
       graph.edges.map(_.attr).count()
       graph.vertices.map(_._2).count()
@@ -383,6 +385,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val verticesDependencies = graph.vertices.partitionsRDD.dependencies
       assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
       assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
+      assert(graph.isCheckpointed)
+      assert(graph.getCheckpointFiles.size === 2)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/842d0003/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 14ba03e..45be1db 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -127,6 +127,12 @@ object MimaExcludes {
             // SPARK-5315 Spark Streaming Java API returns Scala DStream
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow")
+          ) ++ Seq(
+            // SPARK-5461 Graph should have isCheckpointed, getCheckpointFiles methods
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.graphx.Graph.getCheckpointFiles"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.graphx.Graph.isCheckpointed")
           )
 
         case v if v.startsWith("1.2") =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message