Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5BF51200CB6 for ; Wed, 24 May 2017 23:07:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5AC99160BB6; Wed, 24 May 2017 21:07:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E1D97160BDA for ; Wed, 24 May 2017 23:07:38 +0200 (CEST) Received: (qmail 23699 invoked by uid 500); 24 May 2017 21:07:37 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 21111 invoked by uid 99); 24 May 2017 21:07:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 May 2017 21:07:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 66DD9F4A55; Wed, 24 May 2017 21:07:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeagles@apache.org To: commits@tez.apache.org Date: Wed, 24 May 2017 21:08:06 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [37/50] [abbrv] tez git commit: TEZ-3682. Pass parameters instead of configuration for changes to support tez shuffle handler (jeagles) archived-at: Wed, 24 May 2017 21:07:40 -0000 TEZ-3682. Pass parameters instead of configuration for changes to support tez shuffle handler (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/287194c2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/287194c2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/287194c2 Branch: refs/heads/master Commit: 287194c2099a934eb2c0a2520508ef538019fbdf Parents: c3a7c21 Author: Jonathan Eagles Authored: Thu Apr 6 09:14:49 2017 -0500 Committer: Jonathan Eagles Committed: Thu Apr 6 09:14:49 2017 -0500 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../tez/dag/app/launcher/DagDeleteRunnable.java | 10 +++++----- .../dag/app/launcher/DeletionTrackerImpl.java | 2 +- .../app/rm/container/AMContainerHelpers.java | 10 +++++----- .../runtime/library/common/TezRuntimeUtils.java | 6 ++---- .../library/common/shuffle/ShuffleUtils.java | 13 +++++------- .../common/shuffle/orderedgrouped/Shuffle.java | 3 ++- .../ShuffleInputEventHandlerOrderedGrouped.java | 4 ++-- .../common/sort/impl/PipelinedSorter.java | 9 ++++++--- .../common/sort/impl/dflt/DefaultSorter.java | 6 +++++- .../output/OrderedPartitionedKVOutput.java | 4 +++- .../common/shuffle/TestShuffleUtils.java | 13 +++++++++--- ...tShuffleInputEventHandlerOrderedGrouped.java | 3 ++- .../common/sort/impl/TestPipelinedSorter.java | 10 +++++----- .../TestUnorderedPartitionedKVWriter.java | 21 +++++++++++++------- 15 files changed, 68 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index 0fb021e..7ef35c6 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3682. Pass parameters instead of configuration for changes to support tez shuffle handler TEZ-3628. Give Tez shuffle handler threads custom names TEZ-3621. Optimize the Shuffle Handler content length calculation for keep alive TEZ-3620. UnorderedPartitionedKVOutput is missing the shuffle service config in the confKeys set http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java index 669d539..6d966b0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java @@ -18,11 +18,11 @@ package org.apache.tez.dag.app.launcher; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.http.BaseHttpConnection; +import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import java.net.URL; @@ -33,15 +33,15 @@ class DagDeleteRunnable implements Runnable { final JobTokenSecretManager jobTokenSecretManager; final String tezDefaultComponentName; final int shufflePort; - final Configuration conf; + final HttpConnectionParams httpConnectionParams; public DagDeleteRunnable(NodeId nodeId, int shufflePort, TezDAGID currentDag, - Configuration conf, + HttpConnectionParams httpConnectionParams, JobTokenSecretManager jobTokenSecretMgr, String tezDefaultComponent) { this.nodeId = nodeId; this.shufflePort = shufflePort; this.dag = currentDag; - this.conf = conf; + this.httpConnectionParams = httpConnectionParams; this.jobTokenSecretManager = jobTokenSecretMgr; this.tezDefaultComponentName = tezDefaultComponent; } @@ -53,7 +53,7 @@ class DagDeleteRunnable implements Runnable { nodeId.getHost(), shufflePort, dag.getApplicationId().toString(), dag.getId(), false); BaseHttpConnection httpConnection = TezRuntimeUtils.getHttpConnection(true, baseURL, - TezRuntimeUtils.getHttpConnectionParams(conf), "DAGDelete", jobTokenSecretManager); + httpConnectionParams, "DAGDelete", jobTokenSecretManager); httpConnection.connect(); httpConnection.getInputStream(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java index 4a4a4ae..625aabb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java @@ -50,7 +50,7 @@ public class DeletionTrackerImpl extends DeletionTracker { //TODO: add check for healthy node if (shufflePort != TezRuntimeUtils.INVALID_PORT) { DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId, - shufflePort, dag, conf, jobTokenSecretManager, this.pluginName); + shufflePort, dag, TezRuntimeUtils.getHttpConnectionParams(conf), jobTokenSecretManager, this.pluginName); dagDeleteService.submit(dagDeleteRunnable); } } http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index 51e954d..ba3ecad 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@ -91,11 +91,11 @@ public class AMContainerHelpers { * Create the common {@link ContainerLaunchContext} for all attempts. * * @param applicationACLs - * @param conf + * @param auxiliaryService */ private static ContainerLaunchContext createCommonContainerLaunchContext( Map applicationACLs, - Credentials credentials, Map localResources, Configuration conf) { + Credentials credentials, Map localResources, String auxiliaryService) { // Application environment Map environment = new HashMap(); @@ -129,8 +129,6 @@ public class AMContainerHelpers { if (LOG.isDebugEnabled()) { LOG.debug("Putting shuffle token in serviceData in common CLC"); } - String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, - TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); serviceData.put(auxiliaryService, TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials))); } catch (IOException e) { @@ -161,8 +159,10 @@ public class AMContainerHelpers { ContainerLaunchContext commonContainerSpec = null; synchronized (commonContainerSpecLock) { if (!commonContainerSpecs.containsKey(tezDAGID)) { + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); commonContainerSpec = - createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, conf); + createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, auxiliaryService); commonContainerSpecs.put(tezDAGID, commonContainerSpec); } else { commonContainerSpec = commonContainerSpecs.get(tezDAGID); http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index d39d554..8e13c13 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -154,13 +154,11 @@ public class TezRuntimeUtils { return partitioner; } - public static TezTaskOutput instantiateTaskOutputManager( - Configuration conf, OutputContext outputContext) { + public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, OutputContext outputContext) { Class clazz = conf.getClass(Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER, TezTaskOutputFiles.class); try { - Constructor ctor = clazz.getConstructor(Configuration.class, String - .class, int.class); + Constructor ctor = clazz.getConstructor(Configuration.class, String.class, int.class); ctor.setAccessible(true); TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier(), http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 1d644aa..25b9b4f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -265,14 +265,14 @@ public class ShuffleUtils { * @param finalMergeEnabled * @param isLastEvent * @param pathComponent - * @param conf + * @param auxiliaryService * @param deflater * @return ByteBuffer * @throws IOException */ static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails, int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context, - int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, Configuration conf, Deflater deflater) + int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, String auxiliaryService, Deflater deflater) throws IOException { DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto .newBuilder(); @@ -301,9 +301,6 @@ public class ShuffleUtils { if (!sendEmptyPartitionDetails || outputGenerated) { String host = context.getExecutionContext().getHostName(); - String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, - TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); - ByteBuffer shuffleMetadata = context .getServiceProviderMetaData(auxiliaryService); int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata); @@ -391,13 +388,13 @@ public class ShuffleUtils { * @param numPhysicalOutputs * @param pathComponent * @param partitionStats - * @param conf + * @param auxiliaryService * @throws IOException */ public static void generateEventOnSpill(List eventList, boolean finalMergeEnabled, boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord, int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent, - @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, Configuration conf, Deflater deflater) + @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, String auxiliaryService, Deflater deflater) throws IOException { Preconditions.checkArgument(eventList != null, "EventList can't be null"); @@ -415,7 +412,7 @@ public class ShuffleUtils { ByteBuffer payload = generateDMEPayload(sendEmptyPartitionDetails, numPhysicalOutputs, spillRecord, context, spillId, - finalMergeEnabled, isLastEvent, pathComponent, conf, deflater); + finalMergeEnabled, isLastEvent, pathComponent, auxiliaryService, deflater); if (finalMergeEnabled || isLastEvent) { VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats, http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index b3d8a6f..f787c59 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; import org.apache.tez.runtime.api.TaskFailureType; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -188,7 +189,7 @@ public class Shuffle implements ExceptionReporter { eventHandler= new ShuffleInputEventHandlerOrderedGrouped( inputContext, scheduler, - conf); + ShuffleUtils.isTezShuffleHandler(conf)); ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build()); http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index fda899f..116098f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -61,10 +61,10 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext, ShuffleScheduler scheduler, - Configuration conf) { + boolean compositeFetch) { this.inputContext = inputContext; this.scheduler = scheduler; - this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf); + this.compositeFetch = compositeFetch; this.inflater = TezCommonUtils.newInflater(); } http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 5203851..755a131 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,6 +117,7 @@ public class PipelinedSorter extends ExternalSorter { private final int MIN_BLOCK_SIZE; private final boolean lazyAllocateMem; private final Deflater deflater; + private final String auxiliaryService; // TODO Set additional countesr - total bytes written, spills etc. @@ -155,7 +157,8 @@ public class PipelinedSorter extends ExternalSorter { .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle; - + auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); //sanity checks final long sortmb = this.availableMemoryMb; @@ -354,7 +357,7 @@ public class PipelinedSorter extends ExternalSorter { ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, - reportDetailedPartitionStats(), this.conf, deflater); + reportDetailedPartitionStats(), auxiliaryService, deflater); outputContext.sendEvents(events); LOG.info(outputContext.getDestinationVertexName() + ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1)); @@ -677,7 +680,7 @@ public class PipelinedSorter extends ExternalSorter { ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, i, indexCacheList.get(i), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, - reportDetailedPartitionStats(), this.conf, deflater); + reportDetailedPartitionStats(), auxiliaryService, deflater); LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); } outputContext.sendEvents(events); http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 8ff1c99..a6f7cf7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -32,6 +32,7 @@ import java.util.zip.Deflater; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,6 +116,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab volatile boolean spillThreadRunning = false; final SpillThread spillThread = new SpillThread(); private final Deflater deflater; + private final String auxiliaryService; final ArrayList indexCacheList = new ArrayList(); @@ -153,6 +155,8 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " does not work " + "with DefaultSorter. It is supported only with PipelinedSorter."); } + auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); // buffers and accounting int maxMemUsage = sortmb << 20; @@ -1137,7 +1141,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index); ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent, - partitionStats, reportDetailedPartitionStats(), this.conf, deflater); + partitionStats, reportDetailedPartitionStats(), auxiliaryService, deflater); LOG.info(outputContext.getDestinationVertexName() + ": " + "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index); http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 6b14f8d..98e14be 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -201,10 +201,12 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { List eventList = Lists.newLinkedList(); if (finalMergeEnabled && !pipelinedShuffle) { boolean isLastEvent = true; + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent, getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf), getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(), - sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), this.conf, deflater); + sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), auxiliaryService, deflater); } return eventList; } http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index ec19f67..4a28224 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -163,9 +163,12 @@ public class TestShuffleUtils { int spillId = 0; int physicalOutputs = 10; String pathComponent = "/attempt_x_y_0/file.out"; + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false, this.conf, TezCommonUtils.newBestCompressionDeflater()); + physicalOutputs, true, pathComponent, null, false, auxiliaryService, TezCommonUtils.newBestCompressionDeflater()); Assert.assertTrue(events.size() == 1); Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent); @@ -200,11 +203,13 @@ public class TestShuffleUtils { int spillId = 0; int physicalOutputs = 10; String pathComponent = "/attempt_x_y_0/file.out"; + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); //normal code path where we do final merge all the time ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false, this.conf, TezCommonUtils.newBestCompressionDeflater()); + physicalOutputs, true, pathComponent, null, false, auxiliaryService, TezCommonUtils.newBestCompressionDeflater()); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent); @@ -241,11 +246,13 @@ public class TestShuffleUtils { int spillId = 0; int physicalOutputs = 10; String pathComponent = "/attempt_x_y_0/file.out"; + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); //normal code path where we do final merge all the time ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false, this.conf, TezCommonUtils.newBestCompressionDeflater()); + physicalOutputs, true, pathComponent, null, false, auxiliaryService, TezCommonUtils.newBestCompressionDeflater()); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent); http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 1d4afde..72cba80 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -18,6 +18,7 @@ import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.junit.Before; import org.junit.Test; @@ -154,7 +155,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { 0, "src vertex"); scheduler = spy(realScheduler); - handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, config); + handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, ShuffleUtils.isTezShuffleHandler(config)); mergeManager = mock(MergeManager.class); } http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index c3f8dda..454cf22 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -100,7 +100,9 @@ public class TestPipelinedSorter { ApplicationId appId = ApplicationId.newInstance(10000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); - this.outputContext = createMockOutputContext(counters, appId, uniqueId, getConf()); + String auxiliaryService = getConf().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + this.outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); } public static Configuration getConf() { @@ -754,7 +756,7 @@ public class TestPipelinedSorter { } private static OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId, - String uniqueId, Configuration conf) throws IOException { + String uniqueId, String auxiliaryService) throws IOException { OutputContext outputContext = mock(OutputContext.class); ExecutionContext execContext = new ExecutionContextImpl("localhost"); @@ -762,9 +764,7 @@ public class TestPipelinedSorter { DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer(); serviceProviderMetaData.writeInt(80); doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext) - .getServiceProviderMetaData - (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, - TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); + .getServiceProviderMetaData(auxiliaryService); doReturn(execContext).when(outputContext).getExecutionContext(); doReturn(mock(OutputStatisticsReporter.class)).when(outputContext).getStatisticsReporter(); http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 031b44d..07feb20 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -162,7 +162,9 @@ public class TestUnorderedPartitionedKVWriter { ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); - OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf); + String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); int maxSingleBufferSizeBytes = 2047; Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, @@ -259,7 +261,9 @@ public class TestUnorderedPartitionedKVWriter { TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); int dagId = 1; - OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf); + String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); Random random = new Random(); Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress, @@ -528,7 +532,9 @@ public class TestUnorderedPartitionedKVWriter { TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); int dagId = 1; - OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf); + String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, shouldCompress, -1); @@ -713,7 +719,9 @@ public class TestUnorderedPartitionedKVWriter { TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); int dagId = 1; - OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf); + String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, shouldCompress, -1); @@ -904,7 +912,7 @@ public class TestUnorderedPartitionedKVWriter { } private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId, - String uniqueId, Configuration conf) { + String uniqueId, String auxiliaryService) { OutputContext outputContext = mock(OutputContext.class); doReturn(counters).when(outputContext).getCounters(); doReturn(appId).when(outputContext).getApplicationId(); @@ -927,8 +935,7 @@ public class TestUnorderedPartitionedKVWriter { portBuffer.reset(); return portBuffer; } - }).when(outputContext).getServiceProviderMetaData(eq(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, - TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT))); + }).when(outputContext).getServiceProviderMetaData(eq(auxiliaryService)); Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId); String[] outDirs = new String[] { outDirBase.toString() };