tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. (sseth)
Date Sat, 22 Aug 2015 00:10:20 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 387e051ce -> ed1631776


TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ed163177
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ed163177
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ed163177

Branch: refs/heads/branch-0.6
Commit: ed1631776de0d20dfd2053b1c3592c98bbc5628b
Parents: 387e051
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Aug 21 17:10:02 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Aug 21 17:10:02 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../orderedgrouped/TestMergeManager.java        | 138 +++++++++++++++++++
 2 files changed, 140 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ed163177/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 794761f..eefd735 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2734. Add a test to verify the filename generated by OnDiskMerge
   TEZ-2687. ATS History shutdown happens before the min-held containers are released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch option
@@ -225,6 +226,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2734. Add a test to verify the filename generated by OnDiskMerge
   TEZ-2687. ATS History shutdown happens before the min-held containers are released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2635. Limit number of attempts being downloaded in unordered fetch.

http://git-wip-us.apache.org/repos/asf/tez/blob/ed163177/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 7615ba7..69cc7ca 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -18,6 +18,9 @@
 
 package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -189,6 +192,141 @@ public class TestMergeManager {
 
   }
 
+  @Test(timeout = 10000)
+  public void testOnDiskMergerFilenames() throws IOException, InterruptedException {
+    Configuration conf = new TezConfiguration(defaultConf);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
+
+    Path localDir = new Path(workDir, "local");
+    Path srcDir = new Path(workDir, "srcData");
+    localFs.mkdirs(localDir);
+    localFs.mkdirs(srcDir);
+
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString());
+
+    FileSystem localFs = FileSystem.getLocal(conf);
+
+    LocalDirAllocator localDirAllocator =
+        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());
+
+    ExceptionReporter exceptionReporter = mock(ExceptionReporter.class);
+
+    MergeManager mergeManagerReal =
+        new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null,
null,
+            exceptionReporter, 1 * 1024l * 1024l, null, false, -1);
+    MergeManager mergeManager = spy(mergeManagerReal);
+
+    // Partition 0 Keys 0-2, Partition 1 Keys 3-5
+    SrcFileInfo file1Info =
+        createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src1.out"),
+            2, 3, 6);
+
+    SrcFileInfo file2Info =
+        createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src2.out"),
+            2, 3, 0);
+
+    InputAttemptIdentifier iIdentifier1 =
+        new InputAttemptIdentifier(0, 0, file1Info.path.getName());
+    InputAttemptIdentifier iIdentifier2 =
+        new InputAttemptIdentifier(1, 0, file2Info.path.getName());
+
+    MapOutput mapOutput1 =
+        getMapOutputForDirectDiskFetch(iIdentifier1, file1Info.path, file1Info.indexedRecords[0],
+            mergeManager);
+    MapOutput mapOutput2 =
+        getMapOutputForDirectDiskFetch(iIdentifier2, file2Info.path, file2Info.indexedRecords[0],
+            mergeManager);
+
+    mapOutput1.commit();
+    mapOutput2.commit();
+    verify(mergeManager).closeOnDiskFile(mapOutput1.getOutputPath());
+    verify(mergeManager).closeOnDiskFile(mapOutput2.getOutputPath());
+
+    List<FileChunk> mergeFiles = new LinkedList<FileChunk>();
+    mergeFiles.addAll(mergeManager.onDiskMapOutputs);
+    mergeManager.onDiskMapOutputs.clear();
+
+    mergeManager.onDiskMerger.merge(mergeFiles);
+    Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
+
+    FileChunk fcMerged1 = mergeManager.onDiskMapOutputs.iterator().next();
+    Path m1Path = fcMerged1.getPath();
+    assertTrue(m1Path.toString().endsWith("merged0"));
+
+    // Add another file. Make sure the filename is different, and does not get clobbered.
+
+    SrcFileInfo file3Info =
+        createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src3.out"),
+            2, 22, 5);
+    InputAttemptIdentifier iIdentifier3 =
+        new InputAttemptIdentifier(2, 0, file1Info.path.getName());
+    MapOutput mapOutput3 =
+        getMapOutputForDirectDiskFetch(iIdentifier3, file3Info.path, file3Info.indexedRecords[0],
+            mergeManager);
+    mapOutput3.commit();
+    verify(mergeManager).closeOnDiskFile(mapOutput3.getOutputPath());
+
+    mergeFiles = new LinkedList<FileChunk>();
+    mergeFiles.addAll(mergeManager.onDiskMapOutputs);
+    mergeManager.onDiskMapOutputs.clear();
+
+    mergeManager.onDiskMerger.merge(mergeFiles);
+    Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
+
+    FileChunk fcMerged2 = mergeManager.onDiskMapOutputs.iterator().next();
+    Path m2Path = fcMerged2.getPath();
+    assertTrue(m2Path.toString().endsWith("merged1"));
+    assertNotEquals(m1Path, m2Path);
+
+    // Add another file. This time add it to the head of the list.
+    SrcFileInfo file4Info =
+        createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src4.out"),
+            2, 45, 35);
+    InputAttemptIdentifier iIdentifier4 =
+        new InputAttemptIdentifier(3, 0, file4Info.path.getName());
+    MapOutput mapOutput4 =
+        getMapOutputForDirectDiskFetch(iIdentifier4, file4Info.path, file4Info.indexedRecords[0],
+            mergeManager);
+    mapOutput4.commit();
+    verify(mergeManager).closeOnDiskFile(mapOutput4.getOutputPath());
+
+    // Add in reverse order this time.
+    List<FileChunk> tmpList = new LinkedList<FileChunk>();
+    mergeFiles = new LinkedList<FileChunk>();
+    assertEquals(2, mergeManager.onDiskMapOutputs.size());
+    tmpList.addAll(mergeManager.onDiskMapOutputs);
+    mergeFiles.add(tmpList.get(1));
+    mergeFiles.add(tmpList.get(0));
+
+    mergeManager.onDiskMapOutputs.clear();
+
+    mergeManager.onDiskMerger.merge(mergeFiles);
+    Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
+
+    FileChunk fcMerged3 = mergeManager.onDiskMapOutputs.iterator().next();
+    Path m3Path = fcMerged3.getPath();
+
+    assertTrue(m3Path.toString().endsWith("merged2"));
+    assertNotEquals(m2Path, m3Path);
+
+    // Ensure the lengths are the same - since the source file names are the same. No append
happening.
+    assertEquals(m1Path.toString().length(), m2Path.toString().length());
+    assertEquals(m2Path.toString().length(), m3Path.toString().length());
+
+    // Ensure the filenames are used correctly - based on the first file given to the merger.
+    String m1Prefix = m1Path.toString().substring(0, m1Path.toString().indexOf("."));
+    String m2Prefix = m2Path.toString().substring(0, m2Path.toString().indexOf("."));
+    String m3Prefix = m3Path.toString().substring(0, m3Path.toString().indexOf("."));
+
+    assertEquals(m1Prefix, m2Prefix);
+    assertNotEquals(m1Prefix, m3Prefix);
+    assertNotEquals(m2Prefix, m3Prefix);
+
+  }
+
   private InputContext createMockInputContext(String uniqueId) {
     InputContext inputContext = mock(InputContext.class);
     doReturn(new TezCounters()).when(inputContext).getCounters();


Mime
View raw message