spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-15322][SQL][FOLLOW-UP] Update deprecated accumulator usage into accumulatorV2
Date Thu, 19 May 2016 18:54:57 GMT
Repository: spark
Updated Branches:
  refs/heads/master faafd1e9d -> f5065abf4


[SPARK-15322][SQL][FOLLOW-UP] Update deprecated accumulator usage into accumulatorV2

## What changes were proposed in this pull request?

This PR corrects another case that uses deprecated `accumulableCollection` to use `listAccumulator`,
which seems the previous PR missed.

Since `ArrayBuffer[InternalRow].asJava` is `java.util.List[InternalRow]`, it seems ok to replace
the usage.

## How was this patch tested?

Related existing tests `InMemoryColumnarQuerySuite` and `CachedTableSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #13187 from HyukjinKwon/SPARK-15322.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5065abf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5065abf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5065abf

Branch: refs/heads/master
Commit: f5065abf49dea0eac04b0ec219f2d832a0f6730a
Parents: faafd1e
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Thu May 19 11:54:50 2016 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Thu May 19 11:54:50 2016 -0700

----------------------------------------------------------------------
 .../execution/columnar/InMemoryTableScanExec.scala  | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f5065abf/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 009fbaa..ba61940 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql.execution.columnar
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
 
 import org.apache.commons.lang.StringUtils
 
-import org.apache.spark.{Accumulable, Accumulator}
+import org.apache.spark.Accumulator
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.UserDefinedType
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.AccumulatorContext
+import org.apache.spark.util.{AccumulatorContext, ListAccumulator}
 
 
 private[sql] object InMemoryRelation {
@@ -67,14 +67,14 @@ private[sql] case class InMemoryRelation(
     tableName: Option[String])(
     @transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null,
     @transient private[sql] var _statistics: Statistics = null,
-    private[sql] var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
+    private[sql] var _batchStats: ListAccumulator[InternalRow] = null)
   extends logical.LeafNode with MultiInstanceRelation {
 
   override def producedAttributes: AttributeSet = outputSet
 
-  private[sql] val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] =
+  private[sql] val batchStats: ListAccumulator[InternalRow] =
     if (_batchStats == null) {
-      child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow])
+      child.sqlContext.sparkContext.listAccumulator[InternalRow]
     } else {
       _batchStats
     }
@@ -87,7 +87,7 @@ private[sql] case class InMemoryRelation(
         output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
         partitionStatistics.schema)
 
-    batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
+    batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
   }
 
   // Statistics propagation contracts:
@@ -169,7 +169,7 @@ private[sql] case class InMemoryRelation(
           val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
                         .flatMap(_.values))
 
-          batchStats += stats
+          batchStats.add(stats)
           CachedBatch(rowCount, columnBuilders.map { builder =>
             JavaUtils.bufferToArray(builder.build())
           }, stats)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message