spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-3850] Trim trailing spaces for examples/streaming/yarn.
Date Sun, 31 May 2015 07:48:33 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 a7c217166 -> f63eab950


[SPARK-3850] Trim trailing spaces for examples/streaming/yarn.

Author: Reynold Xin <rxin@databricks.com>

Closes #6530 from rxin/trim-whitespace-1 and squashes the following commits:

7b7b3a0 [Reynold Xin] Reset again.
dc14597 [Reynold Xin] Reset scalastyle.
cd556c4 [Reynold Xin] YARN, Kinesis, Flume.
4223fe1 [Reynold Xin] [SPARK-3850] Trim trailing spaces for examples/streaming.

(cherry picked from commit 564bc11e9827915c8652bc06f4bd591809dea4b1)
Signed-off-by: Reynold Xin <rxin@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f63eab95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f63eab95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f63eab95

Branch: refs/heads/branch-1.4
Commit: f63eab950b1ec460fb1de2c2b6f123ef065e1c3f
Parents: a7c2171
Author: Reynold Xin <rxin@databricks.com>
Authored: Sun May 31 00:47:56 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Sun May 31 00:48:29 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/examples/LogQuery.scala    |  2 +-
 .../examples/mllib/DenseGaussianMixture.scala   | 10 +++----
 .../examples/streaming/MQTTWordCount.scala      | 10 +++----
 .../streaming/flume/FlumeInputDStream.scala     | 10 +++----
 .../flume/FlumePollingInputDStream.scala        |  2 +-
 .../streaming/flume/FlumeStreamSuite.scala      |  2 +-
 .../spark/streaming/kafka/KafkaCluster.scala    |  2 +-
 .../spark/streaming/kafka/KafkaUtils.scala      |  8 +++---
 .../streaming/KinesisWordCountASL.scala         | 12 ++++-----
 .../kinesis/KinesisCheckpointState.scala        |  8 +++---
 .../streaming/kinesis/KinesisReceiver.scala     |  8 +++---
 .../kinesis/KinesisRecordProcessor.scala        | 28 ++++++++++----------
 .../org/apache/spark/graphx/EdgeSuite.scala     | 10 +++----
 .../streaming/receiver/BlockGenerator.scala     |  2 +-
 .../receiver/ReceivedBlockHandler.scala         |  2 +-
 .../spark/streaming/UISeleniumSuite.scala       |  4 ---
 .../streaming/util/WriteAheadLogSuite.scala     |  4 +--
 .../yarn/ClientDistributedCacheManager.scala    | 26 +++++++++---------
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  4 +--
 .../ClientDistributedCacheManagerSuite.scala    | 24 ++++++++---------
 20 files changed, 87 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
index 32e02ea..75c8211 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
@@ -22,7 +22,7 @@ import org.apache.spark.SparkContext._
 
 /**
  * Executes a roll up-style query against Apache logs.
- *  
+ *
  * Usage: LogQuery [logFile]
  */
 object LogQuery {

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala
b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala
index 9a1aab0..f8c71cc 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala
@@ -41,22 +41,22 @@ object DenseGaussianMixture {
   private def run(inputFile: String, k: Int, convergenceTol: Double, maxIterations: Int)
{
     val conf = new SparkConf().setAppName("Gaussian Mixture Model EM example")
     val ctx = new SparkContext(conf)
-    
+
     val data = ctx.textFile(inputFile).map { line =>
       Vectors.dense(line.trim.split(' ').map(_.toDouble))
     }.cache()
-      
+
     val clusters = new GaussianMixture()
       .setK(k)
       .setConvergenceTol(convergenceTol)
       .setMaxIterations(maxIterations)
       .run(data)
-    
+
     for (i <- 0 until clusters.k) {
-      println("weight=%f\nmu=%s\nsigma=\n%s\n" format 
+      println("weight=%f\nmu=%s\nsigma=\n%s\n" format
         (clusters.weights(i), clusters.gaussians(i).mu, clusters.gaussians(i).sigma))
     }
-    
+
     println("Cluster labels (first <= 100):")
     val clusterLabels = clusters.predict(data)
     clusterLabels.take(100).foreach { x =>

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
index b336751..813c855 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
@@ -40,7 +40,7 @@ object MQTTPublisher {
     StreamingExamples.setStreamingLogLevels()
 
     val Seq(brokerUrl, topic) = args.toSeq
-    
+
     var client: MqttClient = null
 
     try {
@@ -59,10 +59,10 @@ object MQTTPublisher {
           println(s"Published data. topic: ${msgtopic.getName()}; Message: $message")
         } catch {
           case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT
=>
-            Thread.sleep(10) 
+            Thread.sleep(10)
             println("Queue is full, wait for to consume data from the message queue")
-        }  
-      }      
+        }
+      }
     } catch {
       case e: MqttException => println("Exception Caught: " + e)
     } finally {
@@ -107,7 +107,7 @@ object MQTTWordCount {
     val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
     val words = lines.flatMap(x => x.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
-    
+
     wordCounts.print()
     ssc.start()
     ssc.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 60e2994..1e32a36 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -152,9 +152,9 @@ class FlumeReceiver(
       val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
                                                              Executors.newCachedThreadPool())
       val channelPipelineFactory = new CompressionChannelPipelineFactory()
-      
+
       new NettyServer(
-        responder, 
+        responder,
         new InetSocketAddress(host, port),
         channelFactory,
         channelPipelineFactory,
@@ -188,12 +188,12 @@ class FlumeReceiver(
 
   override def preferredLocation: Option[String] = Option(host)
 
-  /** A Netty Pipeline factory that will decompress incoming data from 
+  /** A Netty Pipeline factory that will decompress incoming data from
     * and the Netty client and compress data going back to the client.
     *
     * The compression on the return is required because Flume requires
-    * a successful response to indicate it can remove the event/batch 
-    * from the configured channel 
+    * a successful response to indicate it can remove the event/batch
+    * from the configured channel
     */
   private[streaming]
   class CompressionChannelPipelineFactory extends ChannelPipelineFactory {

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 92fa5b4..583e7dc 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -110,7 +110,7 @@ private[streaming] class FlumePollingReceiver(
 }
 
 /**
- * A wrapper around the transceiver and the Avro IPC API. 
+ * A wrapper around the transceiver and the Avro IPC API.
  * @param transceiver The transceiver to use for communication with Flume
  * @param client The client that the callbacks are received on.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 39e6754..4ea0f88 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -138,7 +138,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers
with L
       val status = client.appendBatch(inputEvents.toList)
       status should be (avro.Status.OK)
     }
-    
+
     eventually(timeout(10 seconds), interval(100 milliseconds)) {
       val outputEvents = outputBuffer.flatten.map { _.event }
       outputEvents.foreach {

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index 6cf254a..65d51d8 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -113,7 +113,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable
{
       r.flatMap { tm: TopicMetadata =>
         tm.partitionsMetadata.map { pm: PartitionMetadata =>
           TopicAndPartition(tm.topic, pm.partitionId)
-        }    
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 8be2707..0b8a391 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -315,7 +315,7 @@ object KafkaUtils {
    * Points to note:
    *  - No receivers: This stream does not use any receiver. It directly queries Kafka
    *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools that depend
on 
+   *    by the stream itself. For interoperability with Kafka monitoring tools that depend
on
    *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
    *    You can access the offsets used in each batch from the generated RDDs (see
    *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
@@ -363,7 +363,7 @@ object KafkaUtils {
    * Points to note:
    *  - No receivers: This stream does not use any receiver. It directly queries Kafka
    *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools that depend
on 
+   *    by the stream itself. For interoperability with Kafka monitoring tools that depend
on
    *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
    *    You can access the offsets used in each batch from the generated RDDs (see
    *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
@@ -427,7 +427,7 @@ object KafkaUtils {
    * Points to note:
    *  - No receivers: This stream does not use any receiver. It directly queries Kafka
    *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools that depend
on 
+   *    by the stream itself. For interoperability with Kafka monitoring tools that depend
on
    *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
    *    You can access the offsets used in each batch from the generated RDDs (see
    *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
@@ -489,7 +489,7 @@ object KafkaUtils {
    * Points to note:
    *  - No receivers: This stream does not use any receiver. It directly queries Kafka
    *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools that depend
on 
+   *    by the stream itself. For interoperability with Kafka monitoring tools that depend
on
    *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
    *    You can access the offsets used in each batch from the generated RDDs (see
    *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
index 97c3476..be8b62d 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -119,7 +119,7 @@ object KinesisWordCountASL extends Logging {
     val batchInterval = Milliseconds(2000)
 
     // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with
information
-    // on sequence number of records that have been received. Same as batchInterval for this

+    // on sequence number of records that have been received. Same as batchInterval for this
     // example.
     val kinesisCheckpointInterval = batchInterval
 
@@ -145,7 +145,7 @@ object KinesisWordCountASL extends Logging {
 
     // Map each word to a (word, 1) tuple so we can reduce by key to count the words
     val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
- 
+
     // Print the first 10 wordCounts
     wordCounts.print()
 
@@ -210,14 +210,14 @@ object KinesisWordProducerASL {
 
     val randomWords = List("spark", "you", "are", "my", "father")
     val totals = scala.collection.mutable.Map[String, Int]()
-  
+
     // Create the low-level Kinesis Client from the AWS Java SDK.
     val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
     kinesisClient.setEndpoint(endpoint)
 
     println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
         s" $recordsPerSecond records per second and $wordsPerRecord words per record")
-  
+
     // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord
     for (i <- 1 to 10) {
       // Generate recordsPerSec records to put onto the stream
@@ -255,8 +255,8 @@ object KinesisWordProducerASL {
   }
 }
 
-/** 
- *  Utility functions for Spark Streaming examples. 
+/**
+ *  Utility functions for Spark Streaming examples.
  *  This has been lifted from the examples/ project to remove the circular dependency.
  */
 private[streaming] object StreamingExamples extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
index 1c9b0c2..83a4537 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -23,20 +23,20 @@ import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 /**
  * This is a helper class for managing checkpoint clocks.
  *
- * @param checkpointInterval 
+ * @param checkpointInterval
  * @param currentClock.  Default to current SystemClock if none is passed in (mocking purposes)
  */
 private[kinesis] class KinesisCheckpointState(
-    checkpointInterval: Duration, 
+    checkpointInterval: Duration,
     currentClock: Clock = new SystemClock())
   extends Logging {
-  
+
   /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis
*/
   val checkpointClock = new ManualClock()
   checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds)
 
   /**
-   * Check if it's time to checkpoint based on the current time and the derived time 
+   * Check if it's time to checkpoint based on the current time and the derived time
    *   for the next checkpoint
    *
    * @return true if it's time to checkpoint

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 7dd8bfd..1a8a4ce 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -44,12 +44,12 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey:
String)
  * https://github.com/awslabs/amazon-kinesis-client
  * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described
here:
  *   http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * Instances of this class will get shipped to the Spark Streaming Workers to run within
a 
+ * Instances of this class will get shipped to the Spark Streaming Workers to run within
a
  *   Spark Executor.
  *
  * @param appName  Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
  *                 by the Kinesis Client Library.  If you change the App name or Stream name,
- *                 the KCL will throw errors.  This usually requires deleting the backing
 
+ *                 the KCL will throw errors.  This usually requires deleting the backing
  *                 DynamoDB table with the same name this Kinesis application.
  * @param streamName   Kinesis stream name
  * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
@@ -87,7 +87,7 @@ private[kinesis] class KinesisReceiver(
    */
 
   /**
-   * workerId is used by the KCL should be based on the ip address of the actual Spark Worker

+   * workerId is used by the KCL should be based on the ip address of the actual Spark Worker
    * where this code runs (not the driver's IP address.)
    */
   private var workerId: String = null
@@ -121,7 +121,7 @@ private[kinesis] class KinesisReceiver(
 
    /*
     *  RecordProcessorFactory creates impls of IRecordProcessor.
-    *  IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the 
+    *  IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
     *  IRecordProcessor.processRecords() method.
     *  We're using our custom KinesisRecordProcessor in this case.
     */

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index f65e743..fe9e3a0 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -35,9 +35,9 @@ import com.amazonaws.services.kinesis.model.Record
 /**
  * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
  * This implementation operates on the Array[Byte] from the KinesisReceiver.
- * The Kinesis Worker creates an instance of this KinesisRecordProcessor for each 
- *   shard in the Kinesis stream upon startup.  This is normally done in separate threads,

- *   but the KCLs within the KinesisReceivers will balance themselves out if you create 
+ * The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
+ *   shard in the Kinesis stream upon startup.  This is normally done in separate threads,
+ *   but the KCLs within the KinesisReceivers will balance themselves out if you create
  *   multiple Receivers.
  *
  * @param receiver Kinesis receiver
@@ -69,14 +69,14 @@ private[kinesis] class KinesisRecordProcessor(
    * and Spark Streaming's Receiver.store().
    *
    * @param batch list of records from the Kinesis stream shard
-   * @param checkpointer used to update Kinesis when this batch has been processed/stored

+   * @param checkpointer used to update Kinesis when this batch has been processed/stored
    *   in the DStream
    */
   override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer)
{
     if (!receiver.isStopped()) {
       try {
         /*
-         * Notes:  
+         * Notes:
          * 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
          *    Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
          *    internally-configured Spark serializer (kryo, etc).
@@ -84,19 +84,19 @@ private[kinesis] class KinesisRecordProcessor(
          *    ourselves from Spark's internal serialization strategy.
          * 3) For performance, the BlockGenerator is asynchronously queuing elements within
its
          *    memory before creating blocks.  This prevents the small block scenario, but
requires
-         *    that you register callbacks to know when a block has been generated and stored

+         *    that you register callbacks to know when a block has been generated and stored
          *    (WAL is sufficient for storage) before can checkpoint back to the source.
         */
         batch.foreach(record => receiver.store(record.getData().array()))
-        
+
         logDebug(s"Stored:  Worker $workerId stored ${batch.size} records for shardId $shardId")
 
         /*
-         * Checkpoint the sequence number of the last record successfully processed/stored

+         * Checkpoint the sequence number of the last record successfully processed/stored
          *   in the batch.
          * In this implementation, we're checkpointing after the given checkpointIntervalMillis.
-         * Note that this logic requires that processRecords() be called AND that it's time
to 
-         *   checkpoint.  I point this out because there is no background thread running
the 
+         * Note that this logic requires that processRecords() be called AND that it's time
to
+         *   checkpoint.  I point this out because there is no background thread running
the
          *   checkpointer.  Checkpointing is tested and trigger only when a new batch comes
in.
          * If the worker is shutdown cleanly, checkpoint will happen (see shutdown() below).
          * However, if the worker dies unexpectedly, a checkpoint may not happen.
@@ -130,16 +130,16 @@ private[kinesis] class KinesisRecordProcessor(
       }
     } else {
       /* RecordProcessor has been stopped. */
-      logInfo(s"Stopped:  The Spark KinesisReceiver has stopped for workerId $workerId" +

+      logInfo(s"Stopped:  The Spark KinesisReceiver has stopped for workerId $workerId" +
           s" and shardId $shardId.  No more records will be processed.")
     }
   }
 
   /**
    * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons:
-   * 1) the stream is resharding by splitting or merging adjacent shards 
+   * 1) the stream is resharding by splitting or merging adjacent shards
    *     (ShutdownReason.TERMINATE)
-   * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason 
+   * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason
    *     (ShutdownReason.ZOMBIE)
    *
    * @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE
@@ -153,7 +153,7 @@ private[kinesis] class KinesisRecordProcessor(
        * Checkpoint to indicate that all records from the shard have been drained and processed.
        * It's now OK to read from the new shards that resulted from a resharding event.
        */
-      case ShutdownReason.TERMINATE => 
+      case ShutdownReason.TERMINATE =>
         KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
 
       /*

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/graphx/src/test/scala/org/apache/spark/graphx/EdgeSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeSuite.scala
index 5a2c73b..8143606 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeSuite.scala
@@ -23,15 +23,15 @@ class EdgeSuite extends FunSuite {
   test ("compare") {
     // decending order
     val testEdges: Array[Edge[Int]] = Array(
-      Edge(0x7FEDCBA987654321L, -0x7FEDCBA987654321L, 1), 
-      Edge(0x2345L, 0x1234L, 1), 
-      Edge(0x1234L, 0x5678L, 1), 
-      Edge(0x1234L, 0x2345L, 1), 
+      Edge(0x7FEDCBA987654321L, -0x7FEDCBA987654321L, 1),
+      Edge(0x2345L, 0x1234L, 1),
+      Edge(0x1234L, 0x5678L, 1),
+      Edge(0x1234L, 0x2345L, 1),
       Edge(-0x7FEDCBA987654321L, 0x7FEDCBA987654321L, 1)
     )
     // to ascending order
     val sortedEdges = testEdges.sorted(Edge.lexicographicOrdering[Int])
-    
+
     for (i <- 0 until testEdges.length) {
       assert(sortedEdges(i) == testEdges(testEdges.length - i - 1))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 0588517..8d73593 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -191,7 +191,7 @@ private[streaming] class BlockGenerator(
     logError(message, t)
     listener.onError(message, t)
   }
-  
+
   private def pushBlock(block: Block) {
     listener.onPushBlock(block.id, block.buffer)
     logInfo("Pushed block " + block.id)

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 651b534..207d64d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -62,7 +62,7 @@ private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockI
 private[streaming] class BlockManagerBasedBlockHandler(
     blockManager: BlockManager, storageLevel: StorageLevel)
   extends ReceivedBlockHandler with Logging {
-  
+
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult
= {
     val putResult: Seq[(BlockId, BlockStatus)] = block match {
       case ArrayBufferBlock(arrayBuffer) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 441bbf9..43368de 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -28,9 +28,6 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 
-
-
-
 /**
  * Selenium tests for the Spark Web UI.
  */
@@ -197,4 +194,3 @@ class UISeleniumSuite
     }
   }
 }
-  

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 79098bc..616903b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.{SparkConf, SparkException}
 class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
 
   import WriteAheadLogSuite._
-  
+
   val hadoopConf = new Configuration()
   var tempDir: File = null
   var testDir: String = null
@@ -359,7 +359,7 @@ object WriteAheadLogSuite {
     ): FileBasedWriteAheadLog = {
     if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
     val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1)
-    
+
     // Ensure that 500 does not get sorted after 2000, so put a high base value.
     data.foreach { item =>
       manualClock.advance(500)

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 4ca6c90..3d3a966 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -43,22 +43,22 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
    * Add a resource to the list of distributed cache resources. This list can
    * be sent to the ApplicationMaster and possibly the executors so that it can
    * be downloaded into the Hadoop distributed cache for use by this application.
-   * Adds the LocalResource to the localResources HashMap passed in and saves 
+   * Adds the LocalResource to the localResources HashMap passed in and saves
    * the stats of the resources to they can be sent to the executors and verified.
    *
    * @param fs FileSystem
    * @param conf Configuration
    * @param destPath path to the resource
    * @param localResources localResource hashMap to insert the resource into
-   * @param resourceType LocalResourceType 
+   * @param resourceType LocalResourceType
    * @param link link presented in the distributed cache to the destination
-   * @param statCache cache to store the file/directory stats 
+   * @param statCache cache to store the file/directory stats
    * @param appMasterOnly Whether to only add the resource to the app master
    */
   def addResource(
       fs: FileSystem,
       conf: Configuration,
-      destPath: Path, 
+      destPath: Path,
       localResources: HashMap[String, LocalResource],
       resourceType: LocalResourceType,
       link: String,
@@ -74,15 +74,15 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
     amJarRsrc.setSize(destStatus.getLen())
     if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link
name")
     localResources(link) = amJarRsrc
-    
+
     if (!appMasterOnly) {
       val uri = destPath.toUri()
       val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
       if (resourceType == LocalResourceType.FILE) {
-        distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(), 
+        distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
           destStatus.getModificationTime().toString(), visibility.name())
       } else {
-        distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(), 
+        distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
           destStatus.getModificationTime().toString(), visibility.name())
       }
     }
@@ -96,11 +96,11 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
     val (sizes, timeStamps, visibilities) = tupleValues.unzip3
     if (keys.size > 0) {
       env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc, n) => acc + ","
+ n }
-      env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = 
+      env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
         timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = 
+      env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
         sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_FILES_VISIBILITIES") = 
+      env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
         visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
     }
   }
@@ -113,11 +113,11 @@ private[spark] class ClientDistributedCacheManager() extends Logging
{
     val (sizes, timeStamps, visibilities) = tupleValues.unzip3
     if (keys.size > 0) {
       env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc, n) => acc + ","
+ n }
-      env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = 
+      env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
         timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
       env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
         sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") = 
+      env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
         visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
     }
   }
@@ -197,7 +197,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
   def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus
= {
     val stat = statCache.get(uri) match {
       case Some(existstat) => existstat
-      case None => 
+      case None =>
         val newStat = fs.getFileStatus(new Path(uri))
         statCache.put(uri, newStat)
         newStat

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index ba91872..34ac089 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -139,9 +139,9 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
 }
 
 object YarnSparkHadoopUtil {
-  // Additional memory overhead 
+  // Additional memory overhead
   // 10% was arrived at experimentally. In the interest of minimizing memory waste while
covering
-  // the common cases. Memory overhead tends to grow with container size. 
+  // the common cases. Memory overhead tends to grow with container size.
 
   val MEMORY_OVERHEAD_FACTOR = 0.10
   val MEMORY_OVERHEAD_MIN = 384

http://git-wip-us.apache.org/repos/asf/spark/blob/f63eab95/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index 80b57d1..24a53f9 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -40,12 +40,12 @@ import scala.collection.mutable.Map
 class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
 
   class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
-    override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):

+    override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
         LocalResourceVisibility = {
       LocalResourceVisibility.PRIVATE
     }
   }
-  
+
   test("test getFileStatus empty") {
     val distMgr = new ClientDistributedCacheManager()
     val fs = mock[FileSystem]
@@ -60,7 +60,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar
{
     val distMgr = new ClientDistributedCacheManager()
     val fs = mock[FileSystem]
     val uri = new URI("/tmp/testing")
-    val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner", 
+    val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
       null, new Path("/tmp/testing"))
     when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
     val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
@@ -77,7 +77,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar
{
     val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
     when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
 
-    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",

+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
       statCache, false)
     val resource = localResources("link")
     assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
@@ -100,11 +100,11 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar
{
     assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
 
     // add another one and verify both there and order correct
-    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
       null, new Path("/tmp/testing2"))
     val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
     when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
-    distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",

+    distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
       statCache, false)
     val resource2 = localResources("link2")
     assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
@@ -116,7 +116,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar
{
     val env2 = new HashMap[String, String]()
     distMgr.setDistFilesEnv(env2)
     val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
-    val files = env2("SPARK_YARN_CACHE_FILES").split(',') 
+    val files = env2("SPARK_YARN_CACHE_FILES").split(',')
     val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
     val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
     assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
@@ -140,7 +140,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar
{
     when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
 
     intercept[Exception] {
-      distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,

+      distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
         statCache, false)
     }
     assert(localResources.get("link") === None)
@@ -154,11 +154,11 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar
{
     val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
     val localResources = HashMap[String, LocalResource]()
     val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
       null, new Path("/tmp/testing"))
     when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
 
-    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",

+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
       statCache, true)
     val resource = localResources("link")
     assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
@@ -188,11 +188,11 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar
{
     val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
     val localResources = HashMap[String, LocalResource]()
     val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
       null, new Path("/tmp/testing"))
     when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
 
-    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",

+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
       statCache, false)
     val resource = localResources("link")
     assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message