tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [37/50] [abbrv] tez git commit: TEZ-3682. Pass parameters instead of configuration for changes to support tez shuffle handler (jeagles)
Date Wed, 24 May 2017 21:08:06 GMT
TEZ-3682. Pass parameters instead of configuration for changes to support tez shuffle handler
(jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/287194c2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/287194c2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/287194c2

Branch: refs/heads/master
Commit: 287194c2099a934eb2c0a2520508ef538019fbdf
Parents: c3a7c21
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Thu Apr 6 09:14:49 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Thu Apr 6 09:14:49 2017 -0500

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |  1 +
 .../tez/dag/app/launcher/DagDeleteRunnable.java | 10 +++++-----
 .../dag/app/launcher/DeletionTrackerImpl.java   |  2 +-
 .../app/rm/container/AMContainerHelpers.java    | 10 +++++-----
 .../runtime/library/common/TezRuntimeUtils.java |  6 ++----
 .../library/common/shuffle/ShuffleUtils.java    | 13 +++++-------
 .../common/shuffle/orderedgrouped/Shuffle.java  |  3 ++-
 .../ShuffleInputEventHandlerOrderedGrouped.java |  4 ++--
 .../common/sort/impl/PipelinedSorter.java       |  9 ++++++---
 .../common/sort/impl/dflt/DefaultSorter.java    |  6 +++++-
 .../output/OrderedPartitionedKVOutput.java      |  4 +++-
 .../common/shuffle/TestShuffleUtils.java        | 13 +++++++++---
 ...tShuffleInputEventHandlerOrderedGrouped.java |  3 ++-
 .../common/sort/impl/TestPipelinedSorter.java   | 10 +++++-----
 .../TestUnorderedPartitionedKVWriter.java       | 21 +++++++++++++-------
 15 files changed, 68 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 0fb021e..7ef35c6 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3682. Pass parameters instead of configuration for changes to support tez shuffle handler
   TEZ-3628. Give Tez shuffle handler threads custom names
   TEZ-3621. Optimize the Shuffle Handler content length calculation for keep alive
   TEZ-3620. UnorderedPartitionedKVOutput is missing the shuffle service config in the confKeys
set

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
index 669d539..6d966b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
@@ -18,11 +18,11 @@
 
 package org.apache.tez.dag.app.launcher;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 
 import java.net.URL;
@@ -33,15 +33,15 @@ class DagDeleteRunnable implements Runnable {
   final JobTokenSecretManager jobTokenSecretManager;
   final String tezDefaultComponentName;
   final int shufflePort;
-  final Configuration conf;
+  final HttpConnectionParams httpConnectionParams;
 
   public DagDeleteRunnable(NodeId nodeId, int shufflePort, TezDAGID currentDag,
-                           Configuration conf,
+                           HttpConnectionParams httpConnectionParams,
                            JobTokenSecretManager jobTokenSecretMgr, String tezDefaultComponent)
{
     this.nodeId = nodeId;
     this.shufflePort = shufflePort;
     this.dag = currentDag;
-    this.conf = conf;
+    this.httpConnectionParams = httpConnectionParams;
     this.jobTokenSecretManager = jobTokenSecretMgr;
     this.tezDefaultComponentName = tezDefaultComponent;
   }
@@ -53,7 +53,7 @@ class DagDeleteRunnable implements Runnable {
           nodeId.getHost(), shufflePort,
           dag.getApplicationId().toString(), dag.getId(), false);
       BaseHttpConnection httpConnection = TezRuntimeUtils.getHttpConnection(true, baseURL,
-          TezRuntimeUtils.getHttpConnectionParams(conf), "DAGDelete", jobTokenSecretManager);
+          httpConnectionParams, "DAGDelete", jobTokenSecretManager);
       httpConnection.connect();
       httpConnection.getInputStream();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
index 4a4a4ae..625aabb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
@@ -50,7 +50,7 @@ public class DeletionTrackerImpl extends DeletionTracker {
       //TODO: add check for healthy node
       if (shufflePort != TezRuntimeUtils.INVALID_PORT) {
         DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId,
-            shufflePort, dag, conf, jobTokenSecretManager, this.pluginName);
+            shufflePort, dag, TezRuntimeUtils.getHttpConnectionParams(conf), jobTokenSecretManager,
this.pluginName);
         dagDeleteService.submit(dagDeleteRunnable);
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 51e954d..ba3ecad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -91,11 +91,11 @@ public class AMContainerHelpers {
    * Create the common {@link ContainerLaunchContext} for all attempts.
    *
    * @param applicationACLs
-   * @param conf
+   * @param auxiliaryService
    */
   private static ContainerLaunchContext createCommonContainerLaunchContext(
       Map<ApplicationAccessType, String> applicationACLs,
-      Credentials credentials, Map<String, LocalResource> localResources, Configuration
conf) {
+      Credentials credentials, Map<String, LocalResource> localResources, String auxiliaryService)
{
 
     // Application environment
     Map<String, String> environment = new HashMap<String, String>();
@@ -129,8 +129,6 @@ public class AMContainerHelpers {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Putting shuffle token in serviceData in common CLC");
       }
-      String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-          TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
       serviceData.put(auxiliaryService,
           TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
     } catch (IOException e) {
@@ -161,8 +159,10 @@ public class AMContainerHelpers {
     ContainerLaunchContext commonContainerSpec = null;
     synchronized (commonContainerSpecLock) {
       if (!commonContainerSpecs.containsKey(tezDAGID)) {
+        String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
         commonContainerSpec =
-            createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, conf);
+            createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, auxiliaryService);
         commonContainerSpecs.put(tezDAGID, commonContainerSpec);
       } else {
         commonContainerSpec = commonContainerSpecs.get(tezDAGID);

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index d39d554..8e13c13 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -154,13 +154,11 @@ public class TezRuntimeUtils {
     return partitioner;
   }
 
-  public static TezTaskOutput instantiateTaskOutputManager(
-      Configuration conf, OutputContext outputContext) {
+  public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, OutputContext
outputContext) {
     Class<?> clazz = conf.getClass(Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
         TezTaskOutputFiles.class);
     try {
-      Constructor<?> ctor = clazz.getConstructor(Configuration.class, String
-          .class, int.class);
+      Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class,
int.class);
       ctor.setAccessible(true);
       TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf,
           outputContext.getUniqueIdentifier(),

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 1d644aa..25b9b4f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -265,14 +265,14 @@ public class ShuffleUtils {
    * @param finalMergeEnabled
    * @param isLastEvent
    * @param pathComponent
-   * @param conf
+   * @param auxiliaryService
    * @param deflater
    * @return ByteBuffer
    * @throws IOException
    */
   static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails,
       int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context,
-      int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent,
Configuration conf, Deflater deflater)
+      int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent,
String auxiliaryService, Deflater deflater)
       throws IOException {
     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
         .newBuilder();
@@ -301,9 +301,6 @@ public class ShuffleUtils {
 
     if (!sendEmptyPartitionDetails || outputGenerated) {
       String host = context.getExecutionContext().getHostName();
-      String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-          TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
-
       ByteBuffer shuffleMetadata = context
           .getServiceProviderMetaData(auxiliaryService);
       int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
@@ -391,13 +388,13 @@ public class ShuffleUtils {
    * @param numPhysicalOutputs
    * @param pathComponent
    * @param partitionStats
-   * @param conf
+   * @param auxiliaryService
    * @throws IOException
    */
   public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled,
       boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord,
       int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent,
-      @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, Configuration
conf, Deflater deflater)
+      @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, String auxiliaryService,
Deflater deflater)
       throws IOException {
     Preconditions.checkArgument(eventList != null, "EventList can't be null");
 
@@ -415,7 +412,7 @@ public class ShuffleUtils {
 
     ByteBuffer payload = generateDMEPayload(sendEmptyPartitionDetails, numPhysicalOutputs,
         spillRecord, context, spillId,
-        finalMergeEnabled, isLastEvent, pathComponent, conf, deflater);
+        finalMergeEnabled, isLastEvent, pathComponent, auxiliaryService, deflater);
 
     if (finalMergeEnabled || isLastEvent) {
       VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats,

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index b3d8a6f..f787c59 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.tez.runtime.api.TaskFailureType;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -188,7 +189,7 @@ public class Shuffle implements ExceptionReporter {
     eventHandler= new ShuffleInputEventHandlerOrderedGrouped(
         inputContext,
         scheduler,
-        conf);
+        ShuffleUtils.isTezShuffleHandler(conf));
     
     ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build());

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index fda899f..116098f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -61,10 +61,10 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
 
   public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext,
                                                 ShuffleScheduler scheduler,
-                                                Configuration conf) {
+                                                boolean compositeFetch) {
     this.inputContext = inputContext;
     this.scheduler = scheduler;
-    this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
+    this.compositeFetch = compositeFetch;
     this.inflater = TezCommonUtils.newInflater();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 5203851..755a131 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,6 +117,7 @@ public class PipelinedSorter extends ExternalSorter {
   private final int MIN_BLOCK_SIZE;
   private final boolean lazyAllocateMem;
   private final Deflater deflater;
+  private final String auxiliaryService;
 
   // TODO Set additional countesr - total bytes written, spills etc.
 
@@ -155,7 +157,8 @@ public class PipelinedSorter extends ExternalSorter {
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
 
     pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle;
-
+    auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
     //sanity checks
     final long sortmb = this.availableMemoryMb;
 
@@ -354,7 +357,7 @@ public class PipelinedSorter extends ExternalSorter {
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false,
         outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1),
         partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
-        reportDetailedPartitionStats(), this.conf, deflater);
+        reportDetailedPartitionStats(), auxiliaryService, deflater);
     outputContext.sendEvents(events);
     LOG.info(outputContext.getDestinationVertexName() +
         ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
@@ -677,7 +680,7 @@ public class PipelinedSorter extends ExternalSorter {
           ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
               outputContext, i, indexCacheList.get(i), partitions,
               sendEmptyPartitionDetails, pathComponent, partitionStats,
-              reportDetailedPartitionStats(), this.conf, deflater);
+              reportDetailedPartitionStats(), auxiliaryService, deflater);
           LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill
(final update=" + isLastEvent + "), spillId=" + i);
         }
         outputContext.sendEvents(events);

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 8ff1c99..a6f7cf7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -32,6 +32,7 @@ import java.util.zip.Deflater;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,6 +116,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
   volatile boolean spillThreadRunning = false;
   final SpillThread spillThread = new SpillThread();
   private final Deflater deflater;
+  private final String auxiliaryService;
 
   final ArrayList<TezSpillRecord> indexCacheList =
     new ArrayList<TezSpillRecord>();
@@ -153,6 +155,8 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
           TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " does not work
"
           + "with DefaultSorter. It is supported only with PipelinedSorter.");
     }
+    auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
 
     // buffers and accounting
     int maxMemUsage = sortmb << 20;
@@ -1137,7 +1141,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
     String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
         outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
-        partitionStats, reportDetailedPartitionStats(), this.conf, deflater);
+        partitionStats, reportDetailedPartitionStats(), auxiliaryService, deflater);
 
     LOG.info(outputContext.getDestinationVertexName() + ": " +
         "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 6b14f8d..98e14be 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -201,10 +201,12 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput
{
     List<Event> eventList = Lists.newLinkedList();
     if (finalMergeEnabled && !pipelinedShuffle) {
       boolean isLastEvent = true;
+      String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+          TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
       ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent,
           getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf),
           getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(),
-          sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), this.conf, deflater);
+          sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), auxiliaryService,
deflater);
     }
     return eventList;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index ec19f67..4a28224 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -163,9 +163,12 @@ public class TestShuffleUtils {
     int spillId = 0;
     int physicalOutputs = 10;
     String pathComponent = "/attempt_x_y_0/file.out";
+    String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
     ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false, this.conf, TezCommonUtils.newBestCompressionDeflater());
+            physicalOutputs, true, pathComponent, null, false, auxiliaryService, TezCommonUtils.newBestCompressionDeflater());
 
     Assert.assertTrue(events.size() == 1);
     Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
@@ -200,11 +203,13 @@ public class TestShuffleUtils {
     int spillId = 0;
     int physicalOutputs = 10;
     String pathComponent = "/attempt_x_y_0/file.out";
+    String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
 
     //normal code path where we do final merge all the time
     ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false, this.conf, TezCommonUtils.newBestCompressionDeflater());
+            physicalOutputs, true, pathComponent, null, false, auxiliaryService, TezCommonUtils.newBestCompressionDeflater());
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
@@ -241,11 +246,13 @@ public class TestShuffleUtils {
     int spillId = 0;
     int physicalOutputs = 10;
     String pathComponent = "/attempt_x_y_0/file.out";
+    String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
 
     //normal code path where we do final merge all the time
     ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false, this.conf, TezCommonUtils.newBestCompressionDeflater());
+            physicalOutputs, true, pathComponent, null, false, auxiliaryService, TezCommonUtils.newBestCompressionDeflater());
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 1d4afde..72cba80 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -18,6 +18,7 @@ import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.Before;
 import org.junit.Test;
@@ -154,7 +155,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         0,
         "src vertex");
     scheduler = spy(realScheduler);
-    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, config);
+    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, ShuffleUtils.isTezShuffleHandler(config));
     mergeManager = mock(MergeManager.class);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index c3f8dda..454cf22 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -100,7 +100,9 @@ public class TestPipelinedSorter {
     ApplicationId appId = ApplicationId.newInstance(10000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
-    this.outputContext = createMockOutputContext(counters, appId, uniqueId, getConf());
+    String auxiliaryService = getConf().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    this.outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
   }
 
   public static Configuration getConf() {
@@ -754,7 +756,7 @@ public class TestPipelinedSorter {
   }
 
   private static OutputContext createMockOutputContext(TezCounters counters, ApplicationId
appId,
-      String uniqueId, Configuration conf) throws IOException {
+      String uniqueId, String auxiliaryService) throws IOException {
     OutputContext outputContext = mock(OutputContext.class);
 
     ExecutionContext execContext = new ExecutionContextImpl("localhost");
@@ -762,9 +764,7 @@ public class TestPipelinedSorter {
     DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
     serviceProviderMetaData.writeInt(80);
     doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext)
-        .getServiceProviderMetaData
-            (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-                TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
+        .getServiceProviderMetaData(auxiliaryService);
 
     doReturn(execContext).when(outputContext).getExecutionContext();
     doReturn(mock(OutputStatisticsReporter.class)).when(outputContext).getStatisticsReporter();

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 031b44d..07feb20 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -162,7 +162,9 @@ public class TestUnorderedPartitionedKVWriter {
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
-    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
+    String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
 
     int maxSingleBufferSizeBytes = 2047;
     Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
@@ -259,7 +261,9 @@ public class TestUnorderedPartitionedKVWriter {
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
     int dagId = 1;
-    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
+    String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
     Random random = new Random();
 
     Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress,
@@ -528,7 +532,9 @@ public class TestUnorderedPartitionedKVWriter {
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
     int dagId = 1;
-    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
+    String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
 
     Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
         shouldCompress, -1);
@@ -713,7 +719,9 @@ public class TestUnorderedPartitionedKVWriter {
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
     int dagId = 1;
-    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
+    String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
 
     Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
         shouldCompress, -1);
@@ -904,7 +912,7 @@ public class TestUnorderedPartitionedKVWriter {
   }
 
   private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
-      String uniqueId, Configuration conf) {
+      String uniqueId, String auxiliaryService) {
     OutputContext outputContext = mock(OutputContext.class);
     doReturn(counters).when(outputContext).getCounters();
     doReturn(appId).when(outputContext).getApplicationId();
@@ -927,8 +935,7 @@ public class TestUnorderedPartitionedKVWriter {
         portBuffer.reset();
         return portBuffer;
       }
-    }).when(outputContext).getServiceProviderMetaData(eq(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)));
+    }).when(outputContext).getServiceProviderMetaData(eq(auxiliaryService));
 
     Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId);
     String[] outDirs = new String[] { outDirBase.toString() };


Mime
View raw message