tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1074. Reduce the frequency at which counters are sent from the task to the AM to reduce AM CPU usage. Contributed by Rajesh Balamohan.
Date Mon, 28 Apr 2014 17:52:46 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master b084c7f8d -> 083555aca


TEZ-1074. Reduce the frequency at which counters are sent from the task
to the AM to reduce AM CPU usage. Contributed by Rajesh Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/083555ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/083555ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/083555ac

Branch: refs/heads/master
Commit: 083555aca45aae6cf1f113d08b3e9e3c3726117c
Parents: b084c7f
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Apr 28 10:51:49 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Apr 28 10:51:49 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/api/TezConfiguration.java |  8 ++++++++
 .../apache/hadoop/mapred/YarnTezDagChild.java    | 19 ++++++++++++++++++-
 2 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/083555ac/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 2e027c0..ba735fa 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -175,6 +175,14 @@ public class TezConfiguration extends Configuration {
       + "am.heartbeat.interval-ms.max";
   public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 100;
 
+  /**
+   * Interval after which counters are sent to AM in heartbeat  
+   */
+  public static final String TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS = TEZ_TASK_PREFIX
+      + "am.heartbeat.counter.interval-ms.max";
+  public static final long TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT =
+      1000;
+
   public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
       + "max-events-per-heartbeat.max";
   public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 100;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/083555ac/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index e4f76f7..7c02077 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -64,6 +64,7 @@ import org.apache.tez.common.TezLocalResource;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -111,6 +112,7 @@ public class YarnTezDagChild {
       new LinkedBlockingQueue<TezEvent>();
   private static AtomicLong requestCounter = new AtomicLong(0);
   private static long amPollInterval;
+  private static long hbCounterInterval;
   private static TezTaskUmbilicalProtocol umbilical;
   private static ReentrantReadWriteLock taskLock = new ReentrantReadWriteLock();
   private static LogicalIOProcessorRuntimeTask currentTask = null;
@@ -131,6 +133,7 @@ public class YarnTezDagChild {
   private static final float LOG_COUNTER_BACKOFF = 1.3f;
   private static int taskNonOobHeartbeatCounter = 0;
   private static int nextHeartbeatNumToLog = 0;
+  private static long prevHeartbeatTimeStamp =  System.currentTimeMillis();
 
   private static Thread startHeartbeatThread() {
     Thread heartbeatThread = new Thread(new Runnable() {
@@ -213,8 +216,19 @@ public class YarnTezDagChild {
         eventCounter = currentTask.getEventCounter();
         eventsRange = maxEventsToGet;
         if (!currentTask.isTaskDone() && !currentTask.hadFatalError()) {
+          TezCounters counters = null;
+          /**
+           * Increasing the heartbeat interval can delay the delivery of events.
+           * Sending just updated records would save CPU in DAG AM, but certain
+           * counters are updated very frequently. Until real time decisions are made
+           * based on these counters, it can be sent once per second.
+           */
+          if ((System.currentTimeMillis() - prevHeartbeatTimeStamp) > hbCounterInterval)
{
+            counters = currentTask.getCounters();
+            prevHeartbeatTimeStamp = System.currentTimeMillis();
+          }
           updateEvent = new TezEvent(new TaskStatusUpdateEvent(
-              currentTask.getCounters(), currentTask.getProgress()),
+              counters, currentTask.getProgress()),
                 new EventMetaData(EventProducerConsumerType.SYSTEM,
                     currentTask.getVertexName(), "", taskAttemptID));
           events.add(updateEvent);
@@ -344,6 +358,9 @@ public class YarnTezDagChild {
     amPollInterval = defaultConf.getLong(
         TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
         TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    hbCounterInterval = defaultConf.getLong(
+      TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
+      TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT);
     maxEventsToGet = defaultConf.getInt(
         TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
         TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);


Mime
View raw message