spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ankurd...@apache.org
Subject spark git commit: [SPARK-3623][GraphX] GraphX should support the checkpoint operation
Date Sat, 06 Dec 2014 08:56:56 GMT
Repository: spark
Updated Branches:
  refs/heads/master 6eb1b6f62 -> e895e0cbe


[SPARK-3623][GraphX] GraphX should support the checkpoint operation

Author: GuoQiang Li <witgo@qq.com>

Closes #2631 from witgo/SPARK-3623 and squashes the following commits:

a70c500 [GuoQiang Li] Remove java related
4d1e249 [GuoQiang Li] Add comments
e682724 [GuoQiang Li] Graph should support the checkpoint operation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e895e0cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e895e0cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e895e0cb

Branch: refs/heads/master
Commit: e895e0cbecbbec1b412ff21321e57826d2d0a982
Parents: 6eb1b6f
Author: GuoQiang Li <witgo@qq.com>
Authored: Sat Dec 6 00:56:51 2014 -0800
Committer: Ankur Dave <ankurdave@gmail.com>
Committed: Sat Dec 6 00:56:51 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/Graph.scala   |  8 ++++++++
 .../apache/spark/graphx/impl/GraphImpl.scala    |  5 +++++
 .../org/apache/spark/graphx/GraphSuite.scala    | 21 ++++++++++++++++++++
 3 files changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 6377915..23538b7 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -97,6 +97,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
   def cache(): Graph[VD, ED]
 
   /**
+   * Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint
+   * directory set with SparkContext.setCheckpointDir() and all references to its parent
+   * RDDs will be removed. It is strongly recommended that this Graph is persisted in
+   * memory, otherwise saving it on a file will require recomputation.
+   */
+  def checkpoint(): Unit
+
+  /**
    * Uncaches only the vertices of this graph, leaving the edges alone. This is useful in
iterative
    * algorithms that modify the vertex attributes but reuse the edges. This method can be
used to
    * uncache the vertex attributes of previous iterations once they are no longer needed,
improving

http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 0eae2a6..a617d84 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     this
   }
 
+  override def checkpoint(): Unit = {
+    vertices.checkpoint()
+    replicatedVertexView.edges.checkpoint()
+  }
+
   override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
     vertices.unpersist(blocking)
     // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges
alone

http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index a05d1dd..9da0064 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.graphx
 
 import org.scalatest.FunSuite
 
+import com.google.common.io.Files
+
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.Graph._
 import org.apache.spark.graphx.PartitionStrategy._
@@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("checkpoint") {
+    val checkpointDir = Files.createTempDir()
+    checkpointDir.deleteOnExit()
+    withSpark { sc =>
+      sc.setCheckpointDir(checkpointDir.getAbsolutePath)
+      val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
+      val rdd = sc.parallelize(ring)
+      val graph = Graph.fromEdges(rdd, 1.0F)
+      graph.checkpoint()
+      graph.edges.map(_.attr).count()
+      graph.vertices.map(_._2).count()
+
+      val edgesDependencies = graph.edges.partitionsRDD.dependencies
+      val verticesDependencies = graph.vertices.partitionsRDD.dependencies
+      assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
+      assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message