spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-25674][FOLLOW-UP] Update the stats for each ColumnarBatch
Date Tue, 16 Oct 2018 02:20:54 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 d64b35588 -> 8bc7ab03d


[SPARK-25674][FOLLOW-UP] Update the stats for each ColumnarBatch

## What changes were proposed in this pull request?
This PR is a follow-up of https://github.com/apache/spark/pull/22594 . This alternative can
avoid the unneeded computation in the hot code path.

- For row-based scan, we keep the original way.
- For the columnar scan, we just need to update the stats after each batch.

## How was this patch tested?
N/A

Closes #22731 from gatorsmile/udpateStatsFileScanRDD.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4cee191c04f14d7272347e4b29201763c6cfb6bf)
Signed-off-by: Sean Owen <sean.owen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bc7ab03
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bc7ab03
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bc7ab03

Branch: refs/heads/branch-2.4
Commit: 8bc7ab03dd417f0f9cca509df4566d621e633305
Parents: d64b355
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Tue Oct 16 08:58:29 2018 +0800
Committer: Sean Owen <sean.owen@databricks.com>
Committed: Mon Oct 15 21:20:49 2018 -0500

----------------------------------------------------------------------
 .../sql/execution/datasources/FileScanRDD.scala      | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8bc7ab03/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index dd3c154..ffea33c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -85,7 +85,7 @@ class FileScanRDD(
       // If we do a coalesce, however, we are likely to compute multiple partitions in the
same
       // task and in the same thread, in which case we need to avoid override values written
by
       // previous partitions (SPARK-13071).
-      private def updateBytesRead(): Unit = {
+      private def incTaskInputMetricsBytesRead(): Unit = {
         inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
       }
 
@@ -106,15 +106,16 @@ class FileScanRDD(
         // don't need to run this `if` for every record.
         val preNumRecordsRead = inputMetrics.recordsRead
         if (nextElement.isInstanceOf[ColumnarBatch]) {
+          incTaskInputMetricsBytesRead()
           inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
         } else {
+          // too costly to update every record
+          if (inputMetrics.recordsRead %
+              SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+            incTaskInputMetricsBytesRead()
+          }
           inputMetrics.incRecordsRead(1)
         }
-        // The records may be incremented by more than 1 at a time.
-        if (preNumRecordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS !=
-          inputMetrics.recordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS)
{
-          updateBytesRead()
-        }
         nextElement
       }
 
@@ -201,7 +202,7 @@ class FileScanRDD(
       }
 
       override def close(): Unit = {
-        updateBytesRead()
+        incTaskInputMetricsBytesRead()
         InputFileBlockHolder.unset()
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message