Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EE7C0109B6 for ; Sat, 11 Jan 2014 00:26:21 +0000 (UTC) Received: (qmail 75205 invoked by uid 500); 11 Jan 2014 00:26:21 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 75174 invoked by uid 500); 11 Jan 2014 00:26:21 -0000 Mailing-List: contact commits-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list commits@spark.incubator.apache.org Received: (qmail 75167 invoked by uid 99); 11 Jan 2014 00:26:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 11 Jan 2014 00:26:21 +0000 X-ASF-Spam-Status: No, hits=-2000.1 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sat, 11 Jan 2014 00:26:17 +0000 Received: (qmail 73013 invoked by uid 99); 11 Jan 2014 00:25:55 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 11 Jan 2014 00:25:55 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A002732AF1C; Sat, 11 Jan 2014 00:25:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pwendell@apache.org To: commits@spark.incubator.apache.org Date: Sat, 11 Jan 2014 00:25:58 -0000 Message-Id: <0ee537587f2042a38de504d36428ef04@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/20] git commit: Merge branch 'filestream-fix' into driver-test X-Virus-Checked: Checked by ClamAV on apache.org Merge branch 'filestream-fix' into driver-test Conflicts: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/23947945 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/23947945 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/23947945 Branch: refs/heads/master Commit: 23947945913cafb4f6549167c53a3cdd4a09fef0 Parents: 8e88db3 fcd17a1 Author: Tathagata Das Authored: Mon Jan 6 02:23:53 2014 +0000 Committer: Tathagata Das Committed: Mon Jan 6 02:23:53 2014 +0000 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 23 +-- .../spark/api/java/JavaSparkContext.scala | 15 +- .../org/apache/spark/rdd/CheckpointRDD.scala | 32 ++-- .../apache/spark/rdd/RDDCheckpointData.scala | 15 +- .../scala/org/apache/spark/JavaAPISuite.java | 4 +- python/pyspark/context.py | 9 +- python/pyspark/tests.py | 4 +- .../org/apache/spark/streaming/Checkpoint.scala | 50 +++--- .../spark/streaming/StreamingContext.scala | 32 ++-- .../api/java/JavaStreamingContext.scala | 13 +- .../streaming/dstream/FileInputDStream.scala | 153 +++++++++++-------- .../streaming/scheduler/JobGenerator.scala | 67 +++++--- .../spark/streaming/CheckpointSuite.scala | 44 +++--- .../spark/streaming/InputStreamsSuite.scala | 2 +- 14 files changed, 267 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---------------------------------------------------------------------- diff --cc streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 139e2c0,4960a85..09b184b --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@@ -21,12 -21,13 +21,13 @@@ import java.io. import java.util.concurrent.Executors import java.util.concurrent.RejectedExecutionException - import org.apache.hadoop.fs.Path + import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.MetadataCleaner + import org.apache.spark.deploy.SparkHadoopUtil private[streaming] @@@ -141,16 -151,11 +151,15 @@@ class CheckpointWriter(checkpointDir: S private[streaming] object CheckpointReader extends Logging { - def read(path: String): Checkpoint = { + def doesCheckpointExist(path: String): Boolean = { + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) val fs = new Path(path).getFileSystem(new Configuration()) - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), - new Path(path), new Path(path + ".bk")) + (attempts.count(p => fs.exists(p)) > 1) + } + def read(path: String): Checkpoint = { + val fs = new Path(path).getFileSystem(new Configuration()) + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) - val compressionCodec = CompressionCodec.createCodec() attempts.foreach(file => { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ----------------------------------------------------------------------