tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/2] TEZ-346. AM should not use TezConfiguration instances. Contributed by Hitesh Shah.
Date Tue, 13 Aug 2013 21:54:33 GMT
Updated Branches:
  refs/heads/master 8343da947 -> a9b6ab16b


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
index a065218..b1dd93d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
@@ -22,51 +22,51 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Vector;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.YarnTezDagChild;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.dag.api.TezConfiguration;
 
 public class TezEngineChildJVM {
 
-    // FIXME 
+    // FIXME
   public static enum LogName {
     /** Log on the stdout of the task. */
     STDOUT ("stdout"),
 
     /** Log on the stderr of the task. */
     STDERR ("stderr"),
-    
+
     /** Log on the map-reduce system logs of the task. */
     SYSLOG ("syslog"),
-    
+
     /** The java profiler information. */
     PROFILE ("profile.out"),
-    
+
     /** Log the debug script's stdout  */
     DEBUGOUT ("debugout");
-        
+
     private String prefix;
-    
+
     private LogName(String prefix) {
       this.prefix = prefix;
     }
-    
+
     @Override
     public String toString() {
       return prefix;
     }
   }
-  
+
   private static String getTaskLogFile(LogName filter) {
-    return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + 
+    return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR +
         filter.toString();
   }
-  
+
   public static List<String> getVMCommand(
-      InetSocketAddress taskAttemptListenerAddr, TezConfiguration conf, 
+      InetSocketAddress taskAttemptListenerAddr, Configuration conf,
       String containerIdentifier,
       String tokenIdentifier,
       int applicationAttemptNumber,
@@ -78,18 +78,18 @@ public class TezEngineChildJVM {
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 
     //set custom javaOpts
-    vargs.add(javaOpts); 
-    
+    vargs.add(javaOpts);
+
 //[Debug Task] Current simplest way to attach debugger to  Tez Child Task
 // Uncomment the following, then launch a regular job
-// Works best on one-box configured with a single container (hence one task at a time). 
+// Works best on one-box configured with a single container (hence one task at a time).
 //    LOG.error(" !!!!!!!!! Launching Child-Task in debug/suspend mode.  Attach to port 8003 !!!!!!!!");
 //    vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8003,server=y,suspend=y");
-    
+
     Path childTmpDir = new Path(Environment.PWD.$(),
         YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
     vargs.add("-Djava.io.tmpdir=" + childTmpDir);
-    
+
     // FIXME Setup the log4j properties
 
     // Decision to profile needs to be made in the scheduler.
@@ -97,11 +97,11 @@ public class TezEngineChildJVM {
       // FIXME add support for profiling
     }
 
-    // Add main class and its arguments 
+    // Add main class and its arguments
     vargs.add(YarnTezDagChild.class.getName());  // main of Child
 
     // pass TaskAttemptListener's address
-    vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress()); 
+    vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress());
     vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
     vargs.add(containerIdentifier);
     vargs.add(tokenIdentifier);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 342ea0f..d3568a5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -80,7 +81,7 @@ public class TestDAGImpl {
   private static final Log LOG = LogFactory.getLog(TestDAGImpl.class);
   private DAGPlan dagPlan;
   private TezDAGID dagId;
-  private TezConfiguration conf;
+  private Configuration conf;
   private DrainDispatcher dispatcher;
   private Credentials fsTokens;
   private AppContext appContext;
@@ -136,7 +137,7 @@ public class TestDAGImpl {
       ((EventHandler<TaskEvent>)task).handle(event);
     }
   }
-  
+
   private class VertexEventDispatcher
       implements EventHandler<VertexEvent> {
 
@@ -475,7 +476,9 @@ public class TestDAGImpl {
 
   @Before
   public void setup() {
-    conf = new TezConfiguration();
+    conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING, false);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     appAttemptId = ApplicationAttemptId.newInstance(
         ApplicationId.newInstance(100, 1), 1);
     dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
@@ -712,8 +715,8 @@ public class TestDAGImpl {
   }
 
   // a dag.kill() on an active DAG races with vertices all succeeding.
-  // if a JOB_KILL is processed while dag is in running state, it should end in KILLED, 
-  // regardless of whether all vertices complete 
+  // if a JOB_KILL is processed while dag is in running state, it should end in KILLED,
+  // regardless of whether all vertices complete
   //
   // Final state:
   //   DAG is in KILLED state, with killTrigger = USER_KILL

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index bc8b523..d9210d9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -58,7 +59,6 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
@@ -134,7 +134,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = new TezTaskID(
         new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        mock(TaskAttemptListener.class), 1, new TezConfiguration(),
+        mock(TaskAttemptListener.class), 1, new Configuration(),
         mock(Token.class), new Credentials(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
         MAP_PROCESSOR_DESC, locationHint, Resource.newInstance(1024, 1),
@@ -180,7 +180,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = new TezTaskID(
         new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        mock(TaskAttemptListener.class), 1, new TezConfiguration(),
+        mock(TaskAttemptListener.class), 1, new Configuration(),
         mock(Token.class), new Credentials(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
         MAP_PROCESSOR_DESC, locationHint, Resource.newInstance(1024, 1),
@@ -316,9 +316,9 @@ public class TestTaskAttempt {
     when(taListener.getAddress()).thenReturn(
         new InetSocketAddress("localhost", 0));
 
-    TezConfiguration tezConf = new TezConfiguration();
-    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
@@ -331,7 +331,7 @@ public class TestTaskAttempt {
     doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        taListener, 1, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext,
         MAP_PROCESSOR_DESC, locationHint, resource, localResources,
         environment, javaOpts, false);
@@ -370,9 +370,9 @@ public class TestTaskAttempt {
     when(taListener.getAddress()).thenReturn(
         new InetSocketAddress("localhost", 0));
 
-    TezConfiguration tezConf = new TezConfiguration();
-    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
@@ -398,7 +398,7 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        taListener, 1, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
         MAP_PROCESSOR_DESC, locationHint, resource, localResources,
         environment, javaOpts, false);
@@ -465,9 +465,9 @@ public class TestTaskAttempt {
     when(taListener.getAddress()).thenReturn(
         new InetSocketAddress("localhost", 0));
 
-    TezConfiguration tezConf = new TezConfiguration();
-    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
@@ -493,7 +493,7 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        taListener, 1, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
         MAP_PROCESSOR_DESC, locationHint, resource, localResources,
         environment, javaOpts, false);
@@ -530,9 +530,9 @@ public class TestTaskAttempt {
     when(taListener.getAddress()).thenReturn(
         new InetSocketAddress("localhost", 0));
 
-    TezConfiguration tezConf = new TezConfiguration();
-    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
@@ -558,7 +558,7 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        taListener, 1, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
         MAP_PROCESSOR_DESC, locationHint, resource, localResources,
         environment, javaOpts, false);
@@ -597,9 +597,9 @@ public class TestTaskAttempt {
     when(taListener.getAddress()).thenReturn(
         new InetSocketAddress("localhost", 0));
 
-    TezConfiguration tezConf = new TezConfiguration();
-    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
@@ -625,7 +625,7 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        taListener, 1, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
         MAP_PROCESSOR_DESC, locationHint, resource, localResources,
         environment, javaOpts, false);
@@ -690,9 +690,9 @@ public class TestTaskAttempt {
     when(taListener.getAddress()).thenReturn(
         new InetSocketAddress("localhost", 0));
 
-    TezConfiguration tezConf = new TezConfiguration();
-    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
@@ -719,7 +719,7 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        taListener, 1, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
         MAP_PROCESSOR_DESC, locationHint, resource, localResources,
         environment, javaOpts, false);
@@ -791,7 +791,7 @@ public class TestTaskAttempt {
 
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
         EventHandler eventHandler, TaskAttemptListener tal, int partition,
-        TezConfiguration conf, Token<JobTokenIdentifier> jobToken,
+        Configuration conf, Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock,
         TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
         ProcessorDescriptor processorDesc, TaskLocationHint locationHint,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 8a48c6c..922c0a6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -31,6 +31,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -41,7 +42,6 @@ import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
@@ -71,7 +71,7 @@ public class TestTaskImpl {
 
   private InlineDispatcher dispatcher;
 
-  private TezConfiguration conf;
+  private Configuration conf;
   private TaskAttemptListener taskAttemptListener;
   private TaskHeartbeatHandler taskHeartbeatHandler;
   private Token<JobTokenIdentifier> jobToken;
@@ -96,7 +96,7 @@ public class TestTaskImpl {
   @Before
   public void setup() {
     dispatcher = new InlineDispatcher();
-    conf = new TezConfiguration();
+    conf = new Configuration();
     taskAttemptListener = mock(TaskAttemptListener.class);
     taskHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
@@ -267,7 +267,7 @@ public class TestTaskImpl {
   @Test
   /**
    * Kill running attempt
-   * {@link TaskState#RUNNING}->{@link TaskState#RUNNING} 
+   * {@link TaskState#RUNNING}->{@link TaskState#RUNNING}
    */
   public void testKillRunningTaskAttempt() {
     LOG.info("--- START: testKillRunningTaskAttempt ---");
@@ -368,7 +368,7 @@ public class TestTaskImpl {
     assertTaskSucceededState();
 
   }
-  
+
   // TODO Add test to validate the correct commit attempt.
 
   @SuppressWarnings("rawtypes")
@@ -377,7 +377,7 @@ public class TestTaskImpl {
     private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
 
     public MockTaskImpl(TezVertexID vertexId, int partition,
-        EventHandler eventHandler, TezConfiguration conf,
+        EventHandler eventHandler, Configuration conf,
         TaskAttemptListener taskAttemptListener,
         Token<JobTokenIdentifier> jobToken, Credentials credentials,
         Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
@@ -434,7 +434,7 @@ public class TestTaskImpl {
 
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
         EventHandler eventHandler, TaskAttemptListener tal, int partition,
-        TezConfiguration conf, Token<JobTokenIdentifier> jobToken,
+        Configuration conf, Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock, TaskHeartbeatHandler thh,
         AppContext appContext, ProcessorDescriptor processorDesc,
         TaskLocationHint locationHing, Resource resource,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 15c8a9b..23d5b82 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.MRVertexOutputCommitter;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -105,7 +106,7 @@ public class TestVertexImpl {
   private TaskHeartbeatHandler thh;
   private AppContext appContext;
   private VertexLocationHint vertexLocationHint;
-  private TezConfiguration conf = new TezConfiguration();
+  private Configuration conf;
   private Map<String, EdgeProperty> edges;
 
   private TaskEventDispatcher taskEventDispatcher;
@@ -169,7 +170,7 @@ public class TestVertexImpl {
       ((EventHandler<TaskEvent>)task).handle(event);
     }
   }
-  
+
   private class DagEventDispatcher implements EventHandler<DAGEvent> {
     public Map<DAGEventType, Integer> eventCount =
         new HashMap<DAGEventType, Integer>();
@@ -229,7 +230,7 @@ public class TestVertexImpl {
         .build();
     return dag;
   }
- 
+
 
   private DAGPlan createTestDAGPlan() {
     LOG.info("Setting up dag plan");
@@ -502,6 +503,9 @@ public class TestVertexImpl {
 
   @Before
   public void setup() {
+    conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+    conf.setBoolean(TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING, false);
     appAttemptId = ApplicationAttemptId.newInstance(
         ApplicationId.newInstance(100, 1), 1);
     dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
@@ -979,7 +983,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.RUNNING, v6.getState());
     Assert.assertEquals(4, v6.successSourceAttemptCompletionEventNoMap.size());
     Assert.assertEquals(6, v6.getTaskAttemptCompletionEvents(0, 100).length);
-    
+
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 7bd1f8e..056239b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -16,6 +16,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -58,10 +59,10 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.junit.Test;
 
 public class TestContainerReuse {
-  
+
   @Test
   public void test() throws IOException {
-    TezConfiguration tezConf = new TezConfiguration(new YarnConfiguration());
+    Configuration tezConf = new Configuration(new YarnConfiguration());
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
     RackResolver.init(tezConf);
@@ -69,16 +70,16 @@ public class TestContainerReuse {
 
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID = new TezDAGID("0", 0, 0);
-    
+
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     AMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
     String appUrl = "url";
     String appMsg = "success";
-    AppFinalStatus finalStatus = 
+    AppFinalStatus finalStatus =
         new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-    
+
     doReturn(finalStatus).when(mockApp).getFinalAppStatus();
-    
+
     AppContext appContext = mock(AppContext.class);
     AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), appContext);
     AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
@@ -90,18 +91,18 @@ public class TestContainerReuse {
     TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
     taskSchedulerEventHandler.init(tezConf);
     taskSchedulerEventHandler.start();
-    
+
     TaskScheduler taskScheduler = ((TaskSchedulerEventHandlerForTest)taskSchedulerEventHandler).getSpyTaskScheduler();
 
     Resource resource1 = Resource.newInstance(1024, 1);
     String[] host1 = {"host1"};
     String[] host2 = {"host2"};
-    
+
     String []racks = {"/default-rack"};
     Priority priority1 = Priority.newInstance(1);
-    
+
     TezVertexID vertexID1 = new TezVertexID(dagID, 1);
-    
+
     //Vertex 1, Task 1, Attempt 1, host1
     TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID1, 1), 1);
     TaskAttempt ta11 = mock(TaskAttempt.class);
@@ -121,14 +122,14 @@ public class TestContainerReuse {
     TezTaskAttemptID taID14 = new TezTaskAttemptID(new TezTaskID(vertexID1, 4), 1);
     TaskAttempt ta14 = mock(TaskAttempt.class);
     AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent(taID14, ta14, resource1, host2, racks, priority1, tezConf);
-    
+
     taskSchedulerEventHandler.handleEvent(lrEvent1);
     taskSchedulerEventHandler.handleEvent(lrEvent2);
     taskSchedulerEventHandler.handleEvent(lrEvent3);
     taskSchedulerEventHandler.handleEvent(lrEvent4);
 
     Container container1 = createContainer(1, "host1", resource1, priority1);
-    
+
     // One container allocated.
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
@@ -140,7 +141,7 @@ public class TestContainerReuse {
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
-    
+
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED));
@@ -159,23 +160,23 @@ public class TestContainerReuse {
     eventHandler.reset();
 
     Container container2 = createContainer(2, "host2", resource1, priority1);
-    
+
     // Second container allocated. Should be allocated to the last task.
     taskScheduler.onContainersAllocated(Collections.singletonList(container2));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2));
-    
+
     // Task assigned to container completed successfully. No pending requests. Container should be released.
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED));
     verify(taskScheduler).deallocateTask(eq(ta14), eq(true));
     verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
     eventHandler.reset();
-    
+
 
     taskScheduler.close();
     taskSchedulerEventHandler.close();
   }
-  
+
   private Container createContainer(int id, String host, Resource resource, Priority priority) {
     ContainerId containerID = ContainerId.newInstance(
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
@@ -185,10 +186,10 @@ public class TestContainerReuse {
         resource, priority, null);
     return container;
   }
-  
+
   private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(
       TezTaskAttemptID taID, TaskAttempt ta, Resource capability, String[] hosts,
-      String[] racks, Priority priority, TezConfiguration conf) {
+      String[] racks, Priority priority, Configuration conf) {
     AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(
         taID, capability, new HashMap<String, LocalResource>(),
         new TezEngineTaskContext(taID, "user", "jobName", "vertexName",
@@ -200,7 +201,7 @@ public class TestContainerReuse {
         new HashMap<String, String>(), conf);
     return lr;
   }
-  
+
   // Mocking AMRMClientImpl to make use of getMatchingRequest
   public static class AMRMClientForTest extends AMRMClientImpl<CookieContainerRequest> {
 
@@ -213,7 +214,7 @@ public class TestContainerReuse {
     }
   }
 
-  
+
   // Mocking AMRMClientAsyncImpl to make use of getMatchingRequest
   public static class AMRMClientAsyncForTest extends
       AMRMClientAsyncImpl<CookieContainerRequest> {
@@ -244,24 +245,24 @@ public class TestContainerReuse {
     protected void serviceStop() {
     }
   }
-  
+
   // Overrides start / stop. Will be controlled without the extra event handling thread.
   public static class TaskSchedulerEventHandlerForTest extends TaskSchedulerEventHandler {
 
     private AMRMClientAsync<CookieContainerRequest> amrmClientAsync;
-    
+
     @SuppressWarnings("rawtypes")
     public TaskSchedulerEventHandlerForTest(AppContext appContext,
         EventHandler eventHandler, AMRMClientAsync<CookieContainerRequest> amrmClientAsync) {
       super(appContext, null, eventHandler);
       this.amrmClientAsync = amrmClientAsync;
     }
-    
+
     @Override
     public TaskScheduler createTaskScheduler(String host, int port, String trackingUrl) {
       return new TaskScheduler(this, host, port, trackingUrl, amrmClientAsync);
     }
-    
+
     public TaskScheduler getSpyTaskScheduler() {
       return this.taskScheduler;
     }
@@ -297,7 +298,7 @@ public class TestContainerReuse {
         assertFalse(e.getClass().getName().equals(eventClass.getName()));
       }
     }
-    
+
     public void verifyInvocation(Class<? extends Event> eventClass) {
       for (Event e : events) {
         if (e.getClass().getName().equals(eventClass.getName())) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 284d1ef..1928f98 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -72,67 +72,69 @@ import org.mockito.stubbing.Answer;
 
 
 public class TestTaskScheduler {
-  
+
   RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
-    
+
   @SuppressWarnings({ "unchecked" })
   @Test
   public void testTaskScheduler() throws Exception {
     RackResolver.init(new YarnConfiguration());
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-    
-    AMRMClientAsync<CookieContainerRequest> mockRMClient = 
+
+    AMRMClientAsync<CookieContainerRequest> mockRMClient =
                                                   mock(AMRMClientAsync.class);
-    
+
     String appHost = "host";
     int appPort = 0;
     String appUrl = "url";
-    TaskScheduler scheduler = new TaskScheduler(mockApp, appHost, 
+    TaskScheduler scheduler = new TaskScheduler(mockApp, appHost,
                                                 appPort, appUrl, mockRMClient);
-    
-    Configuration conf = new Configuration(); 
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING, false);
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     int interval = 100;
     conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, interval);
     scheduler.init(conf);
     verify(mockRMClient).init(conf);
     verify(mockRMClient).setHeartbeatInterval(interval);
-    
-    RegisterApplicationMasterResponse mockRegResponse = 
+
+    RegisterApplicationMasterResponse mockRegResponse =
                                 mock(RegisterApplicationMasterResponse.class);
     Resource mockMaxResource = mock(Resource.class);
     Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
     when(mockRegResponse.getMaximumResourceCapability()).
                                                    thenReturn(mockMaxResource);
-    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);    
+    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
     when(mockRMClient.
           registerApplicationMaster(anyString(), anyInt(), anyString())).
                                                    thenReturn(mockRegResponse);
     scheduler.start();
     verify(mockRMClient).start();
     verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
-    verify(mockApp).setApplicationRegistrationData(mockMaxResource, 
+    verify(mockApp).setApplicationRegistrationData(mockMaxResource,
                                                    mockAcls);
-    
+
     when(mockRMClient.getClusterNodeCount()).thenReturn(5);
     Assert.assertEquals(5, scheduler.getClusterNodeCount());
-    
+
     Resource mockClusterResource = mock(Resource.class);
     when(mockRMClient.getAvailableResources()).
                                               thenReturn(mockClusterResource);
-    Assert.assertEquals(mockClusterResource, 
+    Assert.assertEquals(mockClusterResource,
                         mockRMClient.getAvailableResources());
-    
+
     Object mockTask1 = mock(Object.class);
     Object mockCookie1 = mock(Object.class);
     Resource mockCapability = mock(Resource.class);
     String[] hosts = {"host1", "host5"};
     String[] racks = {"/default-rack", "/default-rack"};
     Priority mockPriority = mock(Priority.class);
-    ArgumentCaptor<CookieContainerRequest> requestCaptor = 
+    ArgumentCaptor<CookieContainerRequest> requestCaptor =
                         ArgumentCaptor.forClass(CookieContainerRequest.class);
     // allocate task
-    scheduler.allocateTask(mockTask1, mockCapability, hosts, 
+    scheduler.allocateTask(mockTask1, mockCapability, hosts,
                            racks, mockPriority, mockCookie1);
     verify(mockRMClient, times(1)).
                            addContainerRequest((CookieContainerRequest) any());
@@ -143,7 +145,7 @@ public class TestTaskScheduler {
                         removeContainerRequest((CookieContainerRequest) any());
     verify(mockRMClient, times(0)).
                                  releaseAssignedContainer((ContainerId) any());
-    
+
     // deallocating unknown task
     Assert.assertNull(scheduler.deallocateTask(mockTask1, true));
     verify(mockRMClient, times(1)).
@@ -156,22 +158,22 @@ public class TestTaskScheduler {
     Object mockCookie2 = mock(Object.class);
     Object mockTask3 = mock(Object.class);
     Object mockCookie3 = mock(Object.class);
-    scheduler.allocateTask(mockTask1, mockCapability, hosts, 
+    scheduler.allocateTask(mockTask1, mockCapability, hosts,
         racks, mockPriority, mockCookie1);
     verify(mockRMClient, times(2)).
                                 addContainerRequest(requestCaptor.capture());
     CookieContainerRequest request1 = requestCaptor.getValue();
-    scheduler.allocateTask(mockTask2, mockCapability, hosts, 
+    scheduler.allocateTask(mockTask2, mockCapability, hosts,
         racks, mockPriority, mockCookie2);
     verify(mockRMClient, times(3)).
                                 addContainerRequest(requestCaptor.capture());
     CookieContainerRequest request2 = requestCaptor.getValue();
-    scheduler.allocateTask(mockTask3, mockCapability, hosts, 
+    scheduler.allocateTask(mockTask3, mockCapability, hosts,
         racks, mockPriority, mockCookie3);
     verify(mockRMClient, times(4)).
                                 addContainerRequest(requestCaptor.capture());
     CookieContainerRequest request3 = requestCaptor.getValue();
-    
+
     List<Container> containers = new ArrayList<Container>();
     Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
@@ -193,29 +195,29 @@ public class TestTaskScheduler {
     ContainerId mockCId4 = mock(ContainerId.class);
     when(mockContainer4.getId()).thenReturn(mockCId4);
     containers.add(mockContainer4);
-    ArrayList<CookieContainerRequest> hostContainers = 
+    ArrayList<CookieContainerRequest> hostContainers =
                              new ArrayList<CookieContainerRequest>();
     hostContainers.add(request1);
     hostContainers.add(request2);
     hostContainers.add(request3);
-    ArrayList<CookieContainerRequest> rackContainers = 
+    ArrayList<CookieContainerRequest> rackContainers =
                              new ArrayList<CookieContainerRequest>();
     rackContainers.add(request2);
     rackContainers.add(request3);
-    ArrayList<CookieContainerRequest> anyContainers = 
+    ArrayList<CookieContainerRequest> anyContainers =
                              new ArrayList<CookieContainerRequest>();
     anyContainers.add(request3);
 
-    final List<ArrayList<CookieContainerRequest>> hostList = 
+    final List<ArrayList<CookieContainerRequest>> hostList =
                         new LinkedList<ArrayList<CookieContainerRequest>>();
     hostList.add(hostContainers);
-    final List<ArrayList<CookieContainerRequest>> rackList = 
+    final List<ArrayList<CookieContainerRequest>> rackList =
                         new LinkedList<ArrayList<CookieContainerRequest>>();
     rackList.add(rackContainers);
-    final List<ArrayList<CookieContainerRequest>> anyList = 
+    final List<ArrayList<CookieContainerRequest>> anyList =
                         new LinkedList<ArrayList<CookieContainerRequest>>();
     anyList.add(anyContainers);
-    final List<ArrayList<CookieContainerRequest>> emptyList = 
+    final List<ArrayList<CookieContainerRequest>> emptyList =
                         new LinkedList<ArrayList<CookieContainerRequest>>();
     // return all requests for host1
     when(
@@ -283,7 +285,7 @@ public class TestTaskScheduler {
     verify(mockRMClient).removeContainerRequest(request3);
     // verify unwanted container released
     verify(mockRMClient).releaseAssignedContainer(mockCId4);
-    
+
     // deallocate allocated task
     Assert.assertEquals(mockContainer1, scheduler.deallocateTask(mockTask1, true));
     verify(mockRMClient).releaseAssignedContainer(mockCId1);
@@ -291,7 +293,7 @@ public class TestTaskScheduler {
     Assert.assertEquals(mockTask2, scheduler.deallocateContainer(mockCId2));
     verify(mockRMClient).releaseAssignedContainer(mockCId2);
     verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
-    
+
     List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
     ContainerStatus mockStatus1 = mock(ContainerStatus.class);
     when(mockStatus1.getContainerId()).thenReturn(mockCId1);
@@ -305,7 +307,7 @@ public class TestTaskScheduler {
     ContainerStatus mockStatus4 = mock(ContainerStatus.class);
     when(mockStatus4.getContainerId()).thenReturn(mockCId4);
     statuses.add(mockStatus4);
-    
+
     scheduler.onContainersCompleted(statuses);
     // released container status returned
     verify(mockApp).containerCompleted(mockTask1, mockStatus1);
@@ -315,54 +317,54 @@ public class TestTaskScheduler {
     // no other statuses returned
     verify(mockApp, times(3)).containerCompleted(any(), (ContainerStatus) any());
     verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
-    
-    
+
+
     float progress = 0.5f;
     when(mockApp.getProgress()).thenReturn(progress);
     Assert.assertEquals(progress, scheduler.getProgress(), 0);
-    
+
     List<NodeReport> mockUpdatedNodes = mock(List.class);
     scheduler.onNodesUpdated(mockUpdatedNodes);
     verify(mockApp).nodesUpdated(mockUpdatedNodes);
-    
+
     Exception mockException = mock(Exception.class);
     scheduler.onError(mockException);
     verify(mockApp).onError(mockException);
-    
+
     scheduler.onShutdownRequest();
     verify(mockApp).appShutdownRequested();
-    
+
     String appMsg = "success";
-    AppFinalStatus finalStatus = 
+    AppFinalStatus finalStatus =
         new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
     when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
     scheduler.stop();
     verify(mockRMClient).
-                  unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, 
+                  unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
                                               appMsg, appUrl);
     verify(mockRMClient).stop();
     scheduler.close();
   }
-  
+
   @SuppressWarnings("unchecked")
   @Test
   public void testTaskSchedulerPreemption() throws Exception {
     RackResolver.init(new YarnConfiguration());
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-    
-    AMRMClientAsync<CookieContainerRequest> mockRMClient = 
+
+    AMRMClientAsync<CookieContainerRequest> mockRMClient =
                                                   mock(AMRMClientAsync.class);
-    
+
     String appHost = "host";
     int appPort = 0;
     String appUrl = "url";
-    TaskScheduler scheduler = new TaskScheduler(mockApp, appHost, 
+    TaskScheduler scheduler = new TaskScheduler(mockApp, appHost,
                                                 appPort, appUrl, mockRMClient);
-    
-    Configuration conf = new Configuration(); 
+
+    Configuration conf = new Configuration();
     scheduler.init(conf);
-    
-    RegisterApplicationMasterResponse mockRegResponse = 
+
+    RegisterApplicationMasterResponse mockRegResponse =
                        mock(RegisterApplicationMasterResponse.class);
     when(
         mockRMClient.registerApplicationMaster(anyString(), anyInt(),
@@ -371,12 +373,12 @@ public class TestTaskScheduler {
     scheduler.start();
     Resource totalResource = Resource.newInstance(4000, 4);
     when(mockRMClient.getAvailableResources()).thenReturn(totalResource);
-    
+
     // no preemption
     scheduler.getProgress();
     Assert.assertEquals(totalResource, scheduler.getTotalResources());
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
-    
+
     // allocate task
     Object mockTaskPri1 = mock(Object.class);
     Object mockTaskPri2 = mock(Object.class);
@@ -386,38 +388,38 @@ public class TestTaskScheduler {
     Priority pri1 = Priority.newInstance(1);
     Priority pri2 = Priority.newInstance(2);
     Priority pri3 = Priority.newInstance(3);
-    
-    ArgumentCaptor<CookieContainerRequest> requestCaptor = 
+
+    ArgumentCaptor<CookieContainerRequest> requestCaptor =
         ArgumentCaptor.forClass(CookieContainerRequest.class);
-    final ArrayList<CookieContainerRequest> anyContainers = 
+    final ArrayList<CookieContainerRequest> anyContainers =
         new ArrayList<CookieContainerRequest>();
-    
+
     Resource taskAsk = Resource.newInstance(1024, 1);
-    scheduler.allocateTask(mockTaskPri1, taskAsk, null, 
+    scheduler.allocateTask(mockTaskPri1, taskAsk, null,
                            null, pri1, null);
     verify(mockRMClient, times(1)).
         addContainerRequest(requestCaptor.capture());
     anyContainers.add(requestCaptor.getValue());
-    scheduler.allocateTask(mockTaskPri3, taskAsk, null, 
+    scheduler.allocateTask(mockTaskPri3, taskAsk, null,
                            null, pri3, null);
     verify(mockRMClient, times(2)).
     addContainerRequest(requestCaptor.capture());
     anyContainers.add(requestCaptor.getValue());
-    scheduler.allocateTask(mockTaskPri3Kill, taskAsk, null, 
+    scheduler.allocateTask(mockTaskPri3Kill, taskAsk, null,
                            null, pri3, null);
     verify(mockRMClient, times(3)).
     addContainerRequest(requestCaptor.capture());
     anyContainers.add(requestCaptor.getValue());
-    
+
     Resource freeResource = Resource.newInstance(500, 0);
     when(mockRMClient.getAvailableResources()).thenReturn(freeResource);
     scheduler.getProgress();
     Assert.assertEquals(totalResource, scheduler.getTotalResources());
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
-    
-    final List<ArrayList<CookieContainerRequest>> anyList = 
+
+    final List<ArrayList<CookieContainerRequest>> anyList =
         new LinkedList<ArrayList<CookieContainerRequest>>();
-    final List<ArrayList<CookieContainerRequest>> emptyList = 
+    final List<ArrayList<CookieContainerRequest>> emptyList =
         new LinkedList<ArrayList<CookieContainerRequest>>();
 
     anyList.add(anyContainers);
@@ -484,43 +486,43 @@ public class TestTaskScheduler {
         });
     scheduler.onContainersAllocated(containers);
     Assert.assertEquals(3072, scheduler.allocatedResources.getMemory());
-    Assert.assertEquals(mockCId1, 
+    Assert.assertEquals(mockCId1,
         scheduler.taskAllocations.get(mockTaskPri1).getId());
-    Assert.assertEquals(mockCId2, 
+    Assert.assertEquals(mockCId2,
         scheduler.taskAllocations.get(mockTaskPri3).getId());
-    Assert.assertEquals(mockCId3, 
+    Assert.assertEquals(mockCId3,
         scheduler.taskAllocations.get(mockTaskPri3Kill).getId());
 
     // no preemption
     scheduler.getProgress();
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
 
-    scheduler.allocateTask(mockTaskPri3Wait, taskAsk, null, 
+    scheduler.allocateTask(mockTaskPri3Wait, taskAsk, null,
                            null, pri3, null);
     // no preemption
     scheduler.getProgress();
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
-    
-    scheduler.allocateTask(mockTaskPri2, taskAsk, null, 
+
+    scheduler.allocateTask(mockTaskPri2, taskAsk, null,
                            null, pri2, null);
 
     // mockTaskPri3Kill gets preempted
     scheduler.getProgress();
     verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
-    verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3);    
-    
-    AppFinalStatus finalStatus = 
+    verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3);
+
+    AppFinalStatus finalStatus =
         new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl);
     when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
     scheduler.stop();
     scheduler.close();
   }
-  
+
   @SuppressWarnings("unchecked")
   @Test
   public void testLocalityMatching() throws IOException {
 
-    RackResolver.init(new TezConfiguration());
+    RackResolver.init(new Configuration());
     TaskSchedulerAppCallback appClient = mock(TaskSchedulerAppCallback.class);
     AMRMClientAsync<CookieContainerRequest> amrmClient = mock(AMRMClientAsync.class);
     TaskScheduler taskScheduler = new TaskScheduler(appClient, "host", 0, "",

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index ba573b0..b9e907a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -37,6 +37,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -53,7 +54,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.TezTaskContext;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskAttemptListener;
@@ -72,12 +72,12 @@ import org.mockito.ArgumentCaptor;
 
 public class TestAMContainer {
 
-  
+
   @Test
   // Assign before launch.
   public void tetSingleSuccessfulTaskFlow() {
     WrappedContainer wc = new WrappedContainer();
-    
+
     wc.verifyState(AMContainerState.ALLOCATED);
 
     // Launch request.
@@ -85,14 +85,14 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.LAUNCHING);
     // 1 Launch request.
     wc.verifyCountAndGetOutgoingEvents(1);
-    
+
     // Assign task.
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.verifyState(AMContainerState.LAUNCHING);
     wc.verifyNoOutgoingEvents();
     assertEquals(wc.taskAttemptID, wc.amContainer.getQueuedTaskAttempts()
         .get(0));
-    
+
     // Container Launched
     wc.containerLaunched();
     wc.verifyState(AMContainerState.IDLE);
@@ -102,7 +102,7 @@ public class TestAMContainer {
     assertNull(wc.amContainer.getRunningTaskAttempt());
     verify(wc.tal).registerRunningContainer(wc.containerID);
     verify(wc.chh).register(wc.containerID);
-    
+
     // Pull TA
     AMContainerTask pulledTask = wc.pullTaskToRun();
     wc.verifyState(AMContainerState.RUNNING);
@@ -129,12 +129,12 @@ public class TestAMContainer {
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
     assertFalse(wc.amContainer.isInErrorState());
   }
-  
+
   @Test
   // Assign after launch.
   public void testSingleSuccessfulTaskFlow2() {
     WrappedContainer wc = new WrappedContainer();
-    
+
     wc.verifyState(AMContainerState.ALLOCATED);
 
     // Launch request.
@@ -142,7 +142,7 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.LAUNCHING);
     // 1 Launch request.
     wc.verifyCountAndGetOutgoingEvents(1);
-    
+
     // Container Launched
     wc.containerLaunched();
     wc.verifyState(AMContainerState.IDLE);
@@ -150,7 +150,7 @@ public class TestAMContainer {
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
     verify(wc.tal).registerRunningContainer(wc.containerID);
     verify(wc.chh).register(wc.containerID);
-    
+
     // Assign task.
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.verifyState(AMContainerState.IDLE);
@@ -181,11 +181,11 @@ public class TestAMContainer {
     wc.verifyCountAndGetOutgoingEvents(1);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
-    
+
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
     assertFalse(wc.amContainer.isInErrorState());
   }
-  
+
   @Test
   public void testSingleSuccessfulTaskFlowStopRequest() {
     WrappedContainer wc = new WrappedContainer();
@@ -204,7 +204,7 @@ public class TestAMContainer {
     wc.verifyCountAndGetOutgoingEvents(1);
     assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
         NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
-    
+
     wc.nmStopSent();
     wc.verifyState(AMContainerState.STOPPING);
     wc.verifyNoOutgoingEvents();
@@ -221,7 +221,7 @@ public class TestAMContainer {
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
     assertFalse(wc.amContainer.isInErrorState());
   }
-  
+
   @Test
   public void testSingleSuccessfulTaskFlowFailedNMStopRequest() {
     WrappedContainer wc = new WrappedContainer();
@@ -240,14 +240,14 @@ public class TestAMContainer {
     wc.verifyCountAndGetOutgoingEvents(1);
     assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
         NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
-    
+
     wc.nmStopFailed();
     wc.verifyState(AMContainerState.STOPPING);
     // Event to ask a RM container release.
     wc.verifyCountAndGetOutgoingEvents(1);
     assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
         AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
-    
+
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     // 1 Scheduler completed event.
@@ -260,21 +260,21 @@ public class TestAMContainer {
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
     assertFalse(wc.amContainer.isInErrorState());
   }
-  
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testMultipleAllocationsAtIdle() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
-    
+
     wc.launchContainer();
     wc.containerLaunched();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.verifyState(AMContainerState.IDLE);
-    
+
     TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
-    
+
     wc.verifyState(AMContainerState.STOP_REQUESTED);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
@@ -285,7 +285,7 @@ public class TestAMContainer {
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING);
     assertTrue(wc.amContainer.isInErrorState());
-    
+
     wc.nmStopSent();
     wc.containerCompleted();
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
@@ -294,12 +294,12 @@ public class TestAMContainer {
         TaskAttemptEventType.TA_CONTAINER_TERMINATED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED,
         AMSchedulerEventType.S_CONTAINER_COMPLETED);
-    
+
     assertNull(wc.amContainer.getRunningTaskAttempt());
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
     assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
   }
-  
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testAllocationAtRunning() {
@@ -339,7 +339,7 @@ public class TestAMContainer {
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
     assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
   }
-  
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testMultipleAllocationsAtLaunching() {
@@ -469,8 +469,8 @@ public class TestAMContainer {
     List<Event> outgoingEvents;
 
     wc.launchContainer();
-    
-    
+
+
     wc.assignTaskAttempt(wc.taskAttemptID);
 
     wc.containerCompleted();
@@ -482,13 +482,13 @@ public class TestAMContainer {
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         AMSchedulerEventType.S_CONTAINER_COMPLETED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
-    
+
     assertFalse(wc.amContainer.isInErrorState());
-    
+
     // Container launched generated by NM call.
     wc.containerLaunched();
     wc.verifyNoOutgoingEvents();
-    
+
     assertFalse(wc.amContainer.isInErrorState());
   }
 
@@ -499,7 +499,7 @@ public class TestAMContainer {
     List<Event> outgoingEvents;
 
     wc.launchContainer();
-    
+
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.containerLaunched();
     wc.verifyState(AMContainerState.IDLE);
@@ -515,15 +515,15 @@ public class TestAMContainer {
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         AMSchedulerEventType.S_CONTAINER_COMPLETED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
-    
+
     assertFalse(wc.amContainer.isInErrorState());
-    
+
     // Pending pull request. (Ideally, container should be dead at this point
     // and this event should not be generated. Network timeout on NM-RM heartbeat
     // can cause it to be genreated)
     wc.pullTaskToRun();
     wc.verifyNoOutgoingEvents();
-    
+
     assertFalse(wc.amContainer.isInErrorState());
   }
 
@@ -534,7 +534,7 @@ public class TestAMContainer {
     List<Event> outgoingEvents;
 
     wc.launchContainer();
-    
+
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.containerLaunched();
     wc.pullTaskToRun();
@@ -551,15 +551,15 @@ public class TestAMContainer {
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         AMSchedulerEventType.S_CONTAINER_COMPLETED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
-    
+
     assertFalse(wc.amContainer.isInErrorState());
-    
+
     // Pending task complete. (Ideally, container should be dead at this point
     // and this event should not be generated. Network timeout on NM-RM heartbeat
     // can cause it to be genreated)
     wc.taskAttemptSucceeded(wc.taskAttemptID);
     wc.verifyNoOutgoingEvents();
-    
+
     assertFalse(wc.amContainer.isInErrorState());
   }
 
@@ -568,27 +568,27 @@ public class TestAMContainer {
   public void testTaskAssignedToCompletedContainer() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
-    
+
     wc.launchContainer();
     wc.containerLaunched();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(wc.taskAttemptID);
-    
+
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
-    
+
     TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
-    
+
     wc.assignTaskAttempt(taID2);
-    
+
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
-    TaskAttemptEventContainerTerminated ctEvent = 
+    TaskAttemptEventContainerTerminated ctEvent =
         (TaskAttemptEventContainerTerminated) outgoingEvents.get(0);
     assertEquals(taID2, ctEvent.getTaskAttemptID());
-  
+
     // Allocation to a completed Container is considered an error.
     // TODO Is this valid ?
     assertTrue(wc.amContainer.isInErrorState());
@@ -597,7 +597,7 @@ public class TestAMContainer {
   @Test
   public void testTaskPullAtLaunching() {
     WrappedContainer wc = new WrappedContainer();
-    
+
     wc.launchContainer();
     AMContainerTask pulledTask = wc.pullTaskToRun();
     wc.verifyState(AMContainerState.LAUNCHING);
@@ -605,18 +605,18 @@ public class TestAMContainer {
     assertFalse(pulledTask.shouldDie());
     assertNull(pulledTask.getTask());
   }
-  
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testNodeFailedAtIdle() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
-    
+
     wc.launchContainer();
     wc.containerLaunched();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.verifyState(AMContainerState.IDLE);
-    
+
     wc.nodeFailed();
     // Expecting a complete event from the RM
     wc.verifyState(AMContainerState.STOPPING);
@@ -625,7 +625,7 @@ public class TestAMContainer {
         TaskAttemptEventType.TA_NODE_FAILED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
         AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
-    
+
     for (Event event : outgoingEvents) {
       if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
         TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
@@ -641,7 +641,7 @@ public class TestAMContainer {
 
     assertFalse(wc.amContainer.isInErrorState());
   }
-  
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testNodeFailedAtIdleMultipleAttempts() {
@@ -683,12 +683,12 @@ public class TestAMContainer {
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         AMSchedulerEventType.S_CONTAINER_COMPLETED);
-    
+
     assertNull(wc.amContainer.getRunningTaskAttempt());
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
     assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
   }
-  
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testNodeFailedAtRunningMultipleAttempts() {
@@ -740,13 +740,13 @@ public class TestAMContainer {
   public void testNodeFailedAtCompletedMultipleSuccessfulTAs() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
-    
+
     wc.launchContainer();
     wc.containerLaunched();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(wc.taskAttemptID);
-    
+
     TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
     wc.pullTaskToRun();
@@ -755,13 +755,13 @@ public class TestAMContainer {
     wc.nmStopSent();
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
-    
+
     wc.nodeFailed();
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_NODE_FAILED,
         TaskAttemptEventType.TA_NODE_FAILED);
-    
+
     assertNull(wc.amContainer.getRunningTaskAttempt());
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
     assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
@@ -772,13 +772,13 @@ public class TestAMContainer {
   public void testDuplicateCompletedEvents() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
-    
+
     wc.launchContainer();
     wc.containerLaunched();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(wc.taskAttemptID);
-    
+
     TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
     wc.pullTaskToRun();
@@ -787,20 +787,20 @@ public class TestAMContainer {
     wc.nmStopSent();
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
-    
+
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         AMSchedulerEventType.S_CONTAINER_COMPLETED);
-   
+
     wc.containerCompleted();
     wc.verifyNoOutgoingEvents();
   }
-  
-  
+
+
   // TODO Verify diagnostics in most of the tests.
-  
+
   private static class WrappedContainer {
-    
+
     long rmIdentifier = 2000;
     ApplicationId applicationID;
     ApplicationAttemptId appAttemptID;
@@ -812,7 +812,7 @@ public class TestAMContainer {
     Container container;
     ContainerHeartbeatHandler chh;
     TaskAttemptListener tal;
-    
+
     @SuppressWarnings("rawtypes")
     EventHandler eventHandler;
 
@@ -822,13 +822,13 @@ public class TestAMContainer {
     TezVertexID vertexID;
     TezTaskID taskID;
     TezTaskAttemptID taskAttemptID;
-    
+
     TezTaskContext tezTaskContext;
 
     Token<JobTokenIdentifier> jobToken;
-    
+
     public AMContainerImpl amContainer;
-    
+
     @SuppressWarnings("unchecked")
     public WrappedContainer() {
       applicationID = ApplicationId.newInstance(rmIdentifier, 1);
@@ -842,13 +842,13 @@ public class TestAMContainer {
           nodeHttpAddress, resource, priority, null);
 
       chh = mock(ContainerHeartbeatHandler.class);
-      
+
       InetSocketAddress addr = new InetSocketAddress("localhost", 0);
       tal = mock(TaskAttemptListener.class);
       doReturn(addr).when(tal).getAddress();
-      
+
       eventHandler = mock(EventHandler.class);
-      
+
       appContext = mock(AppContext.class);
       doReturn(new HashMap<ApplicationAccessType, String>()).when(appContext)
       .getApplicationACLs();
@@ -860,13 +860,13 @@ public class TestAMContainer {
       vertexID = new TezVertexID(dagID, 1);
       taskID = new TezTaskID(vertexID, 1);
       taskAttemptID = new TezTaskAttemptID(taskID, 1);
-      
+
       tezTaskContext = mock(TezTaskContext.class);
       doReturn(taskAttemptID).when(tezTaskContext).getTaskAttemptId();
 
-      
+
       jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
-      
+
       amContainer = new AMContainerImpl(container, chh, tal,
           appContext);
     }
@@ -879,12 +879,12 @@ public class TestAMContainer {
     public void verifyNoOutgoingEvents() {
       verify(eventHandler, never()).handle(any(Event.class));
     }
-    
+
     /**
      * Returns a list of outgoing events generated by the last incoming event to
-     * the AMContainer. 
+     * the AMContainer.
      * @param invocations number of expected invocations.
-     * 
+     *
      * @return a list of outgoing events from the AMContainer.
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -897,17 +897,17 @@ public class TestAMContainer {
     public void launchContainer() {
       reset(eventHandler);
       amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID,
-          jobToken, new Credentials(), false, new TezConfiguration(),
+          jobToken, new Credentials(), false, new Configuration(),
           new HashMap<String, LocalResource>(), new HashMap<String, String>(),
           null));
     }
-    
+
     public void assignTaskAttempt(TezTaskAttemptID taID) {
       reset(eventHandler);
       amContainer.handle(new AMContainerEventAssignTA(containerID, taID,
-          tezTaskContext)); 
+          tezTaskContext));
     }
-    
+
     public AMContainerTask pullTaskToRun() {
       reset(eventHandler);
       return amContainer.pullTaskContext();
@@ -922,49 +922,49 @@ public class TestAMContainer {
       reset(eventHandler);
       amContainer.handle(new AMContainerEventTASucceeded(containerID, taID));
     }
-    
+
     public void stopRequest() {
       reset(eventHandler);
       amContainer.handle(new AMContainerEventStopRequest(containerID));
     }
-    
+
     public void nmStopSent() {
       reset(eventHandler);
       amContainer.handle(new AMContainerEvent(containerID,
           AMContainerEventType.C_NM_STOP_SENT));
     }
-    
+
     public void nmStopFailed() {
       reset(eventHandler);
       amContainer.handle(new AMContainerEvent(containerID,
           AMContainerEventType.C_NM_STOP_FAILED));
     }
-    
+
     public void containerCompleted() {
       reset(eventHandler);
       ContainerStatus cStatus = ContainerStatus.newInstance(containerID,
           ContainerState.COMPLETE, "", 100);
       amContainer.handle(new AMContainerEventCompleted(cStatus));
     }
-    
+
     public void containerTimedOut() {
       reset(eventHandler);
       amContainer.handle(new AMContainerEvent(containerID,
           AMContainerEventType.C_TIMED_OUT));
     }
-    
+
     public void launchFailed() {
       reset(eventHandler);
       amContainer.handle(new AMContainerEventLaunchFailed(containerID,
           "launchFailed"));
     }
-    
+
     public void nodeFailed() {
       reset(eventHandler);
       amContainer.handle(new AMContainerEventNodeFailed(containerID,
           "nodeFailed"));
     }
-    
+
     public void verifyState(AMContainerState state) {
       assertEquals(
           "Expected state: " + state + ", but found: " + amContainer.getState(),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 94c26b5..62cc96c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -476,7 +476,8 @@ public class MRRSleepJob extends Configured implements Tool {
       finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
           NullOutputFormat.class.getName());
 
-      if (iReduceStagesCount != 0) {
+      if (iReduceStagesCount > 0
+          && numIReducer > 0) {
         MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
             intermediateReduceStageConfs[iReduceStagesCount-1]);
       } else {
@@ -639,6 +640,7 @@ public class MRRSleepJob extends Configured implements Tool {
           MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
               "mapreduce.job.reduces"), numIReducer);
     }
+
     Job job = Job.getInstance(conf, "sleep");
     job.setNumReduceTasks(numReducer);
     job.setJarByClass(MRRSleepJob.class);


Mime
View raw message