Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4339918519 for ; Sat, 30 Jan 2016 02:03:21 +0000 (UTC) Received: (qmail 45822 invoked by uid 500); 30 Jan 2016 02:03:21 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 45794 invoked by uid 500); 30 Jan 2016 02:03:21 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 45785 invoked by uid 99); 30 Jan 2016 02:03:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Jan 2016 02:03:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0B5D6E0A12; Sat, 30 Jan 2016 02:03:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andrewor14@apache.org To: commits@spark.apache.org Message-Id: <279a69a9260544da886dafd6555b6617@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-13071] Coalescing HadoopRDD overwrites existing input metrics Date: Sat, 30 Jan 2016 02:03:21 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master 70e69fc4d -> 12252d1da [SPARK-13071] Coalescing HadoopRDD overwrites existing input metrics This issue is causing tests to fail consistently in master with Hadoop 2.6 / 2.7. This is because for Hadoop 2.5+ we overwrite existing values of `InputMetrics#bytesRead` in each call to `HadoopRDD#compute`. In the case of coalesce, e.g. ``` sc.textFile(..., 4).coalesce(2).count() ``` we will call `compute` multiple times in the same task, overwriting `bytesRead` values from previous calls to `compute`. For a regression test, see `InputOutputMetricsSuite.input metrics for old hadoop with coalesce`. I did not add a new regression test because it's impossible without significant refactoring; there's a lot of existing duplicate code in this corner of Spark. This was caused by #10835. Author: Andrew Or Closes #10973 from andrewor14/fix-input-metrics-coalesce. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12252d1d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12252d1d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12252d1d Branch: refs/heads/master Commit: 12252d1da90fa7d2dffa3a7c249ecc8821dee130 Parents: 70e69fc Author: Andrew Or Authored: Fri Jan 29 18:03:04 2016 -0800 Committer: Andrew Or Committed: Fri Jan 29 18:03:08 2016 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 7 ++++++- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 7 ++++++- .../spark/sql/execution/datasources/SqlNewHadoopRDD.scala | 7 ++++++- 3 files changed, 18 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/12252d1d/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 3204e6a..e2ebd7f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -215,6 +215,7 @@ class HadoopRDD[K, V]( // TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) + val existingBytesRead = inputMetrics.bytesRead // Sets the thread local variable for the file's name split.inputSplit.value match { @@ -230,9 +231,13 @@ class HadoopRDD[K, V]( case _ => None } + // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). def updateBytesRead(): Unit = { getBytesReadCallback.foreach { getBytesRead => - inputMetrics.setBytesRead(getBytesRead()) + inputMetrics.setBytesRead(existingBytesRead + getBytesRead()) } } http://git-wip-us.apache.org/repos/asf/spark/blob/12252d1d/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 4d2816e..e71d340 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -130,6 +130,7 @@ class NewHadoopRDD[K, V]( val conf = getConf val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) + val existingBytesRead = inputMetrics.bytesRead // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes @@ -139,9 +140,13 @@ class NewHadoopRDD[K, V]( case _ => None } + // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). def updateBytesRead(): Unit = { getBytesReadCallback.foreach { getBytesRead => - inputMetrics.setBytesRead(getBytesRead()) + inputMetrics.setBytesRead(existingBytesRead + getBytesRead()) } } http://git-wip-us.apache.org/repos/asf/spark/blob/12252d1d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala index edd87c2..9703b16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala @@ -127,6 +127,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( val conf = getConf(isDriverSide = false) val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) + val existingBytesRead = inputMetrics.bytesRead // Sets the thread local variable for the file's name split.serializableHadoopSplit.value match { @@ -142,9 +143,13 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( case _ => None } + // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). def updateBytesRead(): Unit = { getBytesReadCallback.foreach { getBytesRead => - inputMetrics.setBytesRead(getBytesRead()) + inputMetrics.setBytesRead(existingBytesRead + getBytesRead()) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org