tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [03/50] [abbrv] tez git commit: TEZ-3238. TEZ-3238. Shuffle service name should be configureable and should not be hardcoded to ‘mapreduce_shuffle’ (jeagles)
Date Wed, 24 May 2017 21:07:32 GMT
TEZ-3238. TEZ-3238. Shuffle service name should be configureable and should not be hardcoded to ‘mapreduce_shuffle’ (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/53ea6f5b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/53ea6f5b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/53ea6f5b

Branch: refs/heads/master
Commit: 53ea6f5b3adfa71f3c235467a50757c72528d653
Parents: 397d6af
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Fri Aug 12 13:11:40 2016 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Fri Aug 12 13:11:40 2016 -0500

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |  1 +
 .../org/apache/tez/client/TezClientUtils.java   |  4 +++-
 .../apache/tez/dag/api/TezConfiguration.java    | 10 ++++++++++
 .../org/apache/tez/dag/api/TezConstants.java    |  6 ------
 .../app/launcher/LocalContainerLauncher.java    | 16 ++++++++-------
 .../app/rm/container/AMContainerHelpers.java    |  9 ++++++---
 .../tez/service/impl/ContainerRunnerImpl.java   |  8 +++++---
 .../apache/tez/service/impl/TezTestService.java |  5 ++++-
 .../tez/mapreduce/processor/MapUtils.java       |  7 +++++--
 .../processor/reduce/TestReduceProcessor.java   |  7 +++++--
 .../org/apache/tez/runtime/task/TezChild.java   |  4 +++-
 .../library/common/shuffle/ShuffleUtils.java    | 21 ++++++++++++--------
 .../common/shuffle/impl/ShuffleManager.java     |  9 ++++++---
 .../orderedgrouped/ShuffleScheduler.java        |  7 +++++--
 .../common/sort/impl/PipelinedSorter.java       |  4 ++--
 .../common/sort/impl/dflt/DefaultSorter.java    |  2 +-
 .../writers/UnorderedPartitionedKVWriter.java   |  5 ++++-
 .../output/OrderedPartitionedKVOutput.java      |  2 +-
 .../common/shuffle/TestShuffleUtils.java        | 12 ++++++-----
 .../impl/TestShuffleInputEventHandlerImpl.java  |  7 +++++--
 .../common/shuffle/impl/TestShuffleManager.java |  7 +++++--
 .../common/sort/impl/TestPipelinedSorter.java   |  8 +++++---
 .../sort/impl/dflt/TestDefaultSorter.java       |  4 +++-
 .../TestUnorderedPartitionedKVWriter.java       | 15 ++++++++------
 .../library/output/TestOnFileSortedOutput.java  |  4 +++-
 .../output/TestOnFileUnorderedKVOutput.java     |  4 +++-
 26 files changed, 123 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index ef0a1cb..2122cca 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3238. Shuffle service name should be configureable and should not be hardcoded to ‘mapreduce_shuffle’
   TEZ-3390. Package Shuffle Handler as a shaded uber-jar
   TEZ-3378. Move Shuffle Handler configuration into the Tez namespace
   TEZ-3377. Remove ShuffleHandler dependency on mapred.FadvisedChunkedFile and mapred.FadvisedFileRegion

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index eb1a95e..fbe36f1 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -692,7 +692,9 @@ public class TezClientUtils {
     // provide this to AuxServices running on the AM node - in case tasks run within the AM,
     // and no other task runs on this node.
     Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
-    serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+    String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    serviceData.put(auxiliaryService,
         TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(amLaunchCredentials)));
 
     // Setup ContainerLaunchContext for AM container

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 11c50cf..732fee9 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -138,6 +138,16 @@ public class TezConfiguration extends Configuration {
   public static final boolean TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT = true;
 
   /**
+   * String value. Specifies the name of the shuffle auxiliary service.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty
+  public static final String TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID = TEZ_AM_PREFIX +
+      "shuffle.auxiliary-service.id";
+  public static final String TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT =
+      "mapreduce_shuffle";
+
+  /**
    * String value. Specifies a directory where Tez can create temporary job artifacts.
    */
   @ConfigurationScope(Scope.AM)

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index 43f09a2..31c1e66 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -69,12 +69,6 @@ public class TezConstants {
     TezConfiguration.TEZ_SESSION_PREFIX + "local-resources.pb";
   
   public static final String TEZ_APPLICATION_TYPE = "TEZ";
-
-  /**
-   * The service id for the NodeManager plugin used to share intermediate data
-   * between vertices.
-   */
-  public static final String TEZ_SHUFFLE_HANDLER_SERVICE_ID = "tez_shuffle";
   
   public static final String TEZ_PREWARM_DAG_NAME_PREFIX = "TezPreWarmDAG";
 

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 1e9d1e6..153b2e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -118,13 +118,6 @@ public class LocalContainerLauncher extends ContainerLauncher {
     this.tal = taskCommunicatorManagerInterface;
     this.workingDirectory = workingDirectory;
     this.isLocalMode = isLocalMode;
-    if (isLocalMode) {
-      localEnv = Maps.newHashMap();
-      AuxiliaryServiceHelper.setServiceDataIntoEnv(
-          ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
-    } else {
-      localEnv = System.getenv();
-    }
 
     // Check if the hostname is set in the environment before overriding it.
     String host = isLocalMode ? InetAddress.getLocalHost().getHostName() :
@@ -138,6 +131,15 @@ public class LocalContainerLauncher extends ContainerLauncher {
       throw new TezUncheckedException(
           "Failed to parse user payload for " + LocalContainerLauncher.class.getSimpleName(), e);
     }
+    if (isLocalMode) {
+      String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+          TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+      localEnv = Maps.newHashMap();
+      AuxiliaryServiceHelper.setServiceDataIntoEnv(
+          auxiliaryService, ByteBuffer.allocate(4).putInt(0), localEnv);
+    } else {
+      localEnv = System.getenv();
+    }
     numExecutors = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
         TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
     Preconditions.checkState(numExecutors >=1, "Must have at least 1 executor");

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 11b5006..51e954d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -91,10 +91,11 @@ public class AMContainerHelpers {
    * Create the common {@link ContainerLaunchContext} for all attempts.
    *
    * @param applicationACLs
+   * @param conf
    */
   private static ContainerLaunchContext createCommonContainerLaunchContext(
       Map<ApplicationAccessType, String> applicationACLs,
-      Credentials credentials, Map<String, LocalResource> localResources) {
+      Credentials credentials, Map<String, LocalResource> localResources, Configuration conf) {
 
     // Application environment
     Map<String, String> environment = new HashMap<String, String>();
@@ -128,7 +129,9 @@ public class AMContainerHelpers {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Putting shuffle token in serviceData in common CLC");
       }
-      serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+      String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+          TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+      serviceData.put(auxiliaryService,
           TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
     } catch (IOException e) {
       throw new TezUncheckedException(e);
@@ -159,7 +162,7 @@ public class AMContainerHelpers {
     synchronized (commonContainerSpecLock) {
       if (!commonContainerSpecs.containsKey(tezDAGID)) {
         commonContainerSpec =
-            createCommonContainerLaunchContext(acls, credentials, commonDAGLRs);
+            createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, conf);
         commonContainerSpecs.put(tezDAGID, commonContainerSpec);
       } else {
         commonContainerSpec = commonContainerSpecs.get(tezDAGID);

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index f9de995..b3d6d54 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -132,9 +132,9 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
   public void serviceStart() {
   }
 
-  public void setShufflePort(int shufflePort) {
+  public void setShufflePort(String auxiliaryService, int shufflePort) {
     AuxiliaryServiceHelper.setServiceDataIntoEnv(
-        TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+        auxiliaryService,
         ByteBuffer.allocate(4).putInt(shufflePort), localEnv);
   }
 
@@ -417,7 +417,9 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
 
       Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
       Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
-      serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+      String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+          TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+      serviceConsumerMetadata.put(auxiliaryService,
           TezCommonUtils.convertJobTokenToBytes(jobToken));
       Multimap<String, String> startedInputsMap = HashMultimap.create();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
index 322be00..8d9436e 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.service.ContainerRunner;
 import org.apache.tez.shufflehandler.ShuffleHandler;
@@ -86,7 +87,9 @@ public class TezTestService extends AbstractService implements ContainerRunner {
   @Override
   public void serviceStart() throws Exception {
     ShuffleHandler.initializeAndStart(shuffleHandlerConf);
-    containerRunner.setShufflePort(ShuffleHandler.get().getPort());
+    String auxiliaryService = getConfig().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    containerRunner.setShufflePort(auxiliaryService, ShuffleHandler.get().getPort());
     server.start();
     containerRunner.start();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 8309966..47ce377 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -216,12 +217,14 @@ public class MapUtils {
         outputSpecs, null, null);
 
     Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
-    serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+    String auxiliaryService = jobConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    serviceConsumerMetadata.put(auxiliaryService,
         ShuffleUtils.convertJobTokenToBytes(shuffleToken));
     Map<String, String> envMap = new HashMap<String, String>();
     ByteBuffer shufflePortBb = ByteBuffer.allocate(4).putInt(0, 8000);
     AuxiliaryServiceHelper
-        .setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, shufflePortBb,
+        .setServiceDataIntoEnv(auxiliaryService, shufflePortBb,
             envMap);
 
     LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 1922c53..043bd94 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -208,12 +209,14 @@ public class TestReduceProcessor {
         Collections.singletonList(reduceOutputSpec), null, null);
 
     Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
-    serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+    String auxiliaryService = jobConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    serviceConsumerMetadata.put(auxiliaryService,
         ShuffleUtils.convertJobTokenToBytes(shuffleToken));
     Map<String, String> serviceProviderEnvMap = new HashMap<String, String>();
     ByteBuffer shufflePortBb = ByteBuffer.allocate(4).putInt(0, 8000);
     AuxiliaryServiceHelper
-        .setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, shufflePortBb,
+        .setServiceDataIntoEnv(auxiliaryService, shufflePortBb,
             serviceProviderEnvMap);
 
     LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 07810d9..3ee89cf 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -175,7 +175,9 @@ public class TezChild {
     UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier);
     Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
 
-    serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+    String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    serviceConsumerMetadata.put(auxiliaryService,
         TezCommonUtils.convertJobTokenToBytes(jobToken));
 
     if (umbilical == null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index e194298..fa8533c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -39,6 +39,7 @@ import com.google.protobuf.ByteString;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.http.BaseHttpConnection;
 import org.apache.tez.http.HttpConnection;
 import org.apache.tez.http.HttpConnectionParams;
@@ -73,7 +74,6 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DetailedP
 public class ShuffleUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
-  public static final String SHUFFLE_HANDLER_SERVICE_ID = "tez_shuffle";
   private static final long MB = 1024l * 1024l;
 
   //Shared by multiple threads
@@ -278,12 +278,13 @@ public class ShuffleUtils {
    * @param finalMergeEnabled
    * @param isLastEvent
    * @param pathComponent
+   * @param conf
    * @return ByteBuffer
    * @throws IOException
    */
   static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails,
-      int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context,
-      int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent)
+                                       int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context,
+                                       int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, Configuration conf)
       throws IOException {
     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
         .newBuilder();
@@ -312,8 +313,11 @@ public class ShuffleUtils {
 
     if (!sendEmptyPartitionDetails || outputGenerated) {
       String host = context.getExecutionContext().getHostName();
+      String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+          TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
       ByteBuffer shuffleMetadata = context
-          .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+          .getServiceProviderMetaData(auxiliaryService);
       int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
       payloadBuilder.setHost(host);
       payloadBuilder.setPort(shufflePort);
@@ -398,12 +402,13 @@ public class ShuffleUtils {
    * @param numPhysicalOutputs
    * @param pathComponent
    * @param partitionStats
+   * @param conf
    * @throws IOException
    */
   public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled,
-      boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord,
-      int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent,
-      @Nullable long[] partitionStats, boolean reportDetailedPartitionStats)
+                                          boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord,
+                                          int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent,
+                                          @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, Configuration conf)
       throws IOException {
     Preconditions.checkArgument(eventList != null, "EventList can't be null");
 
@@ -421,7 +426,7 @@ public class ShuffleUtils {
 
     ByteBuffer payload = generateDMEPayload(sendEmptyPartitionDetails, numPhysicalOutputs,
         spillRecord, context, spillId,
-        finalMergeEnabled, isLastEvent, pathComponent);
+        finalMergeEnabled, isLastEvent, pathComponent, conf);
 
     if (finalMergeEnabled || isLastEvent) {
       VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats,

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index c80713b..a8fea9e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import javax.crypto.SecretKey;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
@@ -244,10 +245,12 @@ public class ShuffleManager implements FetcherCallback {
     
     this.startTime = System.currentTimeMillis();
     this.lastProgressTime = startTime;
-    
+
+    String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
     SecretKey shuffleSecret = ShuffleUtils
         .getJobTokenSecretFromTokenBytes(inputContext
-            .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+            .getServiceConsumerMetaData(auxiliaryService));
     this.jobTokenSecretMgr = new JobTokenSecretManager(shuffleSecret);
     this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
     httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
@@ -261,7 +264,7 @@ public class ShuffleManager implements FetcherCallback {
         localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
     this.localhostName = inputContext.getExecutionContext().getHostName();
     final ByteBuffer shuffleMetaData =
-        inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+        inputContext.getServiceProviderMetaData(auxiliaryService);
     this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetaData);
 
     /**

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index afd280b..a83c301 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -55,6 +55,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -327,8 +328,10 @@ class ShuffleScheduler {
     this.applicationId = inputContext.getApplicationId().toString();
     this.dagId = inputContext.getDagIdentifier();
     this.localHostname = inputContext.getExecutionContext().getHostName();
+    String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
     final ByteBuffer shuffleMetadata =
-        inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+        inputContext.getServiceProviderMetaData(auxiliaryService);
     this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
 
     this.referee = new Referee();
@@ -371,7 +374,7 @@ class ShuffleScheduler {
         this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
     SecretKey jobTokenSecret = ShuffleUtils
         .getJobTokenSecretFromTokenBytes(inputContext
-            .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+            .getServiceConsumerMetaData(auxiliaryService));
     this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret);
 
     ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 897d7d7..e468a55 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -350,7 +350,7 @@ public class PipelinedSorter extends ExternalSorter {
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false,
         outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1),
         partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
-        reportDetailedPartitionStats());
+        reportDetailedPartitionStats(), this.conf);
     outputContext.sendEvents(events);
     LOG.info(outputContext.getDestinationVertexName() +
         ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
@@ -673,7 +673,7 @@ public class PipelinedSorter extends ExternalSorter {
           ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
               outputContext, i, indexCacheList.get(i), partitions,
               sendEmptyPartitionDetails, pathComponent, partitionStats,
-              reportDetailedPartitionStats());
+              reportDetailedPartitionStats(), this.conf);
           LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
         }
         outputContext.sendEvents(events);

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 69bfdb8..21c40e9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -1133,7 +1133,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
     String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
         outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
-        partitionStats, reportDetailedPartitionStats());
+        partitionStats, reportDetailedPartitionStats(), this.conf);
 
     LOG.info(outputContext.getDestinationVertexName() + ": " +
         "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 152096c..760daf5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -52,6 +52,7 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.OutputContext;
@@ -1084,8 +1085,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
   @VisibleForTesting
   int getShufflePort() throws IOException {
+    String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
     ByteBuffer shuffleMetadata = outputContext
-        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+        .getServiceProviderMetaData(auxiliaryService);
     int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
     return shufflePort;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 9a3d778..13e27eb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -200,7 +200,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
       ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent,
           getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf),
           getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(),
-          sorter.getPartitionStats(), sorter.reportDetailedPartitionStats());
+          sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), this.conf);
     }
     return eventList;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 4233f5d..03ddfa5 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -17,6 +17,7 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.OutputContext;
@@ -100,7 +101,8 @@ public class TestShuffleUtils {
     serviceProviderMetaData.writeInt(80);
     doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext)
         .getServiceProviderMetaData
-            (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+            (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+                TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
 
 
     doReturn(1).when(outputContext).getTaskVertexIndex();
@@ -115,8 +117,8 @@ public class TestShuffleUtils {
 
   @Before
   public void setup() throws Exception {
-    outputContext = createTezOutputContext();
     conf = new Configuration();
+    outputContext = createTezOutputContext();
     conf.set("fs.defaultFS", "file:///");
     localFs = FileSystem.getLocal(conf);
 
@@ -163,7 +165,7 @@ public class TestShuffleUtils {
     String pathComponent = "/attempt_x_y_0/file.out";
     ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false);
+            physicalOutputs, true, pathComponent, null, false, this.conf);
 
     Assert.assertTrue(events.size() == 1);
     Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
@@ -202,7 +204,7 @@ public class TestShuffleUtils {
     //normal code path where we do final merge all the time
     ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false);
+            physicalOutputs, true, pathComponent, null, false, this.conf);
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
@@ -243,7 +245,7 @@ public class TestShuffleUtils {
     //normal code path where we do final merge all the time
     ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false);
+            physicalOutputs, true, pathComponent, null, false, this.conf);
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index 6bcbeb6..e085d1a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -44,6 +44,7 @@ import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
@@ -167,7 +168,8 @@ public class TestShuffleInputEventHandlerImpl {
     doReturn(new TezCounters()).when(inputContext).getCounters();
     doReturn("sourceVertex").when(inputContext).getSourceVertexName();
     doReturn(shuffleMetaData).when(inputContext)
-        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+        .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
     doReturn(executionContext).when(inputContext).getExecutionContext();
     return inputContext;
   }
@@ -183,7 +185,8 @@ public class TestShuffleInputEventHandlerImpl {
     Token<JobTokenIdentifier> token = new Token(new JobTokenIdentifier(), new JobTokenSecretManager(null));
     token.write(out);
     doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).getServiceConsumerMetaData(
-        TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID);
+        conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
 
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
     ShuffleManager realShuffleManager = new ShuffleManager(inputContext, conf, 2,

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index a5608ef..34ca13f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -44,6 +44,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
@@ -144,7 +145,8 @@ public class TestShuffleManager {
     doReturn(new TezCounters()).when(inputContext).getCounters();
     doReturn("sourceVertex").when(inputContext).getSourceVertexName();
     doReturn(shuffleMetaData).when(inputContext)
-        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+        .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
     doReturn(executionContext).when(inputContext).getExecutionContext();
     return inputContext;
   }
@@ -165,7 +167,8 @@ public class TestShuffleManager {
     token.write(out);
     doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).
         getServiceConsumerMetaData(
-            TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID);
+            conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+                TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
 
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
     return new ShuffleManagerForTest(inputContext, conf,

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 80e7b14..c3f8dda 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -34,6 +34,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.OutputContext;
@@ -99,7 +100,7 @@ public class TestPipelinedSorter {
     ApplicationId appId = ApplicationId.newInstance(10000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
-    this.outputContext = createMockOutputContext(counters, appId, uniqueId);
+    this.outputContext = createMockOutputContext(counters, appId, uniqueId, getConf());
   }
 
   public static Configuration getConf() {
@@ -753,7 +754,7 @@ public class TestPipelinedSorter {
   }
 
   private static OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
-      String uniqueId) throws IOException {
+      String uniqueId, Configuration conf) throws IOException {
     OutputContext outputContext = mock(OutputContext.class);
 
     ExecutionContext execContext = new ExecutionContextImpl("localhost");
@@ -762,7 +763,8 @@ public class TestPipelinedSorter {
     serviceProviderMetaData.writeInt(80);
     doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext)
         .getServiceProviderMetaData
-            (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+            (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+                TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
 
     doReturn(execContext).when(outputContext).getExecutionContext();
     doReturn(mock(OutputStatisticsReporter.class)).when(outputContext).getStatisticsReporter();

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index e0374a3..73d249c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -49,6 +49,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
@@ -463,7 +464,8 @@ public class TestDefaultSorter {
     doReturn("v1").when(context).getDestinationVertexName();
     doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context)
         .getServiceProviderMetaData
-            (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+            (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+                TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
     doAnswer(new Answer() {
       @Override public Object answer(InvocationOnMock invocation) throws Throwable {
         long requestedSize = (Long) invocation.getArguments()[0];

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 41b2b97..4a0d1d5 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -48,6 +48,7 @@ import java.util.Set;
 import java.util.UUID;
 
 import com.google.protobuf.ByteString;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -117,6 +118,7 @@ public class TestUnorderedPartitionedKVWriter {
 
   private boolean shouldCompress;
   private ReportPartitionStats reportPartitionStats;
+  private Configuration defaultConf = new Configuration();
 
   public TestUnorderedPartitionedKVWriter(boolean shouldCompress,
       ReportPartitionStats reportPartitionStats) {
@@ -160,7 +162,7 @@ public class TestUnorderedPartitionedKVWriter {
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
-    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
 
     int maxSingleBufferSizeBytes = 2047;
     Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
@@ -256,7 +258,7 @@ public class TestUnorderedPartitionedKVWriter {
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
-    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
     Random random = new Random();
 
     Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress,
@@ -524,7 +526,7 @@ public class TestUnorderedPartitionedKVWriter {
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
-    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
 
     Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
         shouldCompress, -1);
@@ -708,7 +710,7 @@ public class TestUnorderedPartitionedKVWriter {
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
-    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
 
     Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
         shouldCompress, -1);
@@ -899,7 +901,7 @@ public class TestUnorderedPartitionedKVWriter {
   }
 
   private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
-      String uniqueId) {
+      String uniqueId, Configuration conf) {
     OutputContext outputContext = mock(OutputContext.class);
     doReturn(counters).when(outputContext).getCounters();
     doReturn(appId).when(outputContext).getApplicationId();
@@ -922,7 +924,8 @@ public class TestUnorderedPartitionedKVWriter {
         portBuffer.reset();
         return portBuffer;
       }
-    }).when(outputContext).getServiceProviderMetaData(eq(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID));
+    }).when(outputContext).getServiceProviderMetaData(eq(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)));
 
     Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId);
     String[] outDirs = new String[] { outDirBase.toString() };

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 93c4f92..7762025 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.Event;
@@ -407,7 +408,8 @@ public class TestOnFileSortedOutput {
     doReturn("v1").when(context).getDestinationVertexName();
     doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context)
         .getServiceProviderMetaData
-            (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+            (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+                TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
     doAnswer(new Answer() {
       @Override public Object answer(InvocationOnMock invocation) throws Throwable {
         long requestedSize = (Long) invocation.getArguments()[0];

http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 38a60a2..bf55cb3 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import com.google.protobuf.ByteString;
 
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -230,7 +231,8 @@ public class TestOnFileUnorderedKVOutput {
     ByteBuffer bb = ByteBuffer.allocate(4);
     bb.putInt(shufflePort);
     bb.position(0);
-    AuxiliaryServiceHelper.setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, bb, auxEnv);
+    AuxiliaryServiceHelper.setServiceDataIntoEnv(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT), bb, auxEnv);
 
 
     OutputDescriptor outputDescriptor = mock(OutputDescriptor.class);


Mime
View raw message