Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9A812200CB6 for ; Wed, 24 May 2017 23:07:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 99433160BE3; Wed, 24 May 2017 21:07:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 759F4160BDD for ; Wed, 24 May 2017 23:07:37 +0200 (CEST) Received: (qmail 22511 invoked by uid 500); 24 May 2017 21:07:36 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 21130 invoked by uid 99); 24 May 2017 21:07:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 May 2017 21:07:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 93066F4A62; Wed, 24 May 2017 21:07:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeagles@apache.org To: commits@tez.apache.org Date: Wed, 24 May 2017 21:08:19 -0000 Message-Id: <50093f9c5bd2430d8454d5a68ae6d8ce@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [50/50] [abbrv] tez git commit: Merge branch 'master' into TEZ-3334-MERGE1 archived-at: Wed, 24 May 2017 21:07:38 -0000 Merge branch 'master' into TEZ-3334-MERGE1 Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/87f62a35 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/87f62a35 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/87f62a35 Branch: refs/heads/master Commit: 87f62a35b984c7eed8da7c4b040ecb00d643a2ef Parents: 7a17881 788c1ad Author: Jonathan Eagles Authored: Wed May 24 15:56:02 2017 -0500 Committer: Jonathan Eagles Committed: Wed May 24 15:56:02 2017 -0500 ---------------------------------------------------------------------- .../apache/tez/hadoop/shim/HadoopShim28.java | 16 ++++++ .../tez/hadoop/shim/TestHadoopShim28.java | 60 ++++++++++++++++++++ .../org/apache/tez/hadoop/shim/HadoopShim.java | 7 ++- .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../tez/dag/app/rm/TaskSchedulerManager.java | 10 +++- .../dag/app/rm/TestTaskSchedulerHelpers.java | 3 +- .../dag/app/rm/TestTaskSchedulerManager.java | 13 +++-- .../library/api/TezRuntimeConfiguration.java | 8 +++ .../FairCartesianProductVertexManager.java | 50 ++++++++-------- .../common/shuffle/impl/ShuffleManager.java | 17 ++++-- .../orderedgrouped/ShuffleScheduler.java | 14 +++-- .../library/input/OrderedGroupedKVInput.java | 1 + .../runtime/library/input/UnorderedKVInput.java | 1 + .../TestFairCartesianProductVertexManager.java | 12 ++-- .../impl/TestShuffleInputEventHandlerImpl.java | 36 +++++++++++- .../common/shuffle/impl/TestShuffleManager.java | 53 +++++++++++++++-- .../shuffle/orderedgrouped/TestShuffle.java | 32 ++++++++++- ...tShuffleInputEventHandlerOrderedGrouped.java | 26 ++++++++- .../orderedgrouped/TestShuffleScheduler.java | 50 +++++++++++++++- 19 files changed, 346 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 1a8161d,0e05bd8..c42ffda --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@@ -54,8 -54,8 +54,9 @@@ import com.google.common.util.concurren import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.security.JobTokenSecretManager; @@@ -376,12 -372,18 +377,18 @@@ class ShuffleScheduler this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); SecretKey jobTokenSecret = ShuffleUtils .getJobTokenSecretFromTokenBytes(inputContext - .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID)); + .getServiceConsumerMetaData(auxiliaryService)); this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret); - ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build()); + final ExecutorService fetcherRawExecutor; + if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) { + fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers, + "Fetcher_O {" + srcNameTrimmed + "} #%d"); + } else { + fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build()); + } this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5); http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index c934f6c,d8f2e25..683422b --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@@ -48,10 -52,8 +53,9 @@@ import org.apache.tez.dag.api.TezConfig import org.apache.tez.dag.api.TezConstants; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; - import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; @@@ -169,9 -188,17 +190,18 @@@ public class TestShuffleInputEventHandl doReturn(new TezCounters()).when(inputContext).getCounters(); doReturn("sourceVertex").when(inputContext).getSourceVertexName(); doReturn(shuffleMetaData).when(inputContext) - .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); doReturn(executionContext).when(inputContext).getExecutionContext(); + when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer( + new Answer() { + @Override + public ExecutorService answer(InvocationOnMock invocation) throws Throwable { + return sharedExecutor.createExecutorService( + invocation.getArgumentAt(0, Integer.class), + invocation.getArgumentAt(1, String.class)); + } + }); return inputContext; } @@@ -183,11 -209,11 +212,12 @@@ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, inputContext.getWorkDirs()); DataOutputBuffer out = new DataOutputBuffer(); - Token token = new Token(new JobTokenIdentifier(), new JobTokenSecretManager(null)); + Token token = new Token(new JobTokenIdentifier(), + new JobTokenSecretManager(null)); token.write(out); doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).getServiceConsumerMetaData( - TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID); + conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); ShuffleManager realShuffleManager = new ShuffleManager(inputContext, conf, 2, http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java index b3b8688,a6a5274..f361dc7 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java @@@ -145,9 -165,17 +166,18 @@@ public class TestShuffleManager doReturn(new TezCounters()).when(inputContext).getCounters(); doReturn("sourceVertex").when(inputContext).getSourceVertexName(); doReturn(shuffleMetaData).when(inputContext) - .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); doReturn(executionContext).when(inputContext).getExecutionContext(); + when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer( + new Answer() { + @Override + public ExecutorService answer(InvocationOnMock invocation) throws Throwable { + return sharedExecutor.createExecutorService( + invocation.getArgumentAt(0, Integer.class), + invocation.getArgumentAt(1, String.class)); + } + }); return inputContext; } http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index ff5ceab,384a982..cf4ff6a --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@@ -16,12 -18,13 +18,15 @@@ import org.apache.tez.runtime.api.Input import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; + import org.junit.After; import org.junit.Before; import org.junit.Test; + import org.mockito.invocation.InvocationOnMock; + import org.mockito.stubbing.Answer; import java.io.IOException; import java.nio.ByteBuffer; http://git-wip-us.apache.org/repos/asf/tez/blob/87f62a35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index 52db21b,7b30bf3..c61391c --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@@ -49,8 -52,9 +52,10 @@@ import org.apache.tez.runtime.api.Execu import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; + import org.junit.After; + import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer;