tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-1610. Add additional task counters for fetchers, merger. Contributed by Rajesh Balamohan.
Date Mon, 15 Dec 2014 20:20:23 GMT
Repository: tez
Updated Branches:
  refs/heads/master 53b38e870 -> 125620a45


TEZ-1610. Add additional task counters for fetchers, merger. Contributed
by Rajesh Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/125620a4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/125620a4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/125620a4

Branch: refs/heads/master
Commit: 125620a4577a34f8f165ff86350a9dc89ae96b05
Parents: 53b38e8
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Dec 15 12:19:29 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Dec 15 12:19:29 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/common/counters/TaskCounter.java | 30 ++++++++++++++++++++
 .../impl/ShuffleInputEventHandlerImpl.java      |  1 +
 .../common/shuffle/impl/ShuffleManager.java     | 20 ++++++++++++-
 .../common/shuffle/orderedgrouped/Shuffle.java  | 22 ++++++++++----
 .../ShuffleInputEventHandlerOrderedGrouped.java |  3 +-
 .../orderedgrouped/ShuffleScheduler.java        | 18 ++++++++++--
 7 files changed, 85 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/125620a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1fdf360..3b92f9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1610. Add additional task counters for fetchers, merger.
   TEZ-1775. Allow setting log level per logger.
   TEZ-1847. Fix package name for MiniTezClusterWithTimeline.
   TEZ-1846. Build fails with package org.apache.tez.dag.history.logging.ats does not exist.

http://git-wip-us.apache.org/repos/asf/tez/blob/125620a4/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 94cae5f..128b067 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -175,4 +175,34 @@ public enum TaskCounter {
    * Number of disk to disk merges performed during the sort-merge
    */
   NUM_DISK_TO_DISK_MERGES,
+
+  /**
+   * Time taken to shuffle data. This includes time taken to fetch the data
+   * & merging the data in parallel to fetching when needed.  This also includes any
+   * waiting time related to event delays from source.
+   *
+   * Represented in milliseconds.
+   */
+  SHUFFLE_PHASE_TIME,
+
+  /**
+   * Time taken to merge data retrieved during shuffle.
+   *
+   * Relative to task start time and expressed in milliseconds.
+   */
+  MERGE_PHASE_TIME,
+
+  /**
+   * First event received from source relative to task start time.
+   *
+   * Represented in milliseconds
+   */
+  FIRST_EVENT_RECEIVED,
+
+  /**
+   * Last event received from source relative to task start time.
+   *
+   * Represented in milliseconds
+   */
+  LAST_EVENT_RECEIVED
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/125620a4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 6736282..ffcd128 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -85,6 +85,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler
{
   private void handleEvent(Event event) throws IOException {
     if (event instanceof DataMovementEvent) {
       processDataMovementEvent((DataMovementEvent)event);
+      shuffleManager.updateEventReceivedTime();
     } else if (event instanceof InputFailedEvent) {
       processInputFailedEvent((InputFailedEvent)event);
     } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/125620a4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 69c015e..b7067e5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -69,7 +69,6 @@ import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
 import org.apache.tez.runtime.library.common.shuffle.Fetcher;
 import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
 import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
 import org.apache.tez.runtime.library.common.shuffle.InputHost;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -157,6 +156,10 @@ public class ShuffleManager implements FetcherCallback {
   private final Path[] localDisks;
   private final static String localhostName = NetUtils.getHostname();
 
+  private final TezCounter shufflePhaseTime;
+  private final TezCounter firstEventReceived;
+  private final TezCounter lastEventReceived;
+
   // TODO More counters - FetchErrors, speed?
   
   public ShuffleManager(InputContext inputContext, Configuration conf, int numInputs,
@@ -182,6 +185,10 @@ public class ShuffleManager implements FetcherCallback {
         TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
     this.sharedFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH,
         TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT);
+
+    this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
+    this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
+    this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
     
     this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
   
@@ -324,6 +331,7 @@ public class ShuffleManager implements FetcherCallback {
           }
         }
       }
+      shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
       LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
       // TODO NEWTEZ Maybe clean up inputs.
       if (!fetcherExecutor.isShutdown()) {
@@ -433,6 +441,16 @@ public class ShuffleManager implements FetcherCallback {
     }
   }
 
+  protected synchronized  void updateEventReceivedTime() {
+    long relativeTime = System.currentTimeMillis() - startTime;
+    if (firstEventReceived.getValue() == 0) {
+      firstEventReceived.setValue(relativeTime);
+      lastEventReceived.setValue(relativeTime);
+      return;
+    }
+    lastEventReceived.setValue(relativeTime);
+  }
+
   public void addCompletedInputWithData(
       InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/125620a4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 61e2dfa..fccb181 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -55,7 +55,6 @@ import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
 import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 
@@ -113,6 +112,10 @@ public class Shuffle implements ExceptionReporter {
   private AtomicBoolean schedulerClosed = new AtomicBoolean(false);
   private AtomicBoolean mergerClosed = new AtomicBoolean(false);
 
+  private final long startTime;
+  private final TezCounter mergePhaseTime;
+  private final TezCounter shufflePhaseTime;
+
   public Shuffle(InputContext inputContext, Configuration conf, int numInputs,
       long initialMemoryAvailable) throws IOException {
     this.inputContext = inputContext;
@@ -182,6 +185,7 @@ public class Shuffle implements ExceptionReporter {
 
     boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
       TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
+    startTime = System.currentTimeMillis();
     scheduler = new ShuffleScheduler(
           this.inputContext,
           this.conf,
@@ -193,7 +197,10 @@ public class Shuffle implements ExceptionReporter {
           failedShuffleCounter,
           bytesShuffedToDisk,
           bytesShuffedToDiskDirect,
-          bytesShuffedToMem);
+          bytesShuffedToMem,
+          startTime);
+    this.mergePhaseTime = inputContext.getCounters().findCounter(TaskCounter.MERGE_PHASE_TIME);
+    this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
 
     merger = new MergeManager(
           this.conf,
@@ -220,12 +227,13 @@ public class Shuffle implements ExceptionReporter {
 
     int configuredNumFetchers = 
         conf.getInt(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
     numFetchers = Math.min(configuredNumFetchers, numInputs);
     LOG.info("Num fetchers being started: " + numFetchers);
     fetchers = Lists.newArrayListWithCapacity(numFetchers);
-    localDiskFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
+    localDiskFetchEnabled = conf.getBoolean(
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
         TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
 
     executor = MoreExecutors.listeningDecorator(rawExecutor);
@@ -340,7 +348,8 @@ public class Shuffle implements ExceptionReporter {
           }
         }
       }
-      
+      shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
+
       // Stop the map-output fetcher threads
       cleanupFetchers(false);
       
@@ -354,7 +363,8 @@ public class Shuffle implements ExceptionReporter {
       } catch (Throwable e) {
         throw new ShuffleError("Error while doing final merge " , e);
       }
-      
+      mergePhaseTime.setValue(System.currentTimeMillis() - startTime);
+
       // Sanity check
       synchronized (Shuffle.this) {
         if (throwable != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/125620a4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 2feeaed..bc1b059 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -66,7 +66,8 @@ public class ShuffleInputEventHandlerOrderedGrouped {
   
   private void handleEvent(Event event) throws IOException {
     if (event instanceof DataMovementEvent) {
-      processDataMovementEvent((DataMovementEvent) event);      
+      processDataMovementEvent((DataMovementEvent) event);
+      scheduler.updateEventReceivedTime();
     } else if (event instanceof InputFailedEvent) {
       processTaskFailedEvent((InputFailedEvent) event);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/125620a4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 9cd8c64..52e7334 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -88,6 +88,8 @@ class ShuffleScheduler {
   private final TezCounter bytesShuffledToDisk;
   private final TezCounter bytesShuffledToDiskDirect;
   private final TezCounter bytesShuffledToMem;
+  private final TezCounter firstEventReceived;
+  private final TezCounter lastEventReceived;
 
   private final long startTime;
   private long lastProgressTime;
@@ -112,7 +114,7 @@ class ShuffleScheduler {
                           TezCounter failedShuffleCounter,
                           TezCounter bytesShuffledToDisk,
                           TezCounter bytesShuffledToDiskDirect,
-                          TezCounter bytesShuffledToMem) {
+                          TezCounter bytesShuffledToMem, long startTime) {
     this.inputContext = inputContext;
     this.numInputs = numberOfInputs;
     abortFailureLimit = Math.max(30, numberOfInputs / 10);
@@ -127,7 +129,7 @@ class ShuffleScheduler {
     this.bytesShuffledToDisk = bytesShuffledToDisk;
     this.bytesShuffledToDiskDirect = bytesShuffledToDiskDirect;
     this.bytesShuffledToMem = bytesShuffledToMem;
-    this.startTime = System.currentTimeMillis();
+    this.startTime = startTime;
     this.lastProgressTime = startTime;
 
     this.maxFailedUniqueFetches = Math.min(numberOfInputs,
@@ -146,6 +148,8 @@ class ShuffleScheduler {
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT));
     
     this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
+    this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
+    this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
 
     LOG.info("ShuffleScheduler running for sourceVertex: "
         + inputContext.getSourceVertexName() + " with configuration: "
@@ -156,6 +160,16 @@ class ShuffleScheduler {
         + ", maxMapRuntime=" + maxMapRuntime);
   }
 
+  protected synchronized  void updateEventReceivedTime() {
+    long relativeTime = System.currentTimeMillis() - startTime;
+    if (firstEventReceived.getValue() == 0) {
+      firstEventReceived.setValue(relativeTime);
+      lastEventReceived.setValue(relativeTime);
+      return;
+    }
+    lastEventReceived.setValue(relativeTime);
+  }
+
   public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier, 
                                          MapHost host,
                                          long bytesCompressed,


Mime
View raw message