Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 25CC4180A5 for ; Mon, 19 Oct 2015 23:16:37 +0000 (UTC) Received: (qmail 14511 invoked by uid 500); 19 Oct 2015 23:16:34 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 14436 invoked by uid 500); 19 Oct 2015 23:16:34 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 14361 invoked by uid 99); 19 Oct 2015 23:16:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Oct 2015 23:16:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 78D67E0508; Mon, 19 Oct 2015 23:16:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andrewor14@apache.org To: commits@spark.apache.org Message-Id: <4683e79cf3db40238ac4e2e5e3fe5794@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-11051][CORE] Do not allow local checkpointing after the RDD is materialized and checkpointed Date: Mon, 19 Oct 2015 23:16:34 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master 7ab0ce650 -> a1413b366 [SPARK-11051][CORE] Do not allow local checkpointing after the RDD is materialized and checkpointed JIRA: https://issues.apache.org/jira/browse/SPARK-11051 When a `RDD` is materialized and checkpointed, its partitions and dependencies are cleared. If we allow local checkpointing on it and assign `LocalRDDCheckpointData` to its `checkpointData`. Next time when the RDD is materialized again, the error will be thrown. Author: Liang-Chi Hsieh Closes #9072 from viirya/no-localcheckpoint-after-checkpoint. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1413b36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1413b36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1413b36 Branch: refs/heads/master Commit: a1413b3662250dd5e980e8b1f7c3dc4585ab4766 Parents: 7ab0ce6 Author: Liang-Chi Hsieh Authored: Mon Oct 19 16:16:31 2015 -0700 Committer: Andrew Or Committed: Mon Oct 19 16:16:31 2015 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/rdd/RDD.scala | 35 ++++++++++++++++---- .../org/apache/spark/CheckpointSuite.scala | 4 +++ 2 files changed, 32 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a1413b36/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a56e542..a97bb17 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -294,7 +294,11 @@ abstract class RDD[T: ClassTag]( */ private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { - if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context) + if (isCheckpointedAndMaterialized) { + firstParent[T].iterator(split, context) + } else { + compute(split, context) + } } /** @@ -1520,21 +1524,38 @@ abstract class RDD[T: ClassTag]( persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true) } - checkpointData match { - case Some(reliable: ReliableRDDCheckpointData[_]) => logWarning( - "RDD was already marked for reliable checkpointing: overriding with local checkpoint.") - case _ => + // If this RDD is already checkpointed and materialized, its lineage is already truncated. + // We must not override our `checkpointData` in this case because it is needed to recover + // the checkpointed data. If it is overridden, next time materializing on this RDD will + // cause error. + if (isCheckpointedAndMaterialized) { + logWarning("Not marking RDD for local checkpoint because it was already " + + "checkpointed and materialized") + } else { + // Lineage is not truncated yet, so just override any existing checkpoint data with ours + checkpointData match { + case Some(_: ReliableRDDCheckpointData[_]) => logWarning( + "RDD was already marked for reliable checkpointing: overriding with local checkpoint.") + case _ => + } + checkpointData = Some(new LocalRDDCheckpointData(this)) } - checkpointData = Some(new LocalRDDCheckpointData(this)) this } /** - * Return whether this RDD is marked for checkpointing, either reliably or locally. + * Return whether this RDD is checkpointed and materialized, either reliably or locally. */ def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed) /** + * Return whether this RDD is checkpointed and materialized, either reliably or locally. + * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the + * return value. Exposed for testing. + */ + private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed + + /** * Return whether this RDD is marked for local checkpointing. * Exposed for testing. */ http://git-wip-us.apache.org/repos/asf/spark/blob/a1413b36/core/src/test/scala/org/apache/spark/CheckpointSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 4d70bfe..119e5fc 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -241,9 +241,13 @@ class CheckpointSuite extends SparkFunSuite with LocalSparkContext with Logging val rdd = new BlockRDD[Int](sc, Array[BlockId]()) assert(rdd.partitions.size === 0) assert(rdd.isCheckpointed === false) + assert(rdd.isCheckpointedAndMaterialized === false) checkpoint(rdd, reliableCheckpoint) + assert(rdd.isCheckpointed === false) + assert(rdd.isCheckpointedAndMaterialized === false) assert(rdd.count() === 0) assert(rdd.isCheckpointed === true) + assert(rdd.isCheckpointedAndMaterialized === true) assert(rdd.partitions.size === 0) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org