tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3595. Composite Fetch account error for disk direct (jeagles)
Date Mon, 30 Jan 2017 21:21:02 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 4b30f8e2c -> 3b20be06f


TEZ-3595. Composite Fetch account error for disk direct (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b20be06
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b20be06
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b20be06

Branch: refs/heads/TEZ-3334
Commit: 3b20be06fe89cc65e56a4520336a01afa100d811
Parents: 4b30f8e
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Mon Jan 30 15:20:45 2017 -0600
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Mon Jan 30 15:20:45 2017 -0600

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |   1 +
 .../orderedgrouped/FetcherOrderedGrouped.java   |  66 ++++---
 .../shuffle/orderedgrouped/TestFetcher.java     | 198 ++++++++++++++++---
 3 files changed, 213 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3b20be06/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 4703d95..e40759f 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3595. Composite Fetch account error for disk direct
   TEZ-3590. Remove google.protobuf from the tez-auxservices shaded jar
   TEZ-3587. Fetcher fetchInputs() can NPE on srcAttempt due to missing entry in pathToAttemptMap
   TEZ-3586. Remove fusesource.leveldbjni from the tez-auxservices shaded jar

http://git-wip-us.apache.org/repos/asf/tez/blob/3b20be06/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 18b824a..f213268 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -711,37 +711,47 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
         }
         InputAttemptIdentifier srcAttemptId = iter.next();
         MapOutput mapOutput = null;
-        try {
-          long startTime = System.currentTimeMillis();
-          Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null);
-
-          TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(),
-              minPartition);
-
-          mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
-          long endTime = System.currentTimeMillis();
-          scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(),
-              indexRecord.getRawLength(), (endTime - startTime), mapOutput, true);
-          iter.remove();
-          metrics.successFetch();
-        } catch (IOException e) {
-          if (mapOutput != null) {
-            mapOutput.abort();
-          }
-          if (!stopped) {
-            metrics.failedFetch();
-            ioErrs.increment(1);
-            scheduler.copyFailed(srcAttemptId, host, true, false, true);
-            LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
-                host.getHostIdentifier(), e);
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(
-                  "Ignoring fetch error during local disk copy since fetcher has already
been stopped");
+        boolean hasFailures = false;
+        // Fetch partition count number of map outputs (handles auto-reduce case)
+        for (int curPartition = minPartition; curPartition <= maxPartition; curPartition++)
{
+          try {
+            long startTime = System.currentTimeMillis();
+
+            // Partition id is the base partition id plus the relative offset
+            int reduceId = host.getPartitionId() + curPartition - minPartition;
+            srcAttemptId = scheduler.getIdentifierForFetchedOutput(srcAttemptId.getPathComponent(),
reduceId);
+            Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null);
+            TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(),
reduceId);
+
+            mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
+            long endTime = System.currentTimeMillis();
+            scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(),
+                indexRecord.getRawLength(), (endTime - startTime), mapOutput, true);
+            metrics.successFetch();
+          } catch (IOException e) {
+            if (mapOutput != null) {
+              mapOutput.abort();
             }
-            return;
+            if (!stopped) {
+              hasFailures = true;
+              metrics.failedFetch();
+              ioErrs.increment(1);
+              scheduler.copyFailed(srcAttemptId, host, true, false, true);
+              LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
+                  host.getHostIdentifier(), e);
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "Ignoring fetch error during local disk copy since fetcher has already
been stopped");
+              }
+              return;
+            }
+
           }
         }
+        if (!hasFailures) {
+          iter.remove();
+        }
       }
     } finally {
       putBackRemainingMapOutputs(host);

http://git-wip-us.apache.org/repos/asf/tez/blob/3b20be06/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index a6e4c21..3686d17 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -47,12 +47,15 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.collect.Lists;
 
 import org.apache.tez.http.HttpConnection;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -235,12 +238,12 @@ public class TestFetcher {
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
 
 
-    List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
-        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
-        new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
-        new InputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2"),
-        new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3"),
-        new InputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4")
+    final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList(
+        new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
1),
+        new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
1),
+        new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
1),
+        new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
1),
+        new CompositeInputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4",
1)
     );
     final int FIRST_FAILED_ATTEMPT_IDX = 2;
     final int SECOND_FAILED_ATTEMPT_IDX = 4;
@@ -248,6 +251,24 @@ public class TestFetcher {
 
     doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
 
+    final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap
= new ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>();
+    for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) {
+      for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) {
+        ShuffleScheduler.PathPartition pathPartition = new ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(),
host.getPartitionId() + i);
+        pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i));
+        }
+      }
+    doAnswer(new Answer<InputAttemptIdentifier>() {
+      @Override
+      public InputAttemptIdentifier answer(InvocationOnMock invocation) throws Throwable
{
+        Object[] args = invocation.getArguments();
+        String path = (String) args[0];
+        int reduceId = (int) args[1];
+        return pathToIdentifierMap.get(new ShuffleScheduler.PathPartition(path, reduceId));
+      }
+    }).when(scheduler)
+        .getIdentifierForFetchedOutput(any(String.class), any(int.class));
+
     doAnswer(new Answer<MapOutput>() {
       @Override
       public MapOutput answer(InvocationOnMock invocation) throws Throwable {
@@ -269,20 +290,22 @@ public class TestFetcher {
       }
     }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString());
 
-    doAnswer(new Answer<TezIndexRecord>() {
-      @Override
-      public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
-        Object[] args = invocation.getArguments();
-        String pathComponent = (String) args[0];
-        int len = pathComponent.length();
-        long p = Long.valueOf(pathComponent.substring(len - 1, len));
-        if (p == FIRST_FAILED_ATTEMPT_IDX || p == SECOND_FAILED_ATTEMPT_IDX) {
-          throw new IOException("failing to simulate failure case");
+    for (int i = 0; i < host.getPartitionCount(); i++) {
+      doAnswer(new Answer<TezIndexRecord>() {
+        @Override
+        public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
+          Object[] args = invocation.getArguments();
+          String pathComponent = (String) args[0];
+          int len = pathComponent.length();
+          long p = Long.valueOf(pathComponent.substring(len - 1, len));
+          if (p == FIRST_FAILED_ATTEMPT_IDX || p == SECOND_FAILED_ATTEMPT_IDX) {
+            throw new IOException("failing to simulate failure case");
+          }
+          // match with params for copySucceeded below.
+          return new TezIndexRecord(p * 10, p * 1000, p * 100);
         }
-        // match with params for copySucceeded below.
-        return new TezIndexRecord(p * 10, p * 1000, p * 100);
-      }
-    }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId()));
+      }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i));
+    }
 
     doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
         anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
@@ -295,10 +318,12 @@ public class TestFetcher {
 
     // should have exactly 3 success and 1 failure.
     for (int i : sucessfulAttemptsIndexes) {
-      verifyCopySucceeded(scheduler, host, srcAttempts, i);
+      for (int j = 0; j < host.getPartitionCount(); j++) {
+        verifyCopySucceeded(scheduler, host, srcAttempts, i, j);
+      }
     }
-    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false,
true);
-    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true,
false, true);
+    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host,
true, false, true);
+    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host,
true, false, true);
 
     verify(metrics, times(3)).successFetch();
     verify(metrics, times(2)).failedFetch();
@@ -308,11 +333,136 @@ public class TestFetcher {
     verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
   }
 
+  @Test(timeout = 5000000)
+  public void testSetupLocalDiskFetchAutoReduce() throws Exception {
+    Configuration conf = new TezConfiguration();
+    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+    MergeManager merger = mock(MergeManager.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    Shuffle shuffle = mock(Shuffle.class);
+    InputContext inputContext = mock(InputContext.class);
+    when(inputContext.getCounters()).thenReturn(new TezCounters());
+    when(inputContext.getSourceVertexName()).thenReturn("");
+
+    MapHost host = new MapHost(HOST, PORT, 1, 2);
+    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics,
shuffle, null, false, 0,
+        null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter,
badIdErrsCounter,
+        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+        false, false, true, false);
+    FetcherOrderedGrouped spyFetcher = spy(fetcher);
+
+
+    final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList(
+        new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
host.getPartitionCount()),
+        new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
host.getPartitionCount()),
+        new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
host.getPartitionCount()),
+        new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
host.getPartitionCount()),
+        new CompositeInputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4",
host.getPartitionCount())
+    );
+    final int FIRST_FAILED_ATTEMPT_IDX = 2;
+    final int SECOND_FAILED_ATTEMPT_IDX = 4;
+    final int[] sucessfulAttemptsIndexes = { 0, 1, 3 };
+
+    doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
+    final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap
+        = new ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>();
+    for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) {
+      for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) {
+        ShuffleScheduler.PathPartition pathPartition = new ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(),
host.getPartitionId() + i);
+        pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i));
+      }
+    }
+
+    doAnswer(new Answer<InputAttemptIdentifier>() {
+        @Override
+        public InputAttemptIdentifier answer(InvocationOnMock invocation) throws Throwable
{
+          Object[] args = invocation.getArguments();
+          String path = (String) args[0];
+          int reduceId = (int) args[1];
+          return pathToIdentifierMap.get(new ShuffleScheduler.PathPartition(path, reduceId));
+        }
+      }).when(scheduler)
+          .getIdentifierForFetchedOutput(any(String.class), any(int.class));
+
+    doAnswer(new Answer<MapOutput>() {
+      @Override
+      public MapOutput answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        MapOutput mapOutput = mock(MapOutput.class);
+        doReturn(MapOutput.Type.DISK_DIRECT).when(mapOutput).getType();
+        doReturn(args[0]).when(mapOutput).getAttemptIdentifier();
+        return mapOutput;
+      }
+    }).when(spyFetcher)
+        .getMapOutputForDirectDiskFetch(any(InputAttemptIdentifier.class), any(Path.class),
+            any(TezIndexRecord.class));
+
+    doAnswer(new Answer<Path>() {
+      @Override
+      public Path answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]);
+      }
+    }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString());
+
+    for (int i = 0; i < host.getPartitionCount(); i++) {
+      doAnswer(new Answer<TezIndexRecord>() {
+        @Override
+        public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
+          Object[] args = invocation.getArguments();
+          String pathComponent = (String) args[0];
+          int len = pathComponent.length();
+          long p = Long.valueOf(pathComponent.substring(len - 1, len));
+
+          if (pathComponent.equals(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).getPathComponent())
||
+              pathComponent.equals(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).getPathComponent()))
{
+            throw new IOException("Thowing exception to simulate failure case");
+          }
+          // match with params for copySucceeded below.
+          return new TezIndexRecord(p * 10, p * 1000, p * 100);
+        }
+      }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i));
+    }
+
+    doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
+        anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
+    doNothing().when(scheduler).putBackKnownMapOutput(host,
+        srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0));
+    doNothing().when(scheduler).putBackKnownMapOutput(host,
+        srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1));
+    doNothing().when(scheduler).putBackKnownMapOutput(host,
+        srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0));
+    doNothing().when(scheduler).putBackKnownMapOutput(host,
+        srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1));
+
+    spyFetcher.setupLocalDiskFetch(host);
+
+    // should have exactly 3 success and 1 failure.
+    for (int i : sucessfulAttemptsIndexes) {
+      for (int j = 0; j < host.getPartitionCount(); j++) {
+        verifyCopySucceeded(scheduler, host, srcAttempts, i, j);
+      }
+    }
+    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host,
true, false, true);
+    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(1), host,
true, false, true);
+    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host,
true, false, true);
+    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1), host,
true, false, true);
+
+    verify(metrics, times(6)).successFetch();
+    verify(metrics, times(4)).failedFetch();
+
+    verify(spyFetcher).putBackRemainingMapOutputs(host);
+    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
+    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
+    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
+    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
+  }
+
   private void verifyCopySucceeded(ShuffleScheduler scheduler, MapHost host,
-      List<InputAttemptIdentifier> srcAttempts, long p) throws
+      List<CompositeInputAttemptIdentifier> srcAttempts, long p, int j) throws
       IOException {
     // need to verify filename, offsets, sizes wherever they are used.
-    InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p);
+    InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p).expand(j);
     String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent();
     ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class);
     verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100),


Mime
View raw message