tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject tez git commit: TEZ-3103. Shuffle can hang when memory to memory merging enabled (jlowe)
Date Fri, 12 Feb 2016 18:36:12 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 6852f79aa -> 126217147


TEZ-3103. Shuffle can hang when memory to memory merging enabled (jlowe)

Conflicts:

	CHANGES.txt
	tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/12621714
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/12621714
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/12621714

Branch: refs/heads/branch-0.7
Commit: 1262171477cfe5ad4dfeba7d3fbcdd6d497850af
Parents: 6852f79
Author: Jason Lowe <jlowe@apache.org>
Authored: Fri Feb 12 18:34:04 2016 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Fri Feb 12 18:35:17 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../shuffle/orderedgrouped/MergeManager.java    | 21 ++++++
 .../orderedgrouped/TestMergeManager.java        | 73 ++++++++++++++++++++
 3 files changed, 95 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/12621714/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e991566..adf23a9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES
   TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime).
+  TEZ-3103. Shuffle can hang when memory to memory merging enabled
   TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier
   TEZ-3093. CriticalPathAnalyzer should be accessible via zeppelin
   TEZ-3089. TaskConcurrencyAnalyzer can return negative task count with very large jobs

http://git-wip-us.apache.org/repos/asf/tez/blob/12621714/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index f5e95cb..def8175 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -488,6 +488,12 @@ public class MergeManager {
     LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
              ", inMemoryMergedMapOutputs.size() -> " + 
              inMemoryMergedMapOutputs.size());
+
+    commitMemory += mapOutput.getSize();
+
+    if (commitMemory >= mergeThreshold) {
+      startMemToDiskMerge();
+    }
   }
 
   public FileSystem getLocalFileSystem() {
@@ -1051,4 +1057,19 @@ public class MergeManager {
                  comparator, progressable, spilledRecordsCounter, null,
                  additionalBytesRead, null);
   }
+
+  @VisibleForTesting
+  long getCommitMemory() {
+    return commitMemory;
+  }
+
+  @VisibleForTesting
+  long getUsedMemory() {
+    return usedMemory;
+  }
+
+  @VisibleForTesting
+  void waitForMemToMemMerge() throws InterruptedException {
+    memToMemMerger.waitForMerge();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/12621714/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index a1b3003..72273e0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -29,6 +29,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
@@ -50,6 +52,7 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.junit.After;
@@ -172,6 +175,76 @@ public class TestMergeManager {
     Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable);
   }
 
+  @Test(timeout=20000)
+  public void testIntermediateMemoryMergeAccounting() throws Exception {
+    Configuration conf = new TezConfiguration(defaultConf);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, true);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 2);
+
+    Path localDir = new Path(workDir, "local");
+    Path srcDir = new Path(workDir, "srcData");
+    localFs.mkdirs(localDir);
+    localFs.mkdirs(srcDir);
+
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString());
+
+    FileSystem localFs = FileSystem.getLocal(conf);
+    LocalDirAllocator localDirAllocator =
+        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());
+
+    ExceptionReporter exceptionReporter = mock(ExceptionReporter.class);
+
+    MergeManager mergeManager =
+        new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null,
null,
+            exceptionReporter, 2000000, null, false, -1);
+    mergeManager.configureAndStart();
+
+    assertEquals(0, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+
+    byte[] data1 = generateData(conf, 10);
+    byte[] data2 = generateData(conf, 20);
+    MapOutput firstMapOutput = mergeManager.reserve(null, data1.length, data1.length, 0);
+    MapOutput secondMapOutput = mergeManager.reserve(null, data2.length, data2.length, 0);
+    assertEquals(MapOutput.Type.MEMORY, firstMapOutput.getType());
+    assertEquals(MapOutput.Type.MEMORY, secondMapOutput.getType());
+    assertEquals(0, mergeManager.getCommitMemory());
+    assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
+
+    System.arraycopy(data1, 0, firstMapOutput.getMemory(), 0, data1.length);
+    System.arraycopy(data2, 0, secondMapOutput.getMemory(), 0, data2.length);
+
+    secondMapOutput.commit();
+    assertEquals(data2.length, mergeManager.getCommitMemory());
+    assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
+    firstMapOutput.commit();
+
+    mergeManager.waitForMemToMemMerge();
+    assertEquals(data1.length + data2.length, mergeManager.getCommitMemory());
+    assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
+  }
+
+  private byte[] generateData(Configuration conf, int numEntries) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+    IFile.Writer writer =
+        new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null);
+    for (int i = 0; i < numEntries; ++i) {
+      writer.append(new IntWritable(i), new IntWritable(i));
+    }
+    writer.close();
+    int compressedLength = (int)writer.getCompressedLength();
+    int rawLength = (int)writer.getRawLength();
+    byte[] data = new byte[rawLength];
+    ShuffleUtils.shuffleToMemory(data, new ByteArrayInputStream(baos.toByteArray()),
+        rawLength, compressedLength, null, false, 0, LOG, "sometask");
+    return data;
+  }
+
   @Test(timeout = 10000)
   public void testLocalDiskMergeMultipleTasks() throws IOException {
 


Mime
View raw message