Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 57FD6102CC for ; Sat, 6 Dec 2014 08:56:57 +0000 (UTC) Received: (qmail 78583 invoked by uid 500); 6 Dec 2014 08:56:57 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 78552 invoked by uid 500); 6 Dec 2014 08:56:57 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 78536 invoked by uid 99); 6 Dec 2014 08:56:57 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Dec 2014 08:56:57 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D523E953E89; Sat, 6 Dec 2014 08:56:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ankurdave@apache.org To: commits@spark.apache.org Message-Id: <719eb91310384abf9a58886c4340357f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-3623][GraphX] GraphX should support the checkpoint operation Date: Sat, 6 Dec 2014 08:56:56 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master 6eb1b6f62 -> e895e0cbe [SPARK-3623][GraphX] GraphX should support the checkpoint operation Author: GuoQiang Li Closes #2631 from witgo/SPARK-3623 and squashes the following commits: a70c500 [GuoQiang Li] Remove java related 4d1e249 [GuoQiang Li] Add comments e682724 [GuoQiang Li] Graph should support the checkpoint operation Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e895e0cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e895e0cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e895e0cb Branch: refs/heads/master Commit: e895e0cbecbbec1b412ff21321e57826d2d0a982 Parents: 6eb1b6f Author: GuoQiang Li Authored: Sat Dec 6 00:56:51 2014 -0800 Committer: Ankur Dave Committed: Sat Dec 6 00:56:51 2014 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/graphx/Graph.scala | 8 ++++++++ .../apache/spark/graphx/impl/GraphImpl.scala | 5 +++++ .../org/apache/spark/graphx/GraphSuite.scala | 21 ++++++++++++++++++++ 3 files changed, 34 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 6377915..23538b7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -97,6 +97,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab def cache(): Graph[VD, ED] /** + * Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint + * directory set with SparkContext.setCheckpointDir() and all references to its parent + * RDDs will be removed. It is strongly recommended that this Graph is persisted in + * memory, otherwise saving it on a file will require recomputation. + */ + def checkpoint(): Unit + + /** * Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative * algorithms that modify the vertex attributes but reuse the edges. This method can be used to * uncache the vertex attributes of previous iterations once they are no longer needed, improving http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 0eae2a6..a617d84 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( this } + override def checkpoint(): Unit = { + vertices.checkpoint() + replicatedVertexView.edges.checkpoint() + } + override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { vertices.unpersist(blocking) // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala ---------------------------------------------------------------------- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index a05d1dd..9da0064 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.graphx import org.scalatest.FunSuite +import com.google.common.io.Files + import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("checkpoint") { + val checkpointDir = Files.createTempDir() + checkpointDir.deleteOnExit() + withSpark { sc => + sc.setCheckpointDir(checkpointDir.getAbsolutePath) + val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)} + val rdd = sc.parallelize(ring) + val graph = Graph.fromEdges(rdd, 1.0F) + graph.checkpoint() + graph.edges.map(_.attr).count() + graph.vertices.map(_._2).count() + + val edgesDependencies = graph.edges.partitionsRDD.dependencies + val verticesDependencies = graph.vertices.partitionsRDD.dependencies + assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) + assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org