tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [50/50] [abbrv] tez git commit: Merge branch 'master' into TEZ-3334-MERGE1
Date Wed, 24 May 2017 21:08:19 GMT
Merge branch 'master' into TEZ-3334-MERGE1


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/87f62a35
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/87f62a35
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/87f62a35

Branch: refs/heads/master
Commit: 87f62a35b984c7eed8da7c4b040ecb00d643a2ef
Parents: 7a17881 788c1ad
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Wed May 24 15:56:02 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Wed May 24 15:56:02 2017 -0500

----------------------------------------------------------------------
 .../apache/tez/hadoop/shim/HadoopShim28.java    | 16 ++++++
 .../tez/hadoop/shim/TestHadoopShim28.java       | 60 ++++++++++++++++++++
 .../org/apache/tez/hadoop/shim/HadoopShim.java  |  7 ++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  2 +-
 .../tez/dag/app/rm/TaskSchedulerManager.java    | 10 +++-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |  3 +-
 .../dag/app/rm/TestTaskSchedulerManager.java    | 13 +++--
 .../library/api/TezRuntimeConfiguration.java    |  8 +++
 .../FairCartesianProductVertexManager.java      | 50 ++++++++--------
 .../common/shuffle/impl/ShuffleManager.java     | 17 ++++--
 .../orderedgrouped/ShuffleScheduler.java        | 14 +++--
 .../library/input/OrderedGroupedKVInput.java    |  1 +
 .../runtime/library/input/UnorderedKVInput.java |  1 +
 .../TestFairCartesianProductVertexManager.java  | 12 ++--
 .../impl/TestShuffleInputEventHandlerImpl.java  | 36 +++++++++++-
 .../common/shuffle/impl/TestShuffleManager.java | 53 +++++++++++++++--
 .../shuffle/orderedgrouped/TestShuffle.java     | 32 ++++++++++-
 ...tShuffleInputEventHandlerOrderedGrouped.java | 26 ++++++++-
 .../orderedgrouped/TestShuffleScheduler.java    | 50 +++++++++++++++-
 19 files changed, 346 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 1a8161d,0e05bd8..c42ffda
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@@ -54,8 -54,8 +54,9 @@@ import com.google.common.util.concurren
  import com.google.common.util.concurrent.ListeningExecutorService;
  import com.google.common.util.concurrent.MoreExecutors;
  import com.google.common.util.concurrent.ThreadFactoryBuilder;
+ 
  import org.apache.hadoop.io.compress.CompressionCodec;
 +import org.apache.tez.dag.api.TezConfiguration;
  import org.apache.tez.http.HttpConnectionParams;
  import org.apache.tez.common.CallableWithNdc;
  import org.apache.tez.common.security.JobTokenSecretManager;
@@@ -376,12 -372,18 +377,18 @@@ class ShuffleScheduler 
      this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
      SecretKey jobTokenSecret = ShuffleUtils
          .getJobTokenSecretFromTokenBytes(inputContext
 -            .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
 +            .getServiceConsumerMetaData(auxiliaryService));
      this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret);
  
-     ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
-         new ThreadFactoryBuilder().setDaemon(true)
-             .setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build());
+     final ExecutorService fetcherRawExecutor;
+     if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
+         TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) {
+       fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers,
+           "Fetcher_O {" + srcNameTrimmed + "} #%d");
+     } else {
+       fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder()
+           .setDaemon(true).setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build());
+     }
      this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
  
      this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);

http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index c934f6c,d8f2e25..683422b
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@@ -48,10 -52,8 +53,9 @@@ import org.apache.tez.dag.api.TezConfig
  import org.apache.tez.dag.api.TezConstants;
  import org.apache.tez.runtime.api.Event;
  import org.apache.tez.runtime.api.ExecutionContext;
- import org.apache.tez.runtime.api.TaskFailureType;
  import org.apache.tez.runtime.api.InputContext;
  import org.apache.tez.runtime.api.events.DataMovementEvent;
 +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
  import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
  import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
  import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@@ -169,9 -188,17 +190,18 @@@ public class TestShuffleInputEventHandl
      doReturn(new TezCounters()).when(inputContext).getCounters();
      doReturn("sourceVertex").when(inputContext).getSourceVertexName();
      doReturn(shuffleMetaData).when(inputContext)
 -        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
 +        .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
 +            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
      doReturn(executionContext).when(inputContext).getExecutionContext();
+     when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
+         new Answer<ExecutorService>() {
+           @Override
+           public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
+             return sharedExecutor.createExecutorService(
+                 invocation.getArgumentAt(0, Integer.class),
+                 invocation.getArgumentAt(1, String.class));
+           }
+         });
      return inputContext;
    }
  
@@@ -183,11 -209,11 +212,12 @@@
      conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, inputContext.getWorkDirs());
  
      DataOutputBuffer out = new DataOutputBuffer();
-     Token<JobTokenIdentifier> token = new Token(new JobTokenIdentifier(), new JobTokenSecretManager(null));
+     Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(new JobTokenIdentifier(),
+         new JobTokenSecretManager(null));
      token.write(out);
      doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).getServiceConsumerMetaData(
 -        TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID);
 +        conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
 +            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
  
      FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
      ShuffleManager realShuffleManager = new ShuffleManager(inputContext, conf, 2,

http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index b3b8688,a6a5274..f361dc7
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@@ -145,9 -165,17 +166,18 @@@ public class TestShuffleManager 
      doReturn(new TezCounters()).when(inputContext).getCounters();
      doReturn("sourceVertex").when(inputContext).getSourceVertexName();
      doReturn(shuffleMetaData).when(inputContext)
 -        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
 +        .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
 +            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
      doReturn(executionContext).when(inputContext).getExecutionContext();
+     when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
+         new Answer<ExecutorService>() {
+           @Override
+           public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
+             return sharedExecutor.createExecutorService(
+                 invocation.getArgumentAt(0, Integer.class),
+                 invocation.getArgumentAt(1, String.class));
+           }
+         });
      return inputContext;
    }
  

http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index ff5ceab,384a982..cf4ff6a
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@@ -16,12 -18,13 +18,15 @@@ import org.apache.tez.runtime.api.Input
  import org.apache.tez.runtime.api.events.DataMovementEvent;
  import org.apache.tez.runtime.api.events.InputFailedEvent;
  import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
  import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
  import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+ import org.junit.After;
  import org.junit.Before;
  import org.junit.Test;
+ import org.mockito.invocation.InvocationOnMock;
+ import org.mockito.stubbing.Answer;
  
  import java.io.IOException;
  import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 52db21b,7b30bf3..c61391c
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@@ -49,8 -52,9 +52,10 @@@ import org.apache.tez.runtime.api.Execu
  import org.apache.tez.runtime.api.InputContext;
  import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
  import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
  import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+ import org.junit.After;
+ import org.junit.Before;
  import org.junit.Test;
  import org.mockito.invocation.InvocationOnMock;
  import org.mockito.stubbing.Answer;


Mime
View raw message