Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6A98D200C44 for ; Mon, 13 Mar 2017 04:56:32 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6938E160B87; Mon, 13 Mar 2017 03:56:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 19FF6160B77 for ; Mon, 13 Mar 2017 04:56:30 +0100 (CET) Received: (qmail 46698 invoked by uid 500); 13 Mar 2017 03:56:29 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 46689 invoked by uid 99); 13 Mar 2017 03:56:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Mar 2017 03:56:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A7147DFF13; Mon, 13 Mar 2017 03:56:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeagles@apache.org To: commits@tez.apache.org Message-Id: <956a8b565ae342c098c42210071d791a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-3650. Improve performance of FetchStatsLogger#logIndividualFetchComplete (jeagles) Date: Mon, 13 Mar 2017 03:56:29 +0000 (UTC) archived-at: Mon, 13 Mar 2017 03:56:32 -0000 Repository: tez Updated Branches: refs/heads/master c11810440 -> a9af6cfcc TEZ-3650. Improve performance of FetchStatsLogger#logIndividualFetchComplete (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a9af6cfc Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a9af6cfc Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a9af6cfc Branch: refs/heads/master Commit: a9af6cfccb8069afb01cbac3d373ffa0e2ecba93 Parents: c118104 Author: Jonathan Eagles Authored: Sun Mar 12 22:56:02 2017 -0500 Committer: Jonathan Eagles Committed: Sun Mar 12 22:56:02 2017 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/util/FastNumberFormat.java | 55 +++++++++++ .../org/apache/tez/util/TestNumberFormat.java | 39 ++++++++ .../runtime/library/common/shuffle/Fetcher.java | 4 +- .../library/common/shuffle/ShuffleUtils.java | 85 +++++++++++------ .../orderedgrouped/FetcherOrderedGrouped.java | 4 +- .../common/shuffle/TestShuffleUtils.java | 6 +- .../orderedgrouped/TestMergeManager.java | 98 ++++++++++---------- 8 files changed, 209 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 48ccb54..54b17b1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3650. Improve performance of FetchStatsLogger#logIndividualFetchComplete TEZ-3655. Specify netty version instead of inheriting from hadoop dependency. TEZ-3253. Remove special handling for last app attempt. TEZ-3648. IFile.Write#close has an extra output stream flush http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java b/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java new file mode 100644 index 0000000..f22fc64 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.util; + +public class FastNumberFormat { + + public static final int MAX_COUNT = 19; + private final char[] digits = new char[MAX_COUNT]; + private int minimumIntegerDigits; + + public static FastNumberFormat getInstance() { + return new FastNumberFormat(); + } + + public void setMinimumIntegerDigits(int minimumIntegerDigits) { + this.minimumIntegerDigits = minimumIntegerDigits; + } + + public StringBuilder format(long source, StringBuilder sb) { + int left = MAX_COUNT; + if (source < 0) { + sb.append('-'); + source = - source; + } + while (source > 0) { + digits[--left] = (char)('0' + (source % 10)); + source /= 10; + } + while (MAX_COUNT - left < minimumIntegerDigits) { + digits[--left] = '0'; + } + sb.append(digits, left, MAX_COUNT - left); + return sb; + } + + public String format(long source) { + return format(source, new StringBuilder()).toString(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java b/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java new file mode 100644 index 0000000..c2f1185 --- /dev/null +++ b/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.text.NumberFormat; + +public class TestNumberFormat { + + @Test(timeout = 1000) + public void testLongWithPadding() throws Exception { + FastNumberFormat fastNumberFormat = FastNumberFormat.getInstance(); + fastNumberFormat.setMinimumIntegerDigits(6); + NumberFormat numberFormat = NumberFormat.getInstance(); + numberFormat.setGroupingUsed(false); + numberFormat.setMinimumIntegerDigits(6); + long[] testLongs = {1, 23, 456, 7890, 12345, 678901, 2345689, 0, -0, -1, -23, -456, -7890, -12345, -678901, -2345689}; + for (long l: testLongs) { + Assert.assertEquals("Number formats should be equal", numberFormat.format(l), fastNumberFormat.format(l)); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 6cbff94..9d1f42a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -814,11 +814,11 @@ public class Fetcher extends CallableWithNdc { ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(), input, (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead, ifileReadAheadLength, LOG, - fetchedInput.getInputAttemptIdentifier().toString()); + fetchedInput.getInputAttemptIdentifier()); } else if (fetchedInput.getType() == Type.DISK) { ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(), (host +":" +port), input, compressedLength, decompressedLength, LOG, - fetchedInput.getInputAttemptIdentifier().toString(), + fetchedInput.getInputAttemptIdentifier(), ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); } else { throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 82e844d..caddbc8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -48,6 +48,7 @@ import org.apache.tez.http.SSLFactory; import org.apache.tez.http.async.netty.AsyncHttpConnection; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB; +import org.apache.tez.util.FastNumberFormat; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +89,15 @@ public class ShuffleUtils { return new DecimalFormat("0.00"); } }; + static final ThreadLocal MBPS_FAST_FORMAT = + new ThreadLocal() { + @Override + protected FastNumberFormat initialValue() { + FastNumberFormat fmt = FastNumberFormat.getInstance(); + fmt.setMinimumIntegerDigits(2); + return fmt; + } + }; public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta) throws IOException { @@ -119,7 +129,7 @@ public class ShuffleUtils { public static void shuffleToMemory(byte[] shuffleData, InputStream input, int decompressedLength, int compressedLength, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, - Logger LOG, String identifier) throws IOException { + Logger LOG, InputAttemptIdentifier identifier) throws IOException { try { IFile.Reader.readToMemory(shuffleData, input, compressedLength, codec, ifileReadAhead, ifileReadAheadLength); @@ -145,7 +155,7 @@ public class ShuffleUtils { } public static void shuffleToDisk(OutputStream output, String hostIdentifier, - InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier, + InputStream input, long compressedLength, long decompressedLength, Logger LOG, InputAttemptIdentifier identifier, boolean ifileReadAhead, int ifileReadAheadLength, boolean verifyChecksum) throws IOException { // Copy data to local-disk long bytesLeft = compressedLength; @@ -530,6 +540,20 @@ public class ShuffleUtils { this.aggregateLogger = aggregateLogger; } + + private static StringBuilder toShortString(InputAttemptIdentifier inputAttemptIdentifier, StringBuilder sb) { + sb.append("{"); + sb.append(inputAttemptIdentifier.getInputIdentifier()); + sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber()); + sb.append(", ").append(inputAttemptIdentifier.getPathComponent()); + if (inputAttemptIdentifier.getFetchTypeInfo() + != InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED) { + sb.append(", ").append(inputAttemptIdentifier.getFetchTypeInfo().ordinal()); + sb.append(", ").append(inputAttemptIdentifier.getSpillEventId()); + } + sb.append("}"); + return sb; + } /** * Log individual fetch complete event. * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining @@ -545,19 +569,37 @@ public class ShuffleUtils { */ public void logIndividualFetchComplete(long millis, long bytesCompressed, long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) { - double rate = 0; - if (millis != 0) { - rate = bytesCompressed / ((double) millis / 1000); - rate = rate / (1024 * 1024); - } + if (activeLogger.isInfoEnabled()) { - activeLogger.info( - "Completed fetch for attempt: " - + toShortString(srcAttemptIdentifier) - +" to " + outputType + - ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed + - ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" + - MBPS_FORMAT.get().format(rate) + " MB/s"); + long wholeMBs = 0; + long partialMBs = 0; + if (millis != 0) { + // fast math is done using integer math to avoid double to string conversion + // calculate B/s * 100 to preserve MBs precision to two decimal places + // multiply numerator by 100000 (2^5 * 5^5) and divide denominator by MB (2^20) + // simply fraction to protect ourselves from overflow by factoring out 2^5 + wholeMBs = (bytesCompressed * 3125) / (millis * 32768); + partialMBs = wholeMBs % 100; + wholeMBs /= 100; + } + StringBuilder sb = new StringBuilder("Completed fetch for attempt: "); + toShortString(srcAttemptIdentifier, sb); + sb.append(" to "); + sb.append(outputType); + sb.append(", csize="); + sb.append(bytesCompressed); + sb.append(", dsize="); + sb.append(bytesDecompressed); + sb.append(", EndTime="); + sb.append(System.currentTimeMillis()); + sb.append(", TimeTaken="); + sb.append(millis); + sb.append(", Rate="); + sb.append(wholeMBs); + sb.append("."); + MBPS_FAST_FORMAT.get().format(partialMBs, sb); + sb.append(" MB/s"); + activeLogger.info(sb.toString()); } else { long currentCount, currentCompressedSize, currentDecompressedSize, currentTotalTime; synchronized (this) { @@ -583,21 +625,6 @@ public class ShuffleUtils { } } - private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) { - StringBuilder sb = new StringBuilder(); - sb.append("{"); - sb.append(inputAttemptIdentifier.getInputIdentifier()); - sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber()); - sb.append(", ").append(inputAttemptIdentifier.getPathComponent()); - if (inputAttemptIdentifier.getFetchTypeInfo() - != InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED) { - sb.append(", ").append(inputAttemptIdentifier.getFetchTypeInfo().ordinal()); - sb.append(", ").append(inputAttemptIdentifier.getSpillEventId()); - } - sb.append("}"); - return sb.toString(); - } - /** * Build {@link org.apache.tez.http.HttpConnectionParams} from configuration * http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index bcb75d2..58ca1e2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -504,11 +504,11 @@ class FetcherOrderedGrouped extends CallableWithNdc { if (mapOutput.getType() == Type.MEMORY) { ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input, (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead, - ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString()); + ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier()); } else if (mapOutput.getType() == Type.DISK) { ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(), input, compressedLength, decompressedLength, LOG, - mapOutput.getAttemptIdentifier().toString(), + mapOutput.getAttemptIdentifier(), ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); } else { throw new IOException("Unknown mapOutput type while fetching shuffle data:" + http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index f21da7c..b1ce716 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -284,7 +284,7 @@ public class TestShuffleUtils { byte[] header = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 1}; try { ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(header), - 1024, 128, mockCodec, false, 0, mock(Logger.class), "identifier"); + 1024, 128, mockCodec, false, 0, mock(Logger.class), null); Assert.fail("shuffle was supposed to throw!"); } catch (IOException e) { Assert.assertTrue(e.getCause() instanceof InternalError); @@ -301,14 +301,14 @@ public class TestShuffleUtils { ByteArrayInputStream in = new ByteArrayInputStream(bogusData); ByteArrayOutputStream baos = new ByteArrayOutputStream(); ShuffleUtils.shuffleToDisk(baos, "somehost", in, - bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, false); + bogusData.length, 2000, mock(Logger.class), null, false, 0, false); Assert.assertArrayEquals(bogusData, baos.toByteArray()); // verify sending same stream of zeroes with validation generates an exception in.reset(); try { ShuffleUtils.shuffleToDisk(mock(OutputStream.class), "somehost", in, - bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, true); + bogusData.length, 2000, mock(Logger.class), null, false, 0, true); Assert.fail("shuffle was supposed to throw!"); } catch (IOException e) { } http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index 9209ff4..a812728 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -235,8 +235,8 @@ public class TestMergeManager { assertEquals(0, mergeManager.getUsedMemory()); assertEquals(0, mergeManager.getCommitMemory()); - byte[] data1 = generateData(conf, 10); - byte[] data2 = generateData(conf, 20); + byte[] data1 = generateData(conf, 10, null); + byte[] data2 = generateData(conf, 20, null); MapOutput firstMapOutput = mergeManager.reserve(null, data1.length, data1.length, 0); MapOutput secondMapOutput = mergeManager.reserve(null, data2.length, data2.length, 0); assertEquals(MapOutput.Type.MEMORY, firstMapOutput.getType()); @@ -294,15 +294,19 @@ public class TestMergeManager { * - After 3 segment commits, it would trigger mem-to-mem merge. * - All of them can be merged in memory. */ - byte[] data1 = generateDataBySize(conf, 10); - byte[] data2 = generateDataBySize(conf, 20); - byte[] data3 = generateDataBySize(conf, 200); - byte[] data4 = generateDataBySize(conf, 20000); - - MapOutput mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0); - MapOutput mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0); - MapOutput mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0); - MapOutput mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0); + InputAttemptIdentifier inputAttemptIdentifier1 = new InputAttemptIdentifier(0,0); + InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(1,0); + InputAttemptIdentifier inputAttemptIdentifier3 = new InputAttemptIdentifier(2,0); + InputAttemptIdentifier inputAttemptIdentifier4 = new InputAttemptIdentifier(3,0); + byte[] data1 = generateDataBySize(conf, 10, inputAttemptIdentifier1); + byte[] data2 = generateDataBySize(conf, 20, inputAttemptIdentifier2); + byte[] data3 = generateDataBySize(conf, 200, inputAttemptIdentifier3); + byte[] data4 = generateDataBySize(conf, 20000, inputAttemptIdentifier4); + + MapOutput mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0); + MapOutput mo2 = mergeManager.reserve(inputAttemptIdentifier1, data2.length, data2.length, 0); + MapOutput mo3 = mergeManager.reserve(inputAttemptIdentifier1, data3.length, data3.length, 0); + MapOutput mo4 = mergeManager.reserve(inputAttemptIdentifier1, data4.length, data4.length, 0); assertEquals(MapOutput.Type.MEMORY, mo1.getType()); assertEquals(MapOutput.Type.MEMORY, mo2.getType()); @@ -351,15 +355,15 @@ public class TestMergeManager { mergeManager.configureAndStart(); //Single shuffle limit is 25% of 2000000 - data1 = generateDataBySize(conf, 10); - data2 = generateDataBySize(conf, 400000); - data3 = generateDataBySize(conf, 400000); - data4 = generateDataBySize(conf, 400000); + data1 = generateDataBySize(conf, 10, inputAttemptIdentifier1); + data2 = generateDataBySize(conf, 400000, inputAttemptIdentifier2); + data3 = generateDataBySize(conf, 400000, inputAttemptIdentifier3); + data4 = generateDataBySize(conf, 400000, inputAttemptIdentifier4); - mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0); - mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0); - mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0); - mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0); + mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0); + mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0); + mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0); + mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0); assertEquals(MapOutput.Type.MEMORY, mo1.getType()); assertEquals(MapOutput.Type.MEMORY, mo2.getType()); @@ -409,15 +413,15 @@ public class TestMergeManager { mergeManager.configureAndStart(); //Single shuffle limit is 25% of 2000000 - data1 = generateDataBySize(conf, 400000); - data2 = generateDataBySize(conf, 400000); - data3 = generateDataBySize(conf, 400000); - data4 = generateDataBySize(conf, 400000); + data1 = generateDataBySize(conf, 400000, inputAttemptIdentifier1); + data2 = generateDataBySize(conf, 400000, inputAttemptIdentifier2); + data3 = generateDataBySize(conf, 400000, inputAttemptIdentifier3); + data4 = generateDataBySize(conf, 400000, inputAttemptIdentifier4); - mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0); - mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0); - mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0); - mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0); + mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0); + mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0); + mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0); + mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0); assertEquals(MapOutput.Type.MEMORY, mo1.getType()); assertEquals(MapOutput.Type.MEMORY, mo2.getType()); @@ -465,15 +469,15 @@ public class TestMergeManager { mergeManager.configureAndStart(); //Single shuffle limit is 25% of 2000000 - data1 = generateDataBySize(conf, 490000); - data2 = generateDataBySize(conf, 490000); - data3 = generateDataBySize(conf, 490000); - data4 = generateDataBySize(conf, 230000); + data1 = generateDataBySize(conf, 490000, inputAttemptIdentifier1); + data2 = generateDataBySize(conf, 490000, inputAttemptIdentifier2); + data3 = generateDataBySize(conf, 490000, inputAttemptIdentifier3); + data4 = generateDataBySize(conf, 230000, inputAttemptIdentifier4); - mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0); - mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0); - mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0); - mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0); + mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0); + mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0); + mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0); + mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0); assertTrue(mergeManager.getUsedMemory() >= (490000 + 490000 + 490000 + 23000)); @@ -520,15 +524,15 @@ public class TestMergeManager { mergeManager.configureAndStart(); //Single shuffle limit is 25% of 2000000 - data1 = generateDataBySize(conf, 490000); - data2 = generateDataBySize(conf, 490000); - data3 = generateDataBySize(conf, 490000); - data4 = generateDataBySize(conf, 230000); + data1 = generateDataBySize(conf, 490000, inputAttemptIdentifier1); + data2 = generateDataBySize(conf, 490000, inputAttemptIdentifier2); + data3 = generateDataBySize(conf, 490000, inputAttemptIdentifier3); + data4 = generateDataBySize(conf, 230000, inputAttemptIdentifier4); - mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0); - mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0); - mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0); - mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0); + mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0); + mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0); + mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0); + mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0); assertTrue(mergeManager.getUsedMemory() >= (490000 + 490000 + 490000 + 23000)); @@ -566,7 +570,7 @@ public class TestMergeManager { Assert.assertFalse(mergeManager.isMergeComplete()); } - private byte[] generateDataBySize(Configuration conf, int rawLen) throws IOException { + private byte[] generateDataBySize(Configuration conf, int rawLen, InputAttemptIdentifier inputAttemptIdentifier) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); FSDataOutputStream fsdos = new FSDataOutputStream(baos, null); IFile.Writer writer = @@ -584,11 +588,11 @@ public class TestMergeManager { int rawLength = (int)writer.getRawLength(); byte[] data = new byte[rawLength]; ShuffleUtils.shuffleToMemory(data, new ByteArrayInputStream(baos.toByteArray()), - rawLength, compressedLength, null, false, 0, LOG, "sometask"); + rawLength, compressedLength, null, false, 0, LOG, inputAttemptIdentifier); return data; } - private byte[] generateData(Configuration conf, int numEntries) throws IOException { + private byte[] generateData(Configuration conf, int numEntries, InputAttemptIdentifier inputAttemptIdentifier) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); FSDataOutputStream fsdos = new FSDataOutputStream(baos, null); IFile.Writer writer = @@ -601,7 +605,7 @@ public class TestMergeManager { int rawLength = (int)writer.getRawLength(); byte[] data = new byte[rawLength]; ShuffleUtils.shuffleToMemory(data, new ByteArrayInputStream(baos.toByteArray()), - rawLength, compressedLength, null, false, 0, LOG, "sometask"); + rawLength, compressedLength, null, false, 0, LOG, inputAttemptIdentifier); return data; }