tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-23. TaskScheduler should depend on official YARN AMRMClient* (bikas)
Date Sat, 01 Jun 2013 09:23:41 GMT
Updated Branches:
  refs/heads/TEZ-1 a3e2c2f57 -> eeaaf232f


TEZ-23. TaskScheduler should depend on official YARN AMRMClient* (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/eeaaf232
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/eeaaf232
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/eeaaf232

Branch: refs/heads/TEZ-1
Commit: eeaaf232f33a0b650ded7540c711d9d694e3157f
Parents: a3e2c2f
Author: Bikas Saha <bikas@apache.org>
Authored: Sat Jun 1 02:20:30 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Sat Jun 1 02:20:30 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/dag/app/rm/AMRMClient.java |  182 ------
 .../org/apache/tez/dag/app/rm/AMRMClientAsync.java |  385 -------------
 .../org/apache/tez/dag/app/rm/AMRMClientImpl.java  |  447 ---------------
 .../org/apache/tez/dag/app/rm/TaskScheduler.java   |   91 ++--
 .../tez/dag/app/rm/TaskSchedulerEventHandler.java  |    5 +-
 .../apache/tez/dag/app/rm/TestTaskScheduler.java   |  103 +++-
 6 files changed, 131 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eeaaf232/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClient.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClient.java
deleted file mode 100644
index 7e098f1..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClient.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.service.Service;
-
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service {
-
-  /**
-   * Object to represent container request for resources.
-   * Resources may be localized to nodes and racks.
-   * Resources may be assigned priorities.
-   * Can ask for multiple containers of a given type.
-   */
-  public static class ContainerRequest {
-    Resource capability;
-    String[] hosts;
-    String[] racks;
-    Priority priority;
-    int containerCount;
-        
-    public ContainerRequest(Resource capability, String[] hosts,
-        String[] racks, Priority priority, int containerCount) {
-      this.capability = capability;
-      this.hosts = (hosts != null ? hosts.clone() : null);
-      this.racks = (racks != null ? racks.clone() : null);
-      this.priority = priority;
-      this.containerCount = containerCount;
-    }
-    
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("Capability[").append(capability).append("]");
-      sb.append("Priority[").append(priority).append("]");
-      sb.append("ContainerCount[").append(containerCount).append("]");
-      return sb.toString();
-    }
-  }
-  
-  public static class StoredContainerRequest <T> extends ContainerRequest {
-    T cookie;
-    
-    public StoredContainerRequest(Resource capability, String[] hosts,
-        String[] racks, Priority priority) {
-      super(capability, hosts, racks, priority, 1);
-    }
-    
-    void setCookie(T cookie) {
-      this.cookie = cookie;
-    }
-    
-    T getCookie() {
-      return cookie;
-    }    
-  }
-  
-  /**
-   * Register the application master. This must be called before any 
-   * other interaction
-   * @param appHostName Name of the host on which master is running
-   * @param appHostPort Port master is listening on
-   * @param appTrackingUrl URL at which the master info can be seen
-   * @return <code>RegisterApplicationMasterResponse</code>
-   * @throws YarnRemoteException
-   * @throws IOException
-   */
-  public RegisterApplicationMasterResponse 
-               registerApplicationMaster(String appHostName,
-                                         int appHostPort,
-                                         String appTrackingUrl) 
-               throws YarnRemoteException, IOException;
-  
-  /**
-   * Request additional containers and receive new container allocations.
-   * Requests made via <code>addContainerRequest</code> are sent to the 
-   * <code>ResourceManager</code>. New containers assigned to the master are 
-   * retrieved. Status of completed containers and node health updates are 
-   * also retrieved.
-   * This also doubles up as a heartbeat to the ResourceManager and must be 
-   * made periodically.
-   * The call may not always return any new allocations of containers.
-   * App should not make concurrent allocate requests. May cause request loss.
-   * @param progressIndicator Indicates progress made by the master
-   * @return the response of the allocate request
-   * @throws YarnRemoteException
-   * @throws IOException
-   */
-  public AllocateResponse allocate(float progressIndicator) 
-                           throws YarnRemoteException, IOException;
-  
-  /**
-   * Unregister the application master. This must be called in the end.
-   * @param appStatus Success/Failure status of the master
-   * @param appMessage Diagnostics message on failure
-   * @param appTrackingUrl New URL to get master info
-   * @throws YarnRemoteException
-   * @throws IOException
-   */
-  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
-                                           String appMessage,
-                                           String appTrackingUrl) 
-               throws YarnRemoteException, IOException;
-  
-  /**
-   * Request containers for resources before calling <code>allocate</code>
-   * @param req Resource request
-   */
-  public void addContainerRequest(T req);
-  
-  /**
-   * Remove previous container request. The previous container request may have 
-   * already been sent to the ResourceManager. So even after the remove request 
-   * the app must be prepared to receive an allocation for the previous request 
-   * even after the remove request
-   * @param req Resource request
-   */
-  public void removeContainerRequest(T req);
-  
-  /**
-   * Release containers assigned by the Resource Manager. If the app cannot use
-   * the container or wants to give up the container then it can release them.
-   * The app needs to make new requests for the released resource capability if
-   * it still needs it. eg. it released non-local resources
-   * @param containerId
-   */
-  public void releaseAssignedContainer(ContainerId containerId);
-  
-  /**
-   * Get the currently available resources in the cluster.
-   * A valid value is available after a call to allocate has been made
-   * @return Currently available resources
-   */
-  public Resource getClusterAvailableResources();
-  
-  /**
-   * Get the current number of nodes in the cluster.
-   * A valid values is available after a call to allocate has been made
-   * @return Current number of nodes in the cluster
-   */
-  public int getClusterNodeCount();
-
-  /**
-   * Get outstanding <code>StoredContainerRequest</code>s matching the given 
-   * parameters. These StoredContainerRequests should have been added via
-   * <code>addContainerRequest</code> earlier in the lifecycle.
-   */
-  public Collection<T> getMatchingRequests(
-                                     Priority priority, 
-                                     String resourceName, 
-                                     Resource capability);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eeaaf232/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientAsync.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientAsync.java
deleted file mode 100644
index b38183c..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientAsync.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tez.dag.app.rm.AMRMClient.ContainerRequest;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * <code>AMRMClientAsync</code> handles communication with the ResourceManager
- * and provides asynchronous updates on events such as container allocations and
- * completions.  It contains a thread that sends periodic heartbeats to the
- * ResourceManager.
- * 
- * It should be used by implementing a CallbackHandler:
- * <pre>
- * {@code
- * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
- *   public void onContainersAllocated(List<Container> containers) {
- *     [run tasks on the containers]
- *   }
- *   
- *   public void onContainersCompleted(List<ContainerStatus> statuses) {
- *     [update progress, check whether app is done]
- *   }
- *   
- *   public void onNodesUpdated(List<NodeReport> updated) {}
- *   
- *   public void onReboot() {}
- * }
- * }
- * </pre>
- * 
- * The client's lifecycle should be managed similarly to the following:
- * 
- * <pre>
- * {@code
- * AMRMClientAsync asyncClient = new AMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
- * asyncClient.init(conf);
- * asyncClient.start();
- * RegisterApplicationMasterResponse response = asyncClient
- *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
- *       appMasterTrackingUrl);
- * asyncClient.addContainerRequest(containerRequest);
- * [... wait for application to complete]
- * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
- * asyncClient.stop();
- * }
- * </pre>
- */
-@Unstable
-@Evolving
-public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService {
-  
-  private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
-  
-  private final AMRMClient<T> client;
-  private final int intervalMs;
-  private final HeartbeatThread heartbeatThread;
-  private final CallbackHandlerThread handlerThread;
-  private final CallbackHandler handler;
-
-  private final BlockingQueue<AllocateResponse> responseQueue;
-  
-  private volatile boolean keepRunning;
-  private volatile float progress;
-  
-  private volatile Exception savedException;
-  
-  public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
-      CallbackHandler callbackHandler) {
-    this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
-  }
-  
-  @Private
-  @VisibleForTesting
-  public AMRMClientAsync(AMRMClient<T> client, int intervalMs,
-      CallbackHandler callbackHandler) {
-    super(AMRMClientAsync.class.getName());
-    this.client = client;
-    this.intervalMs = intervalMs;
-    handler = callbackHandler;
-    heartbeatThread = new HeartbeatThread();
-    handlerThread = new CallbackHandlerThread();
-    responseQueue = new LinkedBlockingQueue<AllocateResponse>();
-    keepRunning = true;
-    savedException = null;
-  }
-    
-  @Override
-  public void init(Configuration conf) {
-    super.init(conf);
-    client.init(conf);
-  }  
-  
-  @Override
-  public void start() {
-    handlerThread.start();
-    client.start();
-    super.start();
-  }
-  
-  /**
-   * Tells the heartbeat and handler threads to stop and waits for them to
-   * terminate.  Calling this method from the callback handler thread would cause
-   * deadlock, and thus should be avoided.
-   */
-  @Override
-  public void stop() {
-    if (Thread.currentThread() == handlerThread) {
-      throw new YarnException("Cannot call stop from callback handler thread!");
-    }
-    keepRunning = false;
-    try {
-      heartbeatThread.join();
-    } catch (InterruptedException ex) {
-      LOG.error("Error joining with heartbeat thread", ex);
-    }
-    client.stop();
-    try {
-      handlerThread.interrupt();
-      handlerThread.join();
-    } catch (InterruptedException ex) {
-      LOG.error("Error joining with hander thread", ex);
-    }
-    super.stop();
-  }
-  
-  public Collection<T> getMatchingRequests(
-                                     Priority priority, 
-                                     String resourceName, 
-                                     Resource capability) {
-    return client.getMatchingRequests(priority, resourceName, capability);
-  }
-  
-  /**
-   * Registers this application master with the resource manager. On successful
-   * registration, starts the heartbeating thread.
-   * @throws YarnRemoteException
-   * @throws IOException
-   */
-  public RegisterApplicationMasterResponse registerApplicationMaster(
-      String appHostName, int appHostPort, String appTrackingUrl)
-      throws YarnRemoteException, IOException {
-    RegisterApplicationMasterResponse response =
-        client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
-    heartbeatThread.start();
-    return response;
-  }
-
-  /**
-   * Unregister the application master. This must be called in the end.
-   * @param appStatus Success/Failure status of the master
-   * @param appMessage Diagnostics message on failure
-   * @param appTrackingUrl New URL to get master info
-   * @throws YarnRemoteException
-   * @throws IOException
-   */
-  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
-      String appMessage, String appTrackingUrl) throws YarnRemoteException, IOException {
-    synchronized (client) {
-      keepRunning = false;
-      client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
-    }
-  }
-
-  /**
-   * Request containers for resources before calling <code>allocate</code>
-   * @param req Resource request
-   */
-  public void addContainerRequest(T req) {
-    client.addContainerRequest(req);
-  }
-
-  /**
-   * Remove previous container request. The previous container request may have 
-   * already been sent to the ResourceManager. So even after the remove request 
-   * the app must be prepared to receive an allocation for the previous request 
-   * even after the remove request
-   * @param req Resource request
-   */
-  public void removeContainerRequest(T req) {
-    client.removeContainerRequest(req);
-  }
-
-  /**
-   * Release containers assigned by the Resource Manager. If the app cannot use
-   * the container or wants to give up the container then it can release them.
-   * The app needs to make new requests for the released resource capability if
-   * it still needs it. eg. it released non-local resources
-   * @param containerId
-   */
-  public void releaseAssignedContainer(ContainerId containerId) {
-    client.releaseAssignedContainer(containerId);
-  }
-
-  /**
-   * Get the currently available resources in the cluster.
-   * A valid value is available after a call to allocate has been made
-   * @return Currently available resources
-   */
-  public Resource getClusterAvailableResources() {
-    return client.getClusterAvailableResources();
-  }
-
-  /**
-   * Get the current number of nodes in the cluster.
-   * A valid values is available after a call to allocate has been made
-   * @return Current number of nodes in the cluster
-   */
-  public int getClusterNodeCount() {
-    return client.getClusterNodeCount();
-  }
-  
-  private class HeartbeatThread extends Thread {
-    public HeartbeatThread() {
-      super("AMRM Heartbeater thread");
-    }
-    
-    public void run() {
-      while (true) {
-        AllocateResponse response = null;
-        // synchronization ensures we don't send heartbeats after unregistering
-        synchronized (client) {
-          if (!keepRunning) {
-            break;
-          }
-            
-          try {
-            response = client.allocate(progress);
-          } catch (YarnRemoteException ex) {
-            LOG.error("Yarn exception on heartbeat", ex);
-            savedException = ex;
-            // interrupt handler thread in case it waiting on the queue
-            handlerThread.interrupt();
-            break;
-          } catch (IOException e) {
-            LOG.error("IO exception on heartbeat", e);
-            savedException = e;
-            // interrupt handler thread in case it waiting on the queue
-            handlerThread.interrupt();
-            break;
-          }
-        }
-        if (response != null) {
-          while (true) {
-            try {
-              responseQueue.put(response);
-              break;
-            } catch (InterruptedException ex) {
-              LOG.warn("Interrupted while waiting to put on response queue", ex);
-            }
-          }
-        }
-        
-        try {
-          Thread.sleep(intervalMs);
-        } catch (InterruptedException ex) {
-          LOG.warn("Heartbeater interrupted", ex);
-        }
-      }
-    }
-  }
-  
-  private class CallbackHandlerThread extends Thread {
-    public CallbackHandlerThread() {
-      super("AMRM Callback Handler Thread");
-    }
-    
-    public void run() {
-      while (keepRunning) {
-        AllocateResponse response;
-        try {
-          if(savedException != null) {
-            LOG.error("Stopping callback due to: ", savedException);
-            handler.onError(savedException);
-            break;
-          }
-          response = responseQueue.take();
-        } catch (InterruptedException ex) {
-          LOG.info("Interrupted while waiting for queue", ex);
-          continue;
-        }
-
-        if (response.getReboot()) {
-          handler.onRebootRequest();
-          LOG.info("Reboot requested. Stopping callback.");
-          break;
-        }
-        List<NodeReport> updatedNodes = response.getUpdatedNodes();
-        if (!updatedNodes.isEmpty()) {
-          handler.onNodesUpdated(updatedNodes);
-        }
-        
-        List<ContainerStatus> completed =
-            response.getCompletedContainersStatuses();
-        if (!completed.isEmpty()) {
-          handler.onContainersCompleted(completed);
-        }
-
-        List<Container> allocated = response.getAllocatedContainers();
-        if (!allocated.isEmpty()) {
-          handler.onContainersAllocated(allocated);
-        }
-        
-        progress = handler.getProgress();
-      }
-    }
-  }
-  
-  public interface CallbackHandler {
-    
-    /**
-     * Called when the ResourceManager responds to a heartbeat with completed
-     * containers. If the response contains both completed containers and
-     * allocated containers, this will be called before containersAllocated.
-     */
-    public void onContainersCompleted(List<ContainerStatus> statuses);
-    
-    /**
-     * Called when the ResourceManager responds to a heartbeat with allocated
-     * containers. If the response containers both completed containers and
-     * allocated containers, this will be called after containersCompleted.
-     */
-    public void onContainersAllocated(List<Container> containers);
-    
-    /**
-     * Called when the ResourceManager wants the ApplicationMaster to reboot
-     * for being out of sync. The ApplicationMaster should not unregister with 
-     * the RM unless the ApplicationMaster wants to be the last attempt.
-     */
-    public void onRebootRequest();
-    
-    /**
-     * Called when nodes tracked by the ResourceManager have changed in health,
-     * availability etc.
-     */
-    public void onNodesUpdated(List<NodeReport> updatedNodes);
-    
-    public float getProgress();
-    
-    public void onError(Exception e);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eeaaf232/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientImpl.java
deleted file mode 100644
index 75b9ad2..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClientImpl.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-
-// TODO check inputs for null etc. YARN-654
-
-@Unstable
-public class AMRMClientImpl<T extends org.apache.tez.dag.app.rm.AMRMClient.ContainerRequest> 
-                          extends AbstractService implements AMRMClient<T> {
-
-  private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
-  
-  private final RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
-  
-  private int lastResponseId = 0;
-
-  protected AMRMProtocol rmClient;
-  protected final ApplicationAttemptId appAttemptId;  
-  protected Resource clusterAvailableResources;
-  protected int clusterNodeCount;
-  
-  class ResourceRequestInfo {
-    ResourceRequest remoteRequest;
-    HashSet<T> containerRequests;
-    
-    ResourceRequestInfo(Priority priority, String resourceName, Resource capability) {
-      remoteRequest = BuilderUtils.
-          newResourceRequest(priority, resourceName, capability, 0);
-      containerRequests = new HashSet<T>();
-    }
-  }
-  
-  //Key -> Priority
-  //Value -> Map
-  //Key->ResourceName (e.g., hostname, rackname, *)
-  //Value->Map
-  //Key->Resource Capability
-  //Value->ResourceRequest
-  protected final 
-  Map<Priority, Map<String, Map<Resource, ResourceRequestInfo>>>
-    remoteRequestsTable =
-    new TreeMap<Priority, Map<String, Map<Resource, ResourceRequestInfo>>>();
-
-  protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
-      new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
-  protected final Set<ContainerId> release = new TreeSet<ContainerId>();
-  
-  public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
-    super(AMRMClientImpl.class.getName());
-    this.appAttemptId = appAttemptId;
-  }
-
-  @Override
-  public synchronized void init(Configuration conf) {
-    super.init(conf);
-  }
-
-  @Override
-  public synchronized void start() {
-    final YarnConfiguration conf = new YarnConfiguration(getConfig());
-    final YarnRPC rpc = YarnRPC.create(conf);
-    final InetSocketAddress rmAddress = conf.getSocketAddr(
-        YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-
-    UserGroupInformation currentUser;
-    try {
-      currentUser = UserGroupInformation.getCurrentUser();
-    } catch (IOException e) {
-      throw new YarnException(e);
-    }
-
-    // CurrentUser should already have AMToken loaded.
-    rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
-      @Override
-      public AMRMProtocol run() {
-        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
-            conf);
-      }
-    });
-    LOG.debug("Connecting to ResourceManager at " + rmAddress);
-    super.start();
-  }
-
-  @Override
-  public synchronized void stop() {
-    if (this.rmClient != null) {
-      RPC.stopProxy(this.rmClient);
-    }
-    super.stop();
-  }
-  
-  @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster(
-      String appHostName, int appHostPort, String appTrackingUrl)
-      throws YarnRemoteException, IOException {
-    // do this only once ???
-    RegisterApplicationMasterRequest request = recordFactory
-        .newRecordInstance(RegisterApplicationMasterRequest.class);
-    synchronized (this) {
-      request.setApplicationAttemptId(appAttemptId);      
-    }
-    request.setHost(appHostName);
-    request.setRpcPort(appHostPort);
-    if(appTrackingUrl != null) {
-      request.setTrackingUrl(appTrackingUrl);
-    }
-    RegisterApplicationMasterResponse response = rmClient
-        .registerApplicationMaster(request);
-    return response;
-  }
-
-  @Override
-  public AllocateResponse allocate(float progressIndicator) 
-      throws YarnRemoteException, IOException {
-    AllocateResponse allocateResponse = null;
-    ArrayList<ResourceRequest> askList = null;
-    ArrayList<ContainerId> releaseList = null;
-    AllocateRequest allocateRequest = null;
-    
-    try {
-      synchronized (this) {
-        askList = new ArrayList<ResourceRequest>(ask);
-        releaseList = new ArrayList<ContainerId>(release);
-        // optimistically clear this collection assuming no RPC failure
-        ask.clear();
-        release.clear();
-        allocateRequest = BuilderUtils
-            .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
-                askList, releaseList);
-      }
-
-      allocateResponse = rmClient.allocate(allocateRequest);
-
-      synchronized (this) {
-        // update these on successful RPC
-        clusterNodeCount = allocateResponse.getNumClusterNodes();
-        lastResponseId = allocateResponse.getResponseId();
-        clusterAvailableResources = allocateResponse.getAvailableResources();
-      }
-    } finally {
-      // TODO how to differentiate remote yarn exception vs error in rpc
-      if(allocateResponse == null) {
-        // we hit an exception in allocate()
-        // preserve ask and release for next call to allocate()
-        synchronized (this) {
-          release.addAll(releaseList);
-          // requests could have been added or deleted during call to allocate
-          // If requests were added/removed then there is nothing to do since
-          // the ResourceRequest object in ask would have the actual new value.
-          // If ask does not have this ResourceRequest then it was unchanged and
-          // so we can add the value back safely.
-          // This assumes that there will no concurrent calls to allocate() and
-          // so we dont have to worry about ask being changed in the
-          // synchronized block at the beginning of this method.
-          for(ResourceRequest oldAsk : askList) {
-            if(!ask.contains(oldAsk)) {
-              ask.add(oldAsk);
-            }
-          }
-        }
-      }
-    }
-    return allocateResponse;
-  }
-
-  @Override
-  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
-      String appMessage, String appTrackingUrl) throws YarnRemoteException,
-      IOException {
-    FinishApplicationMasterRequest request = recordFactory
-                  .newRecordInstance(FinishApplicationMasterRequest.class);
-    request.setAppAttemptId(appAttemptId);
-    request.setFinishApplicationStatus(appStatus);
-    if(appMessage != null) {
-      request.setDiagnostics(appMessage);
-    }
-    if(appTrackingUrl != null) {
-      request.setTrackingUrl(appTrackingUrl);
-    }
-    rmClient.finishApplicationMaster(request);
-  }
-  
-  @Override
-  public synchronized void addContainerRequest(T req) {
-    // Create resource requests
-    // add check for dup locations
-    if(req.hosts != null) {
-      for (String host : req.hosts) {
-        addResourceRequest(req.priority, host, req.capability, req.containerCount, req);
-      }
-    }
-
-    if(req.racks != null) {
-      for (String rack : req.racks) {
-        addResourceRequest(req.priority, rack, req.capability, req.containerCount, req);
-      }
-    }
-
-    // Off-switch
-    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, req.containerCount, req); 
-  }
-
-  @Override
-  public synchronized void removeContainerRequest(T req) {
-    // Update resource requests
-    if(req.hosts != null) {
-      for (String hostName : req.hosts) {
-        decResourceRequest(req.priority, hostName, req.capability, req.containerCount, req);
-      }
-    }
-    
-    if(req.racks != null) {
-      for (String rack : req.racks) {
-        decResourceRequest(req.priority, rack, req.capability, req.containerCount, req);
-      }
-    }
-   
-    decResourceRequest(req.priority, ResourceRequest.ANY, req.capability, req.containerCount, req);
-    
-  }
-
-  @Override
-  public synchronized void releaseAssignedContainer(ContainerId containerId) {
-    release.add(containerId);
-  }
-  
-  @Override
-  public synchronized Resource getClusterAvailableResources() {
-    return clusterAvailableResources;
-  }
-  
-  @Override
-  public synchronized int getClusterNodeCount() {
-    return clusterNodeCount;
-  }
-  
-  @Override
-  public synchronized Collection<T> getMatchingRequests(
-                                          Priority priority, 
-                                          String resourceName, 
-                                          Resource capability) {
-    Map<String, Map<Resource, ResourceRequestInfo>> remoteRequests = 
-        this.remoteRequestsTable.get(priority);
-    if (remoteRequests == null) {
-      return null;
-    }
-    Map<Resource, ResourceRequestInfo> reqMap = remoteRequests
-        .get(resourceName);
-    if (reqMap == null) {
-      return null;
-    }
-    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
-    if (resourceRequestInfo == null) {
-      return null;
-    }
-    
-    return resourceRequestInfo.containerRequests;
-  }
-  
-  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
-    // This code looks weird but is needed because of the following scenario.
-    // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
-    // request is added to 'ask' to notify the RM about not needing it any more.
-    // Before the call to allocate, the user now requests more containers. If 
-    // the locations of the 0 size request and the new request are the same
-    // (with the difference being only container count), then the set comparator
-    // will consider both to be the same and not add the new request to ask. So 
-    // we need to check for the "same" request being present and remove it and 
-    // then add it back. The comparator is container count agnostic.
-    // This should happen only rarely but we do need to guard against it.
-    if(ask.contains(remoteRequest)) {
-      ask.remove(remoteRequest);
-    }
-    ask.add(remoteRequest);
-  }
-
-  private void addResourceRequest(Priority priority, String resourceName,
-      Resource capability, int containerCount, T req) {
-    Map<String, Map<Resource, ResourceRequestInfo>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-    if (remoteRequests == null) {
-      remoteRequests = new HashMap<String, Map<Resource, ResourceRequestInfo>>();
-      this.remoteRequestsTable.put(priority, remoteRequests);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Added priority=" + priority);
-      }
-    }
-    Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
-    if (reqMap == null) {
-      reqMap = new HashMap<Resource, ResourceRequestInfo>();
-      remoteRequests.put(resourceName, reqMap);
-    }
-    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
-    if (resourceRequestInfo == null) {
-      resourceRequestInfo =
-          new ResourceRequestInfo(priority, resourceName, capability);
-      reqMap.put(capability, resourceRequestInfo);
-    }
-    
-    resourceRequestInfo.remoteRequest.setNumContainers(
-         resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
-
-    if(req instanceof StoredContainerRequest<?>) {
-      resourceRequestInfo.containerRequests.add(req);
-    }
-
-    // Note this down for next interaction with ResourceManager
-    addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("addResourceRequest:" + " applicationId="
-          + appAttemptId + " priority=" + priority.getPriority()
-          + " resourceName=" + resourceName + " numContainers="
-          + resourceRequestInfo.remoteRequest.getNumContainers() 
-          + " #asks=" + ask.size());
-    }
-  }
-
-  private void decResourceRequest(Priority priority, 
-                                   String resourceName,
-                                   Resource capability, 
-                                   int containerCount, 
-                                   T req) {
-    Map<String, Map<Resource, ResourceRequestInfo>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-    
-    if(remoteRequests == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not decrementing resource as priority " + priority 
-            + " is not present in request table");
-      }
-      return;
-    }
-    
-    Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
-    if (reqMap == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not decrementing resource as " + resourceName
-            + " is not present in request table");
-      }
-      return;
-    }
-    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
-          + appAttemptId + " priority=" + priority.getPriority()
-          + " resourceName=" + resourceName + " numContainers="
-          + resourceRequestInfo.remoteRequest.getNumContainers() 
-          + " #asks=" + ask.size());
-    }
-
-    resourceRequestInfo.remoteRequest.setNumContainers(
-        resourceRequestInfo.remoteRequest.getNumContainers() - containerCount);
-
-    if(req instanceof StoredContainerRequest<?>) {
-      resourceRequestInfo.containerRequests.remove(req);
-    }
-    
-    if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
-      // guard against spurious removals
-      resourceRequestInfo.remoteRequest.setNumContainers(0);
-    }
-    // send the ResourceRequest to RM even if is 0 because it needs to override
-    // a previously sent value. If ResourceRequest was not sent previously then
-    // sending 0 aught to be a no-op on RM
-    addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
-
-    // delete entries from map if no longer needed
-    if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
-      reqMap.remove(capability);
-      if (reqMap.size() == 0) {
-        remoteRequests.remove(resourceName);
-      }
-      if (remoteRequests.size() == 0) {
-        remoteRequestsTable.remove(priority);
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.info("AFTER decResourceRequest:" + " applicationId="
-          + appAttemptId + " priority=" + priority.getPriority()
-          + " resourceName=" + resourceName + " numContainers="
-          + resourceRequestInfo.remoteRequest.getNumContainers() 
-          + " #asks=" + ask.size());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eeaaf232/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 14188b0..d081d2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -42,11 +42,12 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
+import org.apache.hadoop.yarn.client.AMRMClientAsync;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.app.rm.AMRMClient.StoredContainerRequest;
 import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -98,11 +99,11 @@ public class TaskScheduler extends AbstractService
     public AppFinalStatus getFinalAppStatus();
   }
   
-  final AMRMClientAsync<StoredContainerRequest<CRCookie>> amRmClient;
+  final AMRMClientAsync<CookieContainerRequest> amRmClient;
   final TaskSchedulerAppCallback appClient;
   
-  Map<Object, StoredContainerRequest<CRCookie>> taskRequests =  
-                  new HashMap<Object, StoredContainerRequest<CRCookie>>();
+  Map<Object, CookieContainerRequest> taskRequests =  
+                  new HashMap<Object, CookieContainerRequest>();
   Map<Object, Container> taskAllocations = 
                   new HashMap<Object, Container>();
   Map<ContainerId, Object> containerAssigments = 
@@ -121,6 +122,19 @@ public class TaskScheduler extends AbstractService
     Object appCookie;
   }
   
+  class CookieContainerRequest extends StoredContainerRequest {
+    CRCookie cookie;
+    public CookieContainerRequest(Resource capability, String[] hosts,
+        String[] racks, Priority priority, CRCookie cookie) {
+      super(capability, hosts, racks, priority);
+      this.cookie = cookie;
+    }
+    
+    CRCookie getCookie() {
+      return cookie;
+    }
+  }
+  
   public TaskScheduler(ApplicationAttemptId id, 
                         TaskSchedulerAppCallback appClient,
                         String appHostName, 
@@ -129,7 +143,7 @@ public class TaskScheduler extends AbstractService
     super(TaskScheduler.class.getName());
     this.appClient = appClient;
     this.amRmClient = 
-        new AMRMClientAsync<StoredContainerRequest<CRCookie>>(id, 1000, this);
+        new AMRMClientAsync<CookieContainerRequest>(id, 1000, this);
     this.appHostName = appHostName;
     this.appHostPort = appHostPort;
     this.appTrackingUrl = appTrackingUrl;
@@ -142,7 +156,7 @@ public class TaskScheduler extends AbstractService
       String appHostName, 
       int appHostPort,
       String appTrackingUrl,
-      AMRMClientAsync<StoredContainerRequest<CRCookie>> client) {
+      AMRMClientAsync<CookieContainerRequest> client) {
     super(TaskScheduler.class.getName());
     this.appClient = appClient;
     this.amRmClient = client;
@@ -270,13 +284,12 @@ public class TaskScheduler extends AbstractService
     if(isStopped) {
       return;
     }
-    Map<StoredContainerRequest<CRCookie>, Container> appContainers = 
-        new HashMap<StoredContainerRequest<CRCookie>, Container>
-                                                        (containers.size());
+    Map<CookieContainerRequest, Container> appContainers = 
+        new HashMap<CookieContainerRequest, Container>(containers.size());
     synchronized (this) {
       for(Container container : containers) {
         String location = container.getNodeId().getHost();
-        StoredContainerRequest<CRCookie> assigned = getMatchingRequest(container, location);
+        CookieContainerRequest assigned = getMatchingRequest(container, location);
         if(assigned == null) {
           location = RackResolver.resolve(location).getNetworkLocation();
           assigned = getMatchingRequest(container, location);
@@ -307,10 +320,10 @@ public class TaskScheduler extends AbstractService
     }
     
     // upcall to app must be outside locks
-    for (Entry<StoredContainerRequest<CRCookie>, Container> entry : 
-              appContainers.entrySet()) {
-      StoredContainerRequest<CRCookie> assigned = entry.getKey();
-      appClient.taskAllocated(getTask(assigned), getAppCookie(assigned),
+    for (Entry<CookieContainerRequest, Container> entry : 
+                                        appContainers.entrySet()) {
+      CookieContainerRequest assigned = entry.getKey();
+      appClient.taskAllocated(getTask(assigned), assigned.getCookie().appCookie,
           entry.getValue());
     }   
   }
@@ -357,16 +370,16 @@ public class TaskScheduler extends AbstractService
                                            Priority priority,
                                            Object clientCookie) {
     // TODO check for nulls etc
-    StoredContainerRequest<CRCookie> request = 
-             new StoredContainerRequest<CRCookie>(capability, 
-                                                   hosts, 
-                                                   racks, 
-                                                   priority);
     // TODO extra memory allocation
     CRCookie cookie = new CRCookie();
     cookie.task = task;
     cookie.appCookie = clientCookie;
-    request.setCookie(cookie);
+    CookieContainerRequest request = 
+             new CookieContainerRequest(capability, 
+                                         hosts, 
+                                         racks, 
+                                         priority,
+                                         cookie);
 
     addTaskRequest(task, request);
     LOG.info("Allocation request for task: " + task + 
@@ -374,7 +387,7 @@ public class TaskScheduler extends AbstractService
   }
   
   public synchronized Container deallocateTask(Object task) {
-    StoredContainerRequest<CRCookie> request = removeTaskRequest(task);
+    CookieContainerRequest request = removeTaskRequest(task);
     if(request != null) {
       // task not allocated yet
       LOG.info("Deallocating task: " + task + " before allocation");
@@ -406,31 +419,29 @@ public class TaskScheduler extends AbstractService
     return null;
   }
   
-  private StoredContainerRequest<CRCookie> getMatchingRequest(
+  private CookieContainerRequest getMatchingRequest(
                                       Container container, String location) {
     Priority priority = container.getPriority();
     Resource capability = container.getResource();
-    StoredContainerRequest<CRCookie> assigned = null;
-    Collection<StoredContainerRequest<CRCookie>> requests =
+    CookieContainerRequest assigned = null;
+    List<? extends Collection<CookieContainerRequest>> requestsList =
         amRmClient.getMatchingRequests(priority, location, capability);
     
-    if(requests != null) {
-      // TODO maybe do FIFO
-      Iterator<StoredContainerRequest<CRCookie>> iterator = requests.iterator();
-      if(iterator.hasNext()) {
-        assigned = requests.iterator().next();
+    if(requestsList.size() > 0) {
+      // pick first one
+      for(Collection<CookieContainerRequest> requests : requestsList) {
+        Iterator<CookieContainerRequest> iterator = requests.iterator();
+        if(iterator.hasNext()) {
+          assigned = requests.iterator().next();
+        }
       }
     }
     
     return assigned;
   }
   
-  private Object getTask(StoredContainerRequest<CRCookie> request) {
-    return ((CRCookie)request.getCookie()).task;
-  }
-  
-  private Object getAppCookie(StoredContainerRequest<CRCookie> request) {
-    return ((CRCookie)request.getCookie()).appCookie;
+  private Object getTask(CookieContainerRequest request) {
+    return request.getCookie().task;
   }
   
   private void releaseContainer(ContainerId containerId, Object task) {
@@ -442,8 +453,8 @@ public class TaskScheduler extends AbstractService
   
   private void assignContainer(Object task, 
                                 Container container, 
-                                StoredContainerRequest<CRCookie> assigned) {
-    StoredContainerRequest<CRCookie> request = removeTaskRequest(task);
+                                CookieContainerRequest assigned) {
+    CookieContainerRequest request = removeTaskRequest(task);
     assert request != null;
     //assert assigned.equals(request);
 
@@ -453,8 +464,8 @@ public class TaskScheduler extends AbstractService
     
   }
   
-  private StoredContainerRequest<CRCookie> removeTaskRequest(Object task) {
-    StoredContainerRequest<CRCookie> request = taskRequests.remove(task);
+  private CookieContainerRequest removeTaskRequest(Object task) {
+    CookieContainerRequest request = taskRequests.remove(task);
     if(request != null) {
       // remove all references of the request from AMRMClient
       amRmClient.removeContainerRequest(request);
@@ -463,7 +474,7 @@ public class TaskScheduler extends AbstractService
   }
   
   private void addTaskRequest(Object task, 
-                                StoredContainerRequest<CRCookie> request) {
+                                CookieContainerRequest request) {
     // TODO TEZ-37 fix duplicate handling
     taskRequests.put(task, request);
     amRmClient.addContainerRequest(request);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eeaaf232/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 8b98c68..e29db1c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -496,8 +496,6 @@ public class TaskSchedulerEventHandler extends AbstractService
     // TODO TEZ-34 change event to reboot and send to app master
     sendEvent(new DAGEvent(appContext.getDAGID(),
                            DAGEventType.INTERNAL_ERROR));
-    throw new YarnException("ResourceManager requests reboot for: "
-                             + appContext.getApplicationID());  
   }
 
   @Override
@@ -552,7 +550,8 @@ public class TaskSchedulerEventHandler extends AbstractService
 
   @Override
   public void onError(Exception e) {
-    // TODO TEZ-35 handle error
+    sendEvent(new DAGEvent(appContext.getDAGID(),
+        DAGEventType.INTERNAL_ERROR));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eeaaf232/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 4fecd3a..8d52e9a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -19,6 +19,8 @@
 package org.apache.tez.dag.app.rm;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -34,18 +36,20 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tez.dag.app.rm.AMRMClient.StoredContainerRequest;
-import org.apache.tez.dag.app.rm.TaskScheduler.CRCookie;
+import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest;
 import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
 import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import static org.mockito.Mockito.*;
 
@@ -54,13 +58,13 @@ public class TestTaskScheduler {
   RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
     
-  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @SuppressWarnings({ "unchecked" })
   @Test
   public void testTaskScheduler() throws Exception {
     RackResolver.init(new YarnConfiguration());
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
     
-    AMRMClientAsync<StoredContainerRequest<CRCookie>> mockRMClient = 
+    AMRMClientAsync<CookieContainerRequest> mockRMClient = 
                                                   mock(AMRMClientAsync.class);
     
     ApplicationAttemptId attemptId = 
@@ -111,25 +115,25 @@ public class TestTaskScheduler {
     String[] hosts = {"host1", "host5"};
     String[] racks = {"/default-rack", "/default-rack"};
     Priority mockPriority = mock(Priority.class);
-    ArgumentCaptor<StoredContainerRequest> requestCaptor = 
-                        ArgumentCaptor.forClass(StoredContainerRequest.class);
+    ArgumentCaptor<CookieContainerRequest> requestCaptor = 
+                        ArgumentCaptor.forClass(CookieContainerRequest.class);
     // allocate task
     scheduler.allocateTask(mockTask1, mockCapability, hosts, 
                            racks, mockPriority, mockCookie1);
     verify(mockRMClient, times(1)).
-                           addContainerRequest((StoredContainerRequest) any());
+                           addContainerRequest((CookieContainerRequest) any());
 
     // returned from task requests before allocation happens
     Assert.assertNull(scheduler.deallocateTask(mockTask1));
     verify(mockRMClient, times(1)).
-                        removeContainerRequest((StoredContainerRequest) any());
+                        removeContainerRequest((CookieContainerRequest) any());
     verify(mockRMClient, times(0)).
                                  releaseAssignedContainer((ContainerId) any());
     
     // deallocating unknown task
     Assert.assertNull(scheduler.deallocateTask(mockTask1));
     verify(mockRMClient, times(1)).
-                        removeContainerRequest((StoredContainerRequest) any());
+                        removeContainerRequest((CookieContainerRequest) any());
     verify(mockRMClient, times(0)).
                                  releaseAssignedContainer((ContainerId) any());
 
@@ -142,17 +146,17 @@ public class TestTaskScheduler {
         racks, mockPriority, mockCookie1);
     verify(mockRMClient, times(2)).
                                 addContainerRequest(requestCaptor.capture());
-    StoredContainerRequest<CRCookie> request1 = requestCaptor.getValue();
+    CookieContainerRequest request1 = requestCaptor.getValue();
     scheduler.allocateTask(mockTask2, mockCapability, hosts, 
         racks, mockPriority, mockCookie2);
     verify(mockRMClient, times(3)).
                                 addContainerRequest(requestCaptor.capture());
-    StoredContainerRequest<CRCookie> request2 = requestCaptor.getValue();
+    CookieContainerRequest request2 = requestCaptor.getValue();
     scheduler.allocateTask(mockTask3, mockCapability, hosts, 
         racks, mockPriority, mockCookie3);
     verify(mockRMClient, times(4)).
                                 addContainerRequest(requestCaptor.capture());
-    StoredContainerRequest<CRCookie> request3 = requestCaptor.getValue();
+    CookieContainerRequest request3 = requestCaptor.getValue();
     
     List<Container> containers = new ArrayList<Container>();
     Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
@@ -175,35 +179,84 @@ public class TestTaskScheduler {
     ContainerId mockCId4 = mock(ContainerId.class);
     when(mockContainer4.getId()).thenReturn(mockCId4);
     containers.add(mockContainer4);
-    ArrayList<StoredContainerRequest<CRCookie>> hostContainers = 
-                             new ArrayList<StoredContainerRequest<CRCookie>>();
+    ArrayList<CookieContainerRequest> hostContainers = 
+                             new ArrayList<CookieContainerRequest>();
     hostContainers.add(request1);
     hostContainers.add(request2);
     hostContainers.add(request3);
-    ArrayList<StoredContainerRequest<CRCookie>> rackContainers = 
-                             new ArrayList<StoredContainerRequest<CRCookie>>();
+    ArrayList<CookieContainerRequest> rackContainers = 
+                             new ArrayList<CookieContainerRequest>();
     rackContainers.add(request2);
     rackContainers.add(request3);
-    ArrayList<StoredContainerRequest<CRCookie>> anyContainers = 
-                             new ArrayList<StoredContainerRequest<CRCookie>>();
+    ArrayList<CookieContainerRequest> anyContainers = 
+                             new ArrayList<CookieContainerRequest>();
     anyContainers.add(request3);
 
+    final List<ArrayList<CookieContainerRequest>> hostList = 
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    hostList.add(hostContainers);
+    final List<ArrayList<CookieContainerRequest>> rackList = 
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    rackList.add(rackContainers);
+    final List<ArrayList<CookieContainerRequest>> anyList = 
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    anyList.add(anyContainers);
+    final List<ArrayList<CookieContainerRequest>> emptyList = 
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
     // return all requests for host1
     when(
         mockRMClient.getMatchingRequests((Priority) any(), eq("host1"),
-            (Resource) any())).thenReturn(hostContainers);
+            (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return hostList;
+          }
+
+        });
     // first request matched by host
-    // second request matched to rack. RackResolver by default puts hosts in 
+    // second request matched to rack. RackResolver by default puts hosts in
     // /default-rack. We need to workaround by returning rack matches only once
     when(
-        mockRMClient.getMatchingRequests((Priority) any(),
-            eq("/default-rack"), (Resource) any())).thenReturn(
-        rackContainers).thenReturn(null);    
+        mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"),
+            (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return rackList;
+          }
+
+        }).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
+
+        });
     // third request matched to ANY
     when(
         mockRMClient.getMatchingRequests((Priority) any(),
-            eq(ResourceRequest.ANY), (Resource) any())).thenReturn(
-        anyContainers).thenReturn(null);
+            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return anyList;
+          }
+
+        }).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
+
+        });
     scheduler.onContainersAllocated(containers);
     // first container allocated
     verify(mockApp).taskAllocated(mockTask1, mockCookie1, mockContainer1);


Mime
View raw message