tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [18/24] tez git commit: TEZ-3103. Shuffle can hang when memory to memory merging enabled (jlowe)
Date Thu, 18 Feb 2016 09:58:12 GMT
TEZ-3103. Shuffle can hang when memory to memory merging enabled (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a2c590bc
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a2c590bc
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a2c590bc

Branch: refs/heads/TEZ-2980
Commit: a2c590bcb00de093e8365c4c423d5014777cfacf
Parents: 7e3d546
Author: Jason Lowe <jlowe@apache.org>
Authored: Fri Feb 12 18:19:29 2016 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Fri Feb 12 18:19:29 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../shuffle/orderedgrouped/MergeManager.java    | 21 ++++++
 .../orderedgrouped/TestMergeManager.java        | 74 ++++++++++++++++++++
 3 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a2c590bc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8cb7505..5f09280 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3103. Shuffle can hang when memory to memory merging enabled
   TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime).
   TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier
   TEZ-3090. MRInput should make dagIdentifier, vertexIdentifier, etc available to the InputFormat
jobConf.
@@ -331,6 +332,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-3103. Shuffle can hang when memory to memory merging enabled
   TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier
   TEZ-3093. CriticalPathAnalyzer should be accessible via zeppelin.
   TEZ-3089. TaskConcurrencyAnalyzer can return negative task count with very large jobs.

http://git-wip-us.apache.org/repos/asf/tez/blob/a2c590bc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index dfa509f..b56a9a8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -500,6 +500,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped
{
     LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
              ", inMemoryMergedMapOutputs.size() -> " + 
              inMemoryMergedMapOutputs.size());
+
+    commitMemory += mapOutput.getSize();
+
+    if (commitMemory >= mergeThreshold) {
+      startMemToDiskMerge();
+    }
   }
 
   @Override
@@ -1155,4 +1161,19 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped
{
                  comparator, progressable, spilledRecordsCounter, null,
                  additionalBytesRead, null);
   }
+
+  @VisibleForTesting
+  long getCommitMemory() {
+    return commitMemory;
+  }
+
+  @VisibleForTesting
+  long getUsedMemory() {
+    return usedMemory;
+  }
+
+  @VisibleForTesting
+  void waitForMemToMemMerge() throws InterruptedException {
+    memToMemMerger.waitForMerge();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a2c590bc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index b8f99de..c62c116 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -29,12 +29,15 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 
 import com.google.common.collect.Sets;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -52,6 +55,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.junit.After;
@@ -174,6 +178,76 @@ public class TestMergeManager {
     Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable);
   }
 
+  @Test(timeout=20000)
+  public void testIntermediateMemoryMergeAccounting() throws Exception {
+    Configuration conf = new TezConfiguration(defaultConf);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, true);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 2);
+
+    Path localDir = new Path(workDir, "local");
+    Path srcDir = new Path(workDir, "srcData");
+    localFs.mkdirs(localDir);
+    localFs.mkdirs(srcDir);
+
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString());
+
+    FileSystem localFs = FileSystem.getLocal(conf);
+    LocalDirAllocator localDirAllocator =
+        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());
+
+    ExceptionReporter exceptionReporter = mock(ExceptionReporter.class);
+
+    MergeManager mergeManager =
+        new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null,
null,
+            exceptionReporter, 2000000, null, false, -1);
+    mergeManager.configureAndStart();
+
+    assertEquals(0, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+
+    byte[] data1 = generateData(conf, 10);
+    byte[] data2 = generateData(conf, 20);
+    MapOutput firstMapOutput = mergeManager.reserve(null, data1.length, data1.length, 0);
+    MapOutput secondMapOutput = mergeManager.reserve(null, data2.length, data2.length, 0);
+    assertEquals(MapOutput.Type.MEMORY, firstMapOutput.getType());
+    assertEquals(MapOutput.Type.MEMORY, secondMapOutput.getType());
+    assertEquals(0, mergeManager.getCommitMemory());
+    assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
+
+    System.arraycopy(data1, 0, firstMapOutput.getMemory(), 0, data1.length);
+    System.arraycopy(data2, 0, secondMapOutput.getMemory(), 0, data2.length);
+
+    secondMapOutput.commit();
+    assertEquals(data2.length, mergeManager.getCommitMemory());
+    assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
+    firstMapOutput.commit();
+
+    mergeManager.waitForMemToMemMerge();
+    assertEquals(data1.length + data2.length, mergeManager.getCommitMemory());
+    assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
+  }
+
+  private byte[] generateData(Configuration conf, int numEntries) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+    IFile.Writer writer =
+        new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null);
+    for (int i = 0; i < numEntries; ++i) {
+      writer.append(new IntWritable(i), new IntWritable(i));
+    }
+    writer.close();
+    int compressedLength = (int)writer.getCompressedLength();
+    int rawLength = (int)writer.getRawLength();
+    byte[] data = new byte[rawLength];
+    ShuffleUtils.shuffleToMemory(data, new ByteArrayInputStream(baos.toByteArray()),
+        rawLength, compressedLength, null, false, 0, LOG, "sometask");
+    return data;
+  }
+
   class InterruptingThread implements Runnable {
 
     MergeManager.OnDiskMerger mergeThread;


Mime
View raw message