spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: Replace use of .size with .length for Arrays
Date Tue, 07 Apr 2015 17:43:26 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7162ecf88 -> 2c32bef17


Replace use of .size with .length for Arrays

Invoking .size on arrays is valid, but requires an implicit conversion to SeqLike. This incurs
a compile time overhead and more importantly a runtime overhead, as the Array must be wrapped
before the method can be invoked. For example, the difference in generated byte code is:

  public int withSize();
    Code:
       0: getstatic     #23                 // Field scala/Predef$.MODULE$:Lscala/Predef$;
       3: aload_0
       4: invokevirtual #25                 // Method array:()[I
       7: invokevirtual #29                 // Method scala/Predef$.intArrayOps:([I)Lscala/collection/mutable/ArrayOps;
      10: invokeinterface #34,  1           // InterfaceMethod scala/collection/mutable/ArrayOps.size:()I
      15: ireturn

  public int withLength();
    Code:
       0: aload_0
       1: invokevirtual #25                 // Method array:()[I
       4: arraylength
       5: ireturn

Author: sksamuel <sam@sksamuel.com>

Closes #5376 from sksamuel/master and squashes the following commits:

77ec261 [sksamuel] Replace use of .size with .length for Arrays.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c32bef1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c32bef1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c32bef1

Branch: refs/heads/master
Commit: 2c32bef1790dac6f77ef9674f6106c2e24ea0338
Parents: 7162ecf
Author: sksamuel <sam@sksamuel.com>
Authored: Tue Apr 7 10:43:22 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Tue Apr 7 10:43:22 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/network/nio/Connection.scala   |  2 +-
 .../org/apache/spark/rdd/AsyncRDDActions.scala  | 10 ++++-----
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |  2 +-
 .../org/apache/spark/rdd/CartesianRDD.scala     |  4 ++--
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  2 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  4 ++--
 .../org/apache/spark/rdd/CoalescedRDD.scala     |  2 +-
 .../apache/spark/rdd/DoubleRDDFunctions.scala   |  4 ++--
 .../apache/spark/rdd/OrderedRDDFunctions.scala  |  2 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  2 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 22 ++++++++++----------
 .../apache/spark/rdd/RDDCheckpointData.scala    |  6 +++---
 .../org/apache/spark/rdd/SubtractedRDD.scala    |  2 +-
 .../scala/org/apache/spark/rdd/UnionRDD.scala   |  6 +++---
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  |  4 ++--
 .../apache/spark/rdd/ZippedWithIndexRDD.scala   |  2 +-
 .../org/apache/spark/storage/RDDInfo.scala      |  2 +-
 .../apache/spark/ui/ConsoleProgressBar.scala    |  4 ++--
 .../apache/spark/util/collection/BitSet.scala   |  2 +-
 19 files changed, 42 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
index 04eb2bf..6b898bd 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
@@ -181,7 +181,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
     buffer.get(bytes)
     bytes.foreach(x => print(x + " "))
     buffer.position(curPosition)
-    print(" (" + bytes.size + ")")
+    print(" (" + bytes.length + ")")
   }
 
   def printBuffer(buffer: ByteBuffer, position: Int, length: Int) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 646df28..3406a7e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -45,7 +45,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with
Loggi
         }
         result
       },
-      Range(0, self.partitions.size),
+      Range(0, self.partitions.length),
       (index: Int, data: Long) => totalCount.addAndGet(data),
       totalCount.get())
   }
@@ -54,8 +54,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with
Loggi
    * Returns a future for retrieving all elements of this RDD.
    */
   def collectAsync(): FutureAction[Seq[T]] = {
-    val results = new Array[Array[T]](self.partitions.size)
-    self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
+    val results = new Array[Array[T]](self.partitions.length)
+    self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length),
       (index, data) => results(index) = data, results.flatten.toSeq)
   }
 
@@ -111,7 +111,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable
with Loggi
    */
   def foreachAsync(f: T => Unit): FutureAction[Unit] = {
     val cleanF = self.context.clean(f)
-    self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
+    self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length),
       (index, data) => Unit, Unit)
   }
 
@@ -119,7 +119,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable
with Loggi
    * Applies a function f to each partition of this RDD.
    */
   def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
-    self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size),
+    self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
       (index, data) => Unit, Unit)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index fffa191..71578d1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -36,7 +36,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val
blockIds
 
   override def getPartitions: Array[Partition] = {
     assertValid()
-    (0 until blockIds.size).map(i => {
+    (0 until blockIds.length).map(i => {
       new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
     }).toArray
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index 9059eb1..c1d6971 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -53,11 +53,11 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
   extends RDD[Pair[T, U]](sc, Nil)
   with Serializable {
 
-  val numPartitionsInRdd2 = rdd2.partitions.size
+  val numPartitionsInRdd2 = rdd2.partitions.length
 
   override def getPartitions: Array[Partition] = {
     // create the cross product split
-    val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
+    val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
     for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
       val idx = s1.index * numPartitionsInRdd2 + s2.index
       array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 760c0fa..0d130dd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -49,7 +49,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
     if (fs.exists(cpath)) {
       val dirContents = fs.listStatus(cpath).map(_.getPath)
       val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
-      val numPart =  partitionFiles.size
+      val numPart =  partitionFiles.length
       if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0))
||
           ! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1))))
{
         throw new SparkException("Invalid checkpoint directory: " + checkpointPath)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 07398a6..7021a33 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -99,7 +99,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
part:
 
   override def getPartitions: Array[Partition] = {
     val array = new Array[Partition](part.numPartitions)
-    for (i <- 0 until array.size) {
+    for (i <- 0 until array.length) {
       // Each CoGroupPartition will have a dependency per contributing RDD
       array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
         // Assume each RDD contributed a single dependency, and get it
@@ -120,7 +120,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K,
_]]], part:
     val sparkConf = SparkEnv.get.conf
     val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
     val split = s.asInstanceOf[CoGroupPartition]
-    val numRdds = split.deps.size
+    val numRdds = split.deps.length
 
     // A list of (rdd iterator, dependency number) pairs
     val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 5117ccf..0c1b02c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -166,7 +166,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
 
   // determines the tradeoff between load-balancing the partitions sizes and their locality
   // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
-  val slack = (balanceSlack * prev.partitions.size).toInt
+  val slack = (balanceSlack * prev.partitions.length).toInt
 
   var noLocality = true  // if true if no preferredLocations exists for parent RDD
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 71e6e30..29ca3e9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -70,7 +70,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable
{
   @Experimental
   def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
= {
     val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
-    val evaluator = new MeanEvaluator(self.partitions.size, confidence)
+    val evaluator = new MeanEvaluator(self.partitions.length, confidence)
     self.context.runApproximateJob(self, processPartition, evaluator, timeout)
   }
 
@@ -81,7 +81,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable
{
   @Experimental
   def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] =
{
     val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
-    val evaluator = new SumEvaluator(self.partitions.size, confidence)
+    val evaluator = new SumEvaluator(self.partitions.length, confidence)
     self.context.runApproximateJob(self, processPartition, evaluator, timeout)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 6fdfdb7..6afe501 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -56,7 +56,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
    * order of the keys).
    */
   // TODO: this currently doesn't work on P other than Tuple2!
-  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
+  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
       : RDD[(K, V)] =
   {
     val part = new RangePartitioner(numPartitions, self, ascending)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index bf1303d..05351ba 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -823,7 +823,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * RDD will be <= us.
    */
   def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] =
-    subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
+    subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length)))
 
   /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
   def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ddbfd56..d80d94a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -316,7 +316,7 @@ abstract class RDD[T: ClassTag](
   /**
    * Return a new RDD containing the distinct elements in this RDD.
    */
-  def distinct(): RDD[T] = distinct(partitions.size)
+  def distinct(): RDD[T] = distinct(partitions.length)
 
   /**
    * Return a new RDD that has exactly numPartitions partitions.
@@ -488,7 +488,7 @@ abstract class RDD[T: ClassTag](
   def sortBy[K](
       f: (T) => K,
       ascending: Boolean = true,
-      numPartitions: Int = this.partitions.size)
+      numPartitions: Int = this.partitions.length)
       (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
     this.keyBy[K](f)
         .sortByKey(ascending, numPartitions)
@@ -852,7 +852,7 @@ abstract class RDD[T: ClassTag](
    * RDD will be &lt;= us.
    */
   def subtract(other: RDD[T]): RDD[T] =
-    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
+    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
 
   /**
    * Return an RDD with the elements from `this` that are not in `other`.
@@ -986,14 +986,14 @@ abstract class RDD[T: ClassTag](
       combOp: (U, U) => U,
       depth: Int = 2): U = {
     require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
-    if (partitions.size == 0) {
+    if (partitions.length == 0) {
       return Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
     }
     val cleanSeqOp = context.clean(seqOp)
     val cleanCombOp = context.clean(combOp)
     val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp,
cleanCombOp)
     var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
-    var numPartitions = partiallyAggregated.partitions.size
+    var numPartitions = partiallyAggregated.partitions.length
     val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
     // If creating an extra level doesn't help reduce the wall-clock time, we stop tree aggregation.
     while (numPartitions > scale + numPartitions / scale) {
@@ -1026,7 +1026,7 @@ abstract class RDD[T: ClassTag](
       }
       result
     }
-    val evaluator = new CountEvaluator(partitions.size, confidence)
+    val evaluator = new CountEvaluator(partitions.length, confidence)
     sc.runApproximateJob(this, countElements, evaluator, timeout)
   }
 
@@ -1061,7 +1061,7 @@ abstract class RDD[T: ClassTag](
       }
       map
     }
-    val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence)
+    val evaluator = new GroupedCountEvaluator[T](partitions.length, confidence)
     sc.runApproximateJob(this, countPartition, evaluator, timeout)
   }
 
@@ -1140,7 +1140,7 @@ abstract class RDD[T: ClassTag](
    * the same index assignments, you should sort the RDD with sortByKey() or save it to a
file.
    */
   def zipWithUniqueId(): RDD[(T, Long)] = {
-    val n = this.partitions.size.toLong
+    val n = this.partitions.length.toLong
     this.mapPartitionsWithIndex { case (k, iter) =>
       iter.zipWithIndex.map { case (item, i) =>
         (item, i * n + k)
@@ -1243,7 +1243,7 @@ abstract class RDD[T: ClassTag](
         queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
         Iterator.single(queue)
       }
-      if (mapRDDs.partitions.size == 0) {
+      if (mapRDDs.partitions.length == 0) {
         Array.empty
       } else {
         mapRDDs.reduce { (queue1, queue2) =>
@@ -1489,7 +1489,7 @@ abstract class RDD[T: ClassTag](
     }
     // The first RDD in the dependency stack has no parents, so no need for a +-
     def firstDebugString(rdd: RDD[_]): Seq[String] = {
-      val partitionStr = "(" + rdd.partitions.size + ")"
+      val partitionStr = "(" + rdd.partitions.length + ")"
       val leftOffset = (partitionStr.length - 1) / 2
       val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))
 
@@ -1499,7 +1499,7 @@ abstract class RDD[T: ClassTag](
       } ++ debugChildren(rdd, nextPrefix)
     }
     def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String]
= {
-      val partitionStr = "(" + rdd.partitions.size + ")"
+      val partitionStr = "(" + rdd.partitions.length + ")"
       val leftOffset = (partitionStr.length - 1) / 2
       val thisPrefix = prefix.replaceAll("\\|\\s+$", "")
       val nextPrefix = (

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index f67e5f1..6afd63d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -94,10 +94,10 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
       new SerializableWritable(rdd.context.hadoopConfiguration))
     rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf)
_)
     val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
-    if (newRDD.partitions.size != rdd.partitions.size) {
+    if (newRDD.partitions.length != rdd.partitions.length) {
       throw new SparkException(
-        "Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
-          "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
+        "Checkpoint RDD " + newRDD + "(" + newRDD.partitions.length + ") has different "
+
+          "number of partitions than original RDD " + rdd + "(" + rdd.partitions.length +
")")
     }
 
     // Change the dependencies and partitions of the RDD

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index c27f435..e9d7455 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -76,7 +76,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
 
   override def getPartitions: Array[Partition] = {
     val array = new Array[Partition](part.numPartitions)
-    for (i <- 0 until array.size) {
+    for (i <- 0 until array.length) {
       // Each CoGroupPartition will depend on rdd1 and rdd2
       array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j)
=>
         dependencies(j) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 4239e7e..3986645 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -63,7 +63,7 @@ class UnionRDD[T: ClassTag](
   extends RDD[T](sc, Nil) {  // Nil since we implement getDependencies
 
   override def getPartitions: Array[Partition] = {
-    val array = new Array[Partition](rdds.map(_.partitions.size).sum)
+    val array = new Array[Partition](rdds.map(_.partitions.length).sum)
     var pos = 0
     for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
       array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
@@ -76,8 +76,8 @@ class UnionRDD[T: ClassTag](
     val deps = new ArrayBuffer[Dependency[_]]
     var pos = 0
     for (rdd <- rdds) {
-      deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size)
-      pos += rdd.partitions.size
+      deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
+      pos += rdd.partitions.length
     }
     deps
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index d0be304..a96b6c3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -52,8 +52,8 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
     if (preservesPartitioning) firstParent[Any].partitioner else None
 
   override def getPartitions: Array[Partition] = {
-    val numParts = rdds.head.partitions.size
-    if (!rdds.forall(rdd => rdd.partitions.size == numParts)) {
+    val numParts = rdds.head.partitions.length
+    if (!rdds.forall(rdd => rdd.partitions.length == numParts)) {
       throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
     }
     Array.tabulate[Partition](numParts) { i =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index 8c43a55..523aaf2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -41,7 +41,7 @@ class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T,
L
 
   /** The start index of each partition. */
   @transient private val startIndices: Array[Long] = {
-    val n = prev.partitions.size
+    val n = prev.partitions.length
     if (n == 0) {
       Array[Long]()
     } else if (n == 1) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 0186eb3..034525b 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -52,6 +52,6 @@ class RDDInfo(
 private[spark] object RDDInfo {
   def fromRdd(rdd: RDD[_]): RDDInfo = {
     val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
-    new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
+    new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
index 67f572e..77c0bc8 100644
--- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
@@ -65,7 +65,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging
{
     val stageIds = sc.statusTracker.getActiveStageIds()
     val stages = stageIds.map(sc.statusTracker.getStageInfo).flatten.filter(_.numTasks()
> 1)
       .filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId())
-    if (stages.size > 0) {
+    if (stages.length > 0) {
       show(now, stages.take(3))  // display at most 3 stages in same time
     }
   }
@@ -81,7 +81,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging
{
       val total = s.numTasks()
       val header = s"[Stage ${s.stageId()}:"
       val tailer = s"(${s.numCompletedTasks()} + ${s.numActiveTasks()}) / $total]"
-      val w = width - header.size - tailer.size
+      val w = width - header.length - tailer.length
       val bar = if (w > 0) {
         val percent = w * s.numCompletedTasks() / total
         (0 until w).map { i =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2c32bef1/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index f79e8e0..41cb8cf 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -39,7 +39,7 @@ class BitSet(numBits: Int) extends Serializable {
     val wordIndex = bitIndex >> 6 // divide by 64
     var i = 0
     while(i < wordIndex) { words(i) = -1; i += 1 }
-    if(wordIndex < words.size) {
+    if(wordIndex < words.length) {
       // Set the remaining bits (note that the mask could still be zero)
       val mask = ~(-1L << (bitIndex & 0x3f))
       words(wordIndex) |= mask


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message