tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] tez git commit: TEZ-2830. Backport TEZ-2774 to branch-0.7. (sseth)
Date Wed, 16 Sep 2015 22:20:20 GMT
TEZ-2830. Backport TEZ-2774 to branch-0.7. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e15faa56
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e15faa56
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e15faa56

Branch: refs/heads/branch-0.7
Commit: e15faa56c5a1a9ab678fc15583189be8882228f1
Parents: fd7c2fc
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Sep 16 15:20:01 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Sep 16 15:20:01 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/common/AsyncDispatcher.java  |  14 +-
 .../tez/common/AsyncDispatcherConcurrent.java   |   9 +-
 .../org/apache/tez/common/TezUtilsInternal.java |  14 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  27 +++-
 .../app/dag/RootInputInitializerManager.java    |   2 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   2 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  14 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  34 ++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  99 ++++++------
 .../dag/app/launcher/ContainerLauncherImpl.java |   9 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  10 +-
 .../dag/app/rm/YarnTaskSchedulerService.java    | 155 +++++++++++--------
 .../app/rm/container/AMContainerHelpers.java    |  10 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  18 ++-
 .../tez/dag/app/rm/node/AMNodeTracker.java      |   2 +
 .../tez/dag/history/HistoryEventHandler.java    |   8 +-
 .../events/TaskAttemptFinishedEvent.java        |  12 +-
 .../history/events/TaskAttemptStartedEvent.java |   4 +-
 .../impl/SimpleHistoryLoggingService.java       |   4 +-
 .../dag/history/recovery/RecoveryService.java   |   8 +-
 .../resources/tez-container-log4j.properties    |   2 +-
 .../mapreduce/committer/MROutputCommitter.java  |   3 +-
 .../common/MRInputAMSplitGenerator.java         |  31 ++--
 .../common/MRInputSplitDistributor.java         |   7 +-
 .../tez/mapreduce/hadoop/MRInputHelpers.java    |  14 +-
 .../tez/mapreduce/partition/MRPartitioner.java  |  18 ++-
 .../logging/ats/ATSHistoryLoggingService.java   |   9 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  45 +++---
 .../runtime/api/impl/TezInputContextImpl.java   |   4 +-
 .../runtime/api/impl/TezOutputContextImpl.java  |   4 +-
 .../api/impl/TezProcessorContextImpl.java       |   4 +-
 .../common/resources/MemoryDistributor.java     |  76 +++++++--
 .../tez/runtime/metrics/TaskCounterUpdater.java |   4 +-
 .../tez/runtime/task/ContainerReporter.java     |   2 +-
 .../org/apache/tez/runtime/task/TezChild.java   |  36 ++---
 .../runtime/library/common/TezRuntimeUtils.java |   1 -
 .../WeightedScalingMemoryDistributor.java       |   8 +-
 .../tez/mapreduce/examples/RPCLoadGen.java      |   2 -
 39 files changed, 437 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c8bde4..3e56d6c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2830. Backport TEZ-2774 to branch-0.7. Improvements to logging in the AM and part of the runtime.
   TEZ-2829. Tez UI: minor fixes to in-progress update of UI from AM
   TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM.
   TEZ-2825. Report progress in terms of completed tasks to reduce load on AM for Tez UI

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index 4319f4f..159ccd9 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -130,7 +130,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
   @Override
   protected void serviceStart() throws Exception {
     eventHandlingThread = new Thread(createThread());
-    eventHandlingThread.setName("Dispatcher thread: " + name);
+    eventHandlingThread.setName("Dispatcher thread {" + name + "}");
     eventHandlingThread.start();
     
     //start all the components
@@ -211,7 +211,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
 
   private void checkForExistingConcurrentDispatcher(Class<? extends Enum> eventType) {
     AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(eventType);
-    Preconditions.checkState(concurrentDispatcher == null, 
+    Preconditions.checkState(concurrentDispatcher == null,
         "Multiple concurrent dispatchers cannot be registered for: " + eventType.getName());
   }
   
@@ -259,7 +259,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+    LOG.info(
+          "Registering " + eventType + " for independent dispatch using: " + handler.getClass());
     AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName);
     dispatcher.register(eventType, handler);
     eventDispatchers.put(eventType, dispatcher);
@@ -272,7 +273,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
+    LOG.info(
+          "Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
     AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
     dispatcher.register(eventType, handler);
     concurrentEventDispatchers.put(eventType, dispatcher);
@@ -286,8 +288,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
-        + handler.getClass());
+    LOG.info("Registering " + eventType + " with existing concurrent dispatch using: "
+          + handler.getClass());
     dispatcher.register(eventType, handler);
     concurrentEventDispatchers.put(eventType, dispatcher);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
index d19bf9e..321ea8b 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
@@ -136,7 +136,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
   @Override
   protected void serviceStart() throws Exception {
     execService = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true)
-        .setNameFormat("Dispatcher [" + this.name + "] #%d").build());
+        .setNameFormat("Dispatcher {" + this.name + "} #%d").build());
     for (int i=0; i<numThreads; ++i) {
       eventQueues.add(new LinkedBlockingQueue<Event>());
     }
@@ -215,7 +215,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
 
   private void checkForExistingDispatcher(Class<? extends Enum> eventType) {
     AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(eventType);
-    Preconditions.checkState(registeredDispatcher == null, 
+    Preconditions.checkState(registeredDispatcher == null,
         "Multiple dispatchers cannot be registered for: " + eventType.getName());
   }
 
@@ -263,7 +263,8 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+    LOG.info(
+          "Registering " + eventType + " for independent dispatch using: " + handler.getClass());
     AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
     dispatcher.register(eventType, handler);
     eventDispatchers.put(eventType, dispatcher);
@@ -278,7 +279,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
     LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
-        + handler.getClass());
+          + handler.getClass());
     dispatcher.register(eventType, handler);
     eventDispatchers.put(eventType, dispatcher);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 9c78377..55e1e6e 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -81,13 +81,10 @@ public class TezUtilsInternal {
 
 
   public static byte[] compressBytes(byte[] inBytes) throws IOException {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     byte[] compressed = compressBytesInflateDeflate(inBytes);
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
           + ", CompressTime: " + sw.elapsedMillis());
     }
@@ -95,13 +92,10 @@ public class TezUtilsInternal {
   }
 
   public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
           + ", UncompressTimeTaken: " + sw.elapsedMillis());
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 49ba802..f2194fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -435,7 +435,6 @@ public class DAGAppMaster extends AbstractService {
 
     // Prepare the TaskAttemptListener server for authentication of Containers
     // TaskAttemptListener gets the information via jobTokenSecretManager.
-    LOG.info("Adding session token to jobTokenSecretManager for application");
     jobTokenSecretManager.addTokenForJob(
         appAttemptID.getApplicationId().toString(), sessionToken);
 
@@ -461,8 +460,11 @@ public class DAGAppMaster extends AbstractService {
     dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
     dispatcher.register(DAGEventType.class, dagEventDispatcher);
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
-    if (!conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
-        TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT)) {
+    boolean useConcurrentDispatcher =
+        conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
+            TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT);
+    LOG.info("Using concurrent dispatcher: " + useConcurrentDispatcher);
+    if (!useConcurrentDispatcher) {
       dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
       dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
     } else {
@@ -522,7 +524,7 @@ public class DAGAppMaster extends AbstractService {
     currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
         appAttemptID.getAttemptId());
     if (LOG.isDebugEnabled()) {
-      LOG.info("Stage directory information for AppAttemptId :" + this.appAttemptID
+      LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID
           + " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
           + " recoveryAttemptDir :" + currentRecoveryDataDir);
     }
@@ -888,7 +890,7 @@ public class DAGAppMaster extends AbstractService {
 
     try {
       if (LOG.isDebugEnabled()) {
-        LOG.info("JSON dump for submitted DAG, dagId=" + dagId.toString()
+        LOG.debug("JSON dump for submitted DAG, dagId=" + dagId.toString()
             + ", json="
             + DAGUtils.generateSimpleJSONPlan(dagPB).toString());
       }
@@ -2027,6 +2029,7 @@ public class DAGAppMaster extends AbstractService {
   public static void main(String[] args) {
     try {
       Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+      final String pid = System.getenv().get("JVM_PID");
       String containerIdStr =
           System.getenv(Environment.CONTAINER_ID.name());
       String nodeHostString = System.getenv(Environment.NM_HOST.name());
@@ -2066,6 +2069,18 @@ public class DAGAppMaster extends AbstractService {
           false, "Run Tez Application Master in Session mode");
 
       CommandLine cliParser = new GnuParser().parse(opts, args);
+      boolean sessionModeCliOption = cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
+
+      LOG.info("Creating DAGAppMaster for "
+          + "applicationId=" + applicationAttemptId.getApplicationId()
+          + ", attemptNum=" + applicationAttemptId.getAttemptId()
+          + ", AMContainerId=" + containerId
+          + ", jvmPid=" + pid
+          + ", userFromEnv=" + jobUserName
+          + ", cliSessionOption=" + sessionModeCliOption
+          + ", pwd=" + System.getenv(Environment.PWD.name())
+          + ", localDirs=" + System.getenv(Environment.LOCAL_DIRS.name())
+          + ", logDirs=" + System.getenv(Environment.LOG_DIRS.name()));
 
       // TODO Does this really need to be a YarnConfiguration ?
       Configuration conf = new Configuration(new YarnConfiguration());
@@ -2077,7 +2092,7 @@ public class DAGAppMaster extends AbstractService {
           new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
               Integer.parseInt(nodePortString),
               Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime,
-              cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION),
+              sessionModeCliOption,
               System.getenv(Environment.PWD.name()),
               TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())),
               TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())),

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 4a8a286..13128f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -100,7 +100,7 @@ public class RootInputInitializerManager {
     this.vertex = vertex;
     this.eventHandler = appContext.getEventHandler();
     this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("InputInitializer [" + this.vertex.getName() + "] #%d").build());
+        .setDaemon(true).setNameFormat("InputInitializer {" + this.vertex.getName() + "} #%d").build());
     this.executor = MoreExecutors.listeningDecorator(rawExecutor);
     this.dagUgi = dagUgi;
     this.entityStateTracker = stateTracker;

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index f03dcd1..88dcc8d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -181,4 +181,6 @@ public interface Vertex extends Comparable<Vertex> {
   public int getKilledTaskAttemptCount();
 
   public Configuration getConf();
+
+  public boolean isSpeculationEnabled();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index bdc0207..f63f461 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -785,10 +785,12 @@ public class TaskAttemptImpl implements TaskAttempt,
             );
       }
       if (oldState != getInternalState()) {
-          LOG.info(attemptId + " TaskAttempt Transitioned from "
-           + oldState + " to "
-           + getInternalState() + " due to event "
-           + event.getType());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(attemptId + " TaskAttempt Transitioned from "
+              + oldState + " to "
+              + getInternalState() + " due to event "
+              + event.getType());
+        }
       }
     } finally {
       writeLock.unlock();
@@ -1118,7 +1120,9 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskSpec remoteTaskSpec;
       try {
         remoteTaskSpec = ta.createRemoteTaskSpec();
-        LOG.info("remoteTaskSpec:" + remoteTaskSpec);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("remoteTaskSpec:" + remoteTaskSpec);
+        }
       } catch (AMUserCodeException e) {
         String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta;
         LOG.error(msg, e);

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index e6027f5..46215d0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -501,7 +501,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         int toEventId = actualMax + fromEventId;
         events = new ArrayList<TezEvent>(tezEventsForTaskAttempts.subList(fromEventId, toEventId));
         LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId
-            + "-" + toEventId + ")");
+            + "-" + toEventId + ").");
         // currently not modifying the events so that we dont have to create
         // copies of events. e.g. if we have to set taskAttemptId into the TezEvent
         // destination metadata then we will need to create a copy of the TezEvent
@@ -760,12 +760,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   public boolean canCommit(TezTaskAttemptID taskAttemptID) {
     writeLock.lock();
     try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Commit go/no-go request from " + taskAttemptID);
+      }
       TaskState state = getState();
       if (state == TaskState.SCHEDULED) {
         // the actual running task ran and is done and asking for commit. we are still stuck 
         // in the scheduled state which indicates a backlog in event processing. lets wait for the
         // backlog to clear. returning false will make the attempt come back to us.
-        LOG.debug("Event processing delay. "
+        LOG.info(
+            "Event processing delay. "
             + "Attempt committing before state machine transitioned to running : Task {}", taskId);
         return false;
       }
@@ -796,7 +800,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         }
       } else {
         if (commitAttempt.equals(taskAttemptID)) {
-          LOG.info(taskAttemptID + " given a go for committing the task output.");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(taskAttemptID + " already given a go for committing the task output.");
+          }
           return true;
         }
         // Don't think this can be a pluggable decision, so simply raise an
@@ -804,9 +810,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         // Wait for commit attempt to succeed. Dont kill this. If commit
         // attempt fails then choose a different committer. When commit attempt
         // succeeds then this and others will be killed
-        LOG.info(commitAttempt
-            + " is current committer. Commit waiting for:  "
-            + taskAttemptID);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(commitAttempt + " is current committer. Commit waiting for:  " + taskAttemptID);
+        }
         return false;
       }
 
@@ -814,7 +820,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       writeLock.unlock();
     }
   }
-  
+
   TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
@@ -899,9 +905,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         internalError(event.getType());
       }
       if (oldState != getInternalState()) {
-        LOG.info(taskId + " Task Transitioned from " + oldState + " to "
-            + getInternalState() + " due to event "
-            + event.getType());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
+              + getInternalState() + " due to event "
+              + event.getType());
+        }
       }
     } finally {
       writeLock.unlock();
@@ -1112,7 +1120,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             //  other reasons.
             !attempt.isFinished()) {
           LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " +
-            task.successfulAttempt + " has succeeded");
+              task.successfulAttempt + " has succeeded");
           String diagnostics = null;
           TaskAttemptTerminationCause errCause = null;
           if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
@@ -1469,7 +1477,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       } else {
         // nothing to do
         LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " +
-                 task.successfulAttempt + " is already successful");
+            task.successfulAttempt + " is already successful");
         return TaskStateInternal.SUCCEEDED;
       }
     }
@@ -1512,7 +1520,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, TaskAttemptTerminationCause errorCause) {
     if (commitAttempt != null && commitAttempt.equals(attempt.getID())) {
-      LOG.info("Removing commit attempt: " + commitAttempt);
+      LOG.info("Unsetting commit attempt: " + commitAttempt + " since attempt is being killed");
       commitAttempt = null;
     }
     if (attempt != null && !attempt.isFinished()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index daeae3f..4a8309e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -877,6 +877,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     this.clock = clock;
     this.appContext = appContext;
     this.commitVertexOutputs = commitVertexOutputs;
+    this.logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
 
     this.taskAttemptListener = taskAttemptListener;
     this.taskHeartbeatHandler = thh;
@@ -935,6 +936,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
     this.containerContext = new ContainerContext(this.localResources,
         appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this);
+    LOG.info("Default container context for " + logIdentifier + "=" + containerContext + ", Default Resources=" + this.taskResource);
 
     if (vertexPlan.getInputsCount() > 0) {
       setAdditionalInputs(vertexPlan.getInputsList());
@@ -957,7 +959,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       speculator = new LegacySpeculator(vertexConf, getAppContext(), this);
     }
     
-    logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
+
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
 
@@ -971,7 +973,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return vertexConf;
   }
 
-  private boolean isSpeculationEnabled() {
+  @Override
+  public boolean isSpeculationEnabled() {
     return isSpeculationEnabled;
   }
 
@@ -2005,29 +2008,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
 
+  private static String constructCheckTasksForCompletionLog(VertexImpl vertex) {
+    String logLine = vertex.logIdentifier
+        + ", tasks=" + vertex.numTasks
+        + ", failed=" + vertex.failedTaskCount
+        + ", killed=" + vertex.killedTaskCount
+        + ", success=" + vertex.succeededTaskCount
+        + ", completed=" + vertex.completedTaskCount
+        + ", commits=" + vertex.commitFutures.size()
+        + ", err=" + vertex.terminationCause;
+    return logLine;
+  }
+
   // triggered by task_complete
   static VertexState checkTasksForCompletion(final VertexImpl vertex) {
-
-    LOG.info("Checking tasks for vertex completion for "
-        + vertex.logIdentifier
-        + ", numTasks=" + vertex.numTasks
-        + ", failedTaskCount=" + vertex.failedTaskCount
-        + ", killedTaskCount=" + vertex.killedTaskCount
-        + ", successfulTaskCount=" + vertex.succeededTaskCount
-        + ", completedTaskCount=" + vertex.completedTaskCount
-        + ", commitInProgress=" + vertex.commitFutures.size()
-        + ", terminationCause=" + vertex.terminationCause);
-
+    // this log helps quickly count the completion count for a vertex.
+    // grepping and counting for attempts and handling re-tries is time consuming
+    LOG.info("Task Completion: " + constructCheckTasksForCompletionLog(vertex));
     //check for vertex failure first
     if (vertex.completedTaskCount > vertex.tasks.size()) {
       LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
-          + " for vertex " + vertex.logIdentifier
-          + ", numTasks=" + vertex.numTasks
-          + ", failedTaskCount=" + vertex.failedTaskCount
-          + ", killedTaskCount=" + vertex.killedTaskCount
-          + ", successfulTaskCount=" + vertex.succeededTaskCount
-          + ", completedTaskCount=" + vertex.completedTaskCount
-          + ", terminationCause=" + vertex.terminationCause);
+          + constructCheckTasksForCompletionLog(vertex));
     }
 
     if (vertex.completedTaskCount == vertex.tasks.size()) {
@@ -2036,7 +2037,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       
       //Only succeed if tasks complete successfully and no terminationCause is registered.
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
-        LOG.info("All tasks are succeeded, vertex:" + vertex.logIdentifier);
+        LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier);
         if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
           // start commit if there're commits or just finish if no commits
           return commitOrFinish(vertex);
@@ -2054,16 +2055,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   //triggered by commit_complete
   static VertexState checkCommitsForCompletion(final VertexImpl vertex) {
-    LOG.info("Checking commits for vertex completion for "
-        + vertex.logIdentifier
-        + ", numTasks=" + vertex.numTasks
-        + ", failedTaskCount=" + vertex.failedTaskCount
-        + ", killedTaskCount=" + vertex.killedTaskCount
-        + ", successfulTaskCount=" + vertex.succeededTaskCount
-        + ", completedTaskCount=" + vertex.completedTaskCount
-        + ", commitInProgress=" + vertex.commitFutures.size()
-        + ", terminationCause=" + vertex.terminationCause);
-
+    LOG.info("Commits completion: "
+            + constructCheckTasksForCompletionLog(vertex));
     // terminationCause is null mean commit is succeeded, otherwise terminationCause will be set.
     if (vertex.terminationCause == null) {
       Preconditions.checkState(vertex.getState() == VertexState.COMMITTING,
@@ -2184,20 +2177,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   private void initializeCommitters() throws Exception {
     if (!this.additionalOutputSpecs.isEmpty()) {
-      LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
+      LOG.info("Setting up committers for vertex " + logIdentifier + ", numAdditionalOutputs=" +
+          additionalOutputs.size());
       for (Entry<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> entry:
           additionalOutputs.entrySet())  {
         final String outputName = entry.getKey();
         final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> od = entry.getValue();
         if (od.getControllerDescriptor() == null
             || od.getControllerDescriptor().getClassName() == null) {
-          LOG.info("Ignoring committer as none specified for output="
-              + outputName
-              + ", vertexId=" + logIdentifier);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Ignoring committer as none specified for output="
+                + outputName
+                + ", vertexId=" + logIdentifier);
+          }
           continue;
         }
         LOG.info("Instantiating committer for output=" + outputName
-            + ", vertexId=" + logIdentifier
+            + ", vertex=" + logIdentifier
             + ", committerClass=" + od.getControllerDescriptor().getClassName());
 
         dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
@@ -2214,12 +2210,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
                 .createClazzInstance(od.getControllerDescriptor().getClassName(),
                     new Class[]{OutputCommitterContext.class},
                     new Object[]{outputCommitterContext});
-            LOG.info("Invoking committer init for output=" + outputName
-                + ", vertexId=" + logIdentifier);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Invoking committer init for output=" + outputName
+                  + ", vertex=" + logIdentifier);
+            }
             outputCommitter.initialize();
             outputCommitters.put(outputName, outputCommitter);
-            LOG.info("Invoking committer setup for output=" + outputName
-                + ", vertexId=" + logIdentifier);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Invoking committer setup for output=" + outputName
+                  + ", vertex=" + logIdentifier);
+            }
             outputCommitter.setupOutput();
             return null;
           }
@@ -3913,8 +3913,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
       boolean forceTransitionToKillWait = false;
       vertex.completedTaskCount++;
-      LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : "
-          + vertex.completedTaskCount);
       VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
       Task task = vertex.tasks.get(taskEvent.getTaskID());
       if (taskEvent.getState() == TaskState.SUCCEEDED) {
@@ -4210,10 +4208,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
           int numEventsSent = events.size() - numPreRoutedEvents;
           if (numEventsSent > 0) {
             StringBuilder builder = new StringBuilder();
-            builder.append("Sending ").append(attemptID).append(" numEvents: ").append(numEventsSent)
-            .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId)
-            .append(" out of ").append(currEventCount).append(" on-demand events in vertex: ")
-            .append(getLogIdentifier());
+            builder.append("Sending ").append(attemptID).append(" ")
+                .append(numEventsSent)
+                .append(" events [").append(fromEventId).append(",").append(nextFromEventId)
+                .append(") total ").append(currEventCount).append(" ")
+                .append(getLogIdentifier());
             LOG.info(builder.toString());
           }
         }
@@ -4478,9 +4477,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     for (String inputName : inputsWithInitializers) {
       inputList.add(rootInputDescriptors.get(inputName));
     }
-    LOG.info("Vertex will initialize via inputInitializers "
-        + logIdentifier + ". Starting root input initializers: "
-        + inputsWithInitializers.size());
+    LOG.info("Starting " + inputsWithInitializers.size() + " inputInitializers for vertex " +
+        logIdentifier);
     initWaitsForRootInitializers = true;
     rootInputInitializerManager.runInputInitializers(inputList);
     // Send pending rootInputInitializerEvents
@@ -4564,6 +4562,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   @Override
   public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) {
+    LOG.info("Setting " + inputs.size() + " additional inputs for vertex" + this.logIdentifier);
     this.rootInputDescriptors = Maps.newHashMapWithExpectedSize(inputs.size());
     for (RootInputLeafOutputProto input : inputs) {
       addIO(input.getName());
@@ -4608,7 +4607,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   @Override
   public void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs) {
-    LOG.info("setting additional outputs for vertex " + this.vertexName);
+    LOG.info("Setting " + outputs.size() + " additional outputs for vertex " + this.logIdentifier);
     this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size());
     this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size());
     for (RootInputLeafOutputProto output : outputs) {

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 94889a1..da2200e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -147,7 +147,7 @@ public class ContainerLauncherImpl extends AbstractService implements
 
     @SuppressWarnings("unchecked")
     public synchronized void launch(NMCommunicatorLaunchRequestEvent event) {
-      LOG.info("Launching Container with Id: " + event.getContainerId());
+      LOG.info("Launching " + event.getContainerId());
       if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
         state = ContainerState.DONE;
         sendContainerLaunchFailedMsg(event.getContainerId(),
@@ -211,8 +211,7 @@ public class ContainerLauncherImpl extends AbstractService implements
       if(this.state == ContainerState.PREP) {
         this.state = ContainerState.KILLED_BEFORE_LAUNCH;
       } else {
-        LOG.info("Sending a stop request to the NM for ContainerId: "
-            + containerID);
+        LOG.info("Stopping " + containerID);
 
         ContainerManagementProtocolProxyData proxy = null;
         try {
@@ -378,7 +377,9 @@ public class ContainerLauncherImpl extends AbstractService implements
 
     @Override
     public void run() {
-      LOG.info("Processing the event " + event.toString());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing event: " + event.toString());
+      }
 
       // Load ContainerManager tokens before creating a connection.
       // TODO: Do it only once per NodeManager.

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 6d57737..785caf7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -144,7 +144,9 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
 
   public synchronized void handleEvent(AMSchedulerEvent sEvent) {
-    LOG.info("Processing the event " + sEvent.toString());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing the event " + sEvent.toString());
+    }
     switch (sEvent.getType()) {
     case S_TA_LAUNCH_REQUEST:
       handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
@@ -160,7 +162,7 @@ public class TaskSchedulerEventHandler extends AbstractService
         handleTASucceeded(event);
         break;
       default:
-        throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState());
+        throw new TezUncheckedException("Unexpected TA_ENDED state: " + event.getState());
       }
       break;
     case S_CONTAINER_DEALLOCATE:
@@ -302,8 +304,8 @@ public class TaskSchedulerEventHandler extends AbstractService
               event);
           return;
         }
-        LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity 
-            + " but no locality information exists for it. Ignoring hint.");
+        LOG.info("No attempt for task affinity to " + taskAffinity + " for attempt "
+            + taskAttempt.getID() + " Ignoring.");
         // fall through with null hosts/racks
       } else {
         hosts = (locationHint.getHosts() != null) ? locationHint

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 73fcb3d..f35a45f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -348,7 +348,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         "Heartbeats between preemptions should be >=1");
 
     delayedContainerManager = new DelayedContainerManager();
-    LOG.info("TaskScheduler initialized with configuration: " +
+    LOG.info("YarnTaskScheduler initialized with configuration: " +
             "maxRMHeartbeatInterval: " + heartbeatIntervalMax +
             ", containerReuseEnabled: " + shouldReuseContainers +
             ", reuseRackLocal: " + reuseRackLocal +
@@ -428,8 +428,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   @Override
   public void onContainersCompleted(List<ContainerStatus> statuses) {
     if (isStopStarted.get()) {
-      for (ContainerStatus status : statuses) {
-        LOG.info("Container " + status.getContainerId() + " is completed");
+      if (LOG.isDebugEnabled()) {
+        for (ContainerStatus status : statuses) {
+          LOG.debug("Container " + status.getContainerId() + " is completed with ContainerStatus=" +
+              status);
+        }
       }
       return;
     }
@@ -450,8 +453,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           // being released
           // completion of a container we had released earlier
           // an allocated container completed. notify app
-          LOG.info("Released container completed:" + completedId +
-                   " last allocated to task: " + task);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Released container completed:" + completedId +
+                " last allocated to task: " + task);
+          }
           appContainerStatus.put(task, containerStatus);
           continue;
         }
@@ -467,9 +472,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         }
         if(task != null) {
           // completion of a container we have allocated currently
-          // an allocated container completed. notify app
-          LOG.info("Allocated container completed:" + completedId +
-                   " last allocated to task: " + task);
+          // an allocated container completed. notify app. This will cause attempt to get killed
+          LOG.info(
+              "Allocated container completed:" + completedId + " last allocated to task: " + task);
           appContainerStatus.put(task, containerStatus);
           continue;
         }
@@ -488,9 +493,13 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   @Override
   public void onContainersAllocated(List<Container> containers) {
     if (isStopStarted.get()) {
-      for (Container container : containers) {
-        LOG.info("Release container:" + container.getId() + ", because it is shutting down.");
-        releaseContainer(container.getId());
+      LOG.info("Ignoring container allocations because application is shutting down. Num " + 
+          containers.size());
+      if (LOG.isDebugEnabled()) {
+        for (Container container : containers) {
+          LOG.debug("Release container:" + container.getId() + ", because App is shutting down.");
+          releaseContainer(container.getId());
+        }
       }
       return;
     }
@@ -549,6 +558,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     }
 
     // Release any unassigned containers given by the RM
+    if (containers.iterator().hasNext()) {
+      LOG.info("Releasing newly assigned containers which could not be allocated");
+    }
     releaseUnassignedContainers(containers);
 
     return assignedContainers;
@@ -602,15 +614,15 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     boolean isNew = heldContainer.isNew();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Trying to assign a delayed container"
-        + ", containerId=" + heldContainer.getContainer().getId()
-        + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
-        + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
-        + ", AMState=" + state
-        + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
-        + ", taskRequestsCount=" + taskRequests.size()
-        + ", heldContainers=" + heldContainers.size()
-        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
-        + ", isNew=" + isNew);
+          + ", containerId=" + heldContainer.getContainer().getId()
+          + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
+          + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
+          + ", AMState=" + state
+          + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
+          + ", taskRequestsCount=" + taskRequests.size()
+          + ", heldContainers=" + heldContainers.size()
+          + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+          + ", isNew=" + isNew);
     }
 
     if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) {
@@ -658,7 +670,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
             + ", isNew=" + isNew);
           releaseUnassignedContainers(
-              Lists.newArrayList(heldContainer.getContainer()));        
+              Collections.singletonList((heldContainer.getContainer())));        
       } else {
         // no outstanding work and container idle timeout not expired
         if (LOG.isDebugEnabled()) {
@@ -711,7 +723,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         assignReUsedContainerWithLocation(containerToAssign,
             NODE_LOCAL_ASSIGNER, assignedContainers, true);
         if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-          LOG.info("Failed to assign tasks to delayed container using node"
+          LOG.debug("Failed to assign tasks to delayed container using node"
             + ", containerId=" + heldContainer.getContainer().getId());
         }
       }
@@ -727,7 +739,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           assignReUsedContainerWithLocation(containerToAssign,
               RACK_LOCAL_ASSIGNER, assignedContainers, false);
           if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-            LOG.info("Failed to assign tasks to delayed container using rack"
+            LOG.debug("Failed to assign tasks to delayed container using rack"
               + ", containerId=" + heldContainer.getContainer().getId());
           }
         }
@@ -743,7 +755,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
          assignReUsedContainerWithLocation(containerToAssign,
               NON_LOCAL_ASSIGNER, assignedContainers, false);
           if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-            LOG.info("Failed to assign tasks to delayed container using non-local"
+            LOG.debug("Failed to assign tasks to delayed container using non-local"
                 + ", containerId=" + heldContainer.getContainer().getId());
           }
         }
@@ -765,10 +777,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
           && idleContainerTimeoutMin != -1) {
           LOG.info("Container's idle timeout expired. Releasing container"
-            + ", containerId=" + heldContainer.container.getId()
-            + ", containerExpiryTime="
-            + heldContainer.getContainerExpiryTime()
-            + ", idleTimeoutMin=" + idleContainerTimeoutMin);
+              + ", containerId=" + heldContainer.container.getId()
+              + ", containerExpiryTime="
+              + heldContainer.getContainerExpiryTime()
+              + ", idleTimeoutMin=" + idleContainerTimeoutMin);
           releaseUnassignedContainers(
             Lists.newArrayList(heldContainer.container));
         } else {
@@ -815,11 +827,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             if (safeToRelease && 
                 (!taskRequests.isEmpty() || !appContext.isSession())) {
               LOG.info("Releasing held container as either there are pending but "
-                + " unmatched requests or this is not a session"
-                + ", containerId=" + heldContainer.container.getId()
-                + ", pendingTasks=" + taskRequests.size()
-                + ", isSession=" + appContext.isSession()
-                + ". isNew=" + isNew);
+                  + " unmatched requests or this is not a session"
+                  + ", containerId=" + heldContainer.container.getId()
+                  + ", pendingTasks=" + taskRequests.size()
+                  + ", isSession=" + appContext.isSession()
+                  + ". isNew=" + isNew);
               releaseUnassignedContainers(
                 Lists.newArrayList(heldContainer.container));
             } else {
@@ -894,8 +906,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       // TODO this will not handle dynamic changes in resources
       totalResources = Resources.clone(getAvailableResources());
       LOG.info("App total resource memory: " + totalResources.getMemory() +
-               " cpu: " + totalResources.getVirtualCores() +
-               " taskAllocations: " + taskAllocations.size());
+          " cpu: " + totalResources.getVirtualCores() +
+          " taskAllocations: " + taskAllocations.size());
     }
 
     numHeartbeats++;
@@ -994,9 +1006,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     // See if any of the delayedContainers can be used for this task.
     delayedContainerManager.triggerScheduling(true);
     LOG.info("Allocation request for task: " + task +
-      " with request: " + request + 
-      " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") +
-      " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null"));
+          " with request: " + request +
+          " host: " + ((hosts != null && hosts.length > 0) ? hosts[0] : "null") +
+          " rack: " + ((racks != null && racks.length > 0) ? racks[0] : "null"));
   }
 
   /**
@@ -1025,8 +1037,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         LOG.info("Ignoring removal of unknown task: " + task);
         return false;
       } else {
-        LOG.info("Deallocated task: " + task + " from container: "
-            + container.getId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Deallocated task: " + task + " from container: "
+              + container.getId());
+        }
 
         if (!taskSucceeded || !shouldReuseContainers) {
           if (LOG.isDebugEnabled()) {
@@ -1046,6 +1060,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             }
             assignedContainers = assignDelayedContainer(heldContainer);
           } else {
+            // this is a non standard situation
             LOG.info("Skipping container after task deallocate as container is"
                 + " no longer running, containerId=" + container.getId());
           }
@@ -1064,8 +1079,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   public synchronized Object deallocateContainer(ContainerId containerId) {
     Object task = unAssignContainer(containerId, true);
     if(task != null) {
+      // non-standard case for the app layer to deallocate container
       LOG.info("Deallocated container: " + containerId +
-        " from task: " + task);
+          " from task: " + task);
       return task;
     }
 
@@ -1075,9 +1091,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   @Override
   public synchronized void initiateStop() {
-    LOG.info("Initiate stop to YarnTaskScheduler");
+    LOG.info("Initiating stop of YarnTaskScheduler");
     // release held containers
-    LOG.info("Release held containers");
+    LOG.info("Releasing held containers");
     isStopStarted.set(true);
     // Create a new list for containerIds to iterate, otherwise it would cause ConcurrentModificationException
     // because method releaseContainer will change heldContainers.
@@ -1090,7 +1106,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     }
 
     // remove taskRequest from AMRMClient to avoid allocating new containers in the next heartbeat
-    LOG.info("Remove all the taskRequests");
+    LOG.info("Removing all pending taskRequests");
     // Create a new list for tasks to avoid ConcurrentModificationException
     List<Object> tasks = new ArrayList<Object>(taskRequests.size());
     for (Object task : taskRequests.keySet()) {
@@ -1117,6 +1133,14 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     return (int) Math.ceil((original * percent)/100.f);
   }
   
+  private String constructPreemptionPeriodicLog(Resource freeResource) {
+    return "Allocated: " + allocatedResources +
+      " Free: " + freeResource +
+      " pendingRequests: " + taskRequests.size() +
+      " delayedContainers: " + delayedContainerManager.delayedContainers.size() +
+      " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption;
+  }
+  
   void preemptIfNeeded() {
     if (preemptionPercentage == 0) {
       // turned off
@@ -1127,10 +1151,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     synchronized (this) {
       Resource freeResources = amRmClient.getAvailableResources();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
-          " cpu:" + allocatedResources.getVirtualCores() + 
-          " delayedContainers: " + delayedContainerManager.delayedContainers.size() +
-          " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption);
+        LOG.debug(constructPreemptionPeriodicLog(freeResources));
+      } else {
+        if (numHeartbeats % 50 == 1) {
+          LOG.info(constructPreemptionPeriodicLog(freeResources));
+        }
       }
       assert freeResources.getMemory() >= 0;
   
@@ -1156,8 +1181,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       
       if(fitsIn(highestPriRequest.getCapability(), freeResources)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Highest pri request: " + highestPriRequest + " fits in available resources "
-              + freeResources);
+          LOG.debug(highestPriRequest + " fits in free resources");
+        } else {
+          if (numHeartbeats % 50 == 1) {
+            LOG.info(highestPriRequest + " fits in free resources");
+          }
         }
         return;
       }
@@ -1651,8 +1679,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   private void releaseUnassignedContainers(Iterable<Container> containers) {
     for (Container container : containers) {
-      LOG.info("Releasing unused container: "
-          + container.getId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Releasing unused container: " + container.getId());
+      }
       releaseContainer(container.getId());
     }
   }
@@ -1721,19 +1750,17 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       Object task = getTask(assigned);
       assert task != null;
 
-      LOG.info("Assigning container to task"
-        + ", container=" + container
+      LOG.info("Assigning container to task: "
+        + "containerId=" + container.getId()
         + ", task=" + task
-        + ", containerHost=" + container.getNodeId().getHost()
+        + ", containerHost=" + container.getNodeId()
+        + ", containerPriority= " + container.getPriority()
+        + ", containerResources=" + container.getResource()
         + ", localityMatchType=" + locality
         + ", matchedLocation=" + matchedLocation
         + ", honorLocalityFlags=" + honorLocalityFlags
-        + ", reusedContainer="
-        + containerAssignments.containsKey(container.getId())
-        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
-        + ", containerResourceMemory=" + container.getResource().getMemory()
-        + ", containerResourceVCores="
-        + container.getResource().getVirtualCores());
+        + ", reusedContainer=" + containerAssignments.containsKey(container.getId())
+        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size());
 
       assignContainer(task, container, assigned);
     }
@@ -1921,6 +1948,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
                 heldContainers.get(delayedContainer.getContainer().getId())) {
               assignedContainers = assignDelayedContainer(delayedContainer);
             } else {
+              // non standard scenario
               LOG.info("Skipping delayed container as container is no longer"
                   + " running, containerId="
                   + delayedContainer.getContainer().getId());
@@ -1975,9 +2003,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           HeldContainer delayedContainer = iter.next();
           if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) {
             // this container is no longer held by us
+            // non standard scenario
             LOG.info("AssignAll - Skipping delayed container as container is no longer"
-                + " running, containerId="
-                + delayedContainer.getContainer().getId());
+                  + " running, containerId="
+                  + delayedContainer.getContainer().getId());
             iter.remove();
           }
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 470fa56..11b5006 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -113,8 +113,10 @@ public class AMContainerHelpers {
       // correctly, even though they may not be used by all tasks which will run
       // on this container.
 
-      LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
-          + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding #" + credentials.numberOfTokens() + " tokens and #"
+            + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container in common CLC");
+      }
       containerCredentials.addAll(credentials);
 
       DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
@@ -123,7 +125,9 @@ public class AMContainerHelpers {
           containerTokens_dob.getLength());
 
       // Add shuffle token
-      LOG.info("Putting shuffle token in serviceData");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Putting shuffle token in serviceData in common CLC");
+      }
       serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
           TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9b90752..8b6e861 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -397,9 +397,11 @@ public class AMContainerImpl implements AMContainer {
         // TODO Can't set state to COMPLETED. Add a default error state.
       }
       if (oldState != getState()) {
-        LOG.info("AMContainer " + this.containerId + " transitioned from "
-            + oldState + " to " + getState()
-            + " via event " + event.getType());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("AMContainer " + this.containerId + " transitioned from "
+              + oldState + " to " + getState()
+              + " via event " + event.getType());
+        }
       }
     } finally {
       writeLock.unlock();
@@ -450,8 +452,10 @@ public class AMContainerImpl implements AMContainer {
       // task is not told to die since the TAL does not know about the container.
       container.registerWithTAListener();
       container.sendStartRequestToNM(clc);
-      LOG.info("Sending Launch Request for Container with id: " +
-          container.container.getId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending Launch Request for Container with id: " +
+            container.container.getId());
+      }
     }
   }
 
@@ -509,7 +513,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
       container.deAllocate();
-      LOG.info(
+      LOG.warn(
           "Unexpected event type: " + cEvent.getType() + " while in state: " +
               container.getState() + ". Event: " + cEvent);
 
@@ -573,8 +577,6 @@ public class AMContainerImpl implements AMContainer {
         }
       }
 
-      LOG.info("Assigned taskAttempt + [" + container.currentAttempt +
-          "] to container: [" + container.getContainerId() + "]");
       AMContainerTask amContainerTask = new AMContainerTask(
           event.getRemoteTaskSpec(), container.additionalLocalResources,
           container.credentialsChanged ? container.credentials : null, container.credentialsChanged,

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 102cbe9..a067cee 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -132,6 +132,8 @@ public class AMNodeTracker extends AbstractService implements
       AMNode amNode = nodeMap.get(nodeId);
       if (amNode == null) {
         LOG.info("Ignoring RM Health Update for unknown node: " + nodeId);
+        // This implies the node exists on the cluster, but is not running a container for
+        // this application
       } else {
         amNode.handle(rEvent);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 9e275a2..e17a4d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -47,8 +47,6 @@ public class HistoryEventHandler extends CompositeService {
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    LOG.info("Initializing HistoryEventHandler");
-
     this.recoveryEnabled = context.getAMConf().getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
         TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
 
@@ -56,6 +54,10 @@ public class HistoryEventHandler extends CompositeService {
         TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
         TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
 
+    LOG.info("Initializing HistoryEventHandler with"
+        + "recoveryEnabled=" + recoveryEnabled
+        + ", historyServiceClassName=" + historyServiceClassName);
+
     historyLoggingService =
         ReflectionUtils.createClazzInstance(historyServiceClassName);
     historyLoggingService.setAppContext(context);
@@ -66,11 +68,11 @@ public class HistoryEventHandler extends CompositeService {
       addService(recoveryService);
     }
     super.serviceInit(conf);
+
   }
 
   @Override
   public void serviceStart() throws Exception {
-    LOG.info("Starting HistoryEventHandler");
     super.serviceStart();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 9f24151..7d83db2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -177,6 +177,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
 
   @Override
   public String toString() {
+    String counterStr = "";
+    if (state != TaskAttemptState.SUCCEEDED) {
+      counterStr = ", counters=" + ( tezCounters == null ? "null" :
+        tezCounters.toString()
+        .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+    }
     return "vertexName=" + vertexName
         + ", taskAttemptId=" + taskAttemptId
         + ", creationTime=" + creationTime
@@ -187,11 +193,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", status=" + state.name()
         + ", errorEnum=" + (error != null ? error.name() : "")
         + ", diagnostics=" + diagnostics
-        + ", lastDataEventSourceTA=" + 
-              ((dataEvents==null) ? 0:dataEvents.size())
-        + ", counters=" + (tezCounters == null ? "null" :
-          tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+        + counterStr;
   }
 
   public TezTaskAttemptID getTaskAttemptID() {

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index a58b49e..71d4419 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -110,9 +110,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
         + ", taskAttemptId=" + taskAttemptId
         + ", startTime=" + launchTime
         + ", containerId=" + containerId
-        + ", nodeId=" + nodeId
-        + ", inProgressLogs=" + inProgressLogsUrl
-        + ", completedLogs=" + completedLogsUrl;
+        + ", nodeId=" + nodeId;
   }
 
   public TezTaskAttemptID getTaskAttemptID() {

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
index 8852e02..4372d8e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -148,7 +148,9 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService {
     if (loggingDisabled) {
       return;
     }
-    LOG.info("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+    }
     try {
       try {
         JSONObject eventJson = HistoryEventJsonConversion.convertToJson(event.getHistoryEvent());

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index d870645..2fe0e6d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -106,7 +106,6 @@ public class RecoveryService extends AbstractService {
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    LOG.info("Initializing RecoveryService");
     recoveryPath = appContext.getCurrentRecoveryDir();
     recoveryDirFS = FileSystem.get(recoveryPath.toUri(), conf);
     bufferSize = conf.getInt(TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
@@ -120,11 +119,16 @@ public class RecoveryService extends AbstractService {
     drainEventsFlag = conf.getBoolean(
         TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
         TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT);
+
+    LOG.info("RecoveryService initialized with "
+      + "recoveryPath=" + recoveryPath
+      + ", bufferSize(bytes)=" + bufferSize
+      + ", flushInterval(s)=" + flushInterval
+      + ", maxUnflushedEvents=" + maxUnflushedEvents);
   }
 
   @Override
   public void serviceStart() {
-    LOG.info("Starting RecoveryService");
     lastFlushTime = appContext.getClock().getTime();
     eventHandlingThread = new Thread(new Runnable() {
       @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/resources/tez-container-log4j.properties
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties
index 7a2aeab..c53994e 100644
--- a/tez-dag/src/main/resources/tez-container-log4j.properties
+++ b/tez-dag/src/main/resources/tez-container-log4j.properties
@@ -28,7 +28,7 @@ log4j.appender.CLA=org.apache.tez.common.TezContainerLogAppender
 log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
 
 log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}: %m%n
+log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t]|| %c{2} %m%n:
 
 #
 # Event Counter Appender

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 69237d4..1b66c8e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -117,8 +117,9 @@ public class MROutputCommitter extends OutputCommitter {
     if (jobConf.getBoolean("mapred.reducer.new-api", false)
         || jobConf.getBoolean("mapred.mapper.new-api", false))  {
       newApiCommitter = true;
-      LOG.info("Using mapred newApiCommitter.");
     }
+    LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() +
+        " using " + (newApiCommitter ? "new" : "old") + "mapred API");
 
     if (newApiCommitter) {
       TaskAttemptID taskAttemptID = new TaskAttemptID(

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index dbc7748..b93e4ba 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -69,35 +69,30 @@ public class MRInputAMSplitGenerator extends InputInitializer {
 
   @Override
   public List<Event> initialize() throws Exception {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     MRInputUserPayloadProto userPayloadProto = MRInputHelpers
         .parseMRInputPayload(getContext().getInputUserPayload());
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("Time to parse MRInput payload into prot: "
           + sw.elapsedMillis());
     }
-    if (LOG.isDebugEnabled()) {
-      sw.reset().start();
-    }
+    sw.reset().start();
     Configuration conf = TezUtils.createConfFromByteString(userPayloadProto
         .getConfigurationBytes());
     
     sendSerializedEvents = conf.getBoolean(
         MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD,
         MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD_DEFAULT);
-    LOG.info("Emitting serialized splits: " + sendSerializedEvents);
+
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
+      LOG.debug("Emitting serialized splits: " + sendSerializedEvents + " for input " +
+          getContext().getInputName());
       LOG.debug("Time converting ByteString to configuration: " + sw.elapsedMillis());
     }
 
-    if (LOG.isDebugEnabled()) {
-      sw.reset().start();
-    }
+    sw.reset().start();
 
     int totalResource = getContext().getTotalAvailableResource().getMemory();
     int taskResource = getContext().getVertexTaskResource().getMemory();
@@ -107,24 +102,26 @@ public class MRInputAMSplitGenerator extends InputInitializer {
 
     int numTasks = (int)((totalResource*waves)/taskResource);
 
+
+
+    boolean groupSplits = userPayloadProto.getGroupingEnabled();
     LOG.info("Input " + getContext().getInputName() + " asking for " + numTasks
         + " tasks. Headroom: " + totalResource + " Task Resource: "
-        + taskResource + " waves: " + waves);
+        + taskResource + " waves: " + waves + ", groupingEnabled: " + groupSplits);
 
     // Read all credentials into the credentials instance stored in JobConf.
     JobConf jobConf = new JobConf(conf);
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
 
     InputSplitInfoMem inputSplitInfo = null;
-    boolean groupSplits = userPayloadProto.getGroupingEnabled();
+
     if (groupSplits) {
-      LOG.info("Grouping input splits");
       inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, true, numTasks);
     } else {
       inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
     }
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("Time to create splits to mem: " + sw.elapsedMillis());
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index e6b70d2..28d108e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -69,14 +69,11 @@ public class MRInputSplitDistributor extends InputInitializer {
 
   @Override
   public List<Event> initialize() throws IOException {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     MRInputUserPayloadProto userPayloadProto = MRInputHelpers
         .parseMRInputPayload(getContext().getInputUserPayload());
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("Time to parse MRInput payload into prot: "
           + sw.elapsedMillis());  
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 7f5e0e3..30e4a8c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -284,21 +284,27 @@ public class MRInputHelpers {
     InputSplitInfoMem splitInfoMem = null;
     JobConf jobConf = new JobConf(conf);
     if (jobConf.getUseNewMapper()) {
-      LOG.info("Generating mapreduce api input splits");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Generating mapreduce api input splits");
+      }
       Job job = Job.getInstance(conf);
       org.apache.hadoop.mapreduce.InputSplit[] splits =
           generateNewSplits(job, groupSplits, targetTasks);
       splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
           splits.length, job.getCredentials(), job.getConfiguration());
     } else {
-      LOG.info("Generating mapred api input splits");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Generating mapred api input splits");
+      }
       org.apache.hadoop.mapred.InputSplit[] splits =
           generateOldSplits(jobConf, groupSplits, targetTasks);
       splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
           splits.length, jobConf.getCredentials(), jobConf);
     }
-    LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
-        + splitInfoMem.getSplitsProto().getSerializedSize());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
+          + splitInfoMem.getSplitsProto().getSerializedSize());
+    }
     return splitInfoMem;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
index 720af50..80828d4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
@@ -51,11 +51,13 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
     if (useNewApi) {
       oldPartitioner = null;
       if (partitions > 1) {
+        Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>> clazz =
+            (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
+                .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
+                    org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
+        LOG.info("Using newApi, MRpartitionerClass=" + clazz.getName());
         newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
-            .newInstance(
-                (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
-                    .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
-                        org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class), conf);
+            .newInstance(clazz, conf);
       } else {
         newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
           @Override
@@ -67,10 +69,12 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
     } else {
       newPartitioner = null;
       if (partitions > 1) {
-        oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+        Class<? extends org.apache.hadoop.mapred.Partitioner> clazz =
             (Class<? extends org.apache.hadoop.mapred.Partitioner>) conf.getClass(
-                "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class),
-            new JobConf(conf));
+                "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class);
+        LOG.info("Using oldApi, MRpartitionerClass=" + clazz.getName());
+        oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+            clazz, new JobConf(conf));
       } else {
         oldPartitioner = new org.apache.hadoop.mapred.Partitioner() {
           @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index d0e935f..6ea21e2 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -95,7 +95,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
           + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED + " set to false");
       return;
     }
-    LOG.info("Initializing ATSService");
 
     if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
       YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
@@ -124,7 +123,12 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     }
     sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
 
-    LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
+    LOG.info("Initializing " + ATSHistoryLoggingService.class.getSimpleName() + " with "
+      + "maxEventsPerBatch=" + maxEventsPerBatch
+      + ", maxPollingTime(ms)=" + maxPollingTimeMillis
+      + ", waitTimeForShutdown(ms)=" + maxTimeToWaitOnShutdown
+      + ", TimelineACLManagerClass=" + atsHistoryACLManagerClassName);
+
     try {
       historyACLPolicyManager = ReflectionUtils.createClazzInstance(
           atsHistoryACLManagerClassName);
@@ -146,7 +150,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     if (!historyLoggingEnabled || timelineClient == null) {
       return;
     }
-    LOG.info("Starting ATSService");
     timelineClient.start();
 
     eventHandlingThread = new Thread(new Runnable() {


Mime
View raw message