tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [09/37] tez git commit: TEZ-2358. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task (rbalamohan)
Date Tue, 28 Apr 2015 20:40:50 GMT
TEZ-2358. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aa87a14c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aa87a14c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aa87a14c

Branch: refs/heads/TEZ-2003
Commit: aa87a14c5197d5aa3ddc8e829311f1f3534fd62d
Parents: b08ca37
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Tue Apr 28 03:22:25 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Tue Apr 28 03:22:25 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../common/shuffle/DiskFetchedInput.java        |   3 +-
 .../shuffle/orderedgrouped/MapOutput.java       |   2 +-
 .../shuffle/orderedgrouped/MergeManager.java    |  61 ++++-
 .../common/sort/impl/IFileInputStream.java      |   1 +
 .../common/task/local/output/TezTaskOutput.java |  12 +-
 .../task/local/output/TezTaskOutputFiles.java   |  31 +--
 .../apache/tez/test/TestPipelinedShuffle.java   | 258 +++++++++++++++++++
 8 files changed, 334 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a5c4a57..e42a79e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2358. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task.
   TEZ-2342. TestFaultTolerance.testRandomFailingTasks fails due to timeout.
   TEZ-2362. State Change Notifier Thread should be stopped when dag is
   completed

http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
index e25301e..6432b55 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
@@ -50,7 +50,8 @@ public class DiskFetchedInput extends FetchedInput {
 
     this.localFS = FileSystem.getLocal(conf);
     this.outputPath = filenameAllocator.getInputFileForWrite(
-        this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), actualSize);
+        this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), this
+            .inputAttemptIdentifier.getSpillEventId(), actualSize);
     // Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
     // otherwise fetches for the same task but from different attempts would clobber each
other.
     this.tmpOutputPath = outputPath.suffix(String.valueOf(id));

http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
index e999af6..55c80aa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -107,7 +107,7 @@ class MapOutput {
       IOException {
     FileSystem fs = FileSystem.getLocal(conf);
     Path outputpath = mapOutputFile.getInputFileForWrite(
-        attemptIdentifier.getInputIdentifier().getInputIndex(), size);
+        attemptIdentifier.getInputIdentifier().getInputIndex(), attemptIdentifier.getSpillEventId(),
size);
     // Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
     // otherwise fetches for the same task but from different attempts would clobber each
other.
     Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher));

http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index d5f7be1..2e6ebd9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -27,6 +27,8 @@ import java.util.TreeSet;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.FilenameUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -436,7 +438,8 @@ public class MergeManager {
     inMemoryMapOutputs.add(mapOutput);
     LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
         + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
-        + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
+        + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory + ",
mapOutput=" +
+        mapOutput);
 
     commitMemory+= mapOutput.getSize();
 
@@ -476,7 +479,21 @@ public class MergeManager {
   }
   
   public synchronized void closeOnDiskFile(FileChunk file) {
+    //including only path & offset for valdiations.
+    for (FileChunk fileChunk : onDiskMapOutputs) {
+      if (fileChunk.getPath().equals(file.getPath())) {
+        //ensure offsets are not the same.
+        Preconditions.checkArgument(fileChunk.getOffset() != file.getOffset(),
+            "Can't have a file with same path and offset."
+            + "OldFilePath=" + fileChunk.getPath() + ", OldFileOffset=" + fileChunk.getOffset()
+
+            ", newFilePath=" + file.getPath() + ", newFileOffset=" + file.getOffset());
+      }
+    }
+
     onDiskMapOutputs.add(file);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("close onDiskFile=" + file.getPath() + ", len=" + file.getLength());
+    }
 
     synchronized (onDiskMerger) {
       if (!onDiskMerger.isInProgress() &&
@@ -623,8 +640,9 @@ public class MergeManager {
       // All disk writes done by this merge are overhead - due to the lac of
       // adequate memory to keep all segments in memory.
       Path outputPath = mapOutputFile.getInputFileForWrite(
-          srcTaskIdentifier.getInputIdentifier().getInputIndex(),
+          srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(),
           mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
+      LOG.info("Patch..InMemoryMerger outputPath: " + outputPath);
 
       Writer writer = null;
       long outFileLen = 0;
@@ -720,6 +738,11 @@ public class MergeManager {
         final long offset = fileChunk.getOffset();
         final long size = fileChunk.getLength();
         final boolean preserve = fileChunk.isLocalFile();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("InputAttemptIdentifier=" + fileChunk.getInputAttemptIdentifier()
+              + ", len=" + fileChunk.getLength() + ", offset=" + fileChunk.getOffset()
+              + ", path=" + fileChunk.getPath());
+        }
         final Path file = fileChunk.getPath();
         approxOutputSize += size;
         Segment segment = new Segment(rfs, file, offset, size, codec, ifileReadAhead,
@@ -737,8 +760,8 @@ public class MergeManager {
       if (file0.isLocalFile()) {
         // This is setup the same way a type DISK MapOutput is setup when fetching.
         namePart = mapOutputFile.getSpillFileName(
-            file0.getInputAttemptIdentifier().getInputIdentifier().getInputIndex());
-
+            file0.getInputAttemptIdentifier().getInputIdentifier().getInputIndex(),
+            file0.getInputAttemptIdentifier().getSpillEventId());
       } else {
         namePart = file0.getPath().getName().toString();
       }
@@ -859,8 +882,18 @@ public class MergeManager {
     LOG.info("finalMerge called with " + 
              inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
              onDiskMapOutputs.size() + " on-disk map-outputs");
-    
-    
+
+    if (LOG.isDebugEnabled()) {
+      for (MapOutput inMemoryMapOutput : inMemoryMapOutputs) {
+        LOG.debug("inMemoryOutput=" + inMemoryMapOutput + ", size=" + inMemoryMapOutput
+            .getSize());
+      }
+
+      for (FileChunk onDiskMapOutput : onDiskMapOutputs) {
+        LOG.debug("onDiskMapOutput=" + onDiskMapOutput.getPath() + ", size=" + onDiskMapOutput
+                .getLength());
+      }
+    }
     
 
     // merge config params
@@ -876,7 +909,7 @@ public class MergeManager {
     boolean mergePhaseFinished = false;
     if (inMemoryMapOutputs.size() > 0) {
       int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
-      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
+      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
                                                 memDiskSegments,
                                                 this.postMergeMemLimit);
       final int numMemDiskSegments = memDiskSegments.size();
@@ -892,10 +925,11 @@ public class MergeManager {
         
         mergePhaseFinished = true;
         // must spill to disk, but can't retain in-mem for intermediate merge
+        // Can not use spill id in final merge as it would clobber with other files, hence
using
+        // Integer.MAX_VALUE
         final Path outputPath = 
-          mapOutputFile.getInputFileForWrite(srcTaskId,
-                                             inMemToDiskBytes).suffix(
-                                                 Constants.MERGED_OUTPUT_PREFIX);
+          mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE,
+              inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX);
         final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass,
             memDiskSegments, numMemDiskSegments, tmpDir, comparator, nullProgressable,
             spilledRecordsCounter, null, additionalBytesRead, null);
@@ -925,12 +959,12 @@ public class MergeManager {
 
         LOG.info("Merged " + numMemDiskSegments + " segments, " +
                  inMemToDiskBytes + " bytes to disk to satisfy " +
-                 "reduce memory limit");
+                 "reduce memory limit. outputPath=" + outputPath);
         inMemToDiskBytes = 0;
         memDiskSegments.clear();
       } else if (inMemToDiskBytes != 0) {
         LOG.info("Keeping " + numMemDiskSegments + " segments, " +
-                 inMemToDiskBytes + " bytes in memory for " +
+            inMemToDiskBytes + " bytes in memory for " +
                  "intermediate, on-disk merge");
       }
     }
@@ -942,7 +976,8 @@ public class MergeManager {
     for (FileChunk fileChunk : onDisk) {
       final long fileLength = fileChunk.getLength();
       onDiskBytes += fileLength;
-      LOG.debug("Disk file: " + fileChunk.getPath() + " Length is " + fileLength);
+      LOG.info("Disk file=" + fileChunk.getPath() + ", len=" + fileLength + ", isLocal="
+
+          fileChunk.isLocalFile());
 
       final Path file = fileChunk.getPath();
       TezCounter counter =

http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
index 64c1b51..d116242 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -264,6 +264,7 @@ public class IFileInputStream extends InputStream {
     }
     
     if (currentOffset == dataLength) {
+      //TODO: add checksumSize to currentOffset.
       // The last four bytes are checksum. Strip them and verify
       sum.update(buffer, 0, offset);
       csum = new byte[checksumSize];

http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
index e9f33af..c41e4a6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
@@ -122,18 +122,20 @@ public abstract class TezTaskOutput {
    * Create a local input file name.
    *
    * @param srcIdentifier The identifier for the source
-   * @param size the size of the file
-   * @return path the path to the input file.
+   * @param spillNum
+   * @param size the size of the file  @return path the path to the input file.
    * @throws IOException
    */
-  public abstract Path getInputFileForWrite(
-      int srcIdentifier, long size) throws IOException;
+  public abstract Path getInputFileForWrite(int srcIdentifier,
+      int spillNum, long size) throws IOException;
 
   /**
    * Construct a spill file name, given a spill number
+   *
+   * @param srcId
    * @param spillNum
    * @return a spill file name independent of the unique identifier and local directories
    */
-  public abstract String getSpillFileName(int spillNum);
+  public abstract String getSpillFileName(int srcId, int spillNum);
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index 6382e3a..1e6fca3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -49,7 +49,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
 
   private static final String SPILL_FILE_DIR_PATTERN = "%s_%d";
 
-  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+  private static final String SPILL_FILE_PATTERN = "%s_src_%d_spill_%d.out";
 
   /*
   Under YARN, this defaults to one or more of the local directories, along with the appId
in the path.
@@ -233,35 +233,36 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   /**
    * Create a local input file name.
    *
-   * ${appDir}/${uniqueId}_spill_${spillNumber}.out
-   * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10004_spill_0.out
+   * ${appDir}/${uniqueId}_src_{$srcId}_spill_${spillNumber}.out
+   * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10004_src_10_spill_0.out
    *
-   * This is currently equivalent to getSpillFileForWrite. Files are not clobbered due to
the uniqueId
-   * being different for Outputs / Inputs within the same task (and across tasks)
+   * Files are not clobbered due to the uniqueId along with spillId being different for Outputs
/
+   * Inputs within the same task (and across tasks)
    *
    * @param srcIdentifier The identifier for the source
-   * @param size the size of the file
-   * @return path the path to the input file.
+   * @param spillNum
+   * @param size the size of the file  @return path the path to the input file.
    * @throws IOException
    */
   @Override
   public Path getInputFileForWrite(int srcIdentifier,
-      long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(getSpillFileName(srcIdentifier),
-        size, conf);
+      int spillNum, long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(getSpillFileName(srcIdentifier, spillNum), size,
conf);
   }
 
   /**
-   * Construct a spill file name, given a spill number
+   * Construct a spill file name, given a spill number and src id
+   *
+   * ${uniqueId}_src_${srcId}_spill_${spillNumber}.out
+   * e.g. attempt_1418684642047_0006_1_00_000000_0_10004_src_10_spill_0.out
    *
-   * ${uniqueId}_spill_${spillNumber}.out
-   * e.g. attempt_1418684642047_0006_1_00_000000_0_10004_spill_0.out
    *
+   * @param srcId
    * @param spillNum
    * @return a spill file name independent of the unique identifier and local directories
    */
   @Override
-  public String getSpillFileName(int spillNum) {
-    return String.format(SPILL_FILE_PATTERN, uniqueId, spillNum);
+  public String getSpillFileName(int srcId, int spillNum) {
+    return String.format(SPILL_FILE_PATTERN, uniqueId, srcId, spillNum);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
new file mode 100644
index 0000000..2a63293
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.Tool;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestPipelinedShuffle {
+
+  private static MiniDFSCluster miniDFSCluster;
+  private static MiniTezCluster miniTezCluster;
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem fs;
+
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestPipelinedShuffle.class.getName() + "-tmpDir";
+
+  private static final int KEYS_PER_MAPPER = 5000;
+
+  @BeforeClass
+  public static void setupDFSCluster() throws Exception {
+    conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false);
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+    miniDFSCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    fs = miniDFSCluster.getFileSystem();
+    conf.set("fs.defaultFS", fs.getUri().toString());
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
+  }
+
+  @AfterClass
+  public static void shutdownDFSCluster() {
+    if (miniDFSCluster != null) {
+      //shutdown
+      miniDFSCluster.shutdown();
+    }
+  }
+
+  @Before
+  public void setupTezCluster() throws Exception {
+    //With 1 MB sort buffer and with good amount of dataset, it would spill records
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
+
+    //Enable PipelinedShuffle
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
+
+    //Enable local fetch
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+
+    // 3 seconds should be good enough in local machine
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000);
+    //set to low value so that it can detect failures quickly
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2);
+
+    miniTezCluster = new MiniTezCluster(TestPipelinedShuffle.class.getName(), 1, 1, 1);
+
+    miniTezCluster.init(conf);
+    miniTezCluster.start();
+  }
+
+  @After
+  public void shutdownTezCluster() throws IOException {
+    if (miniTezCluster != null) {
+      miniTezCluster.stop();
+    }
+  }
+
+  @Test
+  public void baseTest() throws Exception {
+    PipelinedShuffleJob pipelinedShuffle = new PipelinedShuffleJob();
+    pipelinedShuffle.setConf(new Configuration(miniTezCluster.getConfig()));
+
+    String[] args = new String[] { };
+    assertEquals(0, pipelinedShuffle.run(args));
+  }
+
+  /**
+   *
+   * mapper1 --\
+   *            --> reducer
+   * mapper2 --/
+   *
+   * Mappers just generate dummy data, but ensures that there is enough spills.
+   * Reducer should process them correctly and validate the total number of records.
+   * Only record count is validated in the reducer which is fine for this test.
+   */
+  public static class PipelinedShuffleJob extends Configured implements Tool {
+    private TezConfiguration tezConf;
+
+    public static class DataGenerator extends SimpleMRProcessor {
+
+      public DataGenerator(ProcessorContext context) {
+        super(context);
+      }
+
+      @Override public void run() throws Exception {
+        Preconditions.checkArgument(getInputs().size() == 0);
+        Preconditions.checkArgument(getOutputs().size() == 1);
+        KeyValueWriter writer = (KeyValueWriter) getOutputs().get("reducer").getWriter();
+
+        for (int i = 0; i < KEYS_PER_MAPPER; i++) {
+          writer.write(new Text(RandomStringUtils.randomAlphanumeric(1000)),
+              new Text(RandomStringUtils.randomAlphanumeric(1000)));
+        }
+      }
+    }
+
+    public static class SimpleReduceProcessor extends SimpleMRProcessor {
+
+      public SimpleReduceProcessor(ProcessorContext context) {
+        super(context);
+      }
+
+      private long readData(KeyValuesReader reader) throws IOException {
+        long records = 0;
+        while (reader.next()) {
+          reader.getCurrentKey();
+          for (Object val : reader.getCurrentValues()) {
+            records++;
+          }
+        }
+        return records;
+      }
+
+      @Override
+      public void run() throws Exception {
+        Preconditions.checkArgument(getInputs().size() == 2);
+
+        long totalRecords = 0;
+
+        KeyValuesReader reader1 = (KeyValuesReader) getInputs().get("mapper1").getReader();
+        totalRecords += readData(reader1);
+
+        KeyValuesReader reader2 = (KeyValuesReader) getInputs().get("mapper2").getReader();
+        totalRecords += readData(reader2);
+
+        //Verify if correct number of records are retrieved.
+        assertEquals(2 * KEYS_PER_MAPPER, totalRecords);
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+      this.tezConf = new TezConfiguration(getConf());
+      String dagName = "pipelinedShuffleTest";
+      DAG dag = DAG.create(dagName);
+
+      Vertex m1_Vertex = Vertex.create("mapper1",
+          ProcessorDescriptor.create(DataGenerator.class.getName()), 1);
+
+      Vertex m2_Vertex = Vertex.create("mapper2",
+          ProcessorDescriptor.create(DataGenerator.class.getName()), 1);
+
+      Vertex reducerVertex = Vertex.create("reducer",
+          ProcessorDescriptor.create(SimpleReduceProcessor.class.getName()), 1);
+
+      Edge mapper1_to_reducer = Edge.create(m1_Vertex, reducerVertex,
+          OrderedPartitionedKVEdgeConfig
+              .newBuilder(Text.class.getName(), Text.class.getName(),
+                  HashPartitioner.class.getName())
+              .setFromConfiguration(tezConf).build().createDefaultEdgeProperty());
+
+      Edge mapper2_to_reducer = Edge.create(m2_Vertex, reducerVertex,
+          OrderedPartitionedKVEdgeConfig
+              .newBuilder(Text.class.getName(), Text.class.getName(),
+                  HashPartitioner.class.getName())
+              .setFromConfiguration(tezConf).build().createDefaultEdgeProperty());
+
+      dag.addVertex(m1_Vertex);
+      dag.addVertex(m2_Vertex);
+      dag.addVertex(reducerVertex);
+
+      dag.addEdge(mapper1_to_reducer).addEdge(mapper2_to_reducer);
+
+      TezClient client = TezClient.create(dagName, tezConf);
+      client.start();
+      client.waitTillReady();
+
+      DAGClient dagClient = client.submitDAG(dag);
+      Set<StatusGetOpts> getOpts = Sets.newHashSet();
+      getOpts.add(StatusGetOpts.GET_COUNTERS);
+
+      DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts);
+
+      System.out.println(dagStatus.getDAGCounters());
+      TezCounters counters = dagStatus.getDAGCounters();
+
+      //Ensure that atleast 10 spills were there in this job.
+      assertTrue(counters.findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT).getValue() >
10);
+
+      if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+        System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
+        return -1;
+      }
+      return 0;
+    }
+  }
+
+}


Mime
View raw message