spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [26/33] git commit: Merge remote-tracking branch 'apache/master' into conf2
Date Thu, 02 Jan 2014 05:30:04 GMT
Merge remote-tracking branch 'apache/master' into conf2

Conflicts:
	core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
	streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
	streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ba9338f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ba9338f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ba9338f1

Branch: refs/heads/master
Commit: ba9338f104ccc71d4f342a3f96624a9b36895f48
Parents: 0fa5809 63b411d
Author: Matei Zaharia <matei@databricks.com>
Authored: Tue Dec 31 18:23:14 2013 -0500
Committer: Matei Zaharia <matei@databricks.com>
Committed: Tue Dec 31 18:23:14 2013 -0500

----------------------------------------------------------------------
 .../apache/spark/network/netty/FileClient.java  |  32 ++--
 .../netty/FileClientChannelInitializer.java     |   6 +-
 .../spark/network/netty/FileClientHandler.java  |  12 +-
 .../apache/spark/network/netty/FileServer.java  |  29 ++--
 .../netty/FileServerChannelInitializer.java     |   3 +-
 .../spark/network/netty/FileServerHandler.java  |  18 ++-
 .../scala/org/apache/spark/SparkContext.scala   |  23 +--
 .../spark/api/java/JavaSparkContext.scala       |  15 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  32 ++--
 .../apache/spark/rdd/RDDCheckpointData.scala    |  15 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |   2 +-
 .../org/apache/spark/scheduler/JobLogger.scala  |   2 +-
 .../apache/spark/scheduler/SparkListener.scala  |  20 ++-
 .../spark/scheduler/SparkListenerBus.scala      |   2 +-
 .../apache/spark/scheduler/TaskSetManager.scala |   6 -
 .../spark/ui/jobs/JobProgressListener.scala     |   2 +-
 .../scala/org/apache/spark/JavaAPISuite.java    |   4 +-
 .../apache/spark/scheduler/JobLoggerSuite.scala |   2 +-
 .../spark/scheduler/SparkListenerSuite.scala    |   2 +-
 pom.xml                                         |   2 +-
 project/SparkBuild.scala                        |   2 +-
 python/pyspark/context.py                       |   9 +-
 python/pyspark/tests.py                         |   4 +-
 .../org/apache/spark/streaming/Checkpoint.scala |  53 ++++---
 .../spark/streaming/StreamingContext.scala      |  31 ++--
 .../api/java/JavaStreamingContext.scala         |  13 +-
 .../streaming/dstream/FileInputDStream.scala    | 153 +++++++++++--------
 .../streaming/scheduler/JobGenerator.scala      |  66 +++++---
 .../spark/streaming/CheckpointSuite.scala       |  44 +++---
 .../spark/streaming/InputStreamsSuite.scala     |   2 +-
 30 files changed, 347 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/SparkContext.scala
index 8134ce7,7514ce5..fbc7a78
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@@ -19,15 -19,14 +19,15 @@@ package org.apache.spar
  
  import java.io._
  import java.net.URI
- import java.util.Properties
+ import java.util.{UUID, Properties}
  import java.util.concurrent.atomic.AtomicInteger
  
 -import scala.collection.Map
 +import scala.collection.{Map, Set, immutable}
  import scala.collection.generic.Growable
 -import scala.collection.mutable.ArrayBuffer
 -import scala.collection.mutable.HashMap
 +
 +import scala.collection.mutable.{ArrayBuffer, HashMap}
  import scala.reflect.{ClassTag, classTag}
 +import scala.util.Try
  
  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.fs.Path

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 2897c4b,293a7d1..172ba6b
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@@ -118,10 -125,14 +124,14 @@@ private[spark] object CheckpointRDD ext
      }
    }
  
-   def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
+   def readFromFile[T](
+       path: Path,
+       broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+       context: TaskContext
+     ): Iterator[T] = {
      val env = SparkEnv.get
-     val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
+     val fs = path.getFileSystem(broadcastedConf.value.value)
 -    val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
 +    val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt
      val fileInputStream = fs.open(path, bufferSize)
      val serializer = env.serializer.newInstance()
      val deserializeStream = serializer.deserializeStream(fileInputStream)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index d752e6f,7929051..b99664a
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@@ -114,13 -112,9 +114,9 @@@ private[spark] class TaskSetManager
    // Task index, start and finish time for each task attempt (indexed by task ID)
    val taskInfos = new HashMap[Long, TaskInfo]
  
-   // Did the TaskSet fail?
-   var failed = false
-   var causeOfFailure = ""
- 
    // How frequently to reprint duplicate exceptions in full, in milliseconds
    val EXCEPTION_PRINT_INTERVAL =
 -    System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
 +    conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
  
    // Map of recent exceptions (identified by string representation and top stack frame)
to
    // duplicate count (how many times the same exception has appeared) and time the full
exception

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/project/SparkBuild.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/python/pyspark/context.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 35e23c1,4960a85..af44327
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@@ -21,12 -21,13 +21,13 @@@ import java.io.
  import java.util.concurrent.Executors
  import java.util.concurrent.RejectedExecutionException
  
- import org.apache.hadoop.fs.Path
+ import org.apache.hadoop.fs.{FileSystem, Path}
  import org.apache.hadoop.conf.Configuration
  
 -import org.apache.spark.Logging
 +import org.apache.spark.{SparkConf, Logging}
  import org.apache.spark.io.CompressionCodec
  import org.apache.spark.util.MetadataCleaner
+ import org.apache.spark.deploy.SparkHadoopUtil
  
  
  private[streaming]
@@@ -54,23 -55,21 +55,23 @@@ class Checkpoint(@transient ssc: Stream
  
  
  /**
-  * Convenience class to speed up the writing of graph checkpoint to file
+  * Convenience class to handle the writing of graph checkpoint to file
   */
  private[streaming]
- class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging {
 -class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging
{
++class CheckpointWriter(conf: SparkConf, checkpointDir: String, hadoopConf: Configuration)
++  extends Logging
++{
    val file = new Path(checkpointDir, "graph")
-   // The file to which we actually write - and then "move" to file.
-   private val writeFile = new Path(file.getParent, file.getName + ".next")
-   private val bakFile = new Path(file.getParent, file.getName + ".bk")
- 
-   private var stopped = false
- 
-   val hadoopConf = new Configuration()
-   var fs = file.getFileSystem(hadoopConf)
-   val maxAttempts = 3
+   val MAX_ATTEMPTS = 3
    val executor = Executors.newFixedThreadPool(1)
+   val compressionCodec = CompressionCodec.createCodec()
+   // The file to which we actually write - and then "move" to file
+   val writeFile = new Path(file.getParent, file.getName + ".next")
+   // The file to which existing checkpoint is backed up (i.e. "moved")
+   val bakFile = new Path(file.getParent, file.getName + ".bk")
  
-   private val compressionCodec = CompressionCodec.createCodec(conf)
+   private var stopped = false
+   private var fs_ : FileSystem = _
  
    // Removed code which validates whether there is only one CheckpointWriter per path 'file'
since
    // I did not notice any errors - reintroduce it ?
@@@ -141,12 -151,12 +153,12 @@@
  private[streaming]
  object CheckpointReader extends Logging {
  
 -  def read(path: String): Checkpoint = {
 +  def read(conf: SparkConf, path: String): Checkpoint = {
      val fs = new Path(path).getFileSystem(new Configuration())
-     val attempts = Seq(
-       new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path
+ ".bk"))
+     val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"),
+       new Path(path), new Path(path + ".bk"))
  
 -    val compressionCodec = CompressionCodec.createCodec()
 +    val compressionCodec = CompressionCodec.createCodec(conf)
  
      attempts.foreach(file => {
        if (fs.exists(file)) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index dbd0841,921a33a..e448211
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@@ -32,14 -39,23 +39,23 @@@ class JobGenerator(jobScheduler: JobSch
    initLogging()
  
    val ssc = jobScheduler.ssc
-   val clockClass = ssc.sc.conf.getOrElse(
-     "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
-   val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
-   val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
-     longTime => generateJobs(new Time(longTime)))
    val graph = ssc.graph
+   val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor {
+     def receive = {
+       case event: JobGeneratorEvent =>
+         logDebug("Got event of type " + event.getClass.getName)
+         processEvent(event)
+     }
+   }))
+   val clock = {
 -    val clockClass = System.getProperty(
++    val clockClass = ssc.sc.conf.getOrElse(
+       "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
 -    Class.forName(clockClass).newInstance().asInstanceOf[Clock]
++    val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+   }
+   val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
+     longTime => eventProcessorActor ! GenerateJobs(new Time(longTime)))
    lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir
!= null) {
-     new CheckpointWriter(ssc.conf, ssc.checkpointDir)
 -    new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
++    new CheckpointWriter(ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
    } else {
      null
    }
@@@ -52,10 -66,9 +66,9 @@@
      } else {
        startFirstTime()
      }
-     logInfo("JobGenerator started")
    }
 -  
 +
-   def stop() = synchronized {
+   def stop() {
      timer.stop()
      if (checkpointWriter != null) checkpointWriter.stop()
      ssc.graph.stop()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --cc streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index c60a3f5,4e25c95..8dc80ac
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@@ -63,10 -57,10 +57,10 @@@ class CheckpointSuite extends TestSuite
  
      assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
  
 -    System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
 +    conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
  
      val stateStreamCheckpointInterval = Seconds(1)
- 
+     val fs = FileSystem.getLocal(new Configuration())
      // this ensure checkpointing occurs at least once
      val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
      val secondNumBatches = firstNumBatches
@@@ -132,11 -129,11 +129,12 @@@
      ssc = new StreamingContext(checkpointDir)
      stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
      logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n")
+ "]")
-     assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery
from second failure")
+     assert(!stateStream.generatedRDDs.isEmpty,
+       "No restored RDDs in state stream after recovery from second failure")
  
 -    // Adjust manual clock time as if it is being restarted after a delay
 -    System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds *
7).toString)
 +    // Adjust manual clock time as if it is being restarted after a delay; this is a hack
because
 +    // we modify the conf object, but it works for this one property
 +    ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
      ssc.start()
      advanceTimeWithRealDelay(ssc, 4)
      ssc.stop()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba9338f1/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------


Mime
View raw message