tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From haris...@apache.org
Subject tez git commit: TEZ-3691. Setup fetchers to use shared executor. (harishjp)
Date Wed, 24 May 2017 06:26:24 GMT
Repository: tez
Updated Branches:
  refs/heads/master 8c4407798 -> 51972efec


TEZ-3691. Setup fetchers to use shared executor. (harishjp)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/51972efe
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/51972efe
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/51972efe

Branch: refs/heads/master
Commit: 51972efece985ae1ad7e3ca5250f2d38a768c528
Parents: 8c44077
Author: Harish JP <harishjp@gmail.com>
Authored: Wed May 24 11:56:09 2017 +0530
Committer: Harish JP <harishjp@gmail.com>
Committed: Wed May 24 11:56:09 2017 +0530

----------------------------------------------------------------------
 .../library/api/TezRuntimeConfiguration.java    |  8 +++
 .../common/shuffle/impl/ShuffleManager.java     | 17 ++++---
 .../orderedgrouped/ShuffleScheduler.java        | 14 ++++--
 .../library/input/OrderedGroupedKVInput.java    |  1 +
 .../runtime/library/input/UnorderedKVInput.java |  1 +
 .../impl/TestShuffleInputEventHandlerImpl.java  | 36 +++++++++++--
 .../common/shuffle/impl/TestShuffleManager.java | 53 +++++++++++++++++---
 .../shuffle/orderedgrouped/TestShuffle.java     | 32 ++++++++++--
 ...tShuffleInputEventHandlerOrderedGrouped.java | 26 +++++++++-
 .../orderedgrouped/TestShuffleScheduler.java    | 50 +++++++++++++++++-
 10 files changed, 213 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 2eec276..23f1f9b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -427,6 +427,13 @@ public class TezRuntimeConfiguration {
   public static final boolean TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT =
       false;
 
+  @Private
+  @Unstable
+  @ConfigurationProperty(type = "boolean")
+  public static final String TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL = TEZ_RUNTIME_PREFIX
+
+      "shuffle.fetcher.use-shared-pool";
+  public static final boolean TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT = false;
+
   @ConfigurationProperty(type = "float")
   public static final String TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT = TEZ_RUNTIME_PREFIX
+
       "task.input.post-merge.buffer.percent";
@@ -601,6 +608,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS);
     tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
     tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
 
     defaultConf.addResource("core-default.xml");
     defaultConf.addResource("core-site.xml");

http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 8716b92..d9cdae4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -233,13 +233,18 @@ public class ShuffleManager implements FetcherCallback {
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
     
     this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
-    
-    ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
-        numFetchers,
-        new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d").build());
+
+    final ExecutorService fetcherRawExecutor;
+    if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) {
+      fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers,
+          "Fetcher_B {" + srcNameTrimmed + "} #%d");
+    } else {
+      fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder()
+          .setDaemon(true).setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d").build());
+    }
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
-    
+
     ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}").build());
     this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);

http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 39f2138..0e05bd8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -54,6 +54,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.common.CallableWithNdc;
@@ -374,9 +375,15 @@ class ShuffleScheduler {
             .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
     this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret);
 
-    ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
-        new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build());
+    final ExecutorService fetcherRawExecutor;
+    if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) {
+      fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers,
+          "Fetcher_O {" + srcNameTrimmed + "} #%d");
+    } else {
+      fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder()
+          .setDaemon(true).setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build());
+    }
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
 
     this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);
@@ -1340,7 +1347,6 @@ class ShuffleScheduler {
 
     @Override
     protected Void callInternal() throws InterruptedException {
-      outer:
       while (!isShutdown.get() && remainingMaps.get() > 0) {
         synchronized (ShuffleScheduler.this) {
           if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 8e653ed..6c12a99 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -392,6 +392,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
         .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
     confKeys.add(TezRuntimeConfiguration
         .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS);

http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index c085844..9a46cbd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -268,6 +268,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);

http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index af52f90..d8f2e25 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime.library.common.shuffle.impl;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -26,6 +27,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -33,13 +35,16 @@ import java.util.BitSet;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezExecutors;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
@@ -47,14 +52,17 @@ import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
-import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import com.google.protobuf.ByteString;
 
@@ -65,6 +73,18 @@ public class TestShuffleInputEventHandlerImpl {
   private static final String PATH_COMPONENT = "attempttmp";
   private final Configuration conf = new Configuration();
 
+  private TezExecutors sharedExecutor;
+
+  @Before
+  public void setup() {
+    sharedExecutor = new TezSharedExecutor(conf);
+  }
+
+  @After
+  public void cleanup() {
+    sharedExecutor.shutdownNow();
+  }
+
   @Test(timeout = 5000)
   public void testSimple() throws IOException {
     InputContext inputContext = mock(InputContext.class);
@@ -159,6 +179,7 @@ public class TestShuffleInputEventHandlerImpl {
     DataOutputBuffer port_dob = new DataOutputBuffer();
     port_dob.writeInt(PORT);
     final ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+    port_dob.close();
 
     ExecutionContext executionContext = mock(ExecutionContext.class);
     doReturn(HOST).when(executionContext).getHostName();
@@ -169,10 +190,18 @@ public class TestShuffleInputEventHandlerImpl {
     doReturn(shuffleMetaData).when(inputContext)
         .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
     doReturn(executionContext).when(inputContext).getExecutionContext();
+    when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
+        new Answer<ExecutorService>() {
+          @Override
+          public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
+            return sharedExecutor.createExecutorService(
+                invocation.getArgumentAt(0, Integer.class),
+                invocation.getArgumentAt(1, String.class));
+          }
+        });
     return inputContext;
   }
 
-  @SuppressWarnings("unchecked")
   private ShuffleManager createShuffleManager(InputContext inputContext) throws IOException
{
     Path outDirBase = new Path(".", "outDir");
     String[] outDirs = new String[] { outDirBase.toString() };
@@ -180,7 +209,8 @@ public class TestShuffleInputEventHandlerImpl {
     conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, inputContext.getWorkDirs());
 
     DataOutputBuffer out = new DataOutputBuffer();
-    Token<JobTokenIdentifier> token = new Token(new JobTokenIdentifier(), new JobTokenSecretManager(null));
+    Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(new JobTokenIdentifier(),
+        new JobTokenSecretManager(null));
     token.write(out);
     doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).getServiceConsumerMetaData(
         TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID);

http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index a5608ef..a6a5274 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -20,19 +20,23 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +44,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.TezExecutors;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -49,6 +55,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
@@ -57,6 +64,8 @@ import org.apache.tez.runtime.library.common.shuffle.FetchResult;
 import org.apache.tez.runtime.library.common.shuffle.InputHost;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -68,6 +77,17 @@ public class TestShuffleManager {
   private static final int PORT = 8080;
   private static final String PATH_COMPONENT = "attempttmp";
   private final Configuration conf = new Configuration();
+  private TezExecutors sharedExecutor;
+
+  @Before
+  public void setup() {
+    sharedExecutor = new TezSharedExecutor(conf);
+  }
+
+  @After
+  public void cleanup() {
+    sharedExecutor.shutdownNow();
+  }
 
   /**
    * One reducer fetches multiple partitions from each mapper.
@@ -136,6 +156,7 @@ public class TestShuffleManager {
     port_dob.writeInt(PORT);
     final ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0,
         port_dob.getLength());
+    port_dob.close();
 
     ExecutionContext executionContext = mock(ExecutionContext.class);
     doReturn(FETCHER_HOST).when(executionContext).getHostName();
@@ -146,10 +167,30 @@ public class TestShuffleManager {
     doReturn(shuffleMetaData).when(inputContext)
         .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
     doReturn(executionContext).when(inputContext).getExecutionContext();
+    when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
+        new Answer<ExecutorService>() {
+          @Override
+          public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
+            return sharedExecutor.createExecutorService(
+                invocation.getArgumentAt(0, Integer.class),
+                invocation.getArgumentAt(1, String.class));
+          }
+        });
     return inputContext;
   }
 
-  @SuppressWarnings("unchecked")
+  @Test(timeout=5000)
+  public void testUseSharedExecutor() throws Exception {
+    InputContext inputContext = createInputContext();
+    createShuffleManager(inputContext, 2);
+    verify(inputContext, times(0)).createTezFrameworkExecutorService(anyInt(), anyString());
+
+    inputContext = createInputContext();
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
true);
+    createShuffleManager(inputContext, 2);
+    verify(inputContext).createTezFrameworkExecutorService(anyInt(), anyString());
+  }
+
   private ShuffleManagerForTest createShuffleManager(
       InputContext inputContext, int expectedNumOfPhysicalInputs)
           throws IOException {
@@ -160,7 +201,7 @@ public class TestShuffleManager {
         inputContext.getWorkDirs());
 
     DataOutputBuffer out = new DataOutputBuffer();
-    Token<JobTokenIdentifier> token = new Token(new JobTokenIdentifier(),
+    Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(new JobTokenIdentifier(),
         new JobTokenSecretManager(null));
     token.write(out);
     doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).
@@ -199,9 +240,9 @@ public class TestShuffleManager {
           conf));
       final FetchResult mockFetcherResult = mock(FetchResult.class);
       try {
-        doAnswer(new Answer() {
+        doAnswer(new Answer<FetchResult>() {
           @Override
-          public Object answer(InvocationOnMock invocation) throws Throwable {
+          public FetchResult answer(InvocationOnMock invocation) throws Throwable {
             for(InputAttemptIdentifier input : fetcher.getSrcAttempts()) {
               ShuffleManagerForTest.this.fetchSucceeded(
                   fetcher.getHost(), input, new TestFetchedInput(input), 0, 0,

http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
index 855aedf..cad9523 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
@@ -16,6 +16,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -26,11 +27,15 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -40,14 +45,26 @@ import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.common.Constants;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestShuffle {
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestShuffle.class);
+  private TezExecutors sharedExecutor;
+
+  @Before
+  public void setup() {
+    sharedExecutor = new TezSharedExecutor(new Configuration());
+  }
+
+  @After
+  public void cleanup() {
+    sharedExecutor.shutdownNow();
+  }
 
   @Test(timeout = 10000)
   public void testSchedulerTerminatesOnException() throws IOException, InterruptedException
{
@@ -107,6 +124,15 @@ public class TestShuffle {
         new JobTokenSecretManager());
     ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken);
     doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString());
+    when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
+        new Answer<ExecutorService>() {
+          @Override
+          public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
+            return sharedExecutor.createExecutorService(
+                invocation.getArgumentAt(0, Integer.class),
+                invocation.getArgumentAt(1, String.class));
+          }
+        });
     return inputContext;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 695a307..384a982 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -6,6 +6,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
@@ -18,8 +20,11 @@ import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -27,9 +32,11 @@ import java.util.BitSet;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -65,6 +72,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
   private ShuffleScheduler realScheduler;
   private MergeManager mergeManager;
 
+  private TezExecutors sharedExecutor;
+
   private InputContext createTezInputContext() throws IOException {
     ApplicationId applicationId = ApplicationId.newInstance(1, 1);
     InputContext inputContext = mock(InputContext.class);
@@ -79,6 +88,15 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         new JobTokenSecretManager());
     ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken);
     doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString());
+    when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
+        new Answer<ExecutorService>() {
+          @Override
+          public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
+            return sharedExecutor.createExecutorService(
+                invocation.getArgumentAt(0, Integer.class),
+                invocation.getArgumentAt(1, String.class));
+          }
+        });
     return inputContext;
   }
 
@@ -134,7 +152,13 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
 
   @Before
   public void setup() throws Exception {
-   setupScheduler(2);
+    sharedExecutor = new TezSharedExecutor(new Configuration());
+    setupScheduler(2);
+  }
+
+  @After
+  public void cleanup() {
+    sharedExecutor.shutdownNow();
   }
 
   private void setupScheduler(int numInputs) throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 31da4d0..7b30bf3 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -17,6 +17,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
@@ -41,6 +42,8 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -50,12 +53,25 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 public class TestShuffleScheduler {
 
+  private TezExecutors sharedExecutor;
+
+  @Before
+  public void setup() {
+    sharedExecutor = new TezSharedExecutor(new Configuration());
+  }
+
+  @After
+  public void cleanup() {
+    sharedExecutor.shutdownNow();
+  }
 
   @Test (timeout = 10000)
   public void testNumParallelScheduledFetchers() throws IOException, InterruptedException
{
@@ -106,6 +122,27 @@ public class TestShuffleScheduler {
     }
   }
 
+  @Test(timeout=5000)
+  public void testUseSharedExecutor() throws Exception {
+    InputContext inputContext = createTezInputContext();
+    Configuration conf = new TezConfiguration();
+    int numInputs = 10;
+    Shuffle shuffle = mock(Shuffle.class);
+    MergeManager mergeManager = mock(MergeManager.class);
+
+    ShuffleSchedulerForTest scheduler = new ShuffleSchedulerForTest(inputContext, conf, numInputs,
+        shuffle, mergeManager, mergeManager, System.currentTimeMillis(), null, false, 0,
"srcName");
+    verify(inputContext, times(0)).createTezFrameworkExecutorService(anyInt(), anyString());
+    scheduler.close();
+
+    inputContext = createTezInputContext();
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
true);
+    scheduler = new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
+        mergeManager, System.currentTimeMillis(), null, false, 0, "srcName");
+    verify(inputContext).createTezFrameworkExecutorService(anyInt(), anyString());
+    scheduler.close();
+  }
+
   @Test(timeout = 5000)
   public void testSimpleFlow() throws Exception {
     InputContext inputContext = createTezInputContext();
@@ -909,6 +946,15 @@ public class TestShuffleScheduler {
         new JobTokenSecretManager());
     ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken);
     doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString());
+    when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
+        new Answer<ExecutorService>() {
+          @Override
+          public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
+            return sharedExecutor.createExecutorService(
+                invocation.getArgumentAt(0, Integer.class),
+                invocation.getArgumentAt(1, String.class));
+          }
+        });
     return inputContext;
   }
 
@@ -948,9 +994,9 @@ public class TestShuffleScheduler {
     FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
       numFetchersCreated.incrementAndGet();
       FetcherOrderedGrouped mockFetcher = mock(FetcherOrderedGrouped.class);
-      doAnswer(new Answer() {
+      doAnswer(new Answer<Void>() {
         @Override
-        public Object answer(InvocationOnMock invocation) throws Throwable {
+        public Void answer(InvocationOnMock invocation) throws Throwable {
           if (fetcherShouldWait) {
             Thread.sleep(100000l);
           }


Mime
View raw message