tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [47/50] [abbrv] git commit: TEZ-175. Cleanup logs.
Date Tue, 04 Jun 2013 05:33:49 GMT
TEZ-175. Cleanup logs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9feab053
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9feab053
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9feab053

Branch: refs/heads/master
Commit: 9feab053fc0508f9ec6b791db8afdd4eebc11113
Parents: 9f040cf
Author: Hitesh Shah <hitesh@apache.org>
Authored: Mon Jun 3 16:42:28 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Mon Jun 3 16:46:40 2013 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/mapred/YarnTezDagChild.java  |   31 ++--
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  |  102 +++++-----
 .../tez/dag/app/TaskAttemptListenerImpTezDag.java  |   60 +++---
 .../org/apache/tez/dag/app/dag/impl/DAGImpl.java   |   60 +++---
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java      |  104 +++++-----
 .../org/apache/tez/dag/app/dag/impl/TaskImpl.java  |  164 ++++++++-------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |   36 ++--
 .../tez/dag/app/rm/container/AMContainerImpl.java  |   89 +++++----
 .../org/apache/tez/dag/app/rm/node/AMNodeImpl.java |   24 ++-
 .../engine/common/shuffle/impl/InMemoryWriter.java |   46 ++--
 .../engine/common/sort/impl/ExternalSorter.java    |   38 ++--
 .../engine/common/sort/impl/IFileOutputStream.java |   14 +-
 .../common/sort/impl/dflt/DefaultSorter.java       |   88 ++++----
 .../task/local/output/TezTaskOutputFiles.java      |   38 ++--
 .../engine/lib/output/LocalOnFileSorterOutput.java |   12 +-
 .../apache/tez/engine/runtime/RuntimeUtils.java    |   29 ++-
 .../org/apache/hadoop/mapred/LocalJobRunner.java   |  122 ++++++------
 .../mapreduce/split/SplitMetaInfoReaderTez.java    |   12 +-
 .../apache/tez/mapreduce/combine/MRCombiner.java   |    7 +-
 .../hadoop/MultiStageMRConfToTezTranslator.java    |   34 ++--
 .../org/apache/tez/mapreduce/processor/MRTask.java |  147 +++++++-------
 .../java/org/apache/tez/mapreduce/YARNRunner.java  |   64 ++++---
 22 files changed, 686 insertions(+), 635 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 6fd6eff..33e3972 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -146,9 +146,10 @@ public class YarnTezDagChild {
         }
         taskContext = (TezEngineTaskContext) containerTask
             .getTezEngineTaskContext();
-        LOG.info("DEBUG: New container task context:"
-                + taskContext.toString());
-
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("New container task context:"
+              + taskContext.toString());
+        }
         taskAttemptId = taskContext.getTaskAttemptId();
 
         final Task t = createAndConfigureTezTask(taskContext, umbilical,
@@ -200,25 +201,25 @@ public class YarnTezDagChild {
   /**
    * Configure mapred-local dirs. This config is used by the task for finding
    * out an output directory.
-   * @throws IOException 
+   * @throws IOException
    */
   /**
    * Configure tez-local-dirs, tez-localized-file-dir, etc. Also create these
    * dirs.
    */
-  
+
   private static void configureLocalDirs(Configuration conf) throws IOException {
     String[] localSysDirs = StringUtils.getTrimmedStrings(
         System.getenv(Environment.LOCAL_DIRS.name()));
     conf.setStrings(TezJobConfig.LOCAL_DIRS, localSysDirs);
     conf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
         System.getenv(Environment.PWD.name()));
-    
+
     LOG.info(TezJobConfig.LOCAL_DIRS + " for child: " +
         conf.get(TezJobConfig.LOCAL_DIRS));
     LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: "
         + conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
-    
+
     LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
     Path workDir = null;
     // First, try to find the JOB_LOCAL_DIR on this host.
@@ -250,7 +251,7 @@ public class YarnTezDagChild {
     }
     conf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
   }
-  
+
   private static Task createAndConfigureTezTask(
       TezEngineTaskContext taskContext, TezTaskUmbilicalProtocol master,
       Credentials cxredentials, Token<JobTokenIdentifier> jobToken,
@@ -260,10 +261,10 @@ public class YarnTezDagChild {
     // set tcp nodelay
     conf.setBoolean("ipc.client.tcpnodelay", true);
     conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
-    
+
     configureLocalDirs(conf);
-    
-    
+
+
     // FIXME need Input/Output vertices else we have this hack
     if (taskContext.getInputSpecList().isEmpty()) {
       taskContext.getInputSpecList().add(
@@ -281,18 +282,18 @@ public class YarnTezDagChild {
     // and processor then inits inputs and outputs
     return t;
   }
-  
+
   private static void runTezTask(
-      Task t, TezTaskUmbilicalProtocol master, Configuration conf) 
+      Task t, TezTaskUmbilicalProtocol master, Configuration conf)
   throws IOException, InterruptedException {
     // use job-specified working directory
     FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf));
-    
+
     // Run!
     t.run();
     t.close();
   }
-  
+
   private static Path getWorkingDirectory(Configuration conf) {
     String name = conf.get(JobContext.WORKING_DIR);
     if (name != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 821dd71..a4ca97e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -128,7 +128,7 @@ import org.apache.tez.engine.common.security.JobTokenSecretManager;
 
 @SuppressWarnings("rawtypes")
 public class DAGAppMaster extends CompositeService {
-  
+
   private static final Log LOG = LogFactory.getLog(DAGAppMaster.class);
 
   /**
@@ -137,7 +137,7 @@ public class DAGAppMaster extends CompositeService {
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
   private Clock clock;
-  private final DAGPlan jobPlan;
+  private final DAGPlan dagPlan;
   private long dagsStartTime;
   private final long startTime;
   private final long appSubmitTime;
@@ -154,7 +154,7 @@ public class DAGAppMaster extends CompositeService {
   // TODO Recovery
   //private Map<TezTaskID, TaskInfo> completedTasksFromPreviousRun;
   private AppContext context;
-  private TezConfiguration conf; 
+  private TezConfiguration conf;
   private Dispatcher dispatcher;
   // TODO Recovery
   //private Recovery recoveryServ;
@@ -176,7 +176,7 @@ public class DAGAppMaster extends CompositeService {
   private HistoryEventHandler historyEventHandler;
 
   private DAGAppMasterState state;
-  
+
   DAGClientServer clientRpcServer;
   private DAGClientHandler clientHandler;
 
@@ -195,7 +195,7 @@ public class DAGAppMaster extends CompositeService {
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
       Clock clock, long appSubmitTime, DAGPlan dagPB) {
     super(DAGAppMaster.class.getName());
-    this.jobPlan = dagPB;
+    this.dagPlan = dagPB;
     this.clock = clock;
     this.startTime = clock.getTime();
     this.appSubmitTime = appSubmitTime;
@@ -214,9 +214,9 @@ public class DAGAppMaster extends CompositeService {
   public void init(final Configuration tezConf) {
 
     this.state = DAGAppMasterState.INITED;
-    
+
     assert tezConf instanceof TezConfiguration;
-    
+
     this.conf = (TezConfiguration) tezConf;
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
@@ -226,10 +226,10 @@ public class DAGAppMaster extends CompositeService {
 
     // Job name is the same as the app name util we support DAG of jobs
     // for an app later
-    appName = jobPlan.getName();
+    appName = dagPlan.getName();
 
     dagId = new TezDAGID(appAttemptID.getApplicationId(), 1);
-    
+
     clientHandler = new DAGClientHandler();
 
     // TODO Committer.
@@ -237,7 +237,7 @@ public class DAGAppMaster extends CompositeService {
 
     dispatcher = createDispatcher();
     addIfService(dispatcher);
-    
+
     clientRpcServer = new DAGClientServer(clientHandler);
     addIfService(clientRpcServer);
 
@@ -432,7 +432,7 @@ public class DAGAppMaster extends CompositeService {
       LOG.warn("No handler for event type: " + event.getType());
     }
   }
-  
+
   private class DAGAppMasterEventHandler implements
       EventHandler<DAGAppMasterEvent> {
     @Override
@@ -440,7 +440,7 @@ public class DAGAppMaster extends CompositeService {
       DAGAppMaster.this.handle(event);
     }
   }
-  
+
   private class DAGFinishEventHandler implements EventHandler<DAGFinishEvent> {
     @Override
     public void handle(DAGFinishEvent event) {
@@ -466,9 +466,9 @@ public class DAGAppMaster extends CompositeService {
     */
       // TODO:currently just wait for some time so clients can know the
       // final states. Will be removed once RM come on.
-     
+
       setStateOnDAGCompletion();
-      
+
       try {
         Thread.sleep(5000);
       } catch (InterruptedException e) {
@@ -660,19 +660,19 @@ public class DAGAppMaster extends CompositeService {
   public TaskAttemptListener getTaskAttemptListener() {
     return taskAttemptListener;
   }
-  
+
   public ContainerId getAppContainerId() {
     return containerID;
   }
-  
+
   public String getAppNMHost() {
     return nmHost;
   }
-  
+
   public int getAppNMPort() {
     return nmPort;
   }
-  
+
   public int getAppNMHttpPort() {
     return nmHttpPort;
   }
@@ -680,15 +680,15 @@ public class DAGAppMaster extends CompositeService {
   public DAGAppMasterState getState() {
     return state;
   }
-  
+
   public List<String> getDiagnostics() {
     return dag.getDiagnostics();
   }
-  
+
   public float getProgress() {
     return dag.getProgress();
   }
-  
+
   void setStateOnDAGCompletion() {
     DAGAppMasterState oldState = state;
     if(state == DAGAppMasterState.RUNNING) {
@@ -709,7 +709,7 @@ public class DAGAppMaster extends CompositeService {
     }
     LOG.info("On DAG completion. Old state: " + oldState + " new state: " + state);
   }
-  
+
   class DAGClientHandler implements DAGClient {
 
     @Override
@@ -718,22 +718,22 @@ public class DAGAppMaster extends CompositeService {
     }
 
     @Override
-    public DAGStatus getDAGStatus(String dagIdStr) 
+    public DAGStatus getDAGStatus(String dagIdStr)
                                       throws IOException, TezRemoteException {
       return getDAG(dagIdStr).getDAGStatus();
     }
 
     @Override
-    public VertexStatus getVertexStatus(String dagIdStr, String vertexName) 
+    public VertexStatus getVertexStatus(String dagIdStr, String vertexName)
         throws IOException, TezRemoteException{
       VertexStatus status = getDAG(dagIdStr).getVertexStatus(vertexName);
       if(status == null) {
         throw new TezRemoteException("Unknown vertexName: " + vertexName);
       }
-      
+
       return status;
     }
-    
+
     DAG getDAG(String dagIdStr) throws IOException, TezRemoteException {
       TezDAGID dagId = TezDAGID.fromString(dagIdStr);
       if(dagId == null) {
@@ -759,11 +759,11 @@ public class DAGAppMaster extends CompositeService {
       this.conf = config;
     }
 
-    @Override 
+    @Override
     public DAGAppMaster getAppMaster() {
       return DAGAppMaster.this;
     }
-    
+
     @Override
     public ApplicationAttemptId getApplicationAttemptId() {
       return appAttemptID;
@@ -859,7 +859,7 @@ public class DAGAppMaster extends CompositeService {
   public void start() {
 
     this.state = DAGAppMasterState.RUNNING;
-    
+
     // TODO Recovery
     // Pull completedTasks etc from recovery
     /*
@@ -868,9 +868,9 @@ public class DAGAppMaster extends CompositeService {
       amInfos = recoveryServ.getAMInfos();
     }
     */
-    
+
     // /////////////////// Create the job itself.
-    dag = createDAG(jobPlan);
+    dag = createDAG(dagPlan);
 
     // End of creating the job.
 
@@ -893,7 +893,7 @@ public class DAGAppMaster extends CompositeService {
         startTime, dagsStartTime, appSubmitTime);
     dispatcher.getEventHandler().handle(
         new DAGHistoryEvent(this.dagId, startEvent));
-    
+
     // All components have started, start the job.
     startDags();
   }
@@ -922,7 +922,7 @@ public class DAGAppMaster extends CompositeService {
       ((EventHandler<DAGEvent>)context.getDAG()).handle(event);
     }
   }
-  
+
   private class TaskEventDispatcher implements EventHandler<TaskEvent> {
     @SuppressWarnings("unchecked")
     @Override
@@ -959,7 +959,7 @@ public class DAGAppMaster extends CompositeService {
       ((EventHandler<VertexEvent>) vertex).handle(event);
     }
   }
-  
+
   private static void validateInputParam(String value, String param)
       throws IOException {
     if (value == null) {
@@ -998,22 +998,22 @@ public class DAGAppMaster extends CompositeService {
       // TODO change this once the client is ready.
       String type;
       TezConfiguration conf = new TezConfiguration(new YarnConfiguration());
-      
-      DAGPlan jobPlan = null;
+
+      DAGPlan dagPlan = null;
       if (cliParser.hasOption(OPT_PREDEFINED)) {
         LOG.info("Running with PreDefined configuration");
         type = cliParser.getOptionValue(OPT_PREDEFINED, "mr");
         LOG.info("Running job type: " + type);
 
         if (type.equals("mr")) {
-          jobPlan = MRRExampleHelper.createDAGConfigurationForMR();
+          dagPlan = MRRExampleHelper.createDAGConfigurationForMR();
         } else if (type.equals("mrr")) {
-          jobPlan = MRRExampleHelper.createDAGConfigurationForMRR();
+          dagPlan = MRRExampleHelper.createDAGConfigurationForMRR();
         }
-      } 
+      }
       else {
         // Read the protobuf DAG
-        DAGPlan.Builder dagPlanBuilder = DAGPlan.newBuilder(); 
+        DAGPlan.Builder dagPlanBuilder = DAGPlan.newBuilder();
         FileInputStream dagPBBinaryStream = null;
         try {
           dagPBBinaryStream = new FileInputStream(TezConfiguration.DAG_AM_PLAN_PB_BINARY);
@@ -1021,17 +1021,19 @@ public class DAGAppMaster extends CompositeService {
         }
         finally {
           if(dagPBBinaryStream != null){
-            dagPBBinaryStream.close();  
+            dagPBBinaryStream.close();
           }
         }
 
-        jobPlan = dagPlanBuilder.build();
+        dagPlan = dagPlanBuilder.build();
       }
 
-      LOG.info("XXXX Running a DAG with "
-          + jobPlan.getVertexCount() + " vertices ");
-      for (VertexPlan v : jobPlan.getVertexList()) {
-        LOG.info("XXXX DAG has vertex " + v.getName());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Running a DAG with "
+            + dagPlan.getVertexCount() + " vertices ");
+        for (VertexPlan v : dagPlan.getVertexList()) {
+          LOG.debug("DAG has vertex " + v.getName());
+        }
       }
 
       String jobUserName = System
@@ -1041,8 +1043,8 @@ public class DAGAppMaster extends CompositeService {
       // SIGTERM I have a chance to write out the job history. I'll be closing
       // the objects myself.
       conf.setBoolean("fs.automatic.close", false);
-      
-      Map<String, String> config = DagTypeConverters.createSettingsMapFromDAGPlan(jobPlan.getJobSettingList());
+
+      Map<String, String> config = DagTypeConverters.createSettingsMapFromDAGPlan(dagPlan.getJobSettingList());
       for(Entry<String, String> entry : config.entrySet()) {
         conf.set(entry.getKey(), entry.getValue());
       }
@@ -1050,7 +1052,7 @@ public class DAGAppMaster extends CompositeService {
       DAGAppMaster appMaster =
           new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
               Integer.parseInt(nodePortString),
-              Integer.parseInt(nodeHttpPortString), appSubmitTime, jobPlan);
+              Integer.parseInt(nodeHttpPortString), appSubmitTime, dagPlan);
       ShutdownHookManager.get().addShutdownHook(
         new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
 
@@ -1117,7 +1119,7 @@ public class DAGAppMaster extends CompositeService {
       }
     });
   }
-  
+
   @SuppressWarnings("unchecked")
   private void sendEvent(Event<?> event) {
     dispatcher.getEventHandler().handle(event);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 7600db4..708b33e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -67,7 +67,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
   private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask(
       null, true);
-  
+
   private static ProceedToCompletionResponse COMPLETION_RESPONSE_NO_WAIT =
       new ProceedToCompletionResponse(true, true);
 
@@ -86,10 +86,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   // TODO Use this to figure out whether an incoming ping is valid.
   private ConcurrentMap<TezTaskAttemptID, ContainerId> attemptToContainerIdMap =
       new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
-  
+
   private Set<ContainerId> registeredContainers = Collections
       .newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
-  
+
   public TaskAttemptListenerImpTezDag(AppContext context,
       TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
       JobTokenSecretManager jobTokenSecretManager) {
@@ -118,10 +118,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
               conf.getInt(TezConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT,
                   TezConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
           .setSecretManager(jobTokenSecretManager).build();
-      
+
       // Enable service authorization?
       if (conf.getBoolean(
-          CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
+          CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
           false)) {
         refreshServiceAcls(conf, new MRAMPolicyProvider());
       }
@@ -133,7 +133,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
   }
 
-  void refreshServiceAcls(Configuration configuration, 
+  void refreshServiceAcls(Configuration configuration,
       PolicyProvider policyProvider) {
     this.server.refreshServiceAcl(configuration, policyProvider);
   }
@@ -153,7 +153,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   public InetSocketAddress getAddress() {
     return address;
   }
-  
+
   @Override
   public long getProtocolVersion(String protocol, long clientVersion)
       throws IOException {
@@ -171,13 +171,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
       int fromEventIdx, int maxEvents,
       TezTaskAttemptID taskAttemptID) {
-    
+
     LOG.info("Dependency Completion Events request from " + taskAttemptID
         + ". fromEventID " + fromEventIdx + " maxEvents " + maxEvents);
 
     // TODO: shouldReset is never used. See TT. Ask for Removal.
     boolean shouldReset = false;
-    TezDependentTaskCompletionEvent[] events = 
+    TezDependentTaskCompletionEvent[] events =
         context.getDAG().
             getVertex(taskAttemptID.getTaskID().getVertexID()).
                 getTaskAttemptCompletionEvents(fromEventIdx, maxEvents);
@@ -234,22 +234,28 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
         }
       }
     }
-    LOG.info("DEBUG: getTask returning task: " + task);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getTask returning task: " + task);
+    }
     return task;
   }
 
   @Override
   public boolean statusUpdate(TezTaskAttemptID taskAttemptId,
       TezTaskStatus taskStatus) throws IOException, InterruptedException {
-    LOG.info("DEBUG: " + "Status update from: " + taskAttemptId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Status update from: " + taskAttemptId);
+    }
     taskHeartbeatHandler.progressing(taskAttemptId);
     pingContainerHeartbeatHandler(taskAttemptId);
     TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
     taskAttemptStatus.id = taskAttemptId;
     // Task sends the updated progress to the TT.
     taskAttemptStatus.progress = taskStatus.getProgress();
-    LOG.info("DEBUG: " + "Progress of TaskAttempt " + taskAttemptId + " is : "
-        + taskStatus.getProgress());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Progress of TaskAttempt " + taskAttemptId + " is : "
+          + taskStatus.getProgress());
+    }
 
     // Task sends the updated state-string to the TT.
     taskAttemptStatus.stateString = taskStatus.getStateString();
@@ -267,7 +273,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     // that is the primary storage format inside the AM to avoid multiple
     // conversions and unnecessary heap usage.
     taskAttemptStatus.counters = taskStatus.getCounters();
-    
+
 
     // Map Finish time set by the task (map only)
     // TODO CLEANMRXAM - maybe differentiate between map / reduce / types
@@ -366,7 +372,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   /**
    * TaskAttempt is reporting that it is in commit_pending and it is waiting for
    * the commit Response
-   * 
+   *
    * <br/>
    * Commit it a two-phased protocol. First the attempt informs the
    * ApplicationMaster that it is
@@ -386,14 +392,14 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     //Ignorable TaskStatus? - since a task will send a LastStatusUpdate
     context.getEventHandler().handle(
         new TaskAttemptEvent(
-            taskAttemptId, 
+            taskAttemptId,
             TaskAttemptEventType.TA_COMMIT_PENDING)
         );
   }
 
   /**
    * Child checking whether it can commit.
-   * 
+   *
    * <br/>
    * Commit is a two-phased protocol. First the attempt informs the
    * ApplicationMaster that it is
@@ -411,7 +417,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     pingContainerHeartbeatHandler(taskAttemptId);
 
     DAG job = context.getDAG();
-    Task task = 
+    Task task =
         job.getVertex(taskAttemptId.getTaskID().getVertexID()).
             getTask(taskAttemptId.getTaskID());
     return task.canCommit(taskAttemptId);
@@ -459,16 +465,16 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   @Override
   public ProceedToCompletionResponse
       proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException {
-    
+
     // The async nature of the processing combined with the 1 second interval
     // between polls (MRTask) implies tasks end up wasting upto 1 second doing
     // nothing. Similarly for CA_COMMIT.
-    
+
     DAG job = context.getDAG();
-    Task task = 
+    Task task =
         job.getVertex(taskAttemptId.getTaskID().getVertexID()).
             getTask(taskAttemptId.getTaskID());
-    
+
     // TODO In-Memory Shuffle
     /*
     if (task.needsWaitAfterOutputConsumable()) {
@@ -503,7 +509,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     */
     return COMPLETION_RESPONSE_NO_WAIT;
   }
-  
+
 
   @Override
   public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
@@ -539,11 +545,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     registeredContainers.remove(containerId);
   }
-  
+
   private void pingContainerHeartbeatHandler(ContainerId containerId) {
     containerHeartbeatHandler.pinged(containerId);
   }
-  
+
   private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
     ContainerId containerId = attemptToContainerIdMap.get(taskAttemptId);
     if (containerId != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index d2395ad..de43db0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -558,7 +558,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   protected void startRootVertices() {
     for (Vertex v : vertices.values()) {
       if (v.getInputVerticesCount() == 0) {
-        LOG.info("DEBUG: Starting root vertex " + v.getName());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Starting root vertex " + v.getName());
+        }
         eventHandler.handle(new VertexEvent(v.getVertexId(),
             VertexEventType.V_START));
       }
@@ -577,12 +579,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
    * The only entry point to change the DAG.
    */
   public void handle(DAGEvent event) {
-    LOG.info("DEBUG: Processing DAGEvent " + event.getDAGId() + " of type "
-        + event.getType() + " while in state " + getInternalState()
-        + ". Event: " + event);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processing DAGEvent " + event.getDAGId() + " of type "
-          + event.getType() + " while in state " + getInternalState());
+          + event.getType() + " while in state " + getInternalState()
+          + ". Event: " + event);
     }
     try {
       writeLock.lock();
@@ -658,13 +658,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   static DAGState checkJobForCompletion(DAGImpl dag) {
-
-    LOG.info("ZZZZ: Checking dag completion"
-        + ", numCompletedVertices=" + dag.numCompletedVertices
-        + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
-        + ", numFailedVertices=" + dag.numFailedVertices
-        + ", numKilledVertices=" + dag.numKilledVertices
-        + ", numVertices=" + dag.numVertices);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Checking dag completion"
+          + ", numCompletedVertices=" + dag.numCompletedVertices
+          + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+          + ", numFailedVertices=" + dag.numFailedVertices
+          + ", numKilledVertices=" + dag.numKilledVertices
+          + ", numVertices=" + dag.numVertices);
+    }
 
     if (dag.numFailedVertices > 0) {
       dag.setFinishTime();
@@ -870,7 +871,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         for (Vertex v : dag.vertices.values()) {
           parseVertexEdges(dag, edgePlans, v);
         }
-        
+
         assignDAGScheduler(dag);
 
         // TODO Metrics
@@ -887,7 +888,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         return dag.finished(DAGState.FAILED);
       }
     }
-    
+
     private void assignDAGScheduler(DAGImpl dag) {
       boolean isMRR = true;
       for(Vertex vertex : dag.vertices.values()) {
@@ -895,15 +896,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         if(outVertices == null || outVertices.isEmpty()) {
           continue;
         }
-        if(outVertices.size() > 1 || 
-           outVertices.values().iterator().next().getConnectionPattern() != 
+        if(outVertices.size() > 1 ||
+           outVertices.values().iterator().next().getConnectionPattern() !=
            EdgeProperty.ConnectionPattern.BIPARTITE) {
           // more than 1 output OR single output is not bipartite
           isMRR = false;
           break;
-        }          
+        }
       }
-      
+
       if(isMRR) {
         LOG.info("Using MRR dag scheduler");
         dag.dagScheduler = new DAGSchedulerMRR(dag, dag.eventHandler);
@@ -1110,10 +1111,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     public DAGState transition(DAGImpl job, DAGEvent event) {
 
       DAGEventVertexCompleted vertexEvent = (DAGEventVertexCompleted) event;
-      LOG.info("DEBUG: Received a vertex completion event"
-          + ", vertexId=" + vertexEvent.getVertexId()
-          + ", vertexState=" + vertexEvent.getVertexState());
-
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received a vertex completion event"
+            + ", vertexId=" + vertexEvent.getVertexId()
+            + ", vertexState=" + vertexEvent.getVertexState());
+      }
       Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
       job.numCompletedVertices++;
       if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
@@ -1124,12 +1126,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         vertexKilled(job, vertex);
       }
 
-      LOG.info("ZZZZ: Vertex completed."
-          + ", numCompletedVertices=" + job.numCompletedVertices
-          + ", numSuccessfulVertices=" + job.numSuccessfulVertices
-          + ", numFailedVertices=" + job.numFailedVertices
-          + ", numKilledVertices=" + job.numKilledVertices
-          + ", numVertices=" + job.numVertices);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Vertex completed."
+            + ", numCompletedVertices=" + job.numCompletedVertices
+            + ", numSuccessfulVertices=" + job.numSuccessfulVertices
+            + ", numFailedVertices=" + job.numFailedVertices
+            + ", numKilledVertices=" + job.numKilledVertices
+            + ", numVertices=" + job.numVertices);
+      }
 
       job.dagScheduler.vertexCompleted(vertex);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index c14ba8a..94e5161 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -109,7 +109,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
   private static final String LINE_SEPARATOR = System
       .getProperty("line.separator");
-  
+
   static final TezCounters EMPTY_COUNTERS = new TezCounters();
 
   protected final TezConfiguration conf;
@@ -147,22 +147,22 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected final boolean isRescheduled;
 
   protected String processorName;
-  
+
   protected static final FailedTransitionHelper FAILED_HELPER =
       new FailedTransitionHelper();
-      
+
   protected static final KilledTransitionHelper KILLED_HELPER =
       new KilledTransitionHelper();
 
   private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
       DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION =
           new DiagnosticInformationUpdater();
-  
+
   private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
       STATUS_UPDATER = new StatusUpdaterTransition();
 
   private final StateMachine<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
-  
+
   private static StateMachineFactory
   <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
   stateMachineFactory
@@ -174,7 +174,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.NEW, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminateTransition(FAILED_HELPER))
         .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, new TerminateTransition(KILLED_HELPER))
-        
+
         .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
         .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedBeforeRunningTransition(FAILED_HELPER))
@@ -182,7 +182,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new NodeFailedBeforeRunningTransition())
         .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new ContainerTerminatingBeforeRunningTransition())
         .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
-        
+
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, new OutputConsumableTransition()) //Optional, may not come in for all tasks.
@@ -195,7 +195,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileRunningTransition())
-        
+
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE) // Stuck RPC. The client retries in a loop.
@@ -227,17 +227,17 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
         .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
-        
+
         .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
         .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
-        
+
         .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
 
         .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
-        
+
         // How will duplicate history events be handled ?
         // TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in case there's only one phase in the job.
         .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
@@ -245,21 +245,21 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition(KILLED_HELPER))
         .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedAfterSuccessTransition(FAILED_HELPER))
         .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
-        
-        
+
+
         .installTopology();
-  
+
 
   // TODO Remove TaskAttemptListener from the constructor.
   @SuppressWarnings("rawtypes")
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
-      TaskAttemptListener tal, int partition, 
+      TaskAttemptListener tal, int partition,
       TezConfiguration conf,
       Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       String processorName, TaskLocationHint locationHint,
       Resource resource, Map<String, LocalResource> localResources,
-      Map<String, String> environment, 
+      Map<String, String> environment,
       String javaOpts, boolean isRescheduled) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
@@ -286,34 +286,34 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.javaOpts = javaOpts;
     this.isRescheduled = isRescheduled;
   }
-  
+
 
   @Override
   public TezTaskAttemptID getID() {
     return attemptId;
   }
-  
+
   @Override
   public TezVertexID getVertexID() {
     return attemptId.getTaskID().getVertexID();
   }
-  
+
   @Override
   public TezDAGID getDAGID() {
     return getVertexID().getDAGId();
   }
-  
+
   TezTaskContext createRemoteTask() {
     Vertex vertex = getTask().getVertex();
     DAG dag = vertex.getDAG();
 
     // TODO  TEZ-50 user and jobname
-    return new TezEngineTaskContext(getID(), dag.getUserName(), 
+    return new TezEngineTaskContext(getID(), dag.getUserName(),
         dag.getName(), getTask()
         .getVertex().getName(), processorName,
         vertex.getInputSpecList(), vertex.getOutputSpecList());
   }
-  
+
   @Override
   public TaskAttemptReport getReport() {
     TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
@@ -322,7 +322,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       result.setTaskAttemptId(attemptId);
       //take the LOCAL state of attempt
       //DO NOT take from reportedStatus
-      
+
       result.setTaskAttemptState(getState());
       result.setProgress(reportedStatus.progress);
       result.setStartTime(launchTime);
@@ -522,11 +522,9 @@ public class TaskAttemptImpl implements TaskAttempt,
   public void handle(TaskAttemptEvent event) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processing TaskAttemptEvent " + event.getTaskAttemptID()
-          + " of type " + event.getType());
+          + " of type " + event.getType() + " while in state "
+          + getInternalState() + ". Event: " + event);
     }
-    LOG.info("DEBUG: Processing TaskAttemptEvent " + event.getTaskAttemptID()
-        + " of type " + event.getType() + " while in state "
-        + getInternalState() + ". Event: " + event);
     writeLock.lock();
     try {
       final TaskAttemptStateInternal oldState = getInternalState();
@@ -536,8 +534,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         LOG.error("Can't handle this event at current state for "
             + this.attemptId, e);
         eventHandler.handle(new DAGEventDiagnosticsUpdate(
-            this.attemptId.getTaskID().getVertexID().getDAGId(), 
-            "Invalid event " + event.getType() + 
+            this.attemptId.getTaskID().getVertexID().getDAGId(),
+            "Invalid event " + event.getType() +
             " on TaskAttempt " + this.attemptId));
         eventHandler.handle(
             new DAGEvent(
@@ -546,7 +544,7 @@ public class TaskAttemptImpl implements TaskAttempt,
             );
       }
       if (oldState != getInternalState()) {
-          LOG.info(attemptId + " TaskAttempt Transitioned from " 
+          LOG.info(attemptId + " TaskAttempt Transitioned from "
            + oldState + " to "
            + getInternalState());
       }
@@ -554,7 +552,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       writeLock.unlock();
     }
   }
-  
+
   @VisibleForTesting
   public TaskAttemptStateInternal getInternalState() {
     readLock.lock();
@@ -562,7 +560,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       return stateMachine.getCurrentState();
     } finally {
       readLock.unlock();
-    }    
+    }
   }
 
   private static TaskAttemptState getExternalState(
@@ -590,7 +588,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           + smState);
     }
   }
-  
+
   @Override
   public boolean getIsRescheduled() {
     return isRescheduled;
@@ -612,7 +610,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   // TOOD Merge some of these JobCounter events.
   private static DAGEventCounterUpdate createJobCounterUpdateEventTALaunched(
       TaskAttemptImpl ta) {
-    DAGEventCounterUpdate jce = 
+    DAGEventCounterUpdate jce =
         new DAGEventCounterUpdate(
             ta.getDAGID()
             );
@@ -622,8 +620,8 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   private static DAGEventCounterUpdate createJobCounterUpdateEventSlotMillis(
       TaskAttemptImpl ta) {
-    DAGEventCounterUpdate jce = 
-        new DAGEventCounterUpdate(            
+    DAGEventCounterUpdate jce =
+        new DAGEventCounterUpdate(
             ta.getDAGID()
             );
 
@@ -635,7 +633,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   private static DAGEventCounterUpdate createJobCounterUpdateEventTATerminated(
       TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted,
       TaskAttemptStateInternal taState) {
-    DAGEventCounterUpdate jce = 
+    DAGEventCounterUpdate jce =
         new DAGEventCounterUpdate(
             taskAttempt.getDAGID());
 
@@ -687,16 +685,16 @@ public class TaskAttemptImpl implements TaskAttempt,
         taskAttempt.finishTime,
         taskAttempt.containerNodeId == null ? "UNKNOWN"
             : taskAttempt.containerNodeId.getHost(),
-        taskAttempt.containerNodeId == null ? -1 
-            : taskAttempt.containerNodeId.getPort(),    
-        taskAttempt.nodeRackName == null ? "UNKNOWN" 
+        taskAttempt.containerNodeId == null ? -1
+            : taskAttempt.containerNodeId.getPort(),
+        taskAttempt.nodeRackName == null ? "UNKNOWN"
             : taskAttempt.nodeRackName,
         StringUtils.join(
             LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
             .getProgressSplitBlock().burst());
     return tauce;
   }
-  
+
   // TODO Incorporate MAPREDUCE-4838
   private JobHistoryEvent createTaskAttemptStartedEvent() {
     TaskAttemptStartedEvent tase = new TaskAttemptStartedEvent(
@@ -707,7 +705,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   }
   */
-  
+
 //  private WrappedProgressSplitsBlock getProgressSplitBlock() {
 //    return null;
 //    // TODO
@@ -725,7 +723,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 //    }
 //    */
 //  }
-  
+
   private void updateProgressSplits() {
 //    double newProgress = reportedStatus.progress;
 //    newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
@@ -737,7 +735,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 //    if (splitsBlock != null) {
 //      long now = clock.getTime();
 //      long start = getLaunchTime();
-//      
+//
 //      if (start == 0)
 //        return;
 //
@@ -767,7 +765,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 //      }
 //    }
   }
-  
+
 //  private void maybeSendSpeculatorContainerRequired() {
 //    if (!speculatorContainerRequestSent) {
 //      sendEvent(new SpeculatorEvent(getID().getTaskID(), +1));
@@ -781,10 +779,10 @@ public class TaskAttemptImpl implements TaskAttempt,
 //      speculatorContainerRequestSent = false;
 //    }
 //  }
-  
+
   private void sendTaskAttemptCleanupEvent() {
-//    TaskAttemptContext taContext = 
-//        new TaskAttemptContextImpl(this.conf, 
+//    TaskAttemptContext taContext =
+//        new TaskAttemptContextImpl(this.conf,
 //            TezMRTypeConverter.fromTez(this.attemptId));
 //    sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
   }
@@ -825,7 +823,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal state) {
     //Log finished events only if an attempt started.
     if (getLaunchTime() == 0) return;
-    
+
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getTask().getVertex().getName(),
         getFinishTime(), TaskAttemptState.SUCCEEDED, "",
@@ -901,7 +899,7 @@ public class TaskAttemptImpl implements TaskAttempt,
               ta.localResources, remoteTaskContext, ta,
               ta.credentials, ta.jobToken, hostArray,
               rackArray,
-              scheduleEvent.getPriority(), ta.environment, //ta.javaOpts, 
+              scheduleEvent.getPriority(), ta.environment, //ta.javaOpts,
               ta.conf);
       ta.sendEvent(launchRequestEvent);
     }
@@ -1269,7 +1267,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       return TaskEventType.T_ATTEMPT_KILLED;
     }
   }
-  
+
   @Override
   public String toString() {
     return getID().toString();
@@ -1284,10 +1282,10 @@ public class TaskAttemptImpl implements TaskAttempt,
   public Map<String, String> getEnvironment() {
     return this.environment;
   }
-  
+
   @Override
   public String getJavaOpts() {
-	return this.javaOpts;  
+	return this.javaOpts;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 575f32c..f295caf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -107,68 +107,68 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   protected Resource taskResource;
   protected Map<String, LocalResource> localResources;
   protected Map<String, String> environment;
-  
+
   // counts the number of attempts that are either running or in a state where
   //  they will come to be running when they get a Container
   private int numberUncompletedAttempts = 0;
 
   private boolean historyTaskStartGenerated = false;
-  
-  private static final SingleArcTransition<TaskImpl, TaskEvent> 
+
+  private static final SingleArcTransition<TaskImpl, TaskEvent>
      ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
-  private static final SingleArcTransition<TaskImpl, TaskEvent> 
+  private static final SingleArcTransition<TaskImpl, TaskEvent>
      KILL_TRANSITION = new KillTransition();
 
   private static final StateMachineFactory
-               <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent> 
-            stateMachineFactory 
+               <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
+            stateMachineFactory
            = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
                (TaskStateInternal.NEW)
 
     // define the state machine of Task
 
     // Transitions from NEW state
-    .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED, 
+    .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
-    .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED, 
+    .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
         TaskEventType.T_KILL, new KillNewTransition())
 
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
-     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING,
          TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
-     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
          TaskEventType.T_KILL, KILL_TRANSITION)
-     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
          TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
-     .addTransition(TaskStateInternal.SCHEDULED, 
-        EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED), 
-        TaskEventType.T_ATTEMPT_FAILED, 
+     .addTransition(TaskStateInternal.SCHEDULED,
+        EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
+        TaskEventType.T_ATTEMPT_FAILED,
         new AttemptFailedTransition())
- 
+
     // Transitions from RUNNING state
-    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
-    // This is an optional event. 
+    // This is an optional event.
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
         new AttemptProcessingCompleteTransition())
-    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ATTEMPT_COMMIT_PENDING,
         new AttemptCommitPendingTransition())
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
-    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
         TaskEventType.T_ATTEMPT_SUCCEEDED,
         new AttemptSucceededTransition())
-    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ATTEMPT_KILLED,
         ATTEMPT_KILLED_TRANSITION)
-    .addTransition(TaskStateInternal.RUNNING, 
-        EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED), 
+    .addTransition(TaskStateInternal.RUNNING,
+        EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED),
         TaskEventType.T_ATTEMPT_FAILED,
         new AttemptFailedTransition())
-    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT,
         TaskEventType.T_KILL, KILL_TRANSITION)
 
     // Transitions from KILL_WAIT state
@@ -203,7 +203,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
             TaskEventType.T_ATTEMPT_LAUNCHED))
 
-    // Transitions from FAILED state        
+    // Transitions from FAILED state
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
         EnumSet.of(TaskEventType.T_KILL,
                    TaskEventType.T_ADD_SPEC_ATTEMPT))
@@ -221,7 +221,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   // TODO: Recovery
   /*
-  // By default, the next TaskAttempt number is zero. Changes during recovery  
+  // By default, the next TaskAttempt number is zero. Changes during recovery
   protected int nextAttemptNumber = 0;
   private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
       new ArrayList<TaskAttemptInfo>();
@@ -239,10 +239,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       new RecoverdAttemptsComparator();
 
    */
-  
+
   private TezTaskAttemptID outputConsumableAttempt;
   private boolean outputConsumableAttemptSuccessSent = false;
-  
+
   //should be set to one which comes first
   //saying COMMIT_PENDING
   private TezTaskAttemptID commitAttempt;
@@ -272,10 +272,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
       // TODO Recovery
-      //Map<TezTaskID, TaskInfo> completedTasksFromPreviousRun, 
+      //Map<TezTaskID, TaskInfo> completedTasksFromPreviousRun,
       //int startCount,
       // TODO Metrics
-      //MRAppMetrics metrics, 
+      //MRAppMetrics metrics,
       TaskHeartbeatHandler thh, AppContext appContext,
       String processorName,
       boolean leafVertex, TaskLocationHint locationHint, Resource resource,
@@ -289,7 +289,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     writeLock = readWriteLock.writeLock();
     this.attempts = Collections.emptyMap();
     // TODO TEZ-47 get from conf or API
-    maxAttempts = this.conf.getInt(TezConfiguration.DAG_MAX_TASK_ATTEMPTS, 
+    maxAttempts = this.conf.getInt(TezConfiguration.DAG_MAX_TASK_ATTEMPTS,
                               TezConfiguration.DAG_MAX_TASK_ATTEMPTS_DEFAULT);
     taskId = new TezTaskID(vertexId, partition);
     this.partition = partition;
@@ -360,7 +360,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       if (attempts.size() <= 1) {
         return attempts;
       }
-      
+
       Map<TezTaskAttemptID, TaskAttempt> result
           = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
       result.putAll(attempts);
@@ -385,7 +385,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   public Vertex getVertex() {
     return appContext.getDAG().getVertex(taskId.getVertexID());
   }
-  
+
   @Override
   public TezTaskID getTaskId() {
     return taskId;
@@ -424,12 +424,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       }
 
       report.setSuccessfulAttempt(successfulAttempt);
-      
+
       for (TaskAttempt att : attempts.values()) {
         String prefix = "AttemptID:" + att.getID() + " Info:";
         for (CharSequence cs : att.getDiagnostics()) {
           report.addDiagnostics(prefix + cs);
-          
+
         }
       }
 
@@ -482,7 +482,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       return stateMachine.getCurrentState();
     } finally {
       readLock.unlock();
-    }    
+    }
   }
 
   private static TaskState getExternalState(TaskStateInternal smState) {
@@ -490,7 +490,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       return TaskState.KILLED;
     } else {
       return TaskState.valueOf(smState.name());
-    }    
+    }
   }
 
   //this is always called in read/write lock
@@ -530,7 +530,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
     return finishTime;
   }
-  
+
   private TaskStateInternal finished(TaskStateInternal finalState) {
     if (getInternalState() == TaskStateInternal.RUNNING) {
       // TODO Metrics
@@ -552,7 +552,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       case KILLED:
         continue;
       default:
-      }      
+      }
       if (result == null) {
         result = at; //The first time around
       }
@@ -589,8 +589,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       return false;
     }
   }
-  
-  
+
+
   @Override
   public TezTaskAttemptID getOutputConsumableAttempt() {
     readLock.lock();
@@ -608,7 +608,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         appContext, processorName, locationHint, taskResource,
         localResources, environment, javaOpts, (failedAttempts>0));
   }
-  
+
   protected TaskAttempt getSuccessfulAttempt() {
     readLock.lock();
     try {
@@ -631,7 +631,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       case 0:
         attempts = Collections.singletonMap(attempt.getID(), attempt);
         break;
-        
+
       case 1:
         Map<TezTaskAttemptID, TaskAttempt> newAttempts
             = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>(maxAttempts);
@@ -663,17 +663,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     // picture in mind
     eventHandler.handle(new DAGEventSchedulerUpdate(
         DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, attempt));
-    
+
   }
 
   @Override
   public void handle(TaskEvent event) {
-    LOG.info("DEBUG: Processing TaskEvent " + event.getTaskID() + " of type "
-        + event.getType() + " while in state " + getInternalState()
-        + ". Event: " + event);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processing TaskEvent " + event.getTaskID() + " of type "
-          + event.getType());
+          + event.getType() + " while in state " + getInternalState()
+          + ". Event: " + event);
     }
     try {
       writeLock.lock();
@@ -698,7 +696,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   protected void internalError(TaskEventType type) {
     LOG.error("Invalid event " + type + " on Task " + this.taskId);
     eventHandler.handle(new DAGEventDiagnosticsUpdate(
-        this.taskId.getVertexID().getDAGId(), "Invalid event " + type + 
+        this.taskId.getVertexID().getDAGId(), "Invalid event " + type +
         " on Task " + this.taskId));
     eventHandler.handle(new DAGEvent(this.taskId.getVertexID().getDAGId(),
         DAGEventType.INTERNAL_ERROR));
@@ -719,19 +717,19 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         }
       }
     }
-    if (attempt.getNodeHttpAddress() != null) {      
-      
+    if (attempt.getNodeHttpAddress() != null) {
+
       String scheme = (encryptedShuffle) ? "https://" : "http://";
       String url = scheme
           + attempt.getNodeHttpAddress().split(":")[0] + ":"
           + attempt.getShufflePort();
-      
-      
-      
+
+
+
       int runTime = 0;
       if (attempt.getFinishTime() != 0 && attempt.getLaunchTime() != 0)
         runTime = (int) (attempt.getFinishTime() - attempt.getLaunchTime());
-      
+
       TezDependentTaskCompletionEvent tce = new TezDependentTaskCompletionEvent(
           -1, attemptId, status, url, runTime);
 
@@ -740,7 +738,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       eventHandler.handle(new VertexEventTaskAttemptCompleted(tce));
     }
   }
-  
+
   // always called inside a transition, in turn inside the Write Lock
   private void handleTaskAttemptCompletion(TezTaskAttemptID attemptId,
       TezDependentTaskCompletionEvent.Status status) {
@@ -759,7 +757,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         task.getCounters());
     return tfe;
   }
-  
+
   private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TezTaskAttemptID taId) {
     StringBuilder errorSb = new StringBuilder();
     if (diag != null) {
@@ -778,7 +776,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     return taskFailedEvent;
   }
   */
-  
+
   private static void unSucceed(TaskImpl task) {
     task.commitAttempt = null;
     task.successfulAttempt = null;
@@ -801,7 +799,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     this.eventHandler.handle(new DAGHistoryEvent(
         taskId.getVertexID().getDAGId(), startEvt));
   }
-  
+
   protected void logJobHistoryTaskFinishedEvent() {
     // FIXME need to handle getting finish time as this function
     // is called from within a transition
@@ -811,7 +809,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     this.eventHandler.handle(new DAGHistoryEvent(
         taskId.getVertexID().getDAGId(), finishEvt));
   }
-  
+
   protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
     TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
         getVertex().getName(), clock.getTime(), finalState, getCounters());
@@ -859,15 +857,19 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             TezDependentTaskCompletionEvent.Status.SUCCEEDED);
         task.outputConsumableAttempt = attemptId;
         task.outputConsumableAttemptSuccessSent = true;
-        LOG.info("DEBUG: TezTaskAttemptID: " + attemptId
-            + " set as the OUTPUT_READY attempt");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("TezTaskAttemptID: " + attemptId
+              + " set as the OUTPUT_READY attempt");
+        }
       } else {
         // Nothing to do. This task will eventually be told to die, or will be
         // killed.
-        LOG.info("DEBUG: TezTaskAttemptID: "
-            + attemptId
-            + " reporting OUTPUT_READY. Will be asked to die since another attempt "
-            + task.outputConsumableAttempt + " already has output ready");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("TezTaskAttemptID: "
+              + attemptId + " reporting OUTPUT_READY."
+              + " Will be asked to die since another attempt "
+              + task.outputConsumableAttempt + " already has output ready");
+        }
         task.eventHandler.handle(new TaskAttemptEventKillRequest(attemptId,
             "Alternate attemptId already serving output"));
       }
@@ -899,12 +901,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
-  private static class AttemptSucceededTransition 
+  private static class AttemptSucceededTransition
       implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
       task.handleTaskAttemptCompletion(
-          ((TaskEventTAUpdate) event).getTaskAttemptID(), 
+          ((TaskEventTAUpdate) event).getTaskAttemptID(),
           TezDependentTaskCompletionEvent.Status.SUCCEEDED);
       task.finishedAttempts++;
       --task.numberUncompletedAttempts;
@@ -937,7 +939,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
       task.handleTaskAttemptCompletion(
-          ((TaskEventTAUpdate) event).getTaskAttemptID(), 
+          ((TaskEventTAUpdate) event).getTaskAttemptID(),
           TezDependentTaskCompletionEvent.Status.KILLED);
       task.finishedAttempts++;
       --task.numberUncompletedAttempts;
@@ -956,7 +958,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.handleTaskAttemptCompletion(
-          ((TaskEventTAUpdate) event).getTaskAttemptID(), 
+          ((TaskEventTAUpdate) event).getTaskAttemptID(),
           TezDependentTaskCompletionEvent.Status.KILLED);
       task.finishedAttempts++;
       // check whether all attempts are finished
@@ -997,7 +999,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.finishedAttempts++;
       if (task.failedAttempts < task.maxAttempts) {
         task.handleTaskAttemptCompletion(
-            ((TaskEventTAUpdate) event).getTaskAttemptID(), 
+            ((TaskEventTAUpdate) event).getTaskAttemptID(),
             TezDependentTaskCompletionEvent.Status.FAILED);
         // we don't need a new event if we already have a spare
         if (--task.numberUncompletedAttempts == 0
@@ -1006,9 +1008,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         }
       } else {
         task.handleTaskAttemptCompletion(
-            ((TaskEventTAUpdate) event).getTaskAttemptID(), 
+            ((TaskEventTAUpdate) event).getTaskAttemptID(),
             TezDependentTaskCompletionEvent.Status.TIPFAILED);
-        
+
         if (task.historyTaskStartGenerated) {
           task.logJobHistoryTaskFailedEvent(TaskState.FAILED);
         } else {
@@ -1046,7 +1048,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         LOG.error("Unexpected event for task of leaf vertex " + event.getType());
         task.internalError(event.getType());
       }
-      
+
       // tell the job about the rescheduling
       task.eventHandler.handle(
           new VertexEventTaskReschedule(task.taskId));
@@ -1085,16 +1087,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         // tell the job about the rescheduling
         unSucceed(task);
         task.handleTaskAttemptCompletion(
-            attemptId, 
+            attemptId,
             TezDependentTaskCompletionEvent.Status.KILLED);
         task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
-        // typically we are here because this map task was run on a bad node and 
+        // typically we are here because this map task was run on a bad node and
         // we want to reschedule it on a different node.
-        // Depending on whether there are previous failed attempts or not this 
+        // Depending on whether there are previous failed attempts or not this
         // can SCHEDULE or RESCHEDULE the container allocate request. If this
         // SCHEDULE's then the dataLocal hosts of this taskAttempt will be used
-        // from the map splitInfo. So the bad node might be sent as a location 
-        // to the RM. But the RM would ignore that just like it would ignore 
+        // from the map splitInfo. So the bad node might be sent as a location
+        // to the RM. But the RM would ignore that just like it would ignore
         // currently pending container requests affinitized to bad nodes.
         task.addAndScheduleAttempt();
         return TaskStateInternal.SCHEDULED;
@@ -1105,11 +1107,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
-  private static class KillNewTransition 
+  private static class KillNewTransition
     implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
-      
+
       if (task.historyTaskStartGenerated) {
         task.logJobHistoryTaskFailedEvent(TaskState.KILLED);
       } else {
@@ -1131,7 +1133,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
-  private static class KillTransition 
+  private static class KillTransition
     implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b06e264..c8ee314 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -599,12 +599,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
    * The only entry point to change the Vertex.
    */
   public void handle(VertexEvent event) {
-    LOG.info("DEBUG: Processing VertexEvent " + event.getVertexId()
-        + " of type " + event.getType() + " while in state "
-        + getInternalState() + ". Event: " + event);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing VertexEvent " + event.getVertexId() + " of type "
-          + event.getType() + " while in state " + getInternalState());
+      LOG.debug("Processing VertexEvent " + event.getVertexId()
+          + " of type " + event.getType() + " while in state "
+          + getInternalState() + ". Event: " + event);
     }
     try {
       writeLock.lock();
@@ -631,9 +629,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     finally {
       writeLock.unlock();
     }
-    LOG.info("DEBUG: Finished processing VertexEvent " + event.getVertexId()
-        + " of type " + event.getType() + " while in state "
-        + getInternalState() + ". Event: " + event);
   }
 
   private VertexState getInternalState() {
@@ -699,12 +694,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   static VertexState checkVertexForCompletion(VertexImpl vertex) {
     //check for vertex failure first
-
-    LOG.info("ZZZZ: checking for vertex completion"
-        + ", failedTaskCount=" + vertex.failedTaskCount
-        + ", killedTaskCount=" + vertex.killedTaskCount
-        + ", successfulTaskCount=" + vertex.succeededTaskCount
-        + ", completedTaskCount=" + vertex.completedTaskCount);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Checking for vertex completion"
+          + ", failedTaskCount=" + vertex.failedTaskCount
+          + ", killedTaskCount=" + vertex.killedTaskCount
+          + ", successfulTaskCount=" + vertex.succeededTaskCount
+          + ", completedTaskCount=" + vertex.completedTaskCount);
+    }
 
     if (vertex.failedTaskCount > 0) {
       vertex.setFinishTime();
@@ -1287,8 +1283,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       InputSpec inputSpec = new InputSpec(entry.getKey().getName(),
           entry.getKey().getTotalTasks(),
           entry.getValue().getInputClass());
-      LOG.info("DEBUG: For vertex : " + this.getName()
-          + ", Using InputSpec : " + inputSpec);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("For vertex : " + this.getName()
+            + ", Using InputSpec : " + inputSpec);
+      }
       // TODO DAGAM This should be based on the edge type.
       inputSpecList.add(inputSpec);
     }
@@ -1304,8 +1302,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         OutputSpec outputSpec = new OutputSpec(entry.getKey().getName(),
             entry.getKey().getTotalTasks(),
             entry.getValue().getOutputClass());
-        LOG.info("DEBUG: For vertex : " + this.getName()
-            + ", Using OutputSpec : " + outputSpec);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("For vertex : " + this.getName()
+              + ", Using OutputSpec : " + outputSpec);
+        }
         // TODO DAGAM This should be based on the edge type.
         outputSpecList.add(outputSpec);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9feab053/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 58e9d50..bab02da 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -59,7 +59,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 public class AMContainerImpl implements AMContainer {
 
   private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
-  
+
   private final ReadLock readLock;
   private final WriteLock writeLock;
   private final ContainerId containerId;
@@ -75,10 +75,10 @@ public class AMContainerImpl implements AMContainer {
   // TODO Maybe this should be pulled from the TaskAttempt.s
   private final Map<TezTaskAttemptID, TezTaskContext> remoteTaskMap =
       new HashMap<TezTaskAttemptID, TezTaskContext>();
-  
+
   // TODO ?? Convert to list and hash.
-  
-  private int shufflePort; 
+
+  private int shufflePort;
   private long idleTimeBetweenTasks = 0;
   private long lastTaskFinishTime;
 
@@ -92,16 +92,16 @@ public class AMContainerImpl implements AMContainer {
   private TezTaskAttemptID runningAttempt;
   private List<TezTaskAttemptID> failedAssignments;
   private TezTaskAttemptID pullAttempt;
-  
+
   private AMContainerTask noAllocationContainerTask;
-  
+
   private static final AMContainerTask NO_MORE_TASKS = new AMContainerTask(
       true, null);
   private static final AMContainerTask WAIT_TASK = new AMContainerTask(false,
       null);
-  
+
   private boolean inError = false;
-  
+
   private ContainerLaunchContext clc;
 
   // TODO Consider registering with the TAL, instead of the TAL pulling.
@@ -115,8 +115,8 @@ public class AMContainerImpl implements AMContainer {
 
   private final StateMachine<AMContainerState, AMContainerEventType, AMContainerEvent> stateMachine;
   private static final StateMachineFactory
-      <AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent> 
-      stateMachineFactory = 
+      <AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>
+      stateMachineFactory =
       new StateMachineFactory<AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>(
       AMContainerState.ALLOCATED)
 
@@ -144,7 +144,7 @@ public class AMContainerImpl implements AMContainer {
         .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtIdleTransition())
         .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtIdleTransition())
         .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtIdleTransition())
-        
+
         .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtRunningTransition())
         .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, AMContainerEventType.C_PULL_TA)
         .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, AMContainerEventType.C_TA_SUCCEEDED, new TASucceededAtRunningTransition())
@@ -162,7 +162,7 @@ public class AMContainerImpl implements AMContainer {
         .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtNMStopRequestedTransition())
         .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
         .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtNMStopRequestedTransition())
-        
+
         .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
         .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
         // TODO This transition is wrong. Should be a noop / error.
@@ -196,10 +196,10 @@ public class AMContainerImpl implements AMContainer {
     this.taskAttemptListener = tal;
     this.failedAssignments = new LinkedList<TezTaskAttemptID>();
 
-    this.noAllocationContainerTask = WAIT_TASK; 
+    this.noAllocationContainerTask = WAIT_TASK;
     this.stateMachine = stateMachineFactory.make(this);
   }
-  
+
   @Override
   public AMContainerState getState() {
     readLock.lock();
@@ -214,7 +214,7 @@ public class AMContainerImpl implements AMContainer {
   public ContainerId getContainerId() {
     return this.containerId;
   }
-  
+
   @Override
   public Container getContainer() {
     return this.container;
@@ -262,7 +262,7 @@ public class AMContainerImpl implements AMContainer {
       readLock.unlock();
     }
   }
-  
+
   @Override
   public int getShufflePort() {
     readLock.lock();
@@ -272,17 +272,19 @@ public class AMContainerImpl implements AMContainer {
       readLock.unlock();
     }
   }
-  
+
   public boolean isInErrorState() {
     return inError;
   }
 
   @Override
   public void handle(AMContainerEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing AMContainerEvent " + event.getContainerId()
+          + " of type " + event.getType() + " while in state: " + getState()
+          + ". Event: " + event);
+    }
     this.writeLock.lock();
-    LOG.info("DEBUG: Processing AMContainerEvent " + event.getContainerId()
-        + " of type " + event.getType() + " while in state: " + getState()
-        + ". Event: " + event);
     try {
       final AMContainerState oldState = getState();
       try {
@@ -302,7 +304,7 @@ public class AMContainerImpl implements AMContainer {
       writeLock.unlock();
     }
   }
-  
+
   @SuppressWarnings("unchecked")
   private void sendEvent(Event<?> event) {
     this.eventHandler.handle(event);
@@ -348,14 +350,14 @@ public class AMContainerImpl implements AMContainer {
           container.taskAttemptListener, event.getCredentials(),
           event.shouldProfile(), container.appContext);
 
-      // Registering now, so that in case of delayed NM response, the child 
+      // Registering now, so that in case of delayed NM response, the child
       // task is not told to die since the TAL does not know about the container.
       container.registerWithTAListener();
       container.sendStartRequestToNM();
       LOG.info("Sending Launch Request for Container with id: " +
           container.container.getId());
       // Forget about the clc to save resources. At some point, part of the clc
-      // info may need to be exposed to the scheduler to figure out whether a 
+      // info may need to be exposed to the scheduler to figure out whether a
       // container can be used for a specific TaskAttempt.
       container.clc = null;
     }
@@ -451,7 +453,9 @@ public class AMContainerImpl implements AMContainer {
         return AMContainerState.STOP_REQUESTED;
       }
       container.pendingAttempt = event.getTaskAttemptId();
-      LOG.info("DEBUG: AssignTA: attempt: " + event.getRemoteTaskContext());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AssignTA: attempt: " + event.getRemoteTaskContext());
+      }
       container.remoteTaskMap
           .put(event.getTaskAttemptId(), event.getRemoteTaskContext());
       return container.getState();
@@ -580,7 +584,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
       if (container.pendingAttempt != null) {
-        container.sendTerminatingToTaskAttempt(container.pendingAttempt, 
+        container.sendTerminatingToTaskAttempt(container.pendingAttempt,
             "Container " + container.getContainerId() +
                 " hit an invalid transition - " + cEvent.getType() + " at " +
                 container.getState());
@@ -595,8 +599,10 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public AMContainerState transition(
         AMContainerImpl container, AMContainerEvent cEvent) {
-      LOG.info("DEBUG: AssignTAAtIdle: attempt: " +
-          ((AMContainerEventAssignTA) cEvent).getRemoteTaskContext());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AssignTAAtIdle: attempt: " +
+            ((AMContainerEventAssignTA) cEvent).getRemoteTaskContext());
+      }
       return super.transition(container, cEvent);
     }
   }
@@ -617,12 +623,13 @@ public class AMContainerImpl implements AMContainer {
         if (container.lastTaskFinishTime != 0) {
           long idleTimeDiff =
               System.currentTimeMillis() - container.lastTaskFinishTime;
-          LOG.info("DEBUG: Computing idle time for container: " +
-              container.getContainerId() + ", lastFinishTime: " +
-              container.lastTaskFinishTime + ", Incremented by: " +
-              idleTimeDiff);
-          container.idleTimeBetweenTasks +=
-              System.currentTimeMillis() - container.lastTaskFinishTime;
+          container.idleTimeBetweenTasks += idleTimeDiff;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Computing idle time for container: " +
+                container.getContainerId() + ", lastFinishTime: " +
+                container.lastTaskFinishTime + ", Incremented by: " +
+                idleTimeDiff);
+          }
         }
         LOG.info("Assigned taskAttempt + [" + container.runningAttempt +
             "] to container: [" + container.getContainerId() + "]");
@@ -656,7 +663,9 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
-      LOG.info("DEBUG: IdleTimeBetweenTasks: " + container.idleTimeBetweenTasks);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("IdleTimeBetweenTasks: " + container.idleTimeBetweenTasks);
+      }
       container.unregisterFromContainerListener();
     }
   }
@@ -785,7 +794,7 @@ public class AMContainerImpl implements AMContainer {
       container.registerFailedAttempt(event.getTaskAttemptId());
     }
   }
-  
+
   // Hack to some extent. This allocation should be done while entering one of
   // the post-running states, insetad of being a transition on the post stop
   // states.
@@ -907,11 +916,11 @@ public class AMContainerImpl implements AMContainer {
   protected void registerFailedAttempt(TezTaskAttemptID taId) {
     failedAssignments.add(taId);
   }
-  
+
   protected void deAllocate() {
     sendEvent(new AMSchedulerEventDeallocateContainer(containerId));
   }
-  
+
   protected void sendCompletedToScheduler() {
     sendEvent(new AMSchedulerEventContainerCompleted(containerId));
   }
@@ -944,7 +953,7 @@ public class AMContainerImpl implements AMContainer {
     sendEvent(new NMCommunicatorStopRequestEvent(containerId,
         container.getNodeId(), container.getContainerToken()));
   }
-  
+
   protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) {
     taskAttemptListener.unregisterTaskAttempt(attemptId);
   }
@@ -965,7 +974,7 @@ public class AMContainerImpl implements AMContainer {
   protected void unregisterFromContainerListener() {
     this.containerHeartbeatHandler.unregister(this.containerId);
   }
-  
 
-  
+
+
 }


Mime
View raw message