spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [12/12] git commit: Merge pull request #130 from aarondav/shuffle
Date Tue, 05 Nov 2013 01:54:26 GMT
Merge pull request #130 from aarondav/shuffle

Memory-optimized shuffle file consolidation

Reduces overhead of each shuffle block for consolidation from >300 bytes to 8 bytes (1
primitive Long). Verified via profiler testing with 1 mil shuffle blocks, net overhead was
~8,400,000 bytes.

Despite the memory-optimized implementation incurring extra CPU overhead, the runtime of the
shuffle phase in this test was only around 2% slower, while the reduce phase was 40% faster,
when compared to not using any shuffle file consolidation.

This is accomplished by replacing the map from ShuffleBlockId to FileSegment (i.e., block
id to where it's located), which had high overhead due to being a gigantic, timestamped, concurrent
map with a more space-efficient structure. Namely, the following are introduced (I have omitted
the word "Shuffle" from some names for clarity):
**ShuffleFile** - there is one ShuffleFile per consolidated shuffle file on disk. We store
an array of offsets into the physical shuffle file for each ShuffleMapTask that wrote into
the file. This is sufficient to reconstruct FileSegments for mappers that are in the file.
**FileGroup** - contains a set of ShuffleFiles, one per reducer, that a MapTask can use to
write its output. There is one FileGroup created per _concurrent_ MapTask. The FileGroup contains
an array of the mapIds that have been written to all files in the group. The positions of
elements in this array map directly onto the positions in each ShuffleFile's offsets array.

In order to locate the FileSegment associated with a BlockId, we have another structure which
maps each reducer to the set of ShuffleFiles that were created for it. (There will be as many
ShuffleFiles per reducer as there are FileGroups.) To lookup a given ShuffleBlockId (shuffleId,
reducerId, mapId), we thus search through all ShuffleFiles associated with that reducer.

As a time optimization, we ensure that FileGroups are only reused for MapTasks with monotonically
increasing mapIds. This allows us to perform a binary search to locate a mapId inside a group,
and also enables potential future optimization (based on the usual monotonic access order).


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7a26104a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7a26104a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7a26104a

Branch: refs/heads/master
Commit: 7a26104ab7cb492b347ba761ef1f17ca1b9078e4
Parents: b5dc339 1ba11b1
Author: Reynold Xin <rxin@apache.org>
Authored: Mon Nov 4 17:54:06 2013 -0800
Committer: Reynold Xin <rxin@apache.org>
Committed: Mon Nov 4 17:54:06 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/ShuffleMapTask.scala |  23 +--
 .../org/apache/spark/storage/BlockManager.scala |  10 +-
 .../spark/storage/BlockObjectWriter.scala       |  15 +-
 .../apache/spark/storage/DiskBlockManager.scala |  49 +----
 .../org/apache/spark/storage/DiskStore.scala    |   4 +-
 .../spark/storage/ShuffleBlockManager.scala     | 189 +++++++++++++++----
 .../spark/storage/StoragePerfTester.scala       |  10 +-
 .../org/apache/spark/util/MetadataCleaner.scala |   2 +-
 .../collection/PrimitiveKeyOpenHashMap.scala    |   6 +
 .../spark/util/collection/PrimitiveVector.scala |  51 +++++
 .../spark/storage/DiskBlockManagerSuite.scala   |  84 +++++++++
 11 files changed, 333 insertions(+), 110 deletions(-)
----------------------------------------------------------------------



Mime
View raw message