Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2452010286 for ; Thu, 10 Apr 2014 22:04:52 +0000 (UTC) Received: (qmail 37002 invoked by uid 500); 10 Apr 2014 22:04:51 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 36939 invoked by uid 500); 10 Apr 2014 22:04:48 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.apache.org Delivered-To: mailing list commits@spark.apache.org Received: (qmail 36885 invoked by uid 99); 10 Apr 2014 22:04:46 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Apr 2014 22:04:46 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0DF58989D2D; Thu, 10 Apr 2014 22:04:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pwendell@apache.org To: commits@spark.apache.org Date: Thu, 10 Apr 2014 22:04:46 -0000 Message-Id: <8e11540974034d89a7bcb9cb0d092780@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] Remove Unnecessary Whitespace's Repository: spark Updated Branches: refs/heads/master f04666252 -> 930b70f05 http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/main/scala/org/apache/spark/util/NextIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/NextIterator.scala b/core/src/main/scala/org/apache/spark/util/NextIterator.scala index 8266e5e..e5c732a 100644 --- a/core/src/main/scala/org/apache/spark/util/NextIterator.scala +++ b/core/src/main/scala/org/apache/spark/util/NextIterator.scala @@ -19,7 +19,7 @@ package org.apache.spark.util /** Provides a basic/boilerplate Iterator implementation. */ private[spark] abstract class NextIterator[U] extends Iterator[U] { - + private var gotNext = false private var nextValue: U = _ private var closed = false @@ -34,7 +34,7 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] { * This convention is required because `null` may be a valid value, * and using `Option` seems like it might create unnecessary Some/None * instances, given some iterators might be called in a tight loop. - * + * * @return U, or set 'finished' when done */ protected def getNext(): U http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/main/scala/org/apache/spark/util/StatCounter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index 732748a..d80eed4 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -62,10 +62,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { if (n == 0) { mu = other.mu m2 = other.m2 - n = other.n + n = other.n maxValue = other.maxValue minValue = other.minValue - } else if (other.n != 0) { + } else if (other.n != 0) { val delta = other.mu - mu if (other.n * 10 < n) { mu = mu + (delta * other.n) / (n + other.n) http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/main/scala/org/apache/spark/util/Vector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala index 3c8f94a..1a647fa 100644 --- a/core/src/main/scala/org/apache/spark/util/Vector.scala +++ b/core/src/main/scala/org/apache/spark/util/Vector.scala @@ -136,7 +136,7 @@ object Vector { def ones(length: Int) = Vector(length, _ => 1) /** - * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers + * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers * between 0.0 and 1.0. Optional scala.util.Random number generator can be provided. */ def random(length: Int, random: Random = new XORShiftRandom()) = http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala index 8a4cdea..7f22038 100644 --- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala @@ -25,28 +25,28 @@ import scala.util.hashing.MurmurHash3 import org.apache.spark.util.Utils.timeIt /** - * This class implements a XORShift random number generator algorithm + * This class implements a XORShift random number generator algorithm * Source: * Marsaglia, G. (2003). Xorshift RNGs. Journal of Statistical Software, Vol. 8, Issue 14. * @see Paper * This implementation is approximately 3.5 times faster than * {@link java.util.Random java.util.Random}, partly because of the algorithm, but also due - * to renouncing thread safety. JDK's implementation uses an AtomicLong seed, this class + * to renouncing thread safety. JDK's implementation uses an AtomicLong seed, this class * uses a regular Long. We can forgo thread safety since we use a new instance of the RNG * for each thread. */ private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) { - + def this() = this(System.nanoTime) private var seed = XORShiftRandom.hashSeed(init) // we need to just override next - this will be called by nextInt, nextDouble, // nextGaussian, nextLong, etc. - override protected def next(bits: Int): Int = { + override protected def next(bits: Int): Int = { var nextSeed = seed ^ (seed << 21) nextSeed ^= (nextSeed >>> 35) - nextSeed ^= (nextSeed << 4) + nextSeed ^= (nextSeed << 4) seed = nextSeed (nextSeed & ((1L << bits) -1)).asInstanceOf[Int] } @@ -89,7 +89,7 @@ private[spark] object XORShiftRandom { val million = 1e6.toInt val javaRand = new JavaRandom(seed) val xorRand = new XORShiftRandom(seed) - + // this is just to warm up the JIT - we're not timing anything timeIt(1e6.toInt) { javaRand.nextInt() @@ -97,9 +97,9 @@ private[spark] object XORShiftRandom { } val iters = timeIt(numIters)(_) - + /* Return results as a map instead of just printing to screen - in case the user wants to do something with them */ + in case the user wants to do something with them */ Map("javaTime" -> iters {javaRand.nextInt()}, "xorTime" -> iters {xorRand.nextInt()}) http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala index c5f24c6..c645e4c 100644 --- a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala @@ -37,7 +37,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val securityManager = new SecurityManager(conf); val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) @@ -54,14 +54,14 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { assert(securityManagerBad.isAuthenticationEnabled() === true) - val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") val timeout = AkkaUtils.lookupTimeout(conf) - intercept[akka.actor.ActorNotFound] { - slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + intercept[akka.actor.ActorNotFound] { + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) } actorSystem.shutdown() @@ -75,7 +75,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val securityManager = new SecurityManager(conf); val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) @@ -91,7 +91,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { badconf.set("spark.authenticate.secret", "good") val securityManagerBad = new SecurityManager(badconf); - val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = badconf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( @@ -127,7 +127,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val securityManager = new SecurityManager(conf); val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) @@ -180,7 +180,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val securityManager = new SecurityManager(conf); val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) @@ -204,8 +204,8 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val selection = slaveSystem.actorSelection( s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") val timeout = AkkaUtils.lookupTimeout(conf) - intercept[akka.actor.ActorNotFound] { - slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + intercept[akka.actor.ActorNotFound] { + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) } actorSystem.shutdown() http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/DriverSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala index 9cbdfc5..7f59bdc 100644 --- a/core/src/test/scala/org/apache/spark/DriverSuite.scala +++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala @@ -39,7 +39,7 @@ class DriverSuite extends FunSuite with Timeouts { failAfter(60 seconds) { Utils.executeAndGetOutput( Seq("./bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master), - new File(sparkHome), + new File(sparkHome), Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/FileServerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index aee9ab9..d651fbb 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -45,7 +45,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { val pw = new PrintWriter(textFile) pw.println("100") pw.close() - + val jarFile = new File(tmpDir, "test.jar") val jarStream = new FileOutputStream(jarFile) val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) @@ -53,7 +53,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { val jarEntry = new JarEntry(textFile.getName) jar.putNextEntry(jarEntry) - + val in = new FileInputStream(textFile) val buffer = new Array[Byte](10240) var nRead = 0 http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/FileSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 01af940..b9b668d 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -106,7 +106,7 @@ class FileSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath - val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) + val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) nums.saveAsSequenceFile(outputDir) // Try reading the output back as a SequenceFile val output = sc.sequenceFile[IntWritable, Text](outputDir) http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala index 0b5ed6d..5e538d6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala @@ -45,4 +45,4 @@ class WorkerWatcherSuite extends FunSuite { actorRef.underlyingActor.receive(new DisassociatedEvent(null, otherAkkaAddress, false)) assert(!actorRef.underlyingActor.isShutDown) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index 09e35bf..e89b296 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -42,7 +42,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { override def beforeAll() { sc = new SparkContext("local", "test") - + // Set the block size of local file system to test whether files are split right or not. sc.hadoopConfiguration.setLong("fs.local.block.size", 32) } http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala index a4381a8..4df3655 100644 --- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala @@ -34,14 +34,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices(1).mkString(",") === "2") assert(slices(2).mkString(",") === "3") } - + test("one slice") { val data = Array(1, 2, 3) val slices = ParallelCollectionRDD.slice(data, 1) assert(slices.size === 1) assert(slices(0).mkString(",") === "1,2,3") } - + test("equal slices") { val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) val slices = ParallelCollectionRDD.slice(data, 3) @@ -50,7 +50,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices(1).mkString(",") === "4,5,6") assert(slices(2).mkString(",") === "7,8,9") } - + test("non-equal slices") { val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val slices = ParallelCollectionRDD.slice(data, 3) @@ -77,14 +77,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices(1).mkString(",") === (33 to 66).mkString(",")) assert(slices(2).mkString(",") === (67 to 100).mkString(",")) } - + test("empty data") { val data = new Array[Int](0) val slices = ParallelCollectionRDD.slice(data, 5) assert(slices.size === 5) for (slice <- slices) assert(slice.size === 0) } - + test("zero slices") { val data = Array(1, 2, 3) intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, 0) } @@ -94,7 +94,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { val data = Array(1, 2, 3) intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, -5) } } - + test("exclusive ranges sliced into ranges") { val data = 1 until 100 val slices = ParallelCollectionRDD.slice(data, 3) @@ -102,7 +102,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.forall(_.isInstanceOf[Range])) } - + test("inclusive ranges sliced into ranges") { val data = 1 to 100 val slices = ParallelCollectionRDD.slice(data, 3) @@ -124,7 +124,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(range.step === 1, "slice " + i + " step") } } - + test("random array tests") { val gen = for { d <- arbitrary[List[Int]] @@ -141,7 +141,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { } check(prop) } - + test("random exclusive range tests") { val gen = for { a <- Gen.choose(-100, 100) @@ -177,7 +177,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { } check(prop) } - + test("exclusive ranges of longs") { val data = 1L until 100L val slices = ParallelCollectionRDD.slice(data, 3) @@ -185,7 +185,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.forall(_.isInstanceOf[NumericRange[_]])) } - + test("inclusive ranges of longs") { val data = 1L to 100L val slices = ParallelCollectionRDD.slice(data, 3) @@ -193,7 +193,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices.map(_.size).reduceLeft(_+_) === 100) assert(slices.forall(_.isInstanceOf[NumericRange[_]])) } - + test("exclusive ranges of doubles") { val data = 1.0 until 100.0 by 1.0 val slices = ParallelCollectionRDD.slice(data, 3) @@ -201,7 +201,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.forall(_.isInstanceOf[NumericRange[_]])) } - + test("inclusive ranges of doubles") { val data = 1.0 to 100.0 by 1.0 val slices = ParallelCollectionRDD.slice(data, 3) http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index dc704e0..4cdccdd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -216,7 +216,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc test("onTaskGettingResult() called when result fetched remotely") { val listener = new SaveTaskEvents sc.addSparkListener(listener) - + // Make a task whose result is larger than the akka frame size System.setProperty("spark.akka.frameSize", "1") val akkaFrameSize = @@ -236,7 +236,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc test("onTaskGettingResult() not called when result sent directly") { val listener = new SaveTaskEvents sc.addSparkListener(listener) - + // Make a task whose result is larger than the akka frame size val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x } assert(result === 2) http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 356e28d..2fb750d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -264,7 +264,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin test("Scheduler does not always schedule tasks on the same workers") { sc = new SparkContext("local", "TaskSchedulerImplSuite") - val taskScheduler = new TaskSchedulerImpl(sc) + val taskScheduler = new TaskSchedulerImpl(sc) taskScheduler.initialize(new FakeSchedulerBackend) // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. val dagScheduler = new DAGScheduler(sc, taskScheduler) { http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/ui/UISuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 45c3224..2f9739f 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -33,8 +33,8 @@ class UISuite extends FunSuite { val server = new Server(startPort) Try { server.start() } match { - case Success(s) => - case Failure(e) => + case Success(s) => + case Failure(e) => // Either case server port is busy hence setup for test complete } val serverInfo1 = JettyUtils.startJettyServer( http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 439e564..d7e48e6 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -69,7 +69,7 @@ object TestObject { class TestClass extends Serializable { var x = 5 - + def getX = x def run(): Int = { http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala index e1446cb..32d74d0 100644 --- a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala @@ -32,7 +32,7 @@ class NextIteratorSuite extends FunSuite with ShouldMatchers { i.hasNext should be === false intercept[NoSuchElementException] { i.next() } } - + test("two iterations") { val i = new StubIterator(Buffer(1, 2)) i.hasNext should be === true @@ -70,7 +70,7 @@ class NextIteratorSuite extends FunSuite with ShouldMatchers { class StubIterator(ints: Buffer[Int]) extends NextIterator[Int] { var closeCalled = 0 - + override def getNext() = { if (ints.size == 0) { finished = true http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala index 757476e..39199a1 100644 --- a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala @@ -29,12 +29,12 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers { val xorRand = new XORShiftRandom(seed) val hundMil = 1e8.toInt } - + /* - * This test is based on a chi-squared test for randomness. The values are hard-coded + * This test is based on a chi-squared test for randomness. The values are hard-coded * so as not to create Spark's dependency on apache.commons.math3 just to call one * method for calculating the exact p-value for a given number of random numbers - * and bins. In case one would want to move to a full-fledged test based on + * and bins. In case one would want to move to a full-fledged test based on * apache.commons.math3, the relevant class is here: * org.apache.commons.math3.stat.inference.ChiSquareTest */ @@ -49,19 +49,19 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers { // populate bins based on modulus of the random number times(f.hundMil) {bins(math.abs(f.xorRand.nextInt) % 10) += 1} - /* since the seed is deterministic, until the algorithm is changed, we know the result will be - * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, - * 10000790, 10002286, 9998699), so the test will never fail at the prespecified (5%) - * significance level. However, should the RNG implementation change, the test should still - * pass at the same significance level. The chi-squared test done in R gave the following + /* since the seed is deterministic, until the algorithm is changed, we know the result will be + * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, + * 10000790, 10002286, 9998699), so the test will never fail at the prespecified (5%) + * significance level. However, should the RNG implementation change, the test should still + * pass at the same significance level. The chi-squared test done in R gave the following * results: * > chisq.test(c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, * 10000790, 10002286, 9998699)) * Chi-squared test for given probabilities - * data: c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 10000790, + * data: c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 10000790, * 10002286, 9998699) * X-squared = 11.975, df = 9, p-value = 0.2147 - * Note that the p-value was ~0.22. The test will fail if alpha < 0.05, which for 100 million + * Note that the p-value was ~0.22. The test will fail if alpha < 0.05, which for 100 million * random numbers * and 10 bins will happen at X-squared of ~16.9196. So, the test will fail if X-squared * is greater than or equal to that number. http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 41e813d..1204cfb 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -48,41 +48,41 @@ import org.apache.spark.streaming.dstream._ * @param storageLevel RDD storage level. */ -private[streaming] +private[streaming] class MQTTInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_) with Logging { - + def getReceiver(): NetworkReceiver[T] = { new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]] } } -private[streaming] +private[streaming] class MQTTReceiver(brokerUrl: String, topic: String, storageLevel: StorageLevel ) extends NetworkReceiver[Any] { lazy protected val blockGenerator = new BlockGenerator(storageLevel) - + def onStop() { blockGenerator.stop() } - + def onStart() { blockGenerator.start() - // Set up persistence for messages + // Set up persistence for messages var peristance: MqttClientPersistence = new MemoryPersistence() // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) - // Connect to MqttBroker + // Connect to MqttBroker client.connect() // Subscribe to Mqtt topic @@ -91,7 +91,7 @@ class MQTTReceiver(brokerUrl: String, // Callback automatically triggers as and when new message arrives on specified topic var callback: MqttCallback = new MqttCallback() { - // Handles Mqtt message + // Handles Mqtt message override def messageArrived(arg0: String, arg1: MqttMessage) { blockGenerator += new String(arg1.getPayload()) } http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 3316b6d..843a4a7 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel * @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials. * An optional set of string filters can be used to restrict the set of tweets. The Twitter API is * such that this may return a sampled subset of all tweets during each interval. -* +* * If no Authorization object is provided, initializes OAuth authorization using the system * properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret. */ @@ -42,13 +42,13 @@ class TwitterInputDStream( filters: Seq[String], storageLevel: StorageLevel ) extends NetworkInputDStream[Status](ssc_) { - + private def createOAuthAuthorization(): Authorization = { new OAuthAuthorization(new ConfigurationBuilder().build()) } private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) - + override def getReceiver(): NetworkReceiver[Status] = { new TwitterReceiver(authorization, filters, storageLevel) } http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 377d9d6..5635287 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -172,7 +172,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali "EdgeDirection.Either instead.") } } - + /** * Join the vertices with an RDD and then apply a function from the * the vertex and RDD entry to a new vertex value. The input table http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala ---------------------------------------------------------------------- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index 6386306..a467ca1 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -55,7 +55,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { } } } - + test ("filter") { withSpark { sc => val n = 5 http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala index e41d9bb..7f6d945 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala @@ -30,7 +30,7 @@ import org.apache.spark.mllib.linalg.Vector trait Optimizer extends Serializable { /** - * Solve the provided convex optimization problem. + * Solve the provided convex optimization problem. */ def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector } http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 3bd0017..d969e7a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -26,7 +26,7 @@ import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.linalg.{Vectors, Vector} /** - * GeneralizedLinearModel (GLM) represents a model trained using + * GeneralizedLinearModel (GLM) represents a model trained using * GeneralizedLinearAlgorithm. GLMs consist of a weight vector and * an intercept. * @@ -38,7 +38,7 @@ abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double /** * Predict the result given a data point and the weights learned. - * + * * @param dataMatrix Row vector containing the features for this data point * @param weightMatrix Column vector containing the weights of the model * @param intercept Intercept of the model. http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index a30dcfd..687e85c 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -35,7 +35,7 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI, * used to load classes defined by the interpreter when the REPL is used. * Allows the user to specify if user class path should be first - */ + */ class ExecutorClassLoader(classUri: String, parent: ClassLoader, userClassPathFirst: Boolean) extends ClassLoader { val uri = new URI(classUri) @@ -94,7 +94,7 @@ class ExecutorClassLoader(classUri: String, parent: ClassLoader, case e: Exception => None } } - + def readAndTransformClass(name: String, in: InputStream): Array[Byte] = { if (name.startsWith("line") && name.endsWith("$iw$")) { // Class seems to be an interpreter "wrapper" object storing a val or var. http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala index 8f61a5e..419796b 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala @@ -187,7 +187,7 @@ trait SparkImports { if (currentImps contains imv) addWrapper() val objName = req.lineRep.readPath val valName = "$VAL" + newValId(); - + if(!code.toString.endsWith(".`" + imv + "`;\n")) { // Which means already imported code.append("val " + valName + " = " + objName + ".INSTANCE;\n") code.append("import " + valName + req.accessPath + ".`" + imv + "`;\n") http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 1711849..1f3fab0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -28,7 +28,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { override def toString = s"CAST($child, $dataType)" type EvaluatedType = Any - + def nullOrCast[T](a: Any, func: T => Any): Any = if(a == null) { null } else { @@ -40,7 +40,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { case BinaryType => nullOrCast[Array[Byte]](_, new String(_, "UTF-8")) case _ => nullOrCast[Any](_, _.toString) } - + // BinaryConverter def castToBinary: Any => Any = child.dataType match { case StringType => nullOrCast[String](_, _.getBytes("UTF-8")) @@ -58,7 +58,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { case DoubleType => nullOrCast[Double](_, _ != 0) case FloatType => nullOrCast[Float](_, _ != 0) } - + // TimestampConverter def castToTimestamp: Any => Any = child.dataType match { case StringType => nullOrCast[String](_, s => { http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 8a1db8e..dd9332a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -86,7 +86,7 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Evaluation helper function for 2 Numeric children expressions. Those expressions are supposed + * Evaluation helper function for 2 Numeric children expressions. Those expressions are supposed * to be in the same data type, and also the return type. * Either one of the expressions result is null, the evaluation result should be null. */ @@ -120,7 +120,7 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Evaluation helper function for 2 Fractional children expressions. Those expressions are + * Evaluation helper function for 2 Fractional children expressions. Those expressions are * supposed to be in the same data type, and also the return type. * Either one of the expressions result is null, the evaluation result should be null. */ @@ -153,7 +153,7 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Evaluation helper function for 2 Integral children expressions. Those expressions are + * Evaluation helper function for 2 Integral children expressions. Those expressions are * supposed to be in the same data type, and also the return type. * Either one of the expressions result is null, the evaluation result should be null. */ @@ -186,12 +186,12 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Evaluation helper function for 2 Comparable children expressions. Those expressions are + * Evaluation helper function for 2 Comparable children expressions. Those expressions are * supposed to be in the same data type, and the return type should be Integer: * Negative value: 1st argument less than 2nd argument * Zero: 1st argument equals 2nd argument * Positive value: 1st argument greater than 2nd argument - * + * * Either one of the expressions result is null, the evaluation result should be null. */ @inline @@ -213,7 +213,7 @@ abstract class Expression extends TreeNode[Expression] { null } else { e1.dataType match { - case i: NativeType => + case i: NativeType => f.asInstanceOf[(Ordering[i.JvmType], i.JvmType, i.JvmType) => Boolean]( i.ordering, evalE1.asInstanceOf[i.JvmType], evalE2.asInstanceOf[i.JvmType]) case other => sys.error(s"Type $other does not support ordered operations") http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala index a27c71d..ddc16ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala @@ -28,19 +28,19 @@ trait StringRegexExpression { self: BinaryExpression => type EvaluatedType = Any - + def escape(v: String): String def matches(regex: Pattern, str: String): Boolean - + def nullable: Boolean = true def dataType: DataType = BooleanType - - // try cache the pattern for Literal + + // try cache the pattern for Literal private lazy val cache: Pattern = right match { case x @ Literal(value: String, StringType) => compile(value) case _ => null } - + protected def compile(str: String): Pattern = if(str == null) { null } else { @@ -49,7 +49,7 @@ trait StringRegexExpression { } protected def pattern(str: String) = if(cache == null) compile(str) else cache - + override def eval(input: Row): Any = { val l = left.eval(input) if (l == null) { @@ -73,11 +73,11 @@ trait StringRegexExpression { /** * Simple RegEx pattern matching function */ -case class Like(left: Expression, right: Expression) +case class Like(left: Expression, right: Expression) extends BinaryExpression with StringRegexExpression { - + def symbol = "LIKE" - + // replace the _ with .{1} exactly match 1 time of any character // replace the % with .*, match 0 or more times with any character override def escape(v: String) = { @@ -98,19 +98,19 @@ case class Like(left: Expression, right: Expression) sb.append(Pattern.quote(Character.toString(n))); } } - + i += 1 } - + sb.toString() } - + override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).matches() } -case class RLike(left: Expression, right: Expression) +case class RLike(left: Expression, right: Expression) extends BinaryExpression with StringRegexExpression { - + def symbol = "RLIKE" override def escape(v: String): String = v override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).find(0) http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index cdeb01a..da34bd3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -55,9 +55,9 @@ case object BooleanType extends NativeType { case object TimestampType extends NativeType { type JvmType = Timestamp - + @transient lazy val tag = typeTag[JvmType] - + val ordering = new Ordering[JvmType] { def compare(x: Timestamp, y: Timestamp) = x.compareTo(y) } http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala index 888a19d..2cd0d2b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala @@ -144,7 +144,7 @@ class ExpressionEvaluationSuite extends FunSuite { checkEvaluation("abc" like "b%", false) checkEvaluation("abc" like "bc%", false) } - + test("LIKE Non-literal Regular Expression") { val regEx = 'a.string.at(0) checkEvaluation("abcd" like regEx, null, new GenericRow(Array[Any](null))) @@ -164,7 +164,7 @@ class ExpressionEvaluationSuite extends FunSuite { test("RLIKE literal Regular Expression") { checkEvaluation("abdef" rlike "abdef", true) checkEvaluation("abbbbc" rlike "a.*c", true) - + checkEvaluation("fofo" rlike "^fo", true) checkEvaluation("fo\no" rlike "^fo\no$", true) checkEvaluation("Bn" rlike "^Ba*n", true) @@ -196,9 +196,9 @@ class ExpressionEvaluationSuite extends FunSuite { evaluate("abbbbc" rlike regEx, new GenericRow(Array[Any]("**"))) } } - + test("data type casting") { - + val sts = "1970-01-01 00:00:01.0" val ts = Timestamp.valueOf(sts) @@ -236,7 +236,7 @@ class ExpressionEvaluationSuite extends FunSuite { checkEvaluation("23" cast ShortType, 23) checkEvaluation("2012-12-11" cast DoubleType, null) checkEvaluation(Literal(123) cast IntegerType, 123) - + intercept[Exception] {evaluate(Literal(1) cast BinaryType, null)} } http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala index 65eae33..1cbf973 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala @@ -56,4 +56,4 @@ class ScalaReflectionRelationSuite extends FunSuite { val result = sql("SELECT data FROM reflectBinary").collect().head(0).asInstanceOf[Array[Byte]] assert(result.toSeq === Seq[Byte](1)) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 93023e8..ac56ff7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -59,7 +59,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) } } -private[streaming] +private[streaming] object Checkpoint extends Logging { val PREFIX = "checkpoint-" val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r @@ -79,7 +79,7 @@ object Checkpoint extends Logging { def sortFunc(path1: Path, path2: Path): Boolean = { val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } - (time1 < time2) || (time1 == time2 && bk1) + (time1 < time2) || (time1 == time2 && bk1) } val path = new Path(checkpointDir) @@ -95,7 +95,7 @@ object Checkpoint extends Logging { } } else { logInfo("Checkpoint directory " + path + " does not exist") - Seq.empty + Seq.empty } } } @@ -160,7 +160,7 @@ class CheckpointWriter( }) } - // All done, print success + // All done, print success val finishTime = System.currentTimeMillis() logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") @@ -227,14 +227,14 @@ object CheckpointReader extends Logging { { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) - - // Try to find the checkpoint files + + // Try to find the checkpoint files val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse if (checkpointFiles.isEmpty) { return None } - // Try to read the checkpoint files in the order + // Try to read the checkpoint files in the order logInfo("Checkpoint files found: " + checkpointFiles.mkString(",")) val compressionCodec = CompressionCodec.createCodec(conf) checkpointFiles.foreach(file => { http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala index 16479a0..ad4f3fd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala @@ -20,11 +20,11 @@ package org.apache.spark.streaming private[streaming] class Interval(val beginTime: Time, val endTime: Time) { def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs)) - + def duration(): Duration = endTime - beginTime def + (time: Duration): Interval = { - new Interval(beginTime + time, endTime + time) + new Interval(beginTime + time, endTime + time) } def - (time: Duration): Interval = { @@ -40,9 +40,9 @@ class Interval(val beginTime: Time, val endTime: Time) { } def <= (that: Interval) = (this < that || this == that) - + def > (that: Interval) = !(this <= that) - + def >= (that: Interval) = !(this < that) override def toString = "[" + beginTime + ", " + endTime + "]" http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/streaming/src/main/scala/org/apache/spark/streaming/Time.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala index 2678334..6a6b00a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala @@ -32,7 +32,7 @@ case class Time(private val millis: Long) { def <= (that: Time): Boolean = (this.millis <= that.millis) def > (that: Time): Boolean = (this.millis > that.millis) - + def >= (that: Time): Boolean = (this.millis >= that.millis) def + (that: Duration): Time = new Time(millis + that.milliseconds) @@ -43,7 +43,7 @@ case class Time(private val millis: Long) { def floor(that: Duration): Time = { val t = that.milliseconds - val m = math.floor(this.millis / t).toLong + val m = math.floor(this.millis / t).toLong new Time(m * t) } http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 903e3f3..f33c0ce 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -51,7 +51,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) .map(x => (x._1, x._2.getCheckpointFile.get)) logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) - // Add the checkpoint files to the data to be serialized + // Add the checkpoint files to the data to be serialized if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 8a60516..e878285 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -232,7 +232,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } logDebug("Accepted " + path) } catch { - case fnfe: java.io.FileNotFoundException => + case fnfe: java.io.FileNotFoundException => logWarning("Error finding new files", fnfe) reset() return false http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index 97325f8..6376cff 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -31,11 +31,11 @@ class QueueInputDStream[T: ClassTag]( oneAtATime: Boolean, defaultRDD: RDD[T] ) extends InputDStream[T](ssc) { - + override def start() { } - + override def stop() { } - + override def compute(validTime: Time): Option[RDD[T]] = { val buffer = new ArrayBuffer[RDD[T]]() if (oneAtATime && queue.size > 0) { @@ -55,5 +55,5 @@ class QueueInputDStream[T: ClassTag]( None } } - + } http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index 44eb275..f5984d0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -47,7 +47,7 @@ object ReceiverSupervisorStrategy { * the API for pushing received data into Spark Streaming for being processed. * * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * + * * @example {{{ * class MyActor extends Actor with Receiver{ * def receive { http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala index c5ef2cc..39145a3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala @@ -19,34 +19,34 @@ package org.apache.spark.streaming.util private[streaming] trait Clock { - def currentTime(): Long + def currentTime(): Long def waitTillTime(targetTime: Long): Long } private[streaming] class SystemClock() extends Clock { - + val minPollTime = 25L - + def currentTime(): Long = { System.currentTimeMillis() - } - + } + def waitTillTime(targetTime: Long): Long = { var currentTime = 0L currentTime = System.currentTimeMillis() - + var waitTime = targetTime - currentTime if (waitTime <= 0) { return currentTime } - + val pollTime = { if (waitTime / 10.0 > minPollTime) { (waitTime / 10.0).toLong } else { - minPollTime - } + minPollTime + } } while (true) { @@ -55,7 +55,7 @@ class SystemClock() extends Clock { if (waitTime <= 0) { return currentTime } - val sleepTime = + val sleepTime = if (waitTime < pollTime) { waitTime } else { @@ -69,7 +69,7 @@ class SystemClock() extends Clock { private[streaming] class ManualClock() extends Clock { - + var time = 0L def currentTime() = time @@ -85,13 +85,13 @@ class ManualClock() extends Clock { this.synchronized { time += timeToAdd this.notifyAll() - } + } } def waitTillTime(targetTime: Long): Long = { this.synchronized { while (time < targetTime) { this.wait(100) - } + } } currentTime() } http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index 07021eb..bd1df55 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -25,8 +25,8 @@ import scala.collection.JavaConversions.mapAsScalaMap private[streaming] object RawTextHelper { - /** - * Splits lines and counts the words in them using specialized object-to-long hashmap + /** + * Splits lines and counts the words in them using specialized object-to-long hashmap * (to avoid boxing-unboxing overhead of Long in java/scala HashMap) */ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { @@ -55,13 +55,13 @@ object RawTextHelper { map.toIterator.map{case (k, v) => (k, v)} } - /** + /** * Gets the top k words in terms of word counts. Assumes that each word exists only once * in the `data` iterator (that is, the counts have been reduced). */ def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = { val taken = new Array[(String, Long)](k) - + var i = 0 var len = 0 var done = false @@ -93,7 +93,7 @@ object RawTextHelper { } taken.toIterator } - + /** * Warms up the SparkContext in master and slave by running tasks to force JIT kick in * before real workload starts. @@ -106,11 +106,11 @@ object RawTextHelper { .count() } } - - def add(v1: Long, v2: Long) = (v1 + v2) - def subtract(v1: Long, v2: Long) = (v1 - v2) + def add(v1: Long, v2: Long) = (v1 + v2) + + def subtract(v1: Long, v2: Long) = (v1 - v2) - def max(v1: Long, v2: Long) = math.max(v1, v2) + def max(v1: Long, v2: Long) = math.max(v1, v2) } http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index f71938a..e016377 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -22,10 +22,10 @@ import org.apache.spark.Logging private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) extends Logging { - + private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) - override def run() { loop } + override def run() { loop } } @volatile private var prevTime = -1L @@ -104,11 +104,11 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: private[streaming] object RecurringTimer { - + def main(args: Array[String]) { var lastRecurTime = 0L val period = 1000 - + def onRecur(time: Long) { val currentTime = System.currentTimeMillis() println("" + currentTime + ": " + (currentTime - lastRecurTime)) http://git-wip-us.apache.org/repos/asf/spark/blob/930b70f0/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 13fa648..a0b1bbc 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1673,7 +1673,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testSocketString() { - + class Converter implements Function> { public Iterable call(InputStream in) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(in));