tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] git commit: TEZ-346. AM should not use TezConfiguration instances. Contributed by Hitesh Shah.
Date Tue, 13 Aug 2013 21:54:34 GMT
TEZ-346. AM should not use TezConfiguration instances. Contributed by
Hitesh Shah.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a9b6ab16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a9b6ab16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a9b6ab16

Branch: refs/heads/master
Commit: a9b6ab16b8a7bb527881479886bc80750ba01f97
Parents: 8343da9
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Aug 13 14:53:58 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Aug 13 14:53:58 2013 -0700

----------------------------------------------------------------------
 docs/pom.xml                                    |  34 ++--
 pom.xml                                         |   3 +-
 .../apache/tez/dag/api/TezConfiguration.java    |  26 +--
 .../tez/dag/api/client/DAGClientServer.java     |  35 ++--
 .../java/org/apache/tez/dag/app/AppContext.java |  22 +--
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 136 ++++++++------
 .../java/org/apache/tez/dag/app/dag/DAG.java    |  16 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  43 ++---
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  22 +--
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  11 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   7 +-
 .../app/rm/AMSchedulerEventTALaunchRequest.java |  34 ++--
 .../apache/tez/dag/app/rm/TaskScheduler.java    | 180 +++++++++----------
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  74 ++++----
 .../AMContainerEventLaunchRequest.java          |  14 +-
 .../app/rm/container/AMContainerHelpers.java    |  16 +-
 .../java/org/apache/tez/dag/utils/DAGApps.java  |  33 ----
 .../apache/tez/dag/utils/TezEngineChildJVM.java |  38 ++--
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  13 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  56 +++---
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  14 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  12 +-
 .../tez/dag/app/rm/TestContainerReuse.java      |  53 +++---
 .../tez/dag/app/rm/TestTaskScheduler.java       | 150 ++++++++--------
 .../dag/app/rm/container/TestAMContainer.java   | 178 +++++++++---------
 .../tez/mapreduce/examples/MRRSleepJob.java     |   4 +-
 26 files changed, 616 insertions(+), 608 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/docs/pom.xml
----------------------------------------------------------------------
diff --git a/docs/pom.xml b/docs/pom.xml
index 8f18d74..59b2764 100644
--- a/docs/pom.xml
+++ b/docs/pom.xml
@@ -26,7 +26,7 @@
 
     <groupId>org.apache.tez</groupId>
     <version>0.2.0-SNAPSHOT</version>
-    <artifactId>tez</artifactId>
+    <artifactId>tez-docs</artifactId>
     <packaging>pom</packaging>
 
     <licenses>
@@ -82,9 +82,9 @@
            <role>PMC</role>
         </roles>
         <organization>
-           Hortonworks                
-        </organization>            
-      </developer>          
+           Hortonworks
+        </organization>
+      </developer>
       <developer>
         <id>bikas</id>
         <name>Bikas Saha</name>
@@ -94,9 +94,9 @@
            <role>PMC</role>
         </roles>
         <organization>
-           Hortonworks                
-        </organization>            
-      </developer>          
+           Hortonworks
+        </organization>
+      </developer>
       <developer>
         <id>hitesh</id>
         <name>Hitesh Shah</name>
@@ -106,9 +106,9 @@
            <role>PMC</role>
         </roles>
         <organization>
-           Hortonworks                
-        </organization>            
-      </developer>          
+           Hortonworks
+        </organization>
+      </developer>
       <developer>
         <id>mliddell</id>
         <name>Mike Liddell</name>
@@ -118,9 +118,9 @@
            <role>PMC</role>
         </roles>
         <organization>
-           Microsoft                
-        </organization>            
-      </developer>          
+           Microsoft
+        </organization>
+      </developer>
       <developer>
         <id>sseth</id>
         <name>Siddharth Seth</name>
@@ -130,9 +130,9 @@
            <role>PMC</role>
         </roles>
         <organization>
-           Hortonworks                
-        </organization>            
-      </developer>          
+           Hortonworks
+        </organization>
+      </developer>
     </developers>
 
     <organization>
@@ -189,7 +189,7 @@
         </plugin>
       </plugins>
     </reporting>
-    
+
   <distributionManagement>
     <site>
       <id>apache-website</id>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 847b1d8..04c6793 100644
--- a/pom.xml
+++ b/pom.xml
@@ -226,7 +226,7 @@
       <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro</artifactId>
-        <version>1.5.3</version>
+        <version>1.7.4</version>
       </dependency>
       <dependency>
        <groupId>com.google.protobuf</groupId>
@@ -247,6 +247,7 @@
     <module>tez-mapreduce-tests</module>
     <module>tez-dag</module>
     <module>tez-dist</module>
+    <module>docs</module>
   </modules>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index fc6219c..63c3f35 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -41,7 +41,7 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_TASK_PREFIX = TEZ_PREFIX + "task.";
 
   public static final String TEZ_AM_STAGING_DIR = TEZ_PREFIX + "staging-dir";
-  public static final String TEZ_AM_STAGING_DIR_DEFAULT = "/tmp/hadoop-yarn/staging";
+  public static final String TEZ_AM_STAGING_DIR_DEFAULT = "/tmp/tez/staging";
 
   // TODO Should not be required once all tokens are handled via AppSubmissionContext
   public static final String JOB_SUBMIT_DIR = TEZ_PREFIX + "jobSubmitDir";
@@ -113,16 +113,16 @@ public class TezConfiguration extends Configuration {
           + "slowstart-vertex-scheduler.max-src-fraction";
   public static final float
           TEZ_AM_SLOWSTART_VERTEX_SCHEDULER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
-  
-  public static final String 
+
+  public static final String
           TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION = TEZ_AM_PREFIX
           + "slowstart-dag-scheduler.min-resource-fraction";
-  public static final float 
+  public static final float
           TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION_DEFAULT = 0.5f;
-  
+
   public static final String TEZ_AM_AGGRESSIVE_SCHEDULING = TEZ_AM_PREFIX +
       "aggressive.scheduling";
-  public static boolean TEZ_AM_AGGRESSIVE_SCHEDULING_DEFAULT = false;
+  public static boolean TEZ_AM_AGGRESSIVE_SCHEDULING_DEFAULT = true;
 
   /**
    * The complete path to the serialized dag plan file
@@ -132,21 +132,21 @@ public class TezConfiguration extends Configuration {
    */
   public static final String TEZ_AM_PLAN_REMOTE_PATH = TEZ_AM_PREFIX
       + "dag-am-plan.remote.path";
-  
+
   public static final String TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX = TEZ_AM_PREFIX
       + "am-rm.heartbeat.interval-ms.max";
   public static final int TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT = 1000;
-  
+
   public static final String TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX = TEZ_TASK_PREFIX
       + "get-task.sleep.interval-ms.max";
   public static final int TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT = 500;
-  
+
   /**
    * Configuration to specify whether container should be reused.
    */
   public static final String TEZ_AM_CONTAINER_REUSE_ENABLED = TEZ_AM_PREFIX
       + "container.reuse.enabled";
-  public static final boolean TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT = false;
+  public static final boolean TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT = true;
 
   /**
    * Whether to reuse containers for rack local tasks. Active only if reuse is
@@ -177,12 +177,12 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_CONTAINER_LOG_FILE_NAME = "syslog";
   public static final String TEZ_CONTAINER_ERR_FILE_NAME = "stderr";
   public static final String TEZ_CONTAINER_OUT_FILE_NAME = "stdout";
-      
-  
+
+
   public static final String TEZ_LIB_URIS =
       TEZ_PREFIX + "lib.uris";
 
   public static final String TEZ_APPLICATION_TYPE = "TEZ-MR*";
-  
+
   public static final String LOCAL_FRAMEWORK_NAME = "local-tez";
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
index 9511871..2b5fb6d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
@@ -40,7 +40,7 @@ import com.google.protobuf.BlockingService;
 
 public class DAGClientServer extends AbstractService {
   static final Log LOG = LogFactory.getLog(DAGClientServer.class);
-      
+
   DAGClientHandler realInstance;
   Server server;
   InetSocketAddress bindAddress;
@@ -49,26 +49,25 @@ public class DAGClientServer extends AbstractService {
     super("DAGClientRPCServer");
     this.realInstance = realInstance;
   }
-  
+
   @Override
   public void serviceStart() {
     try {
-      assert getConfig() instanceof TezConfiguration;
-      TezConfiguration conf = (TezConfiguration) getConfig();
+      Configuration conf = getConfig();
       InetSocketAddress addr = new InetSocketAddress(0);
-      
-      DAGClientAMProtocolBlockingPBServerImpl service = 
+
+      DAGClientAMProtocolBlockingPBServerImpl service =
           new DAGClientAMProtocolBlockingPBServerImpl(realInstance);
-      
-      BlockingService blockingService = 
+
+      BlockingService blockingService =
                 DAGClientAMProtocol.newReflectiveBlockingService(service);
-      
-      int numHandlers = conf.getInt(TezConfiguration.TEZ_AM_CLIENT_THREAD_COUNT, 
+
+      int numHandlers = conf.getInt(TezConfiguration.TEZ_AM_CLIENT_THREAD_COUNT,
                           TezConfiguration.TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT);
-      
+
       String portRange = conf.get(TezConfiguration.TEZ_AM_CLIENT_AM_PORT_RANGE);
-      
-      server = createServer(DAGClientAMProtocolBlockingPB.class, addr, conf, 
+
+      server = createServer(DAGClientAMProtocolBlockingPB.class, addr, conf,
                             numHandlers, blockingService, portRange);
       server.start();
       bindAddress = NetUtils.getConnectAddress(server);
@@ -78,20 +77,20 @@ public class DAGClientServer extends AbstractService {
       throw new TezUncheckedException(e);
     }
   }
-  
+
   @Override
   public void serviceStop() {
     if(server != null) {
       server.stop();
     }
   }
-  
+
   public InetSocketAddress getBindAddress() {
     return bindAddress;
   }
-  
-  private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, 
-      int numHandlers, 
+
+  private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
+      int numHandlers,
       BlockingService blockingService, String portRangeConfig) throws IOException {
     RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
     RPC.Server server = new RPC.Builder(conf).setProtocol(pbProtocol)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 8f18993..5f25998 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -21,12 +21,12 @@ package org.apache.tez.dag.app;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
@@ -39,11 +39,11 @@ import org.apache.tez.dag.records.TezDAGID;
  */
 @InterfaceAudience.Private
 public interface AppContext {
-  
+
   DAGAppMaster getAppMaster();
-  
-  TezConfiguration getConf();
-  
+
+  Configuration getConf();
+
   ApplicationId getApplicationID();
 
   TezDAGID getDAGID();
@@ -51,7 +51,7 @@ public interface AppContext {
   ApplicationAttemptId getApplicationAttemptId();
 
   String getApplicationName();
-  
+
   Map<ApplicationAccessType, String> getApplicationACLs();
 
   long getStartTime();
@@ -66,12 +66,12 @@ public interface AppContext {
   EventHandler getEventHandler();
 
   Clock getClock();
-  
+
   ClusterInfo getClusterInfo();
-  
+
   AMContainerMap getAllContainers();
-  
+
   AMNodeMap getAllNodes();
-  
-  TaskSchedulerEventHandler getTaskScheduler();  
+
+  TaskSchedulerEventHandler getTaskScheduler();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 58f2e9d..4697d38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -150,7 +150,7 @@ public class DAGAppMaster extends AbstractService {
   private AMContainerMap containers;
   private AMNodeMap nodes;
   private AppContext context;
-  private TezConfiguration conf;
+  private Configuration conf;
   private Dispatcher dispatcher;
   private ContainerLauncher containerLauncher;
   private TaskCleaner taskCleaner;
@@ -163,8 +163,8 @@ public class DAGAppMaster extends AbstractService {
   private VertexEventDispatcher vertexEventDispatcher;
   private TaskSchedulerEventHandler taskSchedulerEventHandler;
   private HistoryEventHandler historyEventHandler;
-  
-  private DAGAppMasterShutdownHandler shutdownHandler = 
+
+  private DAGAppMasterShutdownHandler shutdownHandler =
       new DAGAppMasterShutdownHandler();
 
   private DAGAppMasterState state;
@@ -175,9 +175,9 @@ public class DAGAppMaster extends AbstractService {
   private DAG dag;
   private Credentials fsTokens = new Credentials(); // Filled during init
   private UserGroupInformation currentUser; // Will be setup during init
-  
+
   // must be LinkedHashMap to preserve order of service addition
-  Map<Service, ServiceWithDependency> services = 
+  Map<Service, ServiceWithDependency> services =
       new LinkedHashMap<Service, ServiceWithDependency>();
 
 
@@ -207,13 +207,11 @@ public class DAGAppMaster extends AbstractService {
   }
 
   @Override
-  public void serviceInit(final Configuration tezConf) throws Exception {
+  public void serviceInit(final Configuration conf) throws Exception {
 
     this.state = DAGAppMasterState.INITED;
 
-    assert tezConf instanceof TezConfiguration;
-
-    this.conf = (TezConfiguration) tezConf;
+    this.conf = conf;
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
     downloadTokensAndSetupUGI(conf);
@@ -308,8 +306,8 @@ public class DAGAppMaster extends AbstractService {
       }
       break;
     case DAG_FINISHED:
-      setStateOnDAGCompletion();      
-      LOG.info("Shutting down on completion of dag:" + 
+      setStateOnDAGCompletion();
+      LOG.info("Shutting down on completion of dag:" +
               ((DAGAppMasterEventDAGFinished)event).getDAGId().toString());
       shutdownHandler.shutdown();
       break;
@@ -335,19 +333,19 @@ public class DAGAppMaster extends AbstractService {
         LOG.info("Ignoring multiple shutdown events");
         return;
       }
-      
+
       LOG.info("Handling DAGAppMaster shutdown");
-      
+
       AMShutdownRunnable r = new AMShutdownRunnable();
       Thread t = new Thread(r, "AMShutdownThread");
       t.start();
     }
-    
+
     private class AMShutdownRunnable implements Runnable {
       @Override
       public void run() {
         // TODO:currently just wait for some time so clients can know the
-        // final states. Will be removed once RM come on. TEZ-160.        
+        // final states. Will be removed once RM come on. TEZ-160.
         try {
           Thread.sleep(5000);
         } catch (InterruptedException e) {
@@ -391,7 +389,7 @@ public class DAGAppMaster extends AbstractService {
    * Obtain the tokens needed by the job and put them in the UGI
    * @param conf
    */
-  protected void downloadTokensAndSetupUGI(TezConfiguration conf) {
+  protected void downloadTokensAndSetupUGI(Configuration conf) {
     // TODO remove - TEZ-71
     try {
       this.currentUser = UserGroupInformation.getCurrentUser();
@@ -424,20 +422,20 @@ public class DAGAppMaster extends AbstractService {
 
   protected void addIfService(Object object, boolean addDispatcher) {
     if (object instanceof Service) {
-      Service service = (Service) object; 
-      ServiceWithDependency sd = new ServiceWithDependency(service); 
+      Service service = (Service) object;
+      ServiceWithDependency sd = new ServiceWithDependency(service);
       services.put(service, sd);
       if(addDispatcher) {
         addIfServiceDependency(service, dispatcher);
       }
     }
   }
-  
+
   protected void addIfServiceDependency(Object object, Object dependency) {
     if (object instanceof Service && dependency instanceof Service) {
       Service service = (Service) object;
       Service dependencyService = (Service) dependency;
-      ServiceWithDependency sd = services.get(service); 
+      ServiceWithDependency sd = services.get(service);
       sd.dependencies.add(dependencyService);
       dependencyService.registerServiceListener(sd);
     }
@@ -451,18 +449,19 @@ public class DAGAppMaster extends AbstractService {
   }
 
   protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
-      TezConfiguration conf) {
+      Configuration conf) {
     TaskHeartbeatHandler thh = new TaskHeartbeatHandler(context, conf.getInt(
         TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
         TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT));
     return thh;
   }
 
-  protected ContainerHeartbeatHandler createContainerHeartbeatHandler(AppContext context,
-      TezConfiguration conf) {
-    ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context, conf.getInt(
-        TezConfiguration.TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT,
-        TezConfiguration.TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT));
+  protected ContainerHeartbeatHandler createContainerHeartbeatHandler(
+      AppContext context, Configuration conf) {
+    ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context,
+        conf.getInt(
+            TezConfiguration.TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT,
+            TezConfiguration.TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT));
     return chh;
   }
 
@@ -595,10 +594,10 @@ public class DAGAppMaster extends AbstractService {
       if(!dagId.equals(dag.getID())) {
         throw new TezException("Unknown dagId: " + dagIdStr);
       }
-      
+
       return dag;
     }
-    
+
     public void tryKillDAG(String dagIdStr)
         throws TezException {
       DAG dag = getDAG(dagIdStr);
@@ -611,22 +610,22 @@ public class DAGAppMaster extends AbstractService {
   private class RunningAppContext implements AppContext {
 
     private DAG dag;
-    private final TezConfiguration conf;
+    private final Configuration conf;
     private final ClusterInfo clusterInfo = new ClusterInfo();
     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock rLock = rwLock.readLock();
     private final Lock wLock = rwLock.writeLock();
-    public RunningAppContext(TezConfiguration config) {
+    public RunningAppContext(Configuration config) {
       this.conf = config;
     }
-    
+
     @Override
     public DAGAppMaster getAppMaster() {
       return DAGAppMaster.this;
     }
-    
+
     @Override
-    public TezConfiguration getConf() {
+    public Configuration getConf() {
       return conf;
     }
 
@@ -689,7 +688,7 @@ public class DAGAppMaster extends AbstractService {
     public AMNodeMap getAllNodes() {
       return nodes;
     }
-    
+
     @Override
     public TaskSchedulerEventHandler getTaskScheduler() {
       return taskSchedulerEventHandler;
@@ -703,7 +702,7 @@ public class DAGAppMaster extends AbstractService {
       }
       return taskSchedulerEventHandler.getApplicationAcls();
     }
-    
+
     @Override
     public TezDAGID getDAGID() {
       try {
@@ -726,7 +725,7 @@ public class DAGAppMaster extends AbstractService {
         wLock.unlock();
       }
     }
-    
+
   }
 
   private class ServiceWithDependency implements ServiceStateChangeListener {
@@ -737,14 +736,15 @@ public class DAGAppMaster extends AbstractService {
     List<Service> dependencies = new ArrayList<Service>();
     AtomicInteger dependenciesStarted = new AtomicInteger(0);
     volatile boolean canStart = false;
+    volatile boolean dependenciesFailed = false;
 
     @Override
     public void stateChanged(Service dependency) {
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Service dependency: " + dependency.getName() + " notify" + 
+        LOG.debug("Service dependency: " + dependency.getName() + " notify" +
                   " for service: " + service.getName());
       }
-      if(dependency.isInState(Service.STATE.STARTED)) {
+      if (dependency.isInState(Service.STATE.STARTED)) {
         if(dependenciesStarted.incrementAndGet() == dependencies.size()) {
           synchronized(this) {
             if(LOG.isDebugEnabled()) {
@@ -754,14 +754,30 @@ public class DAGAppMaster extends AbstractService {
             this.notifyAll();
           }
         }
+      } else if (!service.isInState(Service.STATE.STARTED)
+          && dependency.getFailureState() != null) {
+        synchronized(this) {
+          dependenciesFailed = true;
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("Service: " + service.getName() + " will fail to start"
+                + " as dependent service " + dependency.getName()
+                + " failed to start");
+          }
+          this.notifyAll();
+        }
       }
     }
-    
+
     void start() throws InterruptedException {
       if(dependencies.size() > 0) {
         synchronized(this) {
           while(!canStart) {
             this.wait(1000*60*3L);
+            if (dependenciesFailed) {
+              throw new TezUncheckedException("Skipping service start for "
+                  + service.getName()
+                  + " as dependencies failed to start");
+            }
           }
         }
       }
@@ -770,8 +786,8 @@ public class DAGAppMaster extends AbstractService {
       }
       for(Service dependency : dependencies) {
         if(!dependency.isInState(Service.STATE.STARTED)){
-          LOG.info("Service: " + service.getName() + " not started because " 
-                   + " service: " + dependency.getName() + 
+          LOG.info("Service: " + service.getName() + " not started because "
+                   + " service: " + dependency.getName() +
                    " is in state: " + dependency.getServiceState());
           return;
         }
@@ -779,7 +795,7 @@ public class DAGAppMaster extends AbstractService {
       service.start();
     }
   }
-  
+
   private class ServiceThread extends Thread {
     final ServiceWithDependency serviceWithDependency;
     Throwable error = null;
@@ -787,10 +803,10 @@ public class DAGAppMaster extends AbstractService {
       this.serviceWithDependency = serviceWithDependency;
       this.setName("ServiceThread:" + serviceWithDependency.service.getName());
     }
-    
+
     public void run() {
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Starting thread " + serviceWithDependency.service.getName()); 
+        LOG.debug("Starting thread " + serviceWithDependency.service.getName());
       }
       long start = System.currentTimeMillis();
       try {
@@ -799,10 +815,14 @@ public class DAGAppMaster extends AbstractService {
         error = t;
       } finally {
         if(LOG.isDebugEnabled()) {
-          LOG.debug("Service: " + serviceWithDependency.service.getName() + 
+          LOG.debug("Service: " + serviceWithDependency.service.getName() +
               " started in " + (System.currentTimeMillis() - start) + "ms");
         }
       }
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Service thread completed for "
+            + serviceWithDependency.service.getName());
+      }
     }
   }
 
@@ -819,16 +839,20 @@ public class DAGAppMaster extends AbstractService {
         ServiceThread st = new ServiceThread(sd);
         threads.add(st);
       }
+
       for(ServiceThread st : threads) {
         st.start();
       }
       for(ServiceThread st : threads) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Waiting for service thread to join for " + st.getName());
+        }
         st.join();
         if(st.error != null && firstError == null) {
-            firstError = st.error;
+          firstError = st.error;
         }
       }
-      
+
       if(firstError != null) {
         throw ServiceStateException.convert(firstError);
       }
@@ -839,8 +863,8 @@ public class DAGAppMaster extends AbstractService {
       e.printStackTrace();
     }
   }
-  
-  void initServices(TezConfiguration conf) {
+
+  void initServices(Configuration conf) {
     for (ServiceWithDependency sd : services.values()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Initing service : " + sd.service);
@@ -848,7 +872,7 @@ public class DAGAppMaster extends AbstractService {
       sd.service.init(conf);
     }
   }
-  
+
   void stopServices() {
     // stop in reverse order of start
     List<Service> serviceList = new ArrayList<Service>(services.size());
@@ -871,7 +895,7 @@ public class DAGAppMaster extends AbstractService {
       throw ServiceStateException.convert(firstException);
     }
   }
-  
+
   @SuppressWarnings("unchecked")
   @Override
   public void serviceStart() throws Exception {
@@ -892,7 +916,7 @@ public class DAGAppMaster extends AbstractService {
     dispatcher.getEventHandler().handle(
         new DAGHistoryEvent(startEvent));
   }
-  
+
   @Override
   public void serviceStop() throws Exception {
     stopServices();
@@ -974,9 +998,9 @@ public class DAGAppMaster extends AbstractService {
 
       long appSubmitTime = Long.parseLong(appSubmitTimeStr);
 
-      TezConfiguration conf = new TezConfiguration(new YarnConfiguration());
+      Configuration conf = new Configuration(new YarnConfiguration());
       TezUtils.addUserSpecifiedTezConfiguration(conf);
-      
+
       String jobUserName = System
           .getenv(ApplicationConstants.Environment.USER.name());
 
@@ -1031,7 +1055,7 @@ public class DAGAppMaster extends AbstractService {
       appMaster.stop();
     }
   }
-  
+
   private void startDAG() throws IOException {
     FileInputStream dagPBBinaryStream = null;
     try {
@@ -1090,7 +1114,7 @@ public class DAGAppMaster extends AbstractService {
 
   // TODO XXX Does this really need to be a YarnConfiguration ?
   protected static void initAndStartAppMaster(final DAGAppMaster appMaster,
-      final TezConfiguration conf, String jobUserName) throws IOException,
+      final Configuration conf, String jobUserName) throws IOException,
       InterruptedException {
     Credentials credentials =
         UserGroupInformation.getCurrentUser().getCredentials();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 1d3f6a9..9fd0115 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -21,18 +21,18 @@ package org.apache.tez.dag.app.dag;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 
 /**
- * Main interface to interact with the job.  
+ * Main interface to interact with the job.
  */
 public interface DAG {
 
@@ -44,7 +44,7 @@ public interface DAG {
   /**
    * Get all the counters of this DAG. This includes job-counters aggregated
    * together with the counters of each task. This creates a clone of the
-   * Counters, so use this judiciously.  
+   * Counters, so use this judiciously.
    * @return job-counters and aggregate task-counters
    */
   TezCounters getAllCounters();
@@ -58,15 +58,15 @@ public interface DAG {
   boolean isUber();
   String getUserName();
   String getQueueName();
-  
-  TezConfiguration getConf();
-  
+
+  Configuration getConf();
+
   DAGPlan getJobPlan();
   DAGStatusBuilder getDAGStatus();
   VertexStatusBuilder getVertexStatus(String vertexName);
-  
+
   /**
-   * @return the ACLs for this job for each type of JobACL given. 
+   * @return the ACLs for this job for each type of JobACL given.
    */
   Map<ApplicationAccessType, String> getJobACLs();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 4580260..b9c40fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -135,7 +136,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private Object fullCountersLock = new Object();
   private TezCounters fullCounters = null;
 
-  public final TezConfiguration conf;
+  public final Configuration conf;
   private final DAGPlan jobPlan;
 
   private final List<String> diagnostics = new ArrayList<String>();
@@ -225,10 +226,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               DAGState.TERMINATING,
               DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-          
+
               // Ignore-able events
           .addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
-              EnumSet.of(DAGEventType.DAG_KILL, 
+              EnumSet.of(DAGEventType.DAG_KILL,
                          DAGEventType.DAG_SCHEDULER_UPDATE))
 
           // Transitions from SUCCEEDED state
@@ -311,7 +312,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private long finishTime;
 
   public DAGImpl(TezDAGID dagId,
-      TezConfiguration conf,
+      Configuration conf,
       DAGPlan jobPlan,
       EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
@@ -358,7 +359,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   // TODO maybe removed after TEZ-74
   @Override
-  public TezConfiguration getConf() {
+  public Configuration getConf() {
     return conf;
   }
 
@@ -478,7 +479,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       this.readLock.unlock();
     }
   }
-  
+
   @Override
   public Map<TezVertexID, Vertex> getVertices() {
     synchronized (tasksSyncHandle) {
@@ -652,7 +653,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           + ", numVertices=" + dag.numVertices
           );
     }
-    
+
     if (dag.numCompletedVertices == dag.numVertices) {
       //Only succeed if vertices complete successfully and no terminationCause is registered.
       if(dag.numSuccessfulVertices == dag.numVertices && dag.terminationCause == null) {
@@ -757,10 +758,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       readLock.unlock();
     }
   }
-  
+
   /**
    * Set the terminationCause if it had not yet been set.
-   * 
+   *
    * @param trigger The trigger
    * @return true if setting the value succeeded.
    */
@@ -771,7 +772,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
     return false;
   }
-  
+
   DAGTerminationCause getTerminationCause() {
     readLock.lock();
     try {
@@ -790,7 +791,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     // TODO ApplicationACLs
     return null;
   }
-  
+
   // TODO Recovery
   /*
   @Override
@@ -1037,15 +1038,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       this.fullCounters.incrAllCounters(v.getAllCounters());
     }
   }
-  
+
   /**
    * Set the terminationCause and send a kill-message to all vertices.
-   * The vertex-kill messages are only sent once. 
+   * The vertex-kill messages are only sent once.
    * @param the trigger that is causing the DAG to transition to KILLED/FAILED
    * @param event The type of kill event to send to the vertices.
    */
   void enactKill(DAGTerminationCause dagTerminationCause, VertexTerminationCause vertexTerminationCause) {
-    
+
     if(trySetTerminationCause(dagTerminationCause)){
       for (Vertex v : vertices.values()) {
         eventHandler.handle(
@@ -1089,7 +1090,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       //job.metrics.endRunningJob(job);
     }
 
-   
+
   }
 
   private static class VertexCompletedTransition implements
@@ -1098,7 +1099,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     @Override
     public DAGState transition(DAGImpl job, DAGEvent event) {
       boolean forceTransitionToKillWait = false;
-      
+
       DAGEventVertexCompleted vertexEvent = (DAGEventVertexCompleted) event;
       if (LOG.isDebugEnabled()) {
         LOG.debug("Received a vertex completion event"
@@ -1110,12 +1111,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
         vertexSucceeded(job, vertex);
         job.dagScheduler.vertexCompleted(vertex);
-      } 
+      }
       else if (vertexEvent.getVertexState() == VertexState.FAILED) {
         job.enactKill(DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
         vertexFailed(job, vertex);
         forceTransitionToKillWait = true;
-      } 
+      }
       else if (vertexEvent.getVertexState() == VertexState.KILLED) {
         vertexKilled(job, vertex);
         forceTransitionToKillWait = true;
@@ -1165,8 +1166,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     diagnostics.add(diag);
   }
 
-  
- 
+
+
 
   private static class DiagnosticsUpdateTransition implements
       SingleArcTransition<DAGImpl, DAGEvent> {
@@ -1200,7 +1201,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.dagScheduler.scheduleTask(sEvent);
         break;
       case TA_SCHEDULED:
-        DAGEventSchedulerUpdateTAAssigned taEvent = 
+        DAGEventSchedulerUpdateTAAssigned taEvent =
                               (DAGEventSchedulerUpdateTAAssigned) sEvent;
         dag.dagScheduler.taskScheduled(taEvent);
         break;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index d9eef01..471ca26 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -55,7 +56,6 @@ import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
@@ -113,7 +113,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   static final TezCounters EMPTY_COUNTERS = new TezCounters();
 
-  protected final TezConfiguration conf;
+  protected final Configuration conf;
   protected final int partition;
   @SuppressWarnings("rawtypes")
   protected EventHandler eventHandler;
@@ -140,7 +140,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   private TaskAttemptStatus reportedStatus;
   private DAGCounter localityCounter;
-  
+
   // Used to store locality information when
   Set<String> taskHosts = new HashSet<String>();
   Set<String> taskRacks = new HashSet<String>();
@@ -260,7 +260,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   @SuppressWarnings("rawtypes")
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
       TaskAttemptListener tal, int partition,
-      TezConfiguration conf,
+      Configuration conf,
       Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       ProcessorDescriptor processorDescriptor, TaskLocationHint locationHint,
@@ -298,8 +298,8 @@ public class TaskAttemptImpl implements TaskAttempt,
   public TezTaskAttemptID getID() {
     return attemptId;
   }
-  
-  @Override 
+
+  @Override
   public TezTaskID getTaskID() {
     return attemptId.getTaskID();
   }
@@ -654,7 +654,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     } else if (taState == TaskAttemptStateInternal.KILLED) {
       jce.addCounterUpdate(DAGCounter.NUM_KILLED_TASKS, 1);
     }
-    
+
 //    long slotMillisIncrement = computeSlotMillis(taskAttempt);
 //    if (!taskAlreadyCompleted) {
 //      // dont double count the elapsed time
@@ -893,10 +893,10 @@ public class TaskAttemptImpl implements TaskAttempt,
         }
       }
       requestRacks = racks.toArray(new String[racks.size()]);
-      
+
       ta.taskHosts.addAll(Arrays.asList(requestHosts));
       ta.taskRacks = racks;
-      
+
       // Ask for hosts / racks only if not a re-scheduled task.
       if (ta.isRescheduled) {
         requestHosts = new String[0];
@@ -1012,8 +1012,8 @@ public class TaskAttemptImpl implements TaskAttempt,
           ta.localityCounter = DAGCounter.OTHER_LOCAL_TASKS;
         }
       }
-      
-      
+
+
       // Inform the speculator about the container assignment.
       //ta.maybeSendSpeculatorContainerNoLongerRequired();
       // Inform speculator about startTime

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index f3efa56..240c7bf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -84,7 +85,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private static final Log LOG = LogFactory.getLog(TaskImpl.class);
 
-  protected final TezConfiguration conf;
+  protected final Configuration conf;
   protected final int partition;
   protected final TaskAttemptListener taskAttemptListener;
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
@@ -132,7 +133,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
-            TaskEventType.T_TERMINATE, 
+            TaskEventType.T_TERMINATE,
             new KillNewTransition())
 
     // Transitions from SCHEDULED state
@@ -172,7 +173,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         TaskEventType.T_ATTEMPT_FAILED,
         new AttemptFailedTransition())
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT,
-        TaskEventType.T_TERMINATE, 
+        TaskEventType.T_TERMINATE,
         KILL_TRANSITION)
 
     // Transitions from KILL_WAIT state
@@ -274,7 +275,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   public TaskImpl(TezVertexID vertexId, int partition,
-      EventHandler eventHandler, TezConfiguration conf,
+      EventHandler eventHandler, Configuration conf,
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
@@ -708,7 +709,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     eventHandler.handle(new DAGEvent(this.taskId.getVertexID().getDAGId(),
         DAGEventType.INTERNAL_ERROR));
   }
-  
+
   private void sendTaskAttemptCompletionEvent(TezTaskAttemptID attemptId,
       TezDependentTaskCompletionEvent.Status status) {
     TaskAttempt attempt = attempts.get(attemptId);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index cad79e6..41db1fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.MRVertexOutputCommitter;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -150,7 +151,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private TezCounters fullCounters = null;
   private Resource taskResource;
 
-  private TezConfiguration conf;
+  private Configuration conf;
 
   //fields initialized in init
 
@@ -355,7 +356,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private VertexTerminationCause terminationCause;
 
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
-      String vertexName, TezConfiguration conf, EventHandler eventHandler,
+      String vertexName, Configuration conf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials fsTokenCredentials, Clock clock,
@@ -951,7 +952,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
 
     private void createTasks(VertexImpl vertex) {
-      TezConfiguration conf = vertex.conf;
+      Configuration conf = vertex.conf;
       boolean useNullLocationHint = true;
       if (vertex.vertexLocationHint != null
           && vertex.vertexLocationHint.getTaskLocationHints() != null

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index 313ee7d..21c6035 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -19,13 +19,13 @@ package org.apache.tez.dag.app.rm;
 
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.TezTaskContext;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
@@ -34,7 +34,7 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
 
   // TODO Get rid of remoteTask from here. Can be forgottent after it has been assigned.
   //.... Maybe have the Container talk to the TaskAttempt to pull in the remote task.
-  
+
   private final TezTaskAttemptID attemptId;
   private final Resource capability;
   private final Map<String, LocalResource> localResources;
@@ -46,15 +46,15 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
   private final String[] racks;
   private final Priority priority;
   private final Map<String, String> environment;
-  private final TezConfiguration conf;
-  
+  private final Configuration conf;
+
   public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId,
       Resource capability,
       Map<String, LocalResource> localResources,
       TezTaskContext remoteTaskContext, TaskAttempt ta,
       Credentials credentials, Token<JobTokenIdentifier> jobToken,
       String[] hosts, String[] racks, Priority priority,
-      Map<String, String> environment, TezConfiguration conf) {
+      Map<String, String> environment, Configuration conf) {
     super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
     this.attemptId = attemptId;
     this.capability = capability;
@@ -81,27 +81,27 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
   public String[] getHosts() {
     return hosts;
   }
-  
+
   public String[] getRacks() {
     return racks;
   }
-  
+
   public Priority getPriority() {
     return priority;
   }
-  
+
   public TezTaskContext getRemoteTaskContext() {
     return remoteTaskContext;
   }
-  
+
   public TaskAttempt getTaskAttempt() {
     return this.taskAttempt;
   }
-  
+
   public Credentials getCredentials() {
     return this.credentials;
   }
-  
+
   public Token<JobTokenIdentifier> getJobToken() {
     return this.jobToken;
   }
@@ -109,12 +109,12 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
   public Map<String, LocalResource> getLocalResources() {
     return this.localResources;
   }
-  
+
   public Map<String, String> getEnvironment() {
     return this.environment;
   }
-  
-  public TezConfiguration getConf() {
+
+  public Configuration getConf() {
     return this.conf;
   }
 
@@ -124,7 +124,7 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
   /*Requirements to determine a container request.
    * + Data-local + Rack-local hosts.
    * + Resource capability
-   * + Env - mapreduce.map.env / mapreduce.reduce.env can change. M/R log level. 
+   * + Env - mapreduce.map.env / mapreduce.reduce.env can change. M/R log level.
    * - JobConf and JobJar file - same location.
    * - Distributed Cache - identical for map / reduce tasks at the moment.
    * - Credentials, tokens etc are identical.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 4f977cf..7b2ca86 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -64,10 +64,10 @@ import com.google.common.collect.Lists;
       eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
     }
  */
-public class TaskScheduler extends AbstractService 
+public class TaskScheduler extends AbstractService
                              implements AMRMClientAsync.CallbackHandler {
   private static final Log LOG = LogFactory.getLog(TaskScheduler.class);
-  
+
   public interface TaskSchedulerAppCallback {
     public class AppFinalStatus {
       public final FinalApplicationStatus exitStatus;
@@ -82,13 +82,13 @@ public class TaskScheduler extends AbstractService
       }
     }
     // upcall to app must be outside locks
-    public void taskAllocated(Object task, 
-                               Object appCookie, 
+    public void taskAllocated(Object task,
+                               Object appCookie,
                                Container container);
     // this may end up being called for a task+container pair that the app
     // has not heard about. this can happen because of a race between
     // taskAllocated() upcall and deallocateTask() downcall
-    public void containerCompleted(Object taskLastAllocated, 
+    public void containerCompleted(Object taskLastAllocated,
                                     ContainerStatus containerStatus);
     public void nodesUpdated(List<NodeReport> updatedNodes);
     public void appShutdownRequested();
@@ -100,7 +100,7 @@ public class TaskScheduler extends AbstractService
     public float getProgress();
     public AppFinalStatus getFinalAppStatus();
   }
-  
+
   final AMRMClientAsync<CookieContainerRequest> amRmClient;
   final TaskSchedulerAppCallback appClient;
 
@@ -108,26 +108,26 @@ public class TaskScheduler extends AbstractService
   private boolean shouldReuseContainers;
   private boolean reuseRackLocal;
   private boolean reuseNonLocal;
-  
-  Map<Object, CookieContainerRequest> taskRequests =  
+
+  Map<Object, CookieContainerRequest> taskRequests =
                   new HashMap<Object, CookieContainerRequest>();
   // LinkedHashMap is need in getProgress()
-  LinkedHashMap<Object, Container> taskAllocations = 
+  LinkedHashMap<Object, Container> taskAllocations =
                   new LinkedHashMap<Object, Container>();
-  Map<ContainerId, Object> containerAssigments = 
+  Map<ContainerId, Object> containerAssigments =
                   new HashMap<ContainerId, Object>();
-  HashMap<ContainerId, Object> releasedContainers = 
+  HashMap<ContainerId, Object> releasedContainers =
                   new HashMap<ContainerId, Object>();
-  
+
   Resource totalResources = Resource.newInstance(0, 0);
   Resource allocatedResources = Resource.newInstance(0, 0);
-  
+
   final String appHostName;
   final int appHostPort;
   final String appTrackingUrl;
-  
-  boolean isStopped = false; 
-  
+
+  boolean isStopped = false;
+
   private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
   private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
   private ContainerAssigner NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
@@ -135,16 +135,16 @@ public class TaskScheduler extends AbstractService
   class CRCookie {
     Object task;
     Object appCookie;
-    
+
     Object getTask() {
       return task;
     }
-    
+
     Object getAppCookie() {
       return appCookie;
     }
   }
-  
+
   class CookieContainerRequest extends ContainerRequest {
     CRCookie cookie;
     public CookieContainerRequest(Resource capability, String[] hosts,
@@ -152,14 +152,14 @@ public class TaskScheduler extends AbstractService
       super(capability, hosts, racks, priority);
       this.cookie = cookie;
     }
-    
+
     CRCookie getCookie() {
       return cookie;
     }
   }
-  
+
   public TaskScheduler(TaskSchedulerAppCallback appClient,
-                        String appHostName, 
+                        String appHostName,
                         int appHostPort,
                         String appTrackingUrl) {
     super(TaskScheduler.class.getName());
@@ -169,11 +169,11 @@ public class TaskScheduler extends AbstractService
     this.appHostPort = appHostPort;
     this.appTrackingUrl = appTrackingUrl;
   }
-  
+
   @Private
   @VisibleForTesting
   TaskScheduler(TaskSchedulerAppCallback appClient,
-      String appHostName, 
+      String appHostName,
       int appHostPort,
       String appTrackingUrl,
       AMRMClientAsync<CookieContainerRequest> client) {
@@ -184,15 +184,15 @@ public class TaskScheduler extends AbstractService
     this.appHostPort = appHostPort;
     this.appTrackingUrl = appTrackingUrl;
   }
-  
+
   public Resource getAvailableResources() {
     return amRmClient.getAvailableResources();
   }
-  
+
   public int getClusterNodeCount() {
     return amRmClient.getClusterNodeCount();
   }
-  
+
   // AbstractService methods
   @Override
   public synchronized void serviceInit(Configuration conf) {
@@ -202,7 +202,7 @@ public class TaskScheduler extends AbstractService
         TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX,
         TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT);
     amRmClient.setHeartbeatInterval(heartbeatIntervalMax);
-    
+
     shouldReuseContainers = conf.getBoolean(
         TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED,
         TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT);
@@ -215,19 +215,19 @@ public class TaskScheduler extends AbstractService
             TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT);
     LOG.info("TaskScheduler initialized with configuration: " +
             "maxRMHeartbeatInterval: " + heartbeatIntervalMax +
-            ", containerReuseEnabled: " + shouldReuseContainers + 
-            ", reuseRackLocal: " + reuseRackLocal + 
+            ", containerReuseEnabled: " + shouldReuseContainers +
+            ", reuseRackLocal: " + reuseRackLocal +
             ", reuseNonLocal: " + reuseNonLocal);
   }
-  
+
   @Override
   public void serviceStart() {
     try {
       RegisterApplicationMasterResponse response = null;
       synchronized (this) {
         amRmClient.start();
-        response = amRmClient.registerApplicationMaster(appHostName, 
-                                                        appHostPort, 
+        response = amRmClient.registerApplicationMaster(appHostName,
+                                                        appHostPort,
                                                         appTrackingUrl);
       }
       // upcall to app outside locks
@@ -242,7 +242,7 @@ public class TaskScheduler extends AbstractService
       throw new TezUncheckedException(e);
     }
   }
-  
+
   @Override
   public void serviceStop() {
     // upcall to app outside of locks
@@ -251,13 +251,13 @@ public class TaskScheduler extends AbstractService
       // TODO TEZ-36 dont unregister automatically after reboot sent by RM
       synchronized (this) {
         isStopped = true;
-        amRmClient.unregisterApplicationMaster(status.exitStatus, 
+        amRmClient.unregisterApplicationMaster(status.exitStatus,
                                                status.exitMessage,
                                                status.postCompletionTrackingUrl);
       }
-      
+
       // call client.stop() without lock client will attempt to stop the callback
-      // operation and at the same time the callback operation might be trying 
+      // operation and at the same time the callback operation might be trying
       // to get our lock.
       amRmClient.stop();
     } catch (YarnException e) {
@@ -275,7 +275,7 @@ public class TaskScheduler extends AbstractService
     if(isStopped) {
       return;
     }
-    Map<Object, ContainerStatus> appContainerStatus = 
+    Map<Object, ContainerStatus> appContainerStatus =
                         new HashMap<Object, ContainerStatus>(statuses.size());
     synchronized (this) {
       for(ContainerStatus containerStatus : statuses) {
@@ -287,29 +287,29 @@ public class TaskScheduler extends AbstractService
           // being released
           // completion of a container we had released earlier
           // an allocated container completed. notify app
-          LOG.info("Released container completed:" + completedId + 
+          LOG.info("Released container completed:" + completedId +
                    " last allocated to task: " + task);
           appContainerStatus.put(task, containerStatus);
           continue;
         }
-        
+
         // not found in released containers. check currently allocated containers
         // no need to release this container as the RM has already completed it
         task = unAssignContainer(completedId, false);
         if(task != null) {
           // completion of a container we have allocated currently
           // an allocated container completed. notify app
-          LOG.info("Allocated container completed:" + completedId + 
+          LOG.info("Allocated container completed:" + completedId +
                    " last allocated to task: " + task);
           appContainerStatus.put(task, containerStatus);
           continue;
         }
-        
+
         // container neither allocated nor released
-        LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());        
+        LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
       }
     }
-    
+
     // upcall to app must be outside locks
     for(Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
       appClient.containerCompleted(entry.getKey(), entry.getValue());
@@ -333,7 +333,7 @@ public class TaskScheduler extends AbstractService
       informAppAboutAssignments(entry.getKey(), entry.getValue());
     }
   }
-  
+
   /*
    * Separate calls should be made for contianers being reused and new
    * containers.
@@ -378,19 +378,19 @@ public class TaskScheduler extends AbstractService
     if(isStopped) {
       return 1;
     }
-    
+
     if(totalResources.getMemory() == 0) {
       // assume this is the first allocate callback. nothing is allocated.
       // available resource = totalResource
       // TODO this will not handle dynamic changes in resources
       totalResources = Resources.clone(getAvailableResources());
-      LOG.info("App total resource memory: " + totalResources.getMemory() + 
+      LOG.info("App total resource memory: " + totalResources.getMemory() +
                " cpu: " + totalResources.getVirtualCores() +
                " taskAllocations: " + taskAllocations.size());
     }
-    
+
     preemptIfNeeded();
-    
+
     return appClient.getProgress();
   }
 
@@ -401,12 +401,12 @@ public class TaskScheduler extends AbstractService
     }
     appClient.onError(t);
   }
-  
+
   public synchronized Resource getTotalResources() {
     return totalResources;
   }
-  
-  public synchronized void allocateTask(Object task, 
+
+  public synchronized void allocateTask(Object task,
                                            Resource capability,
                                            String[] hosts,
                                            String[] racks,
@@ -417,15 +417,15 @@ public class TaskScheduler extends AbstractService
     CRCookie cookie = new CRCookie();
     cookie.task = task;
     cookie.appCookie = clientCookie;
-    CookieContainerRequest request = 
-             new CookieContainerRequest(capability, 
-                                         hosts, 
-                                         racks, 
+    CookieContainerRequest request =
+             new CookieContainerRequest(capability,
+                                         hosts,
+                                         racks,
                                          priority,
                                          cookie);
 
     addTaskRequest(task, request);
-    LOG.info("Allocation request for task: " + task + 
+    LOG.info("Allocation request for task: " + task +
              " with request: " + request);
   }
 
@@ -484,43 +484,43 @@ public class TaskScheduler extends AbstractService
     if(task != null) {
       LOG.info("Deallocated container: " + containerId +
                " from task: " + task);
-      return task;      
+      return task;
     }
-    
+
     LOG.info("Ignoring dealloction of unknown container: " + containerId);
     return null;
   }
-  
+
   synchronized void preemptIfNeeded() {
     Resource freeResources = Resources.subtract(totalResources,
         allocatedResources);
-    LOG.info("Allocated resource memory: " + allocatedResources.getMemory() + 
+    LOG.info("Allocated resource memory: " + allocatedResources.getMemory() +
              " cpu:" + allocatedResources.getVirtualCores());
     assert freeResources.getMemory() >= 0;
-    
+
     CookieContainerRequest highestPriRequest = null;
     for(CookieContainerRequest request : taskRequests.values()) {
       if(highestPriRequest == null) {
         highestPriRequest = request;
-      } else if(isHigherPriority(request.getPriority(), 
+      } else if(isHigherPriority(request.getPriority(),
                                    highestPriRequest.getPriority())){
         highestPriRequest = request;
       }
     }
-    if(highestPriRequest != null && 
+    if(highestPriRequest != null &&
        !fitsIn(highestPriRequest.getCapability(), freeResources)) {
       // highest priority request will not fit in existing free resources
       // free up some more
       // TODO this is subject to error wrt RM resource normalization
       Map.Entry<Object, Container> preemptedEntry = null;
       for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
-        if(!isHigherPriority(highestPriRequest.getPriority(), 
+        if(!isHigherPriority(highestPriRequest.getPriority(),
                              entry.getValue().getPriority())) {
           // higher or same priority
           continue;
         }
         if(preemptedEntry == null ||
-           !isHigherPriority(entry.getValue().getPriority(), 
+           !isHigherPriority(entry.getValue().getPriority(),
                              preemptedEntry.getValue().getPriority())) {
           // keep the lower priority or the one added later
           preemptedEntry = entry;
@@ -528,7 +528,7 @@ public class TaskScheduler extends AbstractService
       }
       if(preemptedEntry != null) {
         // found something to preempt
-        LOG.info("Preempting task: " + preemptedEntry.getKey() + 
+        LOG.info("Preempting task: " + preemptedEntry.getKey() +
             " to free resource for request: " + highestPriRequest +
             " . Current free resources: " + freeResources);
         deallocateContainer(preemptedEntry.getValue().getId());
@@ -537,13 +537,13 @@ public class TaskScheduler extends AbstractService
       }
     }
   }
-  
+
   private boolean fitsIn(Resource toFit, Resource resource) {
     // YARN-893 prevents using correct library code
     //return Resources.fitsIn(toFit, resource);
     return resource.getMemory() >= toFit.getMemory();
   }
-  
+
   private CookieContainerRequest getMatchingRequest(
                                       Container container, String location) {
     Priority priority = container.getPriority();
@@ -551,7 +551,7 @@ public class TaskScheduler extends AbstractService
     CookieContainerRequest assigned = null;
     List<? extends Collection<CookieContainerRequest>> requestsList =
         amRmClient.getMatchingRequests(priority, location, capability);
-    
+
     if(requestsList.size() > 0) {
       // pick first one
       for(Collection<CookieContainerRequest> requests : requestsList) {
@@ -561,24 +561,24 @@ public class TaskScheduler extends AbstractService
         }
       }
     }
-    
+
     return assigned;
   }
-  
+
   private Object getTask(CookieContainerRequest request) {
     return request.getCookie().getTask();
   }
-  
+
   private void releaseContainer(ContainerId containerId, Object task) {
     amRmClient.releaseAssignedContainer(containerId);
     if(task != null) {
       releasedContainers.put(containerId, task);
     }
   }
-  
-  private void assignContainer(Object task, 
-                                Container container, 
-                                CookieContainerRequest assigned) {
+
+  private void assignContainer(Object task,
+      Container container,
+      CookieContainerRequest assigned) {
     CookieContainerRequest request = removeTaskRequest(task);
     assert request != null;
     //assert assigned.equals(request);
@@ -586,10 +586,10 @@ public class TaskScheduler extends AbstractService
     Container result = taskAllocations.put(task, container);
     assert result == null;
     containerAssigments.put(container.getId(), task);
-    
+
     Resources.addTo(allocatedResources, container.getResource());
   }
-  
+
   private CookieContainerRequest removeTaskRequest(Object task) {
     CookieContainerRequest request = taskRequests.remove(task);
     if(request != null) {
@@ -598,14 +598,14 @@ public class TaskScheduler extends AbstractService
     }
     return request;
   }
-  
-  private void addTaskRequest(Object task, 
+
+  private void addTaskRequest(Object task,
                                 CookieContainerRequest request) {
     // TODO TEZ-37 fix duplicate handling
     taskRequests.put(task, request);
     amRmClient.addContainerRequest(request);
   }
-  
+
   private Container unAssignContainer(Object task) {
     Container container = taskAllocations.remove(task);
     if(container == null) {
@@ -616,8 +616,8 @@ public class TaskScheduler extends AbstractService
     containerAssigments.remove(container.getId());
     return container;
   }
-  
-  private Object unAssignContainer(ContainerId containerId, 
+
+  private Object unAssignContainer(ContainerId containerId,
                                     boolean releaseIfFound) {
     Object task = containerAssigments.remove(containerId);
     if(task == null) {
@@ -640,10 +640,10 @@ public class TaskScheduler extends AbstractService
   /**
    * Assigns allocated containers using the specified assigner. The list of
    * allocated containers is removed from the specified container list.
-   * 
+   *
    * Separate calls should be made for contianers being reused and new
    * containers.
-   * 
+   *
    * @param containers
    *          containers to be assigned.
    * @param isBeingReused
@@ -651,14 +651,14 @@ public class TaskScheduler extends AbstractService
    *          containers are being reused or not.
    * @param assigner
    *          the assigner to use - nodeLocal, rackLocal, nonLocal
-   * @param assignedContainers container assignments are populated into this map.         
+   * @param assignedContainers container assignments are populated into this map.
    * @return
    */
   private synchronized void assignContainersWithLocation(
       List<Container> containers, boolean isBeingReused,
       ContainerAssigner assigner,
       Map<CookieContainerRequest, Container> assignedContainers) {
-    
+
     Iterator<Container> containerIterator = containers.iterator();
     while (containerIterator.hasNext()) {
       Container container = containerIterator.next();
@@ -670,7 +670,7 @@ public class TaskScheduler extends AbstractService
       }
     }
   }
-  
+
   private void releaseUnassignedContainers(List<Container> containers) {
     for (Container container : containers) {
       releaseContainer(container.getId(), null);
@@ -683,7 +683,7 @@ public class TaskScheduler extends AbstractService
     appClient.taskAllocated(getTask(assigned), assigned.getCookie().appCookie,
         container);
   }
-  
+
   private abstract class ContainerAssigner {
     public abstract CookieContainerRequest assignAllocatedContainer(
         Container container, boolean isBeingReused);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 21af22c..99c4b03 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -59,7 +59,7 @@ import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
 
 public class TaskSchedulerEventHandler extends AbstractService
-                                         implements TaskSchedulerAppCallback, 
+                                         implements TaskSchedulerAppCallback,
                                                EventHandler<AMSchedulerEvent> {
   static final Log LOG = LogFactory.getLog(TaskSchedulerEventHandler.class);
 
@@ -86,24 +86,24 @@ public class TaskSchedulerEventHandler extends AbstractService
     this.eventHandler = eventHandler;
     this.clientService = clientService;
   }
-  
+
   public Map<ApplicationAccessType, String> getApplicationAcls() {
     return appAcls;
   }
-  
+
   public void setSignalled(boolean isSignalled) {
     this.isSignalled = isSignalled;
     LOG.info("TaskScheduler notified that iSignalled was : " + isSignalled);
   }
-    
+
   public Resource getAvailableResources() {
     return taskScheduler.getAvailableResources();
   }
-  
+
   public Resource getTotalResources() {
     return taskScheduler.getTotalResources();
   }
-  
+
   public synchronized void handleEvent(AMSchedulerEvent sEvent) {
     LOG.info("Processing the event " + sEvent.toString());
     switch (sEvent.getType()) {
@@ -121,7 +121,7 @@ public class TaskSchedulerEventHandler extends AbstractService
         handleTASucceeded(event);
         break;
       default:
-        throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState()); 
+        throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState());
       }
       break;
     case S_CONTAINER_DEALLOCATE:
@@ -139,7 +139,7 @@ public class TaskSchedulerEventHandler extends AbstractService
       break;
     }
   }
-  
+
   @Override
   public void handle(AMSchedulerEvent event) {
     int qSize = eventQueue.size();
@@ -162,19 +162,19 @@ public class TaskSchedulerEventHandler extends AbstractService
   private void sendEvent(Event<?> event) {
     eventHandler.handle(event);
   }
-  
-  
+
+
   private void handleContainerDeallocate(
                                   AMSchedulerEventDeallocateContainer event) {
     ContainerId containerId = event.getContainerId();
     // TODO what happens to the task that was connected to this container?
     // current assumption is that it will eventually call handleTaStopRequest
-    //TaskAttempt taskAttempt = (TaskAttempt) 
+    //TaskAttempt taskAttempt = (TaskAttempt)
     taskScheduler.deallocateContainer(containerId);
     // TODO does this container need to be stopped via C_STOP_REQUEST
     sendEvent(new AMContainerEventStopRequest(containerId));
   }
-  
+
   private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
     /*MRxTaskAttemptID aId = event.getAttemptID();
     attemptToLaunchRequestMap.remove(aId);
@@ -204,14 +204,14 @@ public class TaskSchedulerEventHandler extends AbstractService
         }
       }
     }*/
-    
+
     TaskAttempt attempt = event.getAttempt();
     Container container = taskScheduler.deallocateTask(attempt, false);
     // use stored value of container id in case the scheduler has removed this
-    // assignment because the task has been deallocated earlier. 
+    // assignment because the task has been deallocated earlier.
     // retroactive case
     ContainerId attemptContainerId = attempt.getAssignedContainerID();
-    
+
     if(container != null) {
       // use scheduler container since it exists
       ContainerId containerId = container.getId();
@@ -223,17 +223,17 @@ public class TaskSchedulerEventHandler extends AbstractService
     }
 
     if (attemptContainerId != null) {
-      // TODO either ways send the necessary events 
+      // TODO either ways send the necessary events
       // Ask the container to stop.
       sendEvent(new AMContainerEventStopRequest(attemptContainerId));
       // Inform the Node - the task has asked to be STOPPED / has already
       // stopped.
       sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
-          get(attemptContainerId).getContainer().getNodeId(), attemptContainerId,  
+          get(attemptContainerId).getContainer().getNodeId(), attemptContainerId,
           attempt.getID(), event.getState() == TaskAttemptState.FAILED));
     }
   }
-  
+
   private void handleTASucceeded(AMSchedulerEventTAEnded event) {
     /*
     // TODO XXX Remember the assigned containerId even after task success.
@@ -251,7 +251,7 @@ public class TaskSchedulerEventHandler extends AbstractService
       LOG.warn("Received TaskAttemptSucceededEvent for unmapped TaskAttempt: "
           + event.getAttemptID() + ". Full event: " + event);
     }*/
-    
+
     TaskAttempt attempt = event.getAttempt();
     ContainerId usedContainerId = event.getUsedContainerId();
 
@@ -264,7 +264,7 @@ public class TaskSchedulerEventHandler extends AbstractService
           get(usedContainerId).getContainer().getNodeId(), usedContainerId,
           event.getAttemptID()));
     }
-    
+
     Container container = taskScheduler.deallocateTask(attempt, true);
     if(container != null) {
       ContainerId containerId = container.getId();
@@ -306,14 +306,14 @@ public class TaskSchedulerEventHandler extends AbstractService
         TaskType.MAP, mapResourceReqt);
     event.getCapability().setMemory(mapResourceReqt);*/
     TaskAttempt taskAttempt = event.getTaskAttempt();
-    taskScheduler.allocateTask(taskAttempt, 
-                               event.getCapability(), 
-                               event.getHosts(), 
-                               event.getRacks(), 
+    taskScheduler.allocateTask(taskAttempt,
+                               event.getCapability(),
+                               event.getHosts(),
+                               event.getRacks(),
                                event.getPriority(),
                                event);
   }
-  
+
 
   protected TaskScheduler createTaskScheduler(String host, int port,
       String trackingUrl) {
@@ -366,29 +366,31 @@ public class TaskSchedulerEventHandler extends AbstractService
     this.stopEventHandling = true;
     if (eventHandlingThread != null)
       eventHandlingThread.interrupt();
-    taskScheduler.stop();
+    if (taskScheduler != null) {
+      taskScheduler.stop();
+    }
   }
-  
+
   // TaskSchedulerAppCallback methods
   @Override
-  public synchronized void taskAllocated(Object task, 
-                                           Object appCookie, 
-                                           Container container) {    
+  public synchronized void taskAllocated(Object task,
+                                           Object appCookie,
+                                           Container container) {
     ContainerId containerId = container.getId();
     if (appContext.getAllContainers().addContainerIfNew(container)) {
       appContext.getAllNodes().nodeSeen(container.getNodeId());
       sendEvent(new AMNodeEventContainerAllocated(container
           .getNodeId(), container.getId()));
     }
-    
-    AMSchedulerEventTALaunchRequest event = 
+
+    AMSchedulerEventTALaunchRequest event =
                          (AMSchedulerEventTALaunchRequest) appCookie;
     TaskAttempt taskAttempt = event.getTaskAttempt();
     // TODO - perhaps check if the task still needs this container
-    // because the deallocateTask downcall may have raced with the 
+    // because the deallocateTask downcall may have raced with the
     // taskAllocated() upcall
     assert task.equals(taskAttempt);
-    if (appContext.getAllContainers().get(containerId).getState() 
+    if (appContext.getAllContainers().get(containerId).getState()
         == AMContainerState.ALLOCATED) {
 
       sendEvent(new AMContainerEventLaunchRequest(
@@ -411,7 +413,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     // Inform the Containers about completion.
     sendEvent(new AMContainerEventCompleted(containerStatus));
   }
-  
+
   @SuppressWarnings("unchecked")
   @Override
   public synchronized void nodesUpdated(List<NodeReport> updatedNodes) {
@@ -473,7 +475,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     /*String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
         appContext.getApplicationID());
     LOG.info("History url is " + historyUrl);*/
-    
+
     return new AppFinalStatus(finishState, sb.toString(), historyUrl);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
index 4e199da..293339c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
@@ -20,11 +20,11 @@ package org.apache.tez.dag.app.rm.container;
 
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
@@ -35,15 +35,15 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
   private final Token<JobTokenIdentifier> jobToken;
   private final Credentials credentials;
   private final boolean shouldProfile;
-  private final TezConfiguration conf;
+  private final Configuration conf;
   private final Map<String, LocalResource> localResources;
   private final Map<String, String> environment;
   private final String javaOpts;
 
-  public AMContainerEventLaunchRequest(ContainerId containerId, 
+  public AMContainerEventLaunchRequest(ContainerId containerId,
       TezVertexID vertexId,
       Token<JobTokenIdentifier> jobToken,
-      Credentials credentials, boolean shouldProfile, TezConfiguration conf,
+      Credentials credentials, boolean shouldProfile, Configuration conf,
       Map<String, LocalResource> localResources,
       Map<String, String> environment, String javaOpts) {
     super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
@@ -64,7 +64,7 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
   public TezVertexID getVertexId() {
     return this.vertexId;
   }
-  
+
   public Token<JobTokenIdentifier> getJobToken() {
     return this.jobToken;
   }
@@ -77,7 +77,7 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
     return this.shouldProfile;
   }
 
-  public TezConfiguration getConf() {
+  public Configuration getConf() {
     return this.conf;
   }
 
@@ -88,7 +88,7 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
   public Map<String, String> getEnvironment() {
     return environment;
   }
-  
+
   public String getJavaOpts() {
 	  return javaOpts;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 470fee2..5651d48 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.records.TezVertexID;
@@ -78,14 +78,14 @@ public class AMContainerHelpers {
     return LocalResource.newInstance(resourceURL, type, visibility,
         resourceSize, resourceModificationTime);
   }
-  
+
   /**
    * Create the common {@link ContainerLaunchContext} for all attempts.
-   * 
+   *
    * @param applicationACLs
    */
   private static ContainerLaunchContext createCommonContainerLaunchContext(
-      Map<ApplicationAccessType, String> applicationACLs, TezConfiguration conf,
+      Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
       Token<JobTokenIdentifier> jobToken,
       TezVertexID vertexId, Credentials credentials, AppContext appContext) {
 
@@ -99,8 +99,8 @@ public class AMContainerHelpers {
     // Service data
     Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
 
-    
-    
+
+
     // Tokens
     ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[] {});
     try {
@@ -146,7 +146,7 @@ public class AMContainerHelpers {
   @VisibleForTesting
   public static ContainerLaunchContext createContainerLaunchContext(
       Map<ApplicationAccessType, String> acls,
-      ContainerId containerId, TezConfiguration conf, TezVertexID vertexId,
+      ContainerId containerId, Configuration conf, TezVertexID vertexId,
       Token<JobTokenIdentifier> jobToken,
       Resource assignedCapability, Map<String, LocalResource> localResources,
       Map<String, String> vertexEnv,
@@ -178,7 +178,7 @@ public class AMContainerHelpers {
     List<String> commands = TezEngineChildJVM.getVMCommand(
         taskAttemptListener.getAddress(), conf, containerId.toString(),
         appContext.getApplicationID().toString(),
-        appContext.getApplicationAttemptId().getAttemptId(), 
+        appContext.getApplicationAttemptId().getAttemptId(),
         shouldProfile, javaOpts);
 
     // Duplicate the ByteBuffers for access by multiple containers.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9b6ab16/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java
deleted file mode 100644
index 0f8acc1..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.utils;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.dag.api.TezConfiguration;
-
-public class DAGApps {
-
-  private static final String STAGING_CONSTANT = ".staging";
-  public static Path getStagingAreaDir(TezConfiguration conf, String user) {
-    return new Path(conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
-        TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT)
-        + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
-  }
-  
-}


Mime
View raw message