Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8921E1083C for ; Thu, 26 Mar 2015 07:57:07 +0000 (UTC) Received: (qmail 58760 invoked by uid 500); 26 Mar 2015 07:57:07 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 58725 invoked by uid 500); 26 Mar 2015 07:57:07 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 58714 invoked by uid 99); 26 Mar 2015 07:57:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Mar 2015 07:57:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 366B8E0850; Thu, 26 Mar 2015 07:57:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rbalamohan@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2196. Consider reusing UnorderedPartitionedKVWriter with single output in UnorderedKVOutput (rbalamohan) Date: Thu, 26 Mar 2015 07:57:07 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 2fe2d6352 -> 1ba1f927c TEZ-2196. Consider reusing UnorderedPartitionedKVWriter with single output in UnorderedKVOutput (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1ba1f927 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1ba1f927 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1ba1f927 Branch: refs/heads/master Commit: 1ba1f927c16a1d7c273b6cd1a8553e5269d1541a Parents: 2fe2d63 Author: Rajesh Balamohan Authored: Thu Mar 26 13:25:57 2015 +0530 Committer: Rajesh Balamohan Committed: Thu Mar 26 13:25:57 2015 +0530 ---------------------------------------------------------------------- .../library/api/TezRuntimeConfiguration.java | 14 -- .../broadcast/output/FileBasedKVWriter.java | 190 ------------------- .../library/common/shuffle/ShuffleUtils.java | 5 +- .../impl/ShuffleInputEventHandlerImpl.java | 35 +--- .../common/shuffle/impl/ShuffleManager.java | 43 ----- .../writers/UnorderedPartitionedKVWriter.java | 70 ++++++- .../library/output/UnorderedKVOutput.java | 117 +++++------- .../WeightedScalingMemoryDistributor.java | 8 +- .../src/main/proto/ShufflePayloads.proto | 1 - .../TestWeightedScalingMemoryDistributor.java | 4 +- .../TestUnorderedPartitionedKVWriter.java | 15 +- .../output/TestOnFileUnorderedKVOutput.java | 152 +++++++++++---- 12 files changed, 238 insertions(+), 416 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index 565b47a..9f11e4d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -258,18 +258,6 @@ public class TezRuntimeConfiguration { "empty.partitions.info-via-events.enabled"; public static final boolean TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT = true; - @Private - public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED = - TEZ_RUNTIME_PREFIX + "transfer.data-via-events.enabled"; - @Private - public static final boolean TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT = false; - - @Private - public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE = - TEZ_RUNTIME_PREFIX + "transfer.data-via-events.max-size"; - @Private - public static final int TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT = 200 << 10; // 200KB - /** * If the shuffle input is on the local host bypass the http fetch and access the files directly */ @@ -369,8 +357,6 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED); tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED); tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT); - tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED); - tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE); tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS); tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH); tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH); http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java deleted file mode 100644 index 95cea82..0000000 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java +++ /dev/null @@ -1,190 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.runtime.library.broadcast.output; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.tez.common.counters.TaskCounter; -import org.apache.tez.common.counters.TezCounter; -import org.apache.tez.runtime.api.OutputContext; -import org.apache.tez.runtime.library.api.KeyValuesWriter; -import org.apache.tez.runtime.library.common.ConfigUtils; -import org.apache.tez.runtime.library.common.TezRuntimeUtils; -import org.apache.tez.runtime.library.common.sort.impl.IFile; -import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; -import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; -import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput; - -import com.google.common.base.Preconditions; - -@Private -public class FileBasedKVWriter extends KeyValuesWriter { - - private static final Logger LOG = LoggerFactory.getLogger(FileBasedKVWriter.class); - - public static final int INDEX_RECORD_LENGTH = 24; - - private final Configuration conf; - private int numRecords = 0; - - @SuppressWarnings("rawtypes") - private final Class keyClass; - @SuppressWarnings("rawtypes") - private final Class valClass; - private final CompressionCodec codec; - private final FileSystem rfs; - private final IFile.Writer writer; - - private final Path outputPath; - private Path indexPath; - - private final TezTaskOutput ouputFileManager; - private boolean closed = false; - - // Number of output key-value pairs - private final TezCounter outputRecordsCounter; - // Number of bytes of actual output - uncompressed. - private final TezCounter outputBytesCounter; - // Size of the data with additional meta-data - private final TezCounter outputBytesCounterWithOverhead; - // Actual physical size of the data on disk. - private final TezCounter outputMaterializedBytesCounter; - - - // TODO NEWTEZ Define Counters - // Number of records - // Time waiting for a write to complete, if that's possible. - // Size of key-value pairs written. - - public FileBasedKVWriter(OutputContext outputContext, Configuration conf) throws IOException { - this.conf = conf; - - this.outputRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS); - this.outputBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES); - this.outputBytesCounterWithOverhead = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD); - this.outputMaterializedBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL); - - this.rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw(); - - // Setup serialization - keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf); - valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf); - - // Setup compression - if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) { - Class codecClass = ConfigUtils - .getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class); - codec = ReflectionUtils.newInstance(codecClass, this.conf); - } else { - codec = null; - } - - this.ouputFileManager = TezRuntimeUtils.instantiateTaskOutputManager(conf, - outputContext); - LOG.info("Created KVWriter -> " + "compressionCodec: " + (codec == null ? "NoCompressionCodec" - : codec.getClass().getName())); - - this.outputPath = ouputFileManager.getOutputFileForWrite(); - LOG.info("Writing data file: " + outputPath); - - // TODO NEWTEZ Consider making the buffer size configurable. Also consider - // setting up an in-memory buffer which is occasionally flushed to disk so - // that the output does not block. - - // TODO NEWTEZ maybe use appropriate counter - this.writer = new IFile.Writer(conf, rfs, outputPath, keyClass, valClass, - codec, null, outputBytesCounter); - } - - /** - * @return true if any output was generated. false otherwise - * @throws IOException - */ - public boolean close() throws IOException { - this.closed = true; - this.writer.close(); - long rawLen = writer.getRawLength(); - long compLen = writer.getCompressedLength(); - outputBytesCounterWithOverhead.increment(rawLen); - outputMaterializedBytesCounter.increment(compLen); - TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen); - TezSpillRecord sr = new TezSpillRecord(1); - sr.putIndex(rec, 0); - - this.indexPath = ouputFileManager - .getOutputIndexFileForWrite(INDEX_RECORD_LENGTH); - LOG.info("Writing index file: " + indexPath); - sr.writeToFile(indexPath, conf); - return numRecords > 0; - } - - @Override - public void write(Object key, Object value) throws IOException { - this.writer.append(key, value); - this.outputRecordsCounter.increment(1); - numRecords++; - } - - @Override - public void write(Object key, Iterable values) throws IOException { - writer.appendKeyValues(key, values.iterator()); - this.outputRecordsCounter.increment(1); - numRecords++; - } - - public long getRawLength() { - Preconditions.checkState(closed, "Only available after the Writer has been closed"); - return this.writer.getRawLength(); - } - - public long getCompressedLength() { - Preconditions.checkState(closed, "Only available after the Writer has been closed"); - return this.writer.getCompressedLength(); - } - - public byte[] getData() throws IOException { - Preconditions.checkState(closed, - "Only available after the Writer has been closed"); - FSDataInputStream inStream = null; - byte[] buf = null; - try { - inStream = rfs.open(outputPath); - buf = new byte[(int) getCompressedLength()]; - IOUtils.readFully(inStream, buf, 0, (int) getCompressedLength()); - } finally { - if (inStream != null) { - inStream.close(); - } - } - return buf; - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index a9e34f1..9a8b6b5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -262,10 +262,7 @@ public class ShuffleUtils { sb.append("host: " + dmProto.getHost()).append(", "); sb.append("port: " + dmProto.getPort()).append(", "); sb.append("pathComponent: " + dmProto.getPathComponent()).append(", "); - sb.append("runDuration: " + dmProto.getRunDuration()).append(", "); - if (dmProto.hasData()) { - sb.append(", ").append("hasDataInEvent: " + dmProto.hasData()); - } + sb.append("runDuration: " + dmProto.getRunDuration()); sb.append("]"); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index 1486848..61b3e3a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -104,7 +104,6 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); } int srcIndex = dme.getSourceIndex(); - String hostIdentifier = shufflePayload.getHost() + ":" + shufflePayload.getPort(); LOG.info("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex() + ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils .stringify(shufflePayload)); @@ -128,40 +127,10 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme, shufflePayload, (useSharedInputs && srcIndex == 0)); - if (shufflePayload.hasData()) { - DataProto dataProto = shufflePayload.getData(); - FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), - dataProto.getCompressedLength(), srcAttemptIdentifier); - moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier); - shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput); - } else { - shuffleManager.addKnownInput(shufflePayload.getHost(), - shufflePayload.getPort(), srcAttemptIdentifier, srcIndex); - } - + shuffleManager.addKnownInput(shufflePayload.getHost(), + shufflePayload.getPort(), srcAttemptIdentifier, srcIndex); } - private void moveDataToFetchedInput(DataProto dataProto, - FetchedInput fetchedInput, String hostIdentifier) throws IOException { - switch (fetchedInput.getType()) { - case DISK: - ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(), - hostIdentifier, dataProto.getData().newInput(), dataProto.getCompressedLength(), LOG, - fetchedInput.getInputAttemptIdentifier().toString()); - break; - case MEMORY: - ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(), - dataProto.getData().newInput(), dataProto.getRawLength(), dataProto.getCompressedLength(), - codec, ifileReadAhead, ifileReadAheadLength, LOG, - fetchedInput.getInputAttemptIdentifier().toString()); - break; - case WAIT: - default: - throw new TezUncheckedException("Unexpected type: " - + fetchedInput.getType()); - } - } - private void processInputFailedEvent(InputFailedEvent ife) { InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion()); shuffleManager.obsoleteKnownInput(srcAttemptIdentifier); http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 9c53414..749143a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -508,49 +508,6 @@ public class ShuffleManager implements FetcherCallback { lastEventReceived.setValue(relativeTime); } - public void addCompletedInputWithData( - InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput) - throws IOException { - InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); - - LOG.info("Received Data via Event: " + srcAttemptIdentifier + " to " - + fetchedInput.getType()); - // Count irrespective of whether this is a copy of an already fetched input - lock.lock(); - try { - lastProgressTime = System.currentTimeMillis(); - } finally { - lock.unlock(); - } - - boolean committed = false; - if (!completedInputSet.contains(inputIdentifier)) { - synchronized (completedInputSet) { - if (!completedInputSet.contains(inputIdentifier)) { - fetchedInput.commit(); - committed = true; - if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { - registerCompletedInput(fetchedInput); - } else { - registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput); - } - } - } - } - if (!committed) { - fetchedInput.abort(); // If this fails, the fetcher may attempt another - // abort. - } else { - lock.lock(); - try { - // Signal the wakeLoop to check for termination. - wakeLoop.signal(); - } finally { - lock.unlock(); - } - } - } - public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) { obsoletedInputs.add(srcAttemptIdentifier); // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction. http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 19f6130..7aac6f9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -133,6 +133,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit @VisibleForTesting Path finalOutPath; + //for single partition cases (e.g UnorderedKVOutput) + private final IFile.Writer writer; + private final boolean skipBuffers; + private final ReentrantLock spillLock = new ReentrantLock(); private final Condition spillInProgress = spillLock.newCondition(); @@ -143,7 +147,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf, int numOutputs, long availableMemoryBytes) throws IOException { super(outputContext, conf, numOutputs); - Preconditions.checkArgument(availableMemoryBytes > 0, "availableMemory should not be > 0 bytes"); + Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes"); + + //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in + // this case. Add it later if needed. + pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); + + if (availableMemoryBytes == 0) { + Preconditions.checkArgument(((numPartitions == 1) && !pipelinedShuffle), "availableMemory " + + "can be set to 0 only when numPartitions=1 and " + TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " is disabled. current numPartitions=" + + numPartitions + ", " + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + "=" + + pipelinedShuffle); + } + // Ideally, should be significantly larger. availableMemory = availableMemoryBytes; @@ -179,15 +198,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit outputLargeRecordsCounter = outputContext.getCounters().findCounter( TaskCounter.OUTPUT_LARGE_RECORDS); - //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in - // this case. Add it later if needed. - pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration - .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration - .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH; - LOG.info("pipelinedShuffle=" + pipelinedShuffle); + + if (numPartitions == 1 && !pipelinedShuffle) { + //special case, where in only one partition is available. + finalOutPath = outputFileHandler.getOutputFileForWrite(); + finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate); + skipBuffers = true; + writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass, + codec, outputRecordsCounter, outputRecordBytesCounter); + } else { + skipBuffers = false; + writer = null; + } + LOG.info("pipelinedShuffle=" + pipelinedShuffle + ", skipBuffers=" + skipBuffers); } private void computeNumBuffersAndSize(int bufferLimit) { @@ -210,8 +236,13 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // Already reported as a fatalError - report to the user code throw new IOException("Exception during spill", new IOException(spillException)); } - int partition = partitioner.getPartition(key, value, numPartitions); - write(key, value, partition); + if (skipBuffers) { + //special case, where we have only one partition and pipeliing is disabled. + writer.append(key, value); + } else { + int partition = partitioner.getPartition(key, value, numPartitions); + write(key, value, partition); + } } @SuppressWarnings("unchecked") @@ -451,6 +482,27 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit List events = Lists.newLinkedList(); if (!pipelinedShuffle) { + if (skipBuffers) { + writer.close(); + long rawLen = writer.getRawLength(); + long compLen = writer.getCompressedLength(); + TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen); + TezSpillRecord sr = new TezSpillRecord(1); + sr.putIndex(rec, 0); + sr.writeToFile(finalIndexPath, conf); + + BitSet emptyPartitions = new BitSet(); + if (outputRecordsCounter.getValue() == 0) { + emptyPartitions.set(0); + } + cleanupCurrentBuffer(); + + outputBytesWithOverheadCounter.increment(rawLen); + fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate); + return Collections.singletonList(generateDMEvent(false, -1, false, outputContext + .getUniqueIdentifier(), emptyPartitions)); + } + //Regular code path. if (numSpills.get() > 0) { mergeAll(); http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index f455ffb..9914735 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -18,12 +18,11 @@ package org.apache.tez.runtime.library.output; -import java.nio.ByteBuffer; -import java.util.BitSet; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,26 +31,21 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezUtils; -import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; -import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.AbstractLogicalOutput; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.OutputContext; -import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.library.api.KeyValuesWriter; +import org.apache.tez.runtime.library.api.Partitioner; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; -import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter; -import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; -import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; -import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto; +import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; +import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; /** * {@link UnorderedKVOutput} is a {@link LogicalOutput} that writes key @@ -63,12 +57,13 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { private static final Logger LOG = LoggerFactory.getLogger(UnorderedKVOutput.class); - private FileBasedKVWriter kvWriter; + @VisibleForTesting + UnorderedPartitionedKVWriter kvWriter; private Configuration conf; - private boolean dataViaEventsEnabled; - private int dataViaEventsMaxSize; + private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler; + private final AtomicBoolean isStarted = new AtomicBoolean(false); public UnorderedKVOutput(OutputContext outputContext, int numPhysicalOutputs) { super(outputContext, numPhysicalOutputs); @@ -82,25 +77,34 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs()); - getContext().requestInitialMemory(0l, null); // mandatory call + this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, CustomPartitioner.class + .getName()); - this.dataViaEventsEnabled = conf.getBoolean( - TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED, - TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT); - this.dataViaEventsMaxSize = conf.getInt( - TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, - TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT); - - LOG.info(this.getClass().getSimpleName() + " running with params -> " - + "dataViaEventsEnabled: " + dataViaEventsEnabled - + ", dataViaEventsMaxSize: " + dataViaEventsMaxSize); + this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler(); + + boolean pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); + + long memRequestSize = (pipelinedShuffle) ? + UnorderedPartitionedKVWriter.getInitialMemoryRequirement(conf, getContext() + .getTotalMemoryAvailableToTask()) : 0; + getContext().requestInitialMemory(memRequestSize, memoryUpdateCallbackHandler); - this.kvWriter = new FileBasedKVWriter(getContext(), conf); return Collections.emptyList(); } @Override - public synchronized void start() { + public synchronized void start() throws Exception { + if (!isStarted.get()) { + memoryUpdateCallbackHandler.validateUpdateReceived(); + //This would have just a single partition + this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, 1, + memoryUpdateCallbackHandler.getMemoryAssigned()); + isStarted.set(true); + LOG.info(this.getClass().getSimpleName() + " started. MemoryAssigned=" + + memoryUpdateCallbackHandler.getMemoryAssigned()); + } } @Override @@ -116,49 +120,12 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { @Override public synchronized List close() throws Exception { - boolean outputGenerated = this.kvWriter.close(); - - DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto - .newBuilder(); - - LOG.info("Closing KVOutput: RawLength: " + this.kvWriter.getRawLength() - + ", CompressedLength: " + this.kvWriter.getCompressedLength()); - - if (dataViaEventsEnabled && outputGenerated && this.kvWriter.getCompressedLength() <= dataViaEventsMaxSize) { - LOG.info("Serialzing actual data into DataMovementEvent, dataSize: " + this.kvWriter.getCompressedLength()); - byte[] data = this.kvWriter.getData(); - DataProto.Builder dataProtoBuilder = DataProto.newBuilder(); - dataProtoBuilder.setData(ByteString.copyFrom(data)); - dataProtoBuilder.setRawLength((int)this.kvWriter.getRawLength()); - dataProtoBuilder.setCompressedLength((int)this.kvWriter.getCompressedLength()); - payloadBuilder.setData(dataProtoBuilder.build()); - } - - // Set the list of empty partitions - single partition on this case. - if (!outputGenerated) { - LOG.info("No output was generated"); - BitSet emptyPartitions = new BitSet(); - emptyPartitions.set(0); - ByteString emptyPartitionsBytesString = - TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions)); - payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString); - } - if (outputGenerated) { - String host = getHost(); - ByteBuffer shuffleMetadata = getContext() - .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); - int shufflePort = ShuffleUtils - .deserializeShuffleProviderMetaData(shuffleMetadata); - payloadBuilder.setHost(host); - payloadBuilder.setPort(shufflePort); - payloadBuilder.setPathComponent(getContext().getUniqueIdentifier()); + if (isStarted.get()) { + //TODO: Do we need to support sending payloads via events? + return kvWriter.close(); + } else { + return Collections.emptyList(); } - DataMovementEventPayloadProto payloadProto = payloadBuilder.build(); - - DataMovementEvent dmEvent = DataMovementEvent.create(0, payloadProto.toByteString().asReadOnlyByteBuffer()); - List events = Lists.newArrayListWithCapacity(1); - events.add(dmEvent); - return events; } @VisibleForTesting @@ -173,12 +140,15 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX); confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); @@ -192,4 +162,13 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { public static Set getConfigurationKeySet() { return Collections.unmodifiableSet(confKeys); } + + @Private + public static class CustomPartitioner implements Partitioner { + + @Override + public int getPartition(Object key, Object value, int numPartitions) { + return 0; + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java index a0f834e..b70c9d7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java @@ -64,7 +64,7 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator static final double MAX_ADDITIONAL_RESERVATION_FRACTION_PER_IO = 0.1d; static final double RESERVATION_FRACTION_PER_IO = 0.015d; static final String[] DEFAULT_TASK_MEMORY_WEIGHTED_RATIOS = - generateWeightStrings(1, 1, 12, 12, 1, 1); + generateWeightStrings(1, 1, 1, 12, 12, 1, 1); private Configuration conf; @@ -265,11 +265,11 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator return reserveFraction; } - public static String[] generateWeightStrings(int unsortedPartitioned, int broadcastIn, - int sortedOut, int scatterGatherShuffleIn, int proc, int other) { + public static String[] generateWeightStrings(int unsortedPartitioned, int unsorted, + int broadcastIn, int sortedOut, int scatterGatherShuffleIn, int proc, int other) { String[] weights = new String[RequestType.values().length]; weights[0] = RequestType.PARTITIONED_UNSORTED_OUTPUT.name() + ":" + unsortedPartitioned; - weights[1] = RequestType.UNSORTED_OUTPUT.name() + ":" + 0; + weights[1] = RequestType.UNSORTED_OUTPUT.name() + ":" + unsorted; weights[2] = RequestType.UNSORTED_INPUT.name() + ":" + broadcastIn; weights[3] = RequestType.SORTED_OUTPUT.name() + ":" + sortedOut; weights[4] = RequestType.SORTED_MERGED_INPUT.name() + ":" + scatterGatherShuffleIn; http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/proto/ShufflePayloads.proto ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto index 6192381..f7b482d 100644 --- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto +++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto @@ -26,7 +26,6 @@ message DataMovementEventPayloadProto { optional int32 port = 3; optional string path_component = 4; optional int32 run_duration = 5; - optional DataProto data = 6; optional bool pipelined = 7; // Related to pipelined shuffle optional bool last_event = 8; // Related to pipelined shuffle optional int32 spill_id = 9; // Related to pipelined shuffle. http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java index 04d3594..b34accd 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java @@ -54,7 +54,7 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor public void testSimpleWeightedScaling() { Configuration conf = new Configuration(this.conf); conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS, - WeightedScalingMemoryDistributor.generateWeightStrings(0, 1, 2, 3, 1, 1)); + WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 1, 2, 3, 1, 1)); System.err.println(Joiner.on(",").join(conf.getStringCollection( TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS))); @@ -101,7 +101,7 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor public void testAdditionalReserveFractionWeightedScaling() { Configuration conf = new Configuration(this.conf); conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS, - WeightedScalingMemoryDistributor.generateWeightStrings(0, 2, 3, 6, 1, 1)); + WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 2, 3, 6, 1, 1)); conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO, 0.025d); conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX, 0.2d); http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 9b1318c..1a10eb8 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -197,6 +197,12 @@ public class TestUnorderedPartitionedKVWriter { } @Test(timeout = 10000) + public void testNoSpill_SinglPartition() throws IOException, InterruptedException { + baseTest(10, 1, null, shouldCompress); + } + + + @Test(timeout = 10000) public void testRandomText() throws IOException, InterruptedException { textTest(100, 10, 2048, 0, 0, 0, false); } @@ -339,7 +345,6 @@ public class TestUnorderedPartitionedKVWriter { DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom( ByteString.copyFrom(cdme .getUserPayload())); - assertFalse(eventProto.hasData()); BitSet emptyPartitionBits = null; if (partitionsWithData.cardinality() != numPartitions) { assertTrue(eventProto.hasEmptyPartitions()); @@ -549,7 +554,6 @@ public class TestUnorderedPartitionedKVWriter { assertEquals(numOutputs, cdme.getCount()); DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload())); - assertFalse(eventProto.hasData()); //Ensure that this is the last event assertTrue(eventProto.getLastEvent()); if (eventProto.hasEmptyPartitions()) { @@ -671,9 +675,11 @@ public class TestUnorderedPartitionedKVWriter { TezCounter numAdditionalSpillsCounter = counters .findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT); assertEquals(numRecordsWritten * sizePerRecord, outputRecordBytesCounter.getValue()); + if (numPartitions > 1) { + assertEquals(numRecordsWritten * sizePerRecordWithOverhead, outputBytesWithOverheadCounter.getValue()); + } assertEquals(numRecordsWritten, outputRecordsCounter.getValue()); - assertEquals(numRecordsWritten * sizePerRecordWithOverhead, - outputBytesWithOverheadCounter.getValue()); + long fileOutputBytes = fileOutputBytesCounter.getValue(); if (numRecordsWritten > 0) { assertTrue(fileOutputBytes > 0); @@ -710,7 +716,6 @@ public class TestUnorderedPartitionedKVWriter { DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom( cdme.getUserPayload())); - assertFalse(eventProto.hasData()); if (skippedPartitions == null && numRecordsWritten > 0) { assertFalse(eventProto.hasEmptyPartitions()); emptyPartitionBits = new BitSet(numPartitions); http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index 4d9001d..509c23d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -18,7 +18,13 @@ package org.apache.tez.runtime.library.output; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -31,6 +37,7 @@ import java.util.List; import java.util.Map; import com.google.protobuf.ByteString; +import org.apache.commons.lang.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -49,14 +56,16 @@ import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.RuntimeTask; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.OutputContext; -import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.api.impl.TezOutputContextImpl; import org.apache.tez.runtime.api.impl.TezUmbilical; import org.apache.tez.runtime.common.resources.MemoryDistributor; import org.apache.tez.runtime.library.api.KeyValueWriter; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; import org.apache.tez.runtime.library.testutils.KVDataGen; @@ -64,6 +73,9 @@ import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestOnFileUnorderedKVOutput { @@ -72,6 +84,7 @@ public class TestOnFileUnorderedKVOutput { private static Configuration defaultConf = new Configuration(); private static FileSystem localFs = null; private static Path workDir = null; + private static final int shufflePort = 2112; static { defaultConf.set("fs.defaultFS", "file:///"); @@ -87,6 +100,10 @@ public class TestOnFileUnorderedKVOutput { } @Before + public void setup() throws Exception { + localFs.mkdirs(workDir); + } + @After public void cleanup() throws Exception { localFs.delete(workDir, true); @@ -98,41 +115,13 @@ public class TestOnFileUnorderedKVOutput { conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); - int appAttemptNumber = 1; - TezUmbilical tezUmbilical = null; - String dagName = "currentDAG"; - String taskVertexName = "currentVertex"; - String destinationVertexName = "destinationVertex"; - TezDAGID dagID = TezDAGID.getInstance("2000", 1, 1); - TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); - TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); - TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1); - TezCounters counters = new TezCounters(); - UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); - RuntimeTask runtimeTask = mock(RuntimeTask.class); - - int shufflePort = 2112; - Map auxEnv = new HashMap(); - ByteBuffer bb = ByteBuffer.allocate(4); - bb.putInt(shufflePort); - bb.position(0); - AuxiliaryServiceHelper.setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, bb, auxEnv); - - - OutputDescriptor outputDescriptor = mock(OutputDescriptor.class); - when(outputDescriptor.getClassName()).thenReturn("OutputDescriptor"); - - OutputContext outputContext = new TezOutputContextImpl(conf, new String[] {workDir.toString()}, - appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName, - -1, taskAttemptID, counters, 0, userPayload, runtimeTask, - null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null, - new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory()); + OutputContext outputContext = createOutputContext(conf); - UnorderedKVOutput kvOutput = new OnFileUnorderedKVOutputForTest(outputContext, 1); + UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1); List events = null; - events = kvOutput.initialize(); + kvOutput.start(); assertTrue(events != null && events.size() == 0); KeyValueWriter kvWriter = kvOutput.getWriter(); @@ -143,9 +132,9 @@ public class TestOnFileUnorderedKVOutput { events = kvOutput.close(); assertTrue(events != null && events.size() == 1); - DataMovementEvent dmEvent = (DataMovementEvent)events.get(0); + CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0); - assertEquals("Invalid source index", 0, dmEvent.getSourceIndex()); + assertEquals("Invalid source index", 0, dmEvent.getSourceIndexStart()); DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto .parseFrom(ByteString.copyFrom(dmEvent.getUserPayload())); @@ -153,18 +142,97 @@ public class TestOnFileUnorderedKVOutput { assertFalse(shufflePayload.hasEmptyPartitions()); assertEquals(outputContext.getUniqueIdentifier(), shufflePayload.getPathComponent()); assertEquals(shufflePort, shufflePayload.getPort()); - assertEquals("host", shufflePayload.getHost()); + assertEquals("localhost", shufflePayload.getHost()); } - private static class OnFileUnorderedKVOutputForTest extends UnorderedKVOutput { + @Test(timeout = 30000) + @SuppressWarnings("unchecked") + public void testWithPipelinedShuffle() throws Exception { + Configuration conf = new Configuration(); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); + + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 1); - public OnFileUnorderedKVOutputForTest(OutputContext outputContext, int numPhysicalOutputs) { - super(outputContext, numPhysicalOutputs); - } + OutputContext outputContext = createOutputContext(conf); + + UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1); + + List events = null; + events = kvOutput.initialize(); + kvOutput.start(); + assertTrue(events != null && events.size() == 0); - @Override - String getHost() { - return "host"; + KeyValueWriter kvWriter = kvOutput.getWriter(); + for (int i = 0; i < 500; i++) { + kvWriter.write(new Text(RandomStringUtils.randomAscii(10000)), new IntWritable(i)); } + + events = kvOutput.close(); + //When pipelining is on, all events are sent out within writer itself. + assertTrue(events != null && events.size() == 0); + + //ensure that data is sent via outputContext. + ArgumentCaptor eventsCaptor = ArgumentCaptor.forClass(List.class); + verify(outputContext, atLeast(1)).sendEvents(eventsCaptor.capture()); + events = eventsCaptor.getValue(); + + + CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0); + assertEquals("Invalid source index", 0, dmEvent.getSourceIndexStart()); + + DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto + .parseFrom(ByteString.copyFrom(dmEvent.getUserPayload())); + + assertTrue(shufflePayload.hasLastEvent()); + + assertFalse(shufflePayload.hasEmptyPartitions()); + assertEquals(shufflePort, shufflePayload.getPort()); + assertEquals("localhost", shufflePayload.getHost()); + } + + private OutputContext createOutputContext(Configuration conf) throws IOException { + int appAttemptNumber = 1; + TezUmbilical tezUmbilical = mock(TezUmbilical.class); + String dagName = "currentDAG"; + String taskVertexName = "currentVertex"; + String destinationVertexName = "destinationVertex"; + TezDAGID dagID = TezDAGID.getInstance("2000", 1, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1); + TezCounters counters = new TezCounters(); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); + RuntimeTask runtimeTask = mock(RuntimeTask.class); + + + Map auxEnv = new HashMap(); + ByteBuffer bb = ByteBuffer.allocate(4); + bb.putInt(shufflePort); + bb.position(0); + AuxiliaryServiceHelper.setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, bb, auxEnv); + + + OutputDescriptor outputDescriptor = mock(OutputDescriptor.class); + when(outputDescriptor.getClassName()).thenReturn("OutputDescriptor"); + + OutputContext realOutputContext = new TezOutputContextImpl(conf, new String[] {workDir.toString()}, + appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName, + -1, taskAttemptID, counters, 0, userPayload, runtimeTask, + null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null, + new ExecutionContextImpl("localhost"), 2048); + OutputContext outputContext = spy(realOutputContext); + doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock invocation) throws Throwable { + long requestedSize = (Long) invocation.getArguments()[0]; + MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler) invocation + .getArguments()[1]; + callback.memoryAssigned(requestedSize); + return null; + } + }).when(outputContext).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class)); + + return outputContext; } }