spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-15569] Reduce frequency of updateBytesWritten function in Disk…
Date Fri, 27 May 2016 18:22:46 GMT
Repository: spark
Updated Branches:
  refs/heads/master 5bdbedf22 -> ce756daa4


[SPARK-15569] Reduce frequency of updateBytesWritten function in Disk…

## What changes were proposed in this pull request?

Profiling a Spark job spilling large amount of intermediate data we found that significant
portion of time is being spent in DiskObjectWriter.updateBytesWritten function. Looking at
the code, we see that the function is being called too frequently to update the number of
bytes written to disk. We should reduce the frequency to avoid this.

## How was this patch tested?

Tested by running the job on cluster and saw 20% CPU gain  by this change.

Author: Sital Kedia <skedia@fb.com>

Closes #13332 from sitalkedia/DiskObjectWriter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce756daa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce756daa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce756daa

Branch: refs/heads/master
Commit: ce756daa4f012ebdc5a41bf5a89ff11b6dfdab8c
Parents: 5bdbedf
Author: Sital Kedia <skedia@fb.com>
Authored: Fri May 27 11:22:39 2016 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Fri May 27 11:22:39 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/storage/DiskBlockObjectWriter.scala    |  3 +--
 .../spark/storage/DiskBlockObjectWriterSuite.scala      | 12 ++++++------
 2 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce756daa/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index ab97d2e..5b493f4 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -203,8 +203,7 @@ private[spark] class DiskBlockObjectWriter(
     numRecordsWritten += 1
     writeMetrics.incRecordsWritten(1)
 
-    // TODO: call updateBytesWritten() less frequently.
-    if (numRecordsWritten % 32 == 0) {
+    if (numRecordsWritten % 16384 == 0) {
       updateBytesWritten()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ce756daa/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 8eff3c2..ec4ef4b 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -53,13 +53,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach
{
     assert(writeMetrics.recordsWritten === 1)
     // Metrics don't update on every write
     assert(writeMetrics.bytesWritten == 0)
-    // After 32 writes, metrics should update
-    for (i <- 0 until 32) {
+    // After 16384 writes, metrics should update
+    for (i <- 0 until 16384) {
       writer.flush()
       writer.write(Long.box(i), Long.box(i))
     }
     assert(writeMetrics.bytesWritten > 0)
-    assert(writeMetrics.recordsWritten === 33)
+    assert(writeMetrics.recordsWritten === 16385)
     writer.commitAndClose()
     assert(file.length() == writeMetrics.bytesWritten)
   }
@@ -75,13 +75,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach
{
     assert(writeMetrics.recordsWritten === 1)
     // Metrics don't update on every write
     assert(writeMetrics.bytesWritten == 0)
-    // After 32 writes, metrics should update
-    for (i <- 0 until 32) {
+    // After 16384 writes, metrics should update
+    for (i <- 0 until 16384) {
       writer.flush()
       writer.write(Long.box(i), Long.box(i))
     }
     assert(writeMetrics.bytesWritten > 0)
-    assert(writeMetrics.recordsWritten === 33)
+    assert(writeMetrics.recordsWritten === 16385)
     writer.revertPartialWritesAndClose()
     assert(writeMetrics.bytesWritten == 0)
     assert(writeMetrics.recordsWritten == 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message