Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 87AA5172D7 for ; Thu, 5 Feb 2015 21:00:46 +0000 (UTC) Received: (qmail 92074 invoked by uid 500); 5 Feb 2015 21:00:46 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 91987 invoked by uid 500); 5 Feb 2015 21:00:46 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 91780 invoked by uid 99); 5 Feb 2015 21:00:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Feb 2015 21:00:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2D050E05E0; Thu, 5 Feb 2015 21:00:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Thu, 05 Feb 2015 21:00:50 -0000 Message-Id: <771c778ff6e0402ea47dd785afe3c698@git.apache.org> In-Reply-To: <95f9600993c243ebbcba5227a05620b4@git.apache.org> References: <95f9600993c243ebbcba5227a05620b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/8] tez git commit: TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers (rbalamohan) TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5cf9105f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5cf9105f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5cf9105f Branch: refs/heads/TEZ-2003 Commit: 5cf9105fe47bb07aa42f5b3132ba13e81fe205a8 Parents: 7096d8a Author: Rajesh Balamohan Authored: Thu Feb 5 08:25:24 2015 +0530 Committer: Rajesh Balamohan Committed: Thu Feb 5 08:25:24 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/common/shuffle/HttpConnection.java | 1 + .../library/common/shuffle/ShuffleUtils.java | 39 ++++++++++++++++++++ .../common/shuffle/impl/ShuffleManager.java | 19 +--------- .../orderedgrouped/ShuffleScheduler.java | 21 ++--------- 5 files changed, 47 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 39d7f81..6a494ca 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers. TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo. TEZ-1999. IndexOutOfBoundsException during merge. TEZ-2000. Source vertex exists error during DAG submission. http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java index 4732a5a..1a5de41 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java @@ -237,6 +237,7 @@ public class HttpConnection { } // verify that replyHash is HMac of encHash SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr); + //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host LOG.info("for url=" + url + " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms"); } http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 629bab8..af02f9e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.net.MalformedURLException; import java.net.URL; import java.nio.ByteBuffer; +import java.text.DecimalFormat; import java.util.List; import javax.crypto.SecretKey; @@ -50,6 +51,14 @@ public class ShuffleUtils { private static final Log LOG = LogFactory.getLog(ShuffleUtils.class); public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle"; + static final ThreadLocal MBPS_FORMAT = + new ThreadLocal() { + @Override + protected DecimalFormat initialValue() { + return new DecimalFormat("0.00"); + } + }; + public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta) throws IOException { DataInputByteBuffer in = new DataInputByteBuffer(); @@ -233,5 +242,35 @@ public class ShuffleUtils { sb.append("]"); return sb.toString(); } + + /** + * Log individual fetch complete event. + * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining + * - amount of data transferred between source to destination machine + * - time taken to transfer data between source to destination machine + * - details on DISK/DISK_DIRECT/MEMORY based shuffles + * + * @param log + * @param millis + * @param bytesCompressed + * @param bytesDecompressed + * @param outputType + * @param srcAttemptIdentifier + */ + public static void logIndividualFetchComplete(Log log, long millis, long + bytesCompressed, + long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) { + double rate = 0; + if (millis != 0) { + rate = bytesCompressed / ((double) millis / 1000); + rate = rate / (1024 * 1024); + } + log.info( + "Completed fetch for attempt: " + + srcAttemptIdentifier + " to " + outputType + + ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed + + ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" + + MBPS_FORMAT.get().format(rate) + " MB/s"); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 13296c7..3dc8156 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -519,8 +519,8 @@ public class ShuffleManager implements FetcherCallback { if (!completedInputSet.contains(inputIdentifier)) { fetchedInput.commit(); committed = true; - logIndividualFetchComplete(copyDuration, fetchedBytes, decompressedLength, fetchedInput, - srcAttemptIdentifier); + ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration, + fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier); // Processing counters for completed and commit fetches only. Need // additional counters for excessive fetches - which primarily comes @@ -731,22 +731,7 @@ public class ShuffleManager implements FetcherCallback { + mbpsFormat.format(transferRate) + " MB/s)"); } - private void logIndividualFetchComplete(long millis, long fetchedBytes, long decompressedLength, - FetchedInput fetchedInput, - InputAttemptIdentifier srcAttemptIdentifier) { - double rate = 0; - if (millis != 0) { - rate = fetchedBytes / ((double) millis / 1000); - rate = rate / (1024 * 1024); - } - LOG.info( - "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType() + - ", CompressedSize=" + fetchedBytes + ", DecompressedSize=" + decompressedLength + - ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" + - mbpsFormat.format(rate) + " MB/s"); - } - private class SchedulerFutureCallback implements FutureCallback { @Override http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 066b94a..57e904b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -47,6 +47,7 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type; import com.google.common.collect.Lists; @@ -103,7 +104,7 @@ class ShuffleScheduler { private long totalBytesShuffledTillNow = 0; private DecimalFormat mbpsFormat = new DecimalFormat("0.00"); - + public ShuffleScheduler(InputContext inputContext, Configuration conf, int numberOfInputs, @@ -187,8 +188,8 @@ class ShuffleScheduler { } output.commit(); - logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed, output, - srcAttemptIdentifier); + ShuffleUtils.logIndividualFetchComplete(LOG, millis, bytesCompressed, + bytesDecompressed, output.getType().toString(), srcAttemptIdentifier); if (output.getType() == Type.DISK) { bytesShuffledToDisk.increment(bytesCompressed); } else if (output.getType() == Type.DISK_DIRECT) { @@ -234,20 +235,6 @@ class ShuffleScheduler { // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation. } - private void logIndividualFetchComplete(long millis, long bytesCompressed, long bytesDecompressed, - MapOutput output, - InputAttemptIdentifier srcAttemptIdentifier) { - double rate = 0; - if (millis != 0) { - rate = bytesCompressed / ((double) millis / 1000); - rate = rate / (1024 * 1024); - } - LOG.info( - "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + output.getType() + - ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed + - ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" + - mbpsFormat.format(rate) + " MB/s"); - } private void logProgress() { double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);