Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 031AC1013E for ; Thu, 2 Jan 2014 05:31:07 +0000 (UTC) Received: (qmail 6586 invoked by uid 500); 2 Jan 2014 05:30:36 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 6096 invoked by uid 500); 2 Jan 2014 05:30:16 -0000 Mailing-List: contact commits-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list commits@spark.incubator.apache.org Received: (qmail 5650 invoked by uid 99); 2 Jan 2014 05:30:08 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jan 2014 05:30:08 +0000 X-ASF-Spam-Status: No, hits=-2000.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 02 Jan 2014 05:30:04 +0000 Received: (qmail 4401 invoked by uid 99); 2 Jan 2014 05:29:43 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jan 2014 05:29:43 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9452E9117D7; Thu, 2 Jan 2014 05:29:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pwendell@apache.org To: commits@spark.incubator.apache.org Date: Thu, 02 Jan 2014 05:30:04 -0000 Message-Id: In-Reply-To: <587fdf6495a04bb5935a81b6d03beaf8@git.apache.org> References: <587fdf6495a04bb5935a81b6d03beaf8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [26/33] git commit: Merge remote-tracking branch 'apache/master' into conf2 X-Virus-Checked: Checked by ClamAV on apache.org Merge remote-tracking branch 'apache/master' into conf2 Conflicts: core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ba9338f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ba9338f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ba9338f1 Branch: refs/heads/master Commit: ba9338f104ccc71d4f342a3f96624a9b36895f48 Parents: 0fa5809 63b411d Author: Matei Zaharia Authored: Tue Dec 31 18:23:14 2013 -0500 Committer: Matei Zaharia Committed: Tue Dec 31 18:23:14 2013 -0500 ---------------------------------------------------------------------- .../apache/spark/network/netty/FileClient.java | 32 ++-- .../netty/FileClientChannelInitializer.java | 6 +- .../spark/network/netty/FileClientHandler.java | 12 +- .../apache/spark/network/netty/FileServer.java | 29 ++-- .../netty/FileServerChannelInitializer.java | 3 +- .../spark/network/netty/FileServerHandler.java | 18 ++- .../scala/org/apache/spark/SparkContext.scala | 23 +-- .../spark/api/java/JavaSparkContext.scala | 15 +- .../org/apache/spark/rdd/CheckpointRDD.scala | 32 ++-- .../apache/spark/rdd/RDDCheckpointData.scala | 15 +- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../org/apache/spark/scheduler/JobLogger.scala | 2 +- .../apache/spark/scheduler/SparkListener.scala | 20 ++- .../spark/scheduler/SparkListenerBus.scala | 2 +- .../apache/spark/scheduler/TaskSetManager.scala | 6 - .../spark/ui/jobs/JobProgressListener.scala | 2 +- .../scala/org/apache/spark/JavaAPISuite.java | 4 +- .../apache/spark/scheduler/JobLoggerSuite.scala | 2 +- .../spark/scheduler/SparkListenerSuite.scala | 2 +- pom.xml | 2 +- project/SparkBuild.scala | 2 +- python/pyspark/context.py | 9 +- python/pyspark/tests.py | 4 +- .../org/apache/spark/streaming/Checkpoint.scala | 53 ++++--- .../spark/streaming/StreamingContext.scala | 31 ++-- .../api/java/JavaStreamingContext.scala | 13 +- .../streaming/dstream/FileInputDStream.scala | 153 +++++++++++-------- .../streaming/scheduler/JobGenerator.scala | 66 +++++--- .../spark/streaming/CheckpointSuite.scala | 44 +++--- .../spark/streaming/InputStreamsSuite.scala | 2 +- 30 files changed, 347 insertions(+), 259 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/SparkContext.scala index 8134ce7,7514ce5..fbc7a78 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@@ -19,15 -19,14 +19,15 @@@ package org.apache.spar import java.io._ import java.net.URI - import java.util.Properties + import java.util.{UUID, Properties} import java.util.concurrent.atomic.AtomicInteger -import scala.collection.Map +import scala.collection.{Map, Set, immutable} import scala.collection.generic.Growable -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap + +import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.reflect.{ClassTag, classTag} +import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 2897c4b,293a7d1..172ba6b --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@@ -118,10 -125,14 +124,14 @@@ private[spark] object CheckpointRDD ext } } - def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { + def readFromFile[T]( + path: Path, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + context: TaskContext + ): Iterator[T] = { val env = SparkEnv.get - val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) + val fs = path.getFileSystem(broadcastedConf.value.value) - val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt + val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() val deserializeStream = serializer.deserializeStream(fileInputStream) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d752e6f,7929051..b99664a --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@@ -114,13 -112,9 +114,9 @@@ private[spark] class TaskSetManager // Task index, start and finish time for each task attempt (indexed by task ID) val taskInfos = new HashMap[Long, TaskInfo] - // Did the TaskSet fail? - var failed = false - var causeOfFailure = "" - // How frequently to reprint duplicate exceptions in full, in milliseconds val EXCEPTION_PRINT_INTERVAL = - System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong + conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong // Map of recent exceptions (identified by string representation and top stack frame) to // duplicate count (how many times the same exception has appeared) and time the full exception http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/project/SparkBuild.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/python/pyspark/context.py ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---------------------------------------------------------------------- diff --cc streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 35e23c1,4960a85..af44327 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@@ -21,12 -21,13 +21,13 @@@ import java.io. import java.util.concurrent.Executors import java.util.concurrent.RejectedExecutionException - import org.apache.hadoop.fs.Path + import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.MetadataCleaner + import org.apache.spark.deploy.SparkHadoopUtil private[streaming] @@@ -54,23 -55,21 +55,23 @@@ class Checkpoint(@transient ssc: Stream /** - * Convenience class to speed up the writing of graph checkpoint to file + * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] - class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging { -class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging { ++class CheckpointWriter(conf: SparkConf, checkpointDir: String, hadoopConf: Configuration) ++ extends Logging ++{ val file = new Path(checkpointDir, "graph") - // The file to which we actually write - and then "move" to file. - private val writeFile = new Path(file.getParent, file.getName + ".next") - private val bakFile = new Path(file.getParent, file.getName + ".bk") - - private var stopped = false - - val hadoopConf = new Configuration() - var fs = file.getFileSystem(hadoopConf) - val maxAttempts = 3 + val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) + val compressionCodec = CompressionCodec.createCodec() + // The file to which we actually write - and then "move" to file + val writeFile = new Path(file.getParent, file.getName + ".next") + // The file to which existing checkpoint is backed up (i.e. "moved") + val bakFile = new Path(file.getParent, file.getName + ".bk") - private val compressionCodec = CompressionCodec.createCodec(conf) + private var stopped = false + private var fs_ : FileSystem = _ // Removed code which validates whether there is only one CheckpointWriter per path 'file' since // I did not notice any errors - reintroduce it ? @@@ -141,12 -151,12 +153,12 @@@ private[streaming] object CheckpointReader extends Logging { - def read(path: String): Checkpoint = { + def read(conf: SparkConf, path: String): Checkpoint = { val fs = new Path(path).getFileSystem(new Configuration()) - val attempts = Seq( - new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), + new Path(path), new Path(path + ".bk")) - val compressionCodec = CompressionCodec.createCodec() + val compressionCodec = CompressionCodec.createCodec(conf) attempts.foreach(file => { if (fs.exists(file)) { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---------------------------------------------------------------------- diff --cc streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index dbd0841,921a33a..e448211 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@@ -32,14 -39,23 +39,23 @@@ class JobGenerator(jobScheduler: JobSch initLogging() val ssc = jobScheduler.ssc - val clockClass = ssc.sc.conf.getOrElse( - "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] - val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, - longTime => generateJobs(new Time(longTime))) val graph = ssc.graph + val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor { + def receive = { + case event: JobGeneratorEvent => + logDebug("Got event of type " + event.getClass.getName) + processEvent(event) + } + })) + val clock = { - val clockClass = System.getProperty( ++ val clockClass = ssc.sc.conf.getOrElse( + "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - Class.forName(clockClass).newInstance().asInstanceOf[Clock] ++ val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] + } + val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, + longTime => eventProcessorActor ! GenerateJobs(new Time(longTime))) lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { - new CheckpointWriter(ssc.conf, ssc.checkpointDir) - new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) ++ new CheckpointWriter(ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } @@@ -52,10 -66,9 +66,9 @@@ } else { startFirstTime() } - logInfo("JobGenerator started") } - + - def stop() = synchronized { + def stop() { timer.stop() if (checkpointWriter != null) checkpointWriter.stop() ssc.graph.stop() http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---------------------------------------------------------------------- diff --cc streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index c60a3f5,4e25c95..8dc80ac --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@@ -63,10 -57,10 +57,10 @@@ class CheckpointSuite extends TestSuite assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") val stateStreamCheckpointInterval = Seconds(1) - + val fs = FileSystem.getLocal(new Configuration()) // this ensure checkpointing occurs at least once val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2 val secondNumBatches = firstNumBatches @@@ -132,11 -129,11 +129,12 @@@ ssc = new StreamingContext(checkpointDir) stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") - assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure") + assert(!stateStream.generatedRDDs.isEmpty, + "No restored RDDs in state stream after recovery from second failure") - // Adjust manual clock time as if it is being restarted after a delay - System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) + // Adjust manual clock time as if it is being restarted after a delay; this is a hack because + // we modify the conf object, but it works for this one property + ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) ssc.start() advanceTimeWithRealDelay(ssc, 4) ssc.stop() http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ----------------------------------------------------------------------