spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-13071] Coalescing HadoopRDD overwrites existing input metrics
Date Sat, 30 Jan 2016 02:03:21 GMT
Repository: spark
Updated Branches:
  refs/heads/master 70e69fc4d -> 12252d1da


[SPARK-13071] Coalescing HadoopRDD overwrites existing input metrics

This issue is causing tests to fail consistently in master with Hadoop 2.6 / 2.7. This is
because for Hadoop 2.5+ we overwrite existing values of `InputMetrics#bytesRead` in each call
to `HadoopRDD#compute`. In the case of coalesce, e.g.
```
sc.textFile(..., 4).coalesce(2).count()
```
we will call `compute` multiple times in the same task, overwriting `bytesRead` values from
previous calls to `compute`.

For a regression test, see `InputOutputMetricsSuite.input metrics for old hadoop with coalesce`.
I did not add a new regression test because it's impossible without significant refactoring;
there's a lot of existing duplicate code in this corner of Spark.

This was caused by #10835.

Author: Andrew Or <andrew@databricks.com>

Closes #10973 from andrewor14/fix-input-metrics-coalesce.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12252d1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12252d1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12252d1d

Branch: refs/heads/master
Commit: 12252d1da90fa7d2dffa3a7c249ecc8821dee130
Parents: 70e69fc
Author: Andrew Or <andrew@databricks.com>
Authored: Fri Jan 29 18:03:04 2016 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Fri Jan 29 18:03:08 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala      | 7 ++++++-
 core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala   | 7 ++++++-
 .../spark/sql/execution/datasources/SqlNewHadoopRDD.scala     | 7 ++++++-
 3 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/12252d1d/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 3204e6a..e2ebd7f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -215,6 +215,7 @@ class HadoopRDD[K, V](
       // TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD
 
       val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+      val existingBytesRead = inputMetrics.bytesRead
 
       // Sets the thread local variable for the file's name
       split.inputSplit.value match {
@@ -230,9 +231,13 @@ class HadoopRDD[K, V](
         case _ => None
       }
 
+      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+      // If we do a coalesce, however, we are likely to compute multiple partitions in the
same
+      // task and in the same thread, in which case we need to avoid override values written
by
+      // previous partitions (SPARK-13071).
       def updateBytesRead(): Unit = {
         getBytesReadCallback.foreach { getBytesRead =>
-          inputMetrics.setBytesRead(getBytesRead())
+          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/12252d1d/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 4d2816e..e71d340 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -130,6 +130,7 @@ class NewHadoopRDD[K, V](
       val conf = getConf
 
       val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+      val existingBytesRead = inputMetrics.bytesRead
 
       // Find a function that will return the FileSystem bytes read by this thread. Do this
before
       // creating RecordReader, because RecordReader's constructor might read some bytes
@@ -139,9 +140,13 @@ class NewHadoopRDD[K, V](
         case _ => None
       }
 
+      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+      // If we do a coalesce, however, we are likely to compute multiple partitions in the
same
+      // task and in the same thread, in which case we need to avoid override values written
by
+      // previous partitions (SPARK-13071).
       def updateBytesRead(): Unit = {
         getBytesReadCallback.foreach { getBytesRead =>
-          inputMetrics.setBytesRead(getBytesRead())
+          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/12252d1d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index edd87c2..9703b16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -127,6 +127,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
       val conf = getConf(isDriverSide = false)
 
       val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+      val existingBytesRead = inputMetrics.bytesRead
 
       // Sets the thread local variable for the file's name
       split.serializableHadoopSplit.value match {
@@ -142,9 +143,13 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
         case _ => None
       }
 
+      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+      // If we do a coalesce, however, we are likely to compute multiple partitions in the
same
+      // task and in the same thread, in which case we need to avoid override values written
by
+      // previous partitions (SPARK-13071).
       def updateBytesRead(): Unit = {
         getBytesReadCallback.foreach { getBytesRead =>
-          inputMetrics.setBytesRead(getBytesRead())
+          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
         }
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message