tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject svn commit: r1469642 [16/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...
Date Thu, 18 Apr 2013 23:54:28 GMT
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientAsync.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientAsync.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientAsync.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,376 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.app.rm.AMRMClient.ContainerRequest;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <code>AMRMClientAsync</code> handles communication with the ResourceManager
+ * and provides asynchronous updates on events such as container allocations and
+ * completions.  It contains a thread that sends periodic heartbeats to the
+ * ResourceManager.
+ * 
+ * It should be used by implementing a CallbackHandler:
+ * <pre>
+ * {@code
+ * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ *   public void onContainersAllocated(List<Container> containers) {
+ *     [run tasks on the containers]
+ *   }
+ *   
+ *   public void onContainersCompleted(List<ContainerStatus> statuses) {
+ *     [update progress, check whether app is done]
+ *   }
+ *   
+ *   public void onNodesUpdated(List<NodeReport> updated) {}
+ *   
+ *   public void onReboot() {}
+ * }
+ * }
+ * </pre>
+ * 
+ * The client's lifecycle should be managed similarly to the following:
+ * 
+ * <pre>
+ * {@code
+ * AMRMClientAsync asyncClient = new AMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * RegisterApplicationMasterResponse response = asyncClient
+ *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ *       appMasterTrackingUrl);
+ * asyncClient.addContainerRequest(containerRequest);
+ * [... wait for application to complete]
+ * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
+ * asyncClient.stop();
+ * }
+ * </pre>
+ */
+@Unstable
+@Evolving
+public class AMRMClientAsync<T> extends AbstractService {
+  
+  private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
+  
+  private final AMRMClientImpl<T> client;
+  private final int intervalMs;
+  private final HeartbeatThread heartbeatThread;
+  private final CallbackHandlerThread handlerThread;
+  private final CallbackHandler handler;
+
+  private final BlockingQueue<AllocateResponse> responseQueue;
+  
+  private volatile boolean keepRunning;
+  private volatile float progress;
+  
+  private volatile Exception savedException;
+  
+  public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
+      CallbackHandler callbackHandler) {
+    this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
+  }
+  
+  @Private
+  @VisibleForTesting
+  public AMRMClientAsync(AMRMClientImpl<T> client, int intervalMs,
+      CallbackHandler callbackHandler) {
+    super(AMRMClientAsync.class.getName());
+    this.client = client;
+    this.intervalMs = intervalMs;
+    handler = callbackHandler;
+    heartbeatThread = new HeartbeatThread();
+    handlerThread = new CallbackHandlerThread();
+    responseQueue = new LinkedBlockingQueue<AllocateResponse>();
+    keepRunning = true;
+    savedException = null;
+  }
+    
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    client.init(conf);
+  }  
+  
+  @Override
+  public void start() {
+    handlerThread.start();
+    client.start();
+    super.start();
+  }
+  
+  /**
+   * Tells the heartbeat and handler threads to stop and waits for them to
+   * terminate.  Calling this method from the callback handler thread would cause
+   * deadlock, and thus should be avoided.
+   */
+  @Override
+  public void stop() {
+    if (Thread.currentThread() == handlerThread) {
+      throw new YarnException("Cannot call stop from callback handler thread!");
+    }
+    keepRunning = false;
+    try {
+      heartbeatThread.join();
+    } catch (InterruptedException ex) {
+      LOG.error("Error joining with heartbeat thread", ex);
+    }
+    client.stop();
+    try {
+      handlerThread.interrupt();
+      handlerThread.join();
+    } catch (InterruptedException ex) {
+      LOG.error("Error joining with hander thread", ex);
+    }
+    super.stop();
+  }
+  
+  public Collection<ContainerRequest<T>> getMatchingRequests(
+      Priority priority, 
+      String resourceName, 
+      Resource capability) {
+    return client.getMatchingRequests(priority, resourceName, capability);
+  }
+  
+  /**
+   * Registers this application master with the resource manager. On successful
+   * registration, starts the heartbeating thread.
+   */
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl)
+      throws YarnRemoteException {
+    RegisterApplicationMasterResponse response =
+        client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
+    heartbeatThread.start();
+    return response;
+  }
+
+  /**
+   * Unregister the application master. This must be called in the end.
+   * @param appStatus Success/Failure status of the master
+   * @param appMessage Diagnostics message on failure
+   * @param appTrackingUrl New URL to get master info
+   * @throws YarnRemoteException
+   */
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+      String appMessage, String appTrackingUrl) throws YarnRemoteException {
+    synchronized (client) {
+      keepRunning = false;
+      client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
+    }
+  }
+
+  /**
+   * Request containers for resources before calling <code>allocate</code>
+   * @param req Resource request
+   */
+  public void addContainerRequest(AMRMClient.ContainerRequest<T> req) {
+    client.addContainerRequest(req);
+  }
+
+  /**
+   * Remove previous container request. The previous container request may have 
+   * already been sent to the ResourceManager. So even after the remove request 
+   * the app must be prepared to receive an allocation for the previous request 
+   * even after the remove request
+   * @param req Resource request
+   */
+  public void removeContainerRequest(AMRMClient.ContainerRequest<T> req) {
+    client.removeContainerRequest(req);
+  }
+
+  /**
+   * Release containers assigned by the Resource Manager. If the app cannot use
+   * the container or wants to give up the container then it can release them.
+   * The app needs to make new requests for the released resource capability if
+   * it still needs it. eg. it released non-local resources
+   * @param containerId
+   */
+  public void releaseAssignedContainer(ContainerId containerId) {
+    client.releaseAssignedContainer(containerId);
+  }
+
+  /**
+   * Get the currently available resources in the cluster.
+   * A valid value is available after a call to allocate has been made
+   * @return Currently available resources
+   */
+  public Resource getClusterAvailableResources() {
+    return client.getClusterAvailableResources();
+  }
+
+  /**
+   * Get the current number of nodes in the cluster.
+   * A valid values is available after a call to allocate has been made
+   * @return Current number of nodes in the cluster
+   */
+  public int getClusterNodeCount() {
+    return client.getClusterNodeCount();
+  }
+  
+  private class HeartbeatThread extends Thread {
+    public HeartbeatThread() {
+      super("AMRM Heartbeater thread");
+    }
+    
+    public void run() {
+      while (true) {
+        AllocateResponse response = null;
+        // synchronization ensures we don't send heartbeats after unregistering
+        synchronized (client) {
+          if (!keepRunning) {
+            break;
+          }
+            
+          try {
+            response = client.allocate(progress);
+            savedException = null;
+          } catch (YarnRemoteException ex) {
+            LOG.error("Failed to heartbeat", ex);
+            savedException = ex;
+            handlerThread.interrupt();
+          }
+        }
+        if (response != null) {
+          while (true) {
+            try {
+              responseQueue.put(response);
+              break;
+            } catch (InterruptedException ex) {
+              LOG.warn("Interrupted while waiting to put on response queue", ex);
+            }
+          }
+        }
+        
+        try {
+          Thread.sleep(intervalMs);
+        } catch (InterruptedException ex) {
+          LOG.warn("Heartbeater interrupted", ex);
+        }
+      }
+    }
+  }
+  
+  private class CallbackHandlerThread extends Thread {
+    public CallbackHandlerThread() {
+      super("AMRM Callback Handler Thread");
+    }
+    
+    void handleError() {
+      Exception e = savedException;
+      savedException = null;
+      if(e != null) {
+        handler.onError(e);
+      }
+    }
+    
+    public void run() {
+      while (keepRunning) {
+        AllocateResponse response;
+        try {
+          handleError();
+          response = responseQueue.take();
+        } catch (InterruptedException ex) {
+          LOG.info("Interrupted while waiting for queue", ex);
+          continue;
+        }
+
+        if (response.getReboot()) {
+          handler.onRebootRequest();
+          /* TODO dont callback after this */
+        }
+        List<NodeReport> updatedNodes = response.getUpdatedNodes();
+        if (!updatedNodes.isEmpty()) {
+          handler.onNodesUpdated(updatedNodes);
+        }
+        
+        List<ContainerStatus> completed =
+            response.getCompletedContainersStatuses();
+        if (!completed.isEmpty()) {
+          handler.onContainersCompleted(completed);
+        }
+
+        List<Container> allocated = response.getAllocatedContainers();
+        if (!allocated.isEmpty()) {
+          handler.onContainersAllocated(allocated);
+        }
+        
+        progress = handler.getProgress();
+      }
+    }
+  }
+  
+  public interface CallbackHandler {
+    
+    /**
+     * Called when the ResourceManager responds to a heartbeat with completed
+     * containers. If the response contains both completed containers and
+     * allocated containers, this will be called before containersAllocated.
+     */
+    public void onContainersCompleted(List<ContainerStatus> statuses);
+    
+    /**
+     * Called when the ResourceManager responds to a heartbeat with allocated
+     * containers. If the response containers both completed containers and
+     * allocated containers, this will be called after containersCompleted.
+     */
+    public void onContainersAllocated(List<Container> containers);
+    
+    /**
+     * Called when the ResourceManager wants the ApplicationMaster to reboot
+     * for being out of sync.
+     */
+    public void onRebootRequest();
+    
+    /**
+     * Called when nodes tracked by the ResourceManager have changed in in health,
+     * availability etc.
+     */
+    public void onNodesUpdated(List<NodeReport> updatedNodes);
+    
+    public float getProgress();
+    
+    public void onError(Exception e);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientAsync.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,458 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+// TODO check inputs for null etc.
+
+@Unstable
+public class AMRMClientImpl<T> extends AbstractService implements AMRMClient<T> {
+
+  private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
+  
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+  
+  private int lastResponseId = 0;
+
+  protected AMRMProtocol rmClient;
+  protected final ApplicationAttemptId appAttemptId;  
+  protected Resource clusterAvailableResources;
+  protected int clusterNodeCount;
+  
+  class ResourceRequestInfo {
+    ResourceRequest remoteRequest;
+    HashSet<ContainerRequest<T>> containerRequests;
+    
+    ResourceRequestInfo(Priority priority, String resourceName, Resource capability) {
+      remoteRequest = BuilderUtils.
+          newResourceRequest(priority, resourceName, capability, 0);
+      containerRequests = new HashSet<ContainerRequest<T>>();
+    }
+  }
+  
+  //Key -> Priority
+  //Value -> Map
+  //Key->ResourceName (e.g., hostname, rackname, *)
+  //Value->Map
+  //Key->Resource Capability
+  //Value->ResourceRequest
+  protected final 
+  Map<Priority, Map<String, Map<Resource, ResourceRequestInfo>>>
+    remoteRequestsTable =
+    new TreeMap<Priority, Map<String, Map<Resource, ResourceRequestInfo>>>();
+
+  protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
+      new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
+  protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+  
+  public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
+    super(AMRMClientImpl.class.getName());
+    this.appAttemptId = appAttemptId;
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    super.init(conf);
+  }
+
+  @Override
+  public synchronized void start() {
+    final YarnConfiguration conf = new YarnConfiguration(getConfig());
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress rmAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      String tokenURLEncodedStr = System.getenv().get(
+          ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+
+      SecurityUtil.setTokenService(token, rmAddress);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is " + token);
+      }
+      currentUser.addToken(token);
+    }
+
+    rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
+            conf);
+      }
+    });
+    LOG.debug("Connecting to ResourceManager at " + rmAddress);
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (this.rmClient != null) {
+      RPC.stopProxy(this.rmClient);
+    }
+    super.stop();
+  }
+  
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl)
+      throws YarnRemoteException {
+    // do this only once ???
+    RegisterApplicationMasterRequest request = recordFactory
+        .newRecordInstance(RegisterApplicationMasterRequest.class);
+    synchronized (this) {
+      request.setApplicationAttemptId(appAttemptId);      
+    }
+    request.setHost(appHostName);
+    request.setRpcPort(appHostPort);
+    if(appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    RegisterApplicationMasterResponse response = rmClient
+        .registerApplicationMaster(request);
+    return response;
+  }
+
+  @Override
+  public AllocateResponse allocate(float progressIndicator) 
+      throws YarnRemoteException {
+    AllocateResponse allocateResponse = null;
+    ArrayList<ResourceRequest> askList = null;
+    ArrayList<ContainerId> releaseList = null;
+    AllocateRequest allocateRequest = null;
+    
+    try {
+      synchronized (this) {
+        askList = new ArrayList<ResourceRequest>(ask);
+        releaseList = new ArrayList<ContainerId>(release);
+        // optimistically clear this collection assuming no RPC failure
+        ask.clear();
+        release.clear();
+        allocateRequest = BuilderUtils
+            .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
+                askList, releaseList);
+      }
+
+      allocateResponse = rmClient.allocate(allocateRequest);
+
+      synchronized (this) {
+        // update these on successful RPC
+        clusterNodeCount = allocateResponse.getNumClusterNodes();
+        lastResponseId = allocateResponse.getResponseId();
+        clusterAvailableResources = allocateResponse.getAvailableResources();
+      }
+    } finally {
+      // TODO how to differentiate remote yarn exception vs error in rpc
+      if(allocateResponse == null) {
+        // we hit an exception in allocate()
+        // preserve ask and release for next call to allocate()
+        synchronized (this) {
+          release.addAll(releaseList);
+          // requests could have been added or deleted during call to allocate
+          // If requests were added/removed then there is nothing to do since
+          // the ResourceRequest object in ask would have the actual new value.
+          // If ask does not have this ResourceRequest then it was unchanged and
+          // so we can add the value back safely.
+          // This assumes that there will no concurrent calls to allocate() and
+          // so we dont have to worry about ask being changed in the
+          // synchronized block at the beginning of this method.
+          for(ResourceRequest oldAsk : askList) {
+            if(!ask.contains(oldAsk)) {
+              ask.add(oldAsk);
+            }
+          }
+        }
+      }
+    }
+    return allocateResponse;
+  }
+
+  @Override
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+      String appMessage, String appTrackingUrl) throws YarnRemoteException {
+    FinishApplicationMasterRequest request = recordFactory
+                  .newRecordInstance(FinishApplicationMasterRequest.class);
+    request.setAppAttemptId(appAttemptId);
+    request.setFinishApplicationStatus(appStatus);
+    if(appMessage != null) {
+      request.setDiagnostics(appMessage);
+    }
+    if(appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    rmClient.finishApplicationMaster(request);
+  }
+  
+  @Override
+  public synchronized void addContainerRequest(ContainerRequest<T> req) {
+    // Create resource requests
+    // add check for dup locations
+    if(req.hosts != null) {
+      for (String host : req.hosts) {
+        addResourceRequest(req.priority, host, req.capability, req.containerCount, req);
+      }
+    }
+
+    if(req.racks != null) {
+      for (String rack : req.racks) {
+        addResourceRequest(req.priority, rack, req.capability, req.containerCount, req);
+      }
+    }
+
+    // Off-switch
+    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, req.containerCount, req); 
+  }
+
+  @Override
+  public synchronized void removeContainerRequest(ContainerRequest<T> req) {
+    // Update resource requests
+    if(req.hosts != null) {
+      for (String hostName : req.hosts) {
+        decResourceRequest(req.priority, hostName, req.capability, req.containerCount, req);
+      }
+    }
+    
+    if(req.racks != null) {
+      for (String rack : req.racks) {
+        decResourceRequest(req.priority, rack, req.capability, req.containerCount, req);
+      }
+    }
+   
+    decResourceRequest(req.priority, ResourceRequest.ANY, req.capability, req.containerCount, req);
+    
+  }
+
+  @Override
+  public synchronized void releaseAssignedContainer(ContainerId containerId) {
+    release.add(containerId);
+  }
+  
+  @Override
+  public synchronized Resource getClusterAvailableResources() {
+    return clusterAvailableResources;
+  }
+  
+  @Override
+  public synchronized int getClusterNodeCount() {
+    return clusterNodeCount;
+  }
+  
+  public synchronized Collection<ContainerRequest<T>> getMatchingRequests(
+                                          Priority priority, 
+                                          String resourceName, 
+                                          Resource capability) {
+    Map<String, Map<Resource, ResourceRequestInfo>> remoteRequests = 
+        this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      return null;
+    }
+    Map<Resource, ResourceRequestInfo> reqMap = remoteRequests
+        .get(resourceName);
+    if (reqMap == null) {
+      return null;
+    }
+    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+    if (resourceRequestInfo == null) {
+      return null;
+    }
+    
+    return resourceRequestInfo.containerRequests;
+  }
+  
+  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+    // This code looks weird but is needed because of the following scenario.
+    // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
+    // request is added to 'ask' to notify the RM about not needing it any more.
+    // Before the call to allocate, the user now requests more containers. If 
+    // the locations of the 0 size request and the new request are the same
+    // (with the difference being only container count), then the set comparator
+    // will consider both to be the same and not add the new request to ask. So 
+    // we need to check for the "same" request being present and remove it and 
+    // then add it back. The comparator is container count agnostic.
+    // This should happen only rarely but we do need to guard against it.
+    if(ask.contains(remoteRequest)) {
+      ask.remove(remoteRequest);
+    }
+    ask.add(remoteRequest);
+  }
+
+  private void addResourceRequest(Priority priority, String resourceName,
+      Resource capability, int containerCount, ContainerRequest<T> req) {
+    Map<String, Map<Resource, ResourceRequestInfo>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      remoteRequests = new HashMap<String, Map<Resource, ResourceRequestInfo>>();
+      this.remoteRequestsTable.put(priority, remoteRequests);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added priority=" + priority);
+      }
+    }
+    Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      reqMap = new HashMap<Resource, ResourceRequestInfo>();
+      remoteRequests.put(resourceName, reqMap);
+    }
+    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+    if (resourceRequestInfo == null) {
+      resourceRequestInfo =
+          new ResourceRequestInfo(priority, resourceName, capability);
+      reqMap.put(capability, resourceRequestInfo);
+    }
+    
+    resourceRequestInfo.remoteRequest.setNumContainers(
+         resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
+    resourceRequestInfo.containerRequests.add(req);
+
+    // Note this down for next interaction with ResourceManager
+    addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + resourceRequestInfo.remoteRequest.getNumContainers() 
+          + " #asks=" + ask.size());
+    }
+  }
+
+  private void decResourceRequest(Priority priority, 
+                                   String resourceName,
+                                   Resource capability, 
+                                   int containerCount, 
+                                   ContainerRequest<T> req) {
+    Map<String, Map<Resource, ResourceRequestInfo>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    
+    if(remoteRequests == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as priority " + priority 
+            + " is not present in request table");
+      }
+      return;
+    }
+    
+    Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as " + resourceName
+            + " is not present in request table");
+      }
+      return;
+    }
+    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + resourceRequestInfo.remoteRequest.getNumContainers() 
+          + " #asks=" + ask.size());
+    }
+
+    resourceRequestInfo.remoteRequest.setNumContainers(
+        resourceRequestInfo.remoteRequest.getNumContainers() - containerCount);
+    resourceRequestInfo.containerRequests.remove(req);
+    if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
+      // guard against spurious removals
+      resourceRequestInfo.remoteRequest.setNumContainers(0);
+    }
+    // send the ResourceRequest to RM even if is 0 because it needs to override
+    // a previously sent value. If ResourceRequest was not sent previously then
+    // sending 0 aught to be a no-op on RM
+    addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
+
+    // delete entries from map if no longer needed
+    if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
+      reqMap.remove(capability);
+      if (reqMap.size() == 0) {
+        remoteRequests.remove(resourceName);
+      }
+      if (remoteRequests.size() == 0) {
+        remoteRequestsTable.remove(priority);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info("AFTER decResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + resourceRequestInfo.remoteRequest.getNumContainers() 
+          + " #asks=" + ask.size());
+    }
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+
+public class AMSchedulerEvent extends AbstractEvent<AMSchedulerEventType> {
+
+  // TODO Not a very useful class...
+  public AMSchedulerEvent(AMSchedulerEventType type) {
+    super(type);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventContainerCompleted.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventContainerCompleted.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventContainerCompleted.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventContainerCompleted.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventContainerCompleted extends AMSchedulerEvent {
+
+  private final ContainerId containerId;
+  
+  public AMSchedulerEventContainerCompleted(ContainerId containerId) {
+    super(AMSchedulerEventType.S_CONTAINER_COMPLETED);;
+    this.containerId = containerId;
+  }
+  
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventContainerCompleted.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventContainersAllocated.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventContainersAllocated.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventContainersAllocated.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventContainersAllocated.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,48 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventContainersAllocated extends AMSchedulerEvent {
+
+  private final List<ContainerId> containerIds;
+  private final boolean headRoomChanged;
+
+  // TODO Maybe distinguish between newly allocated containers and 
+  // existing containers being re-used.
+  // headRoomChanged is a strange API - making an assumption about how the
+  // scheduler will use this info.
+  public AMSchedulerEventContainersAllocated(List<ContainerId> containerIds,
+      boolean headRoomChanged) {
+    super(AMSchedulerEventType.S_CONTAINERS_ALLOCATED);
+    this.containerIds = containerIds;
+    this.headRoomChanged = headRoomChanged;
+  }
+
+  public List<ContainerId> getContainerIds() {
+    return this.containerIds;
+  }
+
+  public boolean didHeadroomChange() {
+    return headRoomChanged;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventContainersAllocated.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventDeallocateContainer extends AMSchedulerEvent {
+
+  private final ContainerId containerId;
+  
+  public AMSchedulerEventDeallocateContainer(ContainerId containerId) {
+    super(AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+    this.containerId = containerId;
+  }
+  
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+}
+

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMSchedulerEventNodeBlacklisted extends AMSchedulerEvent {
+
+  private final NodeId nodeId;
+
+  public AMSchedulerEventNodeBlacklisted(NodeId nodeId) {
+    super(AMSchedulerEventType.S_NODE_BLACKLISTED);
+    this.nodeId = nodeId;
+  }
+
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
+
+  private final TaskAttempt attempt;
+  private final ContainerId containerId;
+  private TaskAttemptState state;
+
+  public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId,
+      TaskAttemptState state) {
+    super(AMSchedulerEventType.S_TA_ENDED);
+    this.attempt = attempt;
+    this.containerId = containerId;
+    this.state = state;
+  }
+
+  public TezTaskAttemptID getAttemptID() {
+    return this.attempt.getID();
+  }
+
+  public TaskAttempt getAttempt() {
+    return this.attempt;
+  }
+
+  public TaskAttemptState getState() {
+    return this.state;
+  }
+
+  public ContainerId getUsedContainerId() {
+    return this.containerId;
+  }
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.Map;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
+
+  // TODO Get rid of remoteTask from here. Can be forgottent after it has been assigned.
+  //.... Maybe have the Container talk to the TaskAttempt to pull in the remote task.
+  
+  private final TezTaskAttemptID attemptId;
+  private final Resource capability;
+  private final Map<String, LocalResource> localResources;
+  private final TezTask remoteTaskContext;
+  private final TaskAttempt taskAttempt;
+  private final Credentials credentials;
+  private Token<JobTokenIdentifier> jobToken;
+  private final String[] hosts;
+  private final String[] racks;
+  private final Priority priority;
+  private final Map<String, String> environment;
+  
+  
+  public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId,
+      Resource capability,
+      Map<String, LocalResource> localResources,
+      TezTask remoteTaskContext, TaskAttempt ta,
+      Credentials credentials, Token<JobTokenIdentifier> jobToken,
+      String[] hosts, String[] racks, Priority priority,
+      Map<String, String> environment) {
+    super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
+    this.attemptId = attemptId;
+    this.capability = capability;
+    this.localResources = localResources;
+    this.remoteTaskContext = remoteTaskContext;
+    this.taskAttempt = ta;
+    this.credentials = credentials;
+    this.jobToken = jobToken;
+    this.hosts = hosts;
+    this.racks = racks;
+    this.priority = priority;
+    this.environment = environment;
+  }
+
+  public TezTaskAttemptID getAttemptID() {
+    return this.attemptId;
+  }
+
+  public Resource getCapability() {
+    return capability;
+  }
+
+  public String[] getHosts() {
+    return hosts;
+  }
+  
+  public String[] getRacks() {
+    return racks;
+  }
+  
+  public Priority getPriority() {
+    return priority;
+  }
+  
+  public TezTask getRemoteTaskContext() {
+    return remoteTaskContext;
+  }
+  
+  public TaskAttempt getTaskAttempt() {
+    return this.taskAttempt;
+  }
+  
+  public Credentials getCredentials() {
+    return this.credentials;
+  }
+  
+  public Token<JobTokenIdentifier> getJobToken() {
+    return this.jobToken;
+  }
+
+  public Map<String, LocalResource> getLocalResources() {
+    return this.localResources;
+  }
+  
+  public Map<String, String> getEnvironment() {
+    return this.environment;
+  }
+
+  // Parameter replacement: @taskid@ will not be usable
+  // ProfileTaskRange not available along with ContainerReUse
+
+  /*Requirements to determine a container request.
+   * + Data-local + Rack-local hosts.
+   * + Resource capability
+   * + Env - mapreduce.map.env / mapreduce.reduce.env can change. M/R log level. 
+   * - JobConf and JobJar file - same location.
+   * - Distributed Cache - identical for map / reduce tasks at the moment.
+   * - Credentials, tokens etc are identical.
+   * + Command - dependent on map / reduce java.opts
+   */
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+public enum AMSchedulerEventType {
+  //Producer: TaskAttempt
+  S_TA_LAUNCH_REQUEST,
+  S_TA_ENDED, // Annotated with FAILED/KILLED/SUCCEEDED.
+
+  //Producer: RMCommunicator
+  S_CONTAINERS_ALLOCATED,
+
+  //Producer: Container. (Maybe RMCommunicator)
+  S_CONTAINER_COMPLETED,
+
+  //Producer: Node
+  S_NODE_BLACKLISTED,
+  S_NODE_UNHEALTHY,
+  S_NODE_HEALTHY,
+  // The scheduler should have a way of knowing about unusable nodes. Acting on
+  // this information to change requests etc is scheduler specific.
+  
+  // Producer : AMContainer
+  S_CONTAINER_DEALLOCATE
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerAllocator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerAllocator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerAllocator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerAllocator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+// TODO XXX Rename to AMScheduler.
+public interface ContainerAllocator extends EventHandler<AMSchedulerEvent>{
+
+//  enum EventType {
+//
+//    CONTAINER_REQ,
+//    CONTAINER_DEALLOCATE,
+//    CONTAINER_FAILED
+//  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerAllocator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerRequestor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerRequestor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerRequestor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.rm.RMContainerRequestor.ContainerRequest;
+
+public interface ContainerRequestor extends EventHandler<RMCommunicatorEvent> {
+  public Resource getAvailableResources();
+
+  public void addContainerReq(ContainerRequest req);
+
+  public void decContainerReq(ContainerRequest req);
+  
+  public Map<ApplicationAccessType, String> getApplicationACLs();
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerRequestor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType> {
+
+  private final ContainerId containerId;
+  private final NodeId nodeId;
+  private final ContainerToken containerToken;
+
+  public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId,
+      ContainerToken containerToken, NMCommunicatorEventType type) {
+    super(type);
+    this.containerId = containerId;
+    this.nodeId = nodeId;
+    this.containerToken = containerToken;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
+
+  public ContainerToken getContainerToken() {
+    return this.containerToken;
+  }
+  
+  public String toSrting() {
+    return super.toString() + " for container " + containerId + ", nodeId: "
+        + nodeId;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((containerId == null) ? 0 : containerId.hashCode());
+    result = prime * result
+        + ((containerToken == null) ? 0 : containerToken.hashCode());
+    result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    NMCommunicatorEvent other = (NMCommunicatorEvent) obj;
+    if (containerId == null) {
+      if (other.containerId != null)
+        return false;
+    } else if (!containerId.equals(other.containerId))
+      return false;
+    if (containerToken == null) {
+      if (other.containerToken != null)
+        return false;
+    } else if (!containerToken.equals(other.containerToken))
+      return false;
+    if (nodeId == null) {
+      if (other.nodeId != null)
+        return false;
+    } else if (!nodeId.equals(other.nodeId))
+      return false;
+    return true;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+// TODO - Re-use the events in ContainerLauncher..
+public enum NMCommunicatorEventType {
+  CONTAINER_LAUNCH_REQUEST,
+  CONTAINER_STOP_REQUEST
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+
+public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
+
+  private final ContainerLaunchContext clc;
+  private final Container container;
+
+  public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc,
+      Container container) {
+    super(container.getId(), container.getNodeId(), container
+        .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+    this.clc = clc;
+    this.container = container;
+  }
+
+  public ContainerLaunchContext getContainerLaunchContext() {
+    return this.clc;
+  }
+
+  public Container getContainer() {
+    return container;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent {
+
+  public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId,
+      ContainerToken containerToken) {
+    super(containerId, nodeId, containerToken,
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,293 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.client.ClientService;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * Registers/unregisters to RM and sends heartbeats to RM.
+ */
+public abstract class RMCommunicator extends AbstractService  {
+  private static final Log LOG = LogFactory.getLog(RMCommunicator.class);
+  private int rmPollInterval;//millis
+  protected ApplicationId applicationId;
+  protected ApplicationAttemptId applicationAttemptId;
+  private AtomicBoolean stopped;
+  protected Thread allocatorThread;
+  @SuppressWarnings("rawtypes")
+  protected EventHandler eventHandler;
+  protected AMRMProtocol scheduler;
+  private final ClientService clientService;
+  protected int lastResponseID;
+  private Resource minContainerCapability;
+  private Resource maxContainerCapability;
+  protected Map<ApplicationAccessType, String> applicationACLs;
+
+  protected final AppContext context;
+  private DAG job;
+  // Has a signal (SIGTERM etc) been issued?
+  protected volatile boolean isSignalled = false;
+
+  public RMCommunicator(ClientService clientService, AppContext context) {
+    super(RMCommunicator.class.getSimpleName());
+    this.clientService = clientService;
+    this.context = context;
+    this.eventHandler = context.getEventHandler();
+    this.applicationId = context.getApplicationID();
+    this.applicationAttemptId = context.getApplicationAttemptId();
+    this.stopped = new AtomicBoolean(false);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    rmPollInterval =
+        conf.getInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS,
+            MRJobConfig.DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS);
+  }
+
+  @Override
+  public void start() {
+    scheduler= createSchedulerProxy();
+    register();
+    startAllocatorThread();
+    job = context.getDAG();
+    super.start();
+  }
+
+  protected AppContext getContext() {
+    return context;
+  }
+
+  protected DAG getJob() {
+    return job;
+  }
+
+  /**
+   * Get the appProgress. Can be used only after this component is started.
+   * @return the appProgress.
+   */
+  protected float getApplicationProgress() {
+    // For now just a single job. In future when we have a DAG, we need an
+    // aggregate progress.
+    return this.job.getProgress();
+  }
+
+  // TODO EVENTUALLY. get rid of this, when RMComma dn RMContainerRequestor are
+  // collapsed.
+  public Map<ApplicationAccessType, String> getApplicationAcls() {
+    return this.applicationACLs;
+  }
+  
+  // TODO (After 3902): Get rid of the dependencies on the ClientService.
+  protected void register() {
+    //Register
+    InetSocketAddress serviceAddr = clientService.getBindAddress();
+    try {
+      RegisterApplicationMasterRequest request = Records
+          .newRecord(RegisterApplicationMasterRequest.class);
+      request.setApplicationAttemptId(applicationAttemptId);
+      request.setHost(serviceAddr.getHostName());
+      request.setRpcPort(serviceAddr.getPort());
+      request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+      RegisterApplicationMasterResponse response =
+        scheduler.registerApplicationMaster(request);
+      minContainerCapability = response.getMinimumResourceCapability();
+      maxContainerCapability = response.getMaximumResourceCapability();
+      this.context.getClusterInfo().setMinContainerCapability(
+          minContainerCapability);
+      this.context.getClusterInfo().setMaxContainerCapability(
+          maxContainerCapability);
+      this.applicationACLs = response.getApplicationACLs();
+      LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
+      LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
+    } catch (Exception are) {
+      LOG.error("Exception while registering", are);
+      throw new YarnException(are);
+    }
+  }
+
+  protected void unregister() {
+    try {
+      FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+      if (job.getState() == DAGState.SUCCEEDED) {
+        finishState = FinalApplicationStatus.SUCCEEDED;
+      } else if (job.getState() == DAGState.KILLED
+          || (job.getState() == DAGState.RUNNING && isSignalled)) {
+        finishState = FinalApplicationStatus.KILLED;
+      } else if (job.getState() == DAGState.FAILED
+          || job.getState() == DAGState.ERROR) {
+        finishState = FinalApplicationStatus.FAILED;
+      }
+      StringBuffer sb = new StringBuffer();
+      for (String s : job.getDiagnostics()) {
+        sb.append(s).append("\n");
+      }
+      LOG.info("Setting job diagnostics to " + sb.toString());
+
+      String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
+          context.getApplicationID());
+      LOG.info("History url is " + historyUrl);
+
+      FinishApplicationMasterRequest request = Records
+          .newRecord(FinishApplicationMasterRequest.class);
+      request.setAppAttemptId(this.applicationAttemptId);
+      request.setFinishApplicationStatus(finishState);
+      request.setDiagnostics(sb.toString());
+      request.setTrackingUrl(historyUrl);
+      scheduler.finishApplicationMaster(request);
+    } catch(Exception are) {
+      LOG.error("Exception while unregistering ", are);
+    }
+  }
+
+  protected Resource getMinContainerCapability() {
+    return minContainerCapability;
+  }
+
+  protected Resource getMaxContainerCapability() {
+    return maxContainerCapability;
+  }
+
+  @Override
+  public void stop() {
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
+    allocatorThread.interrupt();
+    try {
+      allocatorThread.join();
+    } catch (InterruptedException ie) {
+      LOG.warn("InterruptedException while stopping", ie);
+    }
+    unregister();
+    super.stop();
+  }
+
+  protected void startAllocatorThread() {
+    allocatorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            Thread.sleep(rmPollInterval);
+            try {
+              heartbeat();
+            } catch (YarnException e) {
+              LOG.error("Error communicating with RM: " + e.getMessage() , e);
+              return;
+            } catch (Exception e) {
+              LOG.error("ERROR IN CONTACTING RM. ", e);
+              // TODO: for other exceptions
+            }
+          } catch (InterruptedException e) {
+            LOG.warn("Allocated thread interrupted. Returning.");
+            return;
+          }
+        }
+      }
+    });
+    allocatorThread.setName("RMCommunicator");
+    allocatorThread.start();
+  }
+
+  protected AMRMProtocol createSchedulerProxy() {
+    final Configuration conf = getConfig();
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress serviceAddr = conf.getSocketAddr(
+        YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      String tokenURLEncodedStr = System.getenv().get(
+          ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+
+      SecurityUtil.setTokenService(token, serviceAddr);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is " + token);
+      }
+      currentUser.addToken(token);
+    }
+
+    return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+            serviceAddr, conf);
+      }
+    });
+  }
+
+  protected abstract void heartbeat() throws Exception;
+
+  public void setSignalled(boolean isSignalled) {
+    this.isSignalled = isSignalled;
+    LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMCommunicatorEvent extends AbstractEvent<RMCommunicatorEventType> {
+
+  public RMCommunicatorEvent(RMCommunicatorEventType type) {
+    super(type);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventContainerDeAllocateRequest.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventContainerDeAllocateRequest.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventContainerDeAllocateRequest.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventContainerDeAllocateRequest.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class RMCommunicatorEventContainerDeAllocateRequest extends
+    RMCommunicatorEvent {
+
+  private final ContainerId containerId;
+  
+  public RMCommunicatorEventContainerDeAllocateRequest(ContainerId containerId) {
+    super(RMCommunicatorEventType.CONTAINER_DEALLOCATE);
+    this.containerId = containerId;
+  }
+  
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventContainerDeAllocateRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,23 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+public enum RMCommunicatorEventType {
+  CONTAINER_DEALLOCATE,
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventType.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message