hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject svn commit: r1503526 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ hadoop-yarn/hado...
Date Tue, 16 Jul 2013 00:23:18 GMT
Author: bikas
Date: Tue Jul 16 00:23:18 2013
New Revision: 1503526

URL: http://svn.apache.org/r1503526
Log:
YARN-521. Augment AM - RM client module to be able to request containers only at specific
locations (Sandy Ryza via bikas)

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/InvalidContainerRequestException.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1503526&r1=1503525&r2=1503526&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jul 16 00:23:18 2013
@@ -476,6 +476,9 @@ Release 2.1.0-beta - 2013-07-02
     YARN-569. Add support for requesting and enforcing preemption requests via
     a capacity monitor. (Carlo Curino, cdouglas)
 
+    YARN-521. Augment AM - RM client module to be able to request containers
+    only at specific locations (Sandy Ryza via bikas)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1503526&r1=1503525&r2=1503526&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
Tue Jul 16 00:23:18 2013
@@ -69,24 +69,32 @@ public abstract class AMRMClient<T exten
   }
 
   /**
-   * Object to represent container request for resources. Scheduler
+   * Object to represent a container request for resources. Scheduler
    * documentation should be consulted for the specifics of how the parameters
    * are honored.
-   * All getters return immutable values.
    * 
-   * @param capability
-   *    The {@link Resource} to be requested for each container.
-   * @param nodes
-   *    Any hosts to request that the containers are placed on.
-   * @param racks
-   *    Any racks to request that the containers are placed on. The racks
-   *    corresponding to any hosts requested will be automatically added to
-   *    this list.
-   * @param priority
-   *    The priority at which to request the containers. Higher priorities have
-   *    lower numerical values.
-   * @param containerCount
-   *    The number of containers to request.
+   * By default, YARN schedulers try to allocate containers at the requested
+   * locations but they may relax the constraints in order to expedite meeting
+   * allocations limits. They first relax the constraint to the same rack as the
+   * requested node and then to anywhere in the cluster. The relaxLocality flag
+   * may be used to disable locality relaxation and request containers at only 
+   * specific locations. The following conditions apply.
+   * <ul>
+   * <li>Within a priority, all container requests must have the same value for
+   * locality relaxation. Either enabled or disabled.</li>
+   * <li>If locality relaxation is disabled, then across requests, locations at
+   * different network levels may not be specified. E.g. its invalid to make a
+   * request for a specific node and another request for a specific rack.</li>
+   * <li>If locality relaxation is disabled, then only within the same request,  
+   * a node and its rack may be specified together. This allows for a specific   
+   * rack with a preference for a specific node within that rack.</li>
+   * <li></li>
+   * </ul>
+   * To re-enable locality relaxation at a given priority, all pending requests 
+   * with locality relaxation disabled must be first removed. Then they can be 
+   * added back with locality relaxation enabled.
+   * 
+   * All getters return immutable values.
    */
   public static class ContainerRequest {
     final Resource capability;
@@ -94,9 +102,55 @@ public abstract class AMRMClient<T exten
     final List<String> racks;
     final Priority priority;
     final int containerCount;
-        
+    final boolean relaxLocality;
+    
+    /**
+     * Instantiates a {@link ContainerRequest} with the given constraints and
+     * locality relaxation enabled.
+     * 
+     * @param capability
+     *          The {@link Resource} to be requested for each container.
+     * @param nodes
+     *          Any hosts to request that the containers are placed on.
+     * @param racks
+     *          Any racks to request that the containers are placed on. The
+     *          racks corresponding to any hosts requested will be automatically
+     *          added to this list.
+     * @param priority
+     *          The priority at which to request the containers. Higher
+     *          priorities have lower numerical values.
+     * @param containerCount
+     *          The number of containers to request.
+     */
     public ContainerRequest(Resource capability, String[] nodes,
         String[] racks, Priority priority, int containerCount) {
+      this(capability, nodes, racks, priority, containerCount, true);
+    }
+          
+    /**
+     * Instantiates a {@link ContainerRequest} with the given constraints.
+     * 
+     * @param capability
+     *          The {@link Resource} to be requested for each container.
+     * @param nodes
+     *          Any hosts to request that the containers are placed on.
+     * @param racks
+     *          Any racks to request that the containers are placed on. The
+     *          racks corresponding to any hosts requested will be automatically
+     *          added to this list.
+     * @param priority
+     *          The priority at which to request the containers. Higher
+     *          priorities have lower numerical values.
+     * @param containerCount
+     *          The number of containers to request.
+     * @param relaxLocality
+     *          If true, containers for this request may be assigned on hosts
+     *          and racks other than the ones explicitly requested.
+     */
+    public ContainerRequest(Resource capability, String[] nodes,
+        String[] racks, Priority priority, int containerCount,
+        boolean relaxLocality) {
+      // Validate request
       Preconditions.checkArgument(capability != null,
           "The Resource to be requested for each container " +
               "should not be null ");
@@ -104,11 +158,17 @@ public abstract class AMRMClient<T exten
           "The priority at which to request containers should not be null ");
       Preconditions.checkArgument(containerCount > 0,
           "The number of containers to request should larger than 0");
+      Preconditions.checkArgument(
+              (!relaxLocality && (racks == null || racks.length == 0) 
+                  && (nodes == null || nodes.length == 0)),
+              "Can't turn off locality relaxation on a " + 
+              "request with no location constraints");
       this.capability = capability;
       this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
       this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
       this.priority = priority;
       this.containerCount = containerCount;
+      this.relaxLocality = relaxLocality;
     }
     
     public Resource getCapability() {
@@ -131,6 +191,10 @@ public abstract class AMRMClient<T exten
       return containerCount;
     }
     
+    public boolean getRelaxLocality() {
+      return relaxLocality;
+    }
+    
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("Capability[").append(capability).append("]");
@@ -154,6 +218,11 @@ public abstract class AMRMClient<T exten
         String[] racks, Priority priority) {
       super(capability, nodes, racks, priority, 1);
     }
+    
+    public StoredContainerRequest(Resource capability, String[] nodes,
+        String[] racks, Priority priority, boolean relaxLocality) {
+      super(capability, nodes, racks, priority, 1, relaxLocality);
+    }
   }
   
   /**

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/InvalidContainerRequestException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/InvalidContainerRequestException.java?rev=1503526&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/InvalidContainerRequestException.java
(added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/InvalidContainerRequestException.java
Tue Jul 16 00:23:18 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.client.api;
+
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+/**
+ * Thrown when an arguments are combined to construct a
+ * <code>AMRMClient.ContainerRequest</code> in an invalid way.
+ */
+public class InvalidContainerRequestException extends YarnRuntimeException {
+  public InvalidContainerRequestException(Throwable cause) {
+    super(cause);
+  }
+
+  public InvalidContainerRequestException(String message) {
+    super(message);
+  }
+
+  public InvalidContainerRequestException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1503526&r1=1503525&r2=1503526&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
Tue Jul 16 00:23:18 2013
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -75,6 +77,8 @@ import com.google.common.base.Preconditi
 public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T>
{
 
   private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
+  private static final List<String> ANY_LIST =
+      Collections.singletonList(ResourceRequest.ANY);
   
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -91,9 +95,10 @@ public class AMRMClientImpl<T extends Co
     LinkedHashSet<T> containerRequests;
     
     ResourceRequestInfo(Priority priority, String resourceName,
-        Resource capability) {
+        Resource capability, boolean relaxLocality) {
       remoteRequest = ResourceRequest.newInstance(priority, resourceName,
           capability, 0);
+      remoteRequest.setRelaxLocality(relaxLocality);
       containerRequests = new LinkedHashSet<T>();
     }
   }
@@ -226,7 +231,7 @@ public class AMRMClientImpl<T extends Co
   @Override
   public AllocateResponse allocate(float progressIndicator) 
       throws YarnException, IOException {
-    Preconditions.checkArgument(progressIndicator > 0,
+    Preconditions.checkArgument(progressIndicator >= 0,
         "Progress indicator should not be negative");
     AllocateResponse allocateResponse = null;
     ArrayList<ResourceRequest> askList = null;
@@ -326,17 +331,30 @@ public class AMRMClientImpl<T extends Co
   public synchronized void addContainerRequest(T req) {
     Preconditions.checkArgument(req != null,
         "Resource request can not be null.");
-    Set<String> allRacks = new HashSet<String>();
+    Set<String> dedupedRacks = new HashSet<String>();
     if (req.getRacks() != null) {
-      allRacks.addAll(req.getRacks());
-      if(req.getRacks().size() != allRacks.size()) {
+      dedupedRacks.addAll(req.getRacks());
+      if(req.getRacks().size() != dedupedRacks.size()) {
         Joiner joiner = Joiner.on(',');
         LOG.warn("ContainerRequest has duplicate racks: "
             + joiner.join(req.getRacks()));
       }
     }
-    allRacks.addAll(resolveRacks(req.getNodes()));
-    
+    Set<String> inferredRacks = resolveRacks(req.getNodes());
+    inferredRacks.removeAll(dedupedRacks);
+
+    // check that specific and non-specific requests cannot be mixed within a
+    // priority
+    checkLocalityRelaxationConflict(req.getPriority(), ANY_LIST,
+        req.getRelaxLocality());
+    // check that specific rack cannot be mixed with specific node within a 
+    // priority. If node and its rack are both specified then they must be 
+    // in the same request.
+    // For explicitly requested racks, we set locality relaxation to true
+    checkLocalityRelaxationConflict(req.getPriority(), dedupedRacks, true);
+    checkLocalityRelaxationConflict(req.getPriority(), inferredRacks,
+        req.getRelaxLocality());
+
     if (req.getNodes() != null) {
       HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
       if(dedupedNodes.size() != req.getNodes().size()) {
@@ -345,21 +363,26 @@ public class AMRMClientImpl<T extends Co
             + joiner.join(req.getNodes()));        
       }
       for (String node : dedupedNodes) {
-        // Ensure node requests are accompanied by requests for
-        // corresponding rack
         addResourceRequest(req.getPriority(), node, req.getCapability(),
-            req.getContainerCount(), req);
+            req.getContainerCount(), req, true);
       }
     }
 
-    for (String rack : allRacks) {
+    for (String rack : dedupedRacks) {
       addResourceRequest(req.getPriority(), rack, req.getCapability(),
-          req.getContainerCount(), req);
+          req.getContainerCount(), req, true);
+    }
+
+    // Ensure node requests are accompanied by requests for
+    // corresponding rack
+    for (String rack : inferredRacks) {
+      addResourceRequest(req.getPriority(), rack, req.getCapability(),
+          req.getContainerCount(), req, req.getRelaxLocality());
     }
 
     // Off-switch
     addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
-        req.getContainerCount(), req);
+        req.getContainerCount(), req, req.getRelaxLocality());
   }
 
   @Override
@@ -428,7 +451,8 @@ public class AMRMClientImpl<T extends Co
     }
 
     ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
-    if (resourceRequestInfo != null) {
+    if (resourceRequestInfo != null &&
+        !resourceRequestInfo.containerRequests.isEmpty()) {
       list.add(resourceRequestInfo.containerRequests);
       return list;
     }
@@ -438,7 +462,8 @@ public class AMRMClientImpl<T extends Co
     SortedMap<Resource, ResourceRequestInfo> tailMap = 
                                                   reqMap.tailMap(capability);
     for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
-      if(canFit(entry.getKey(), capability)) {
+      if (canFit(entry.getKey(), capability) &&
+          !entry.getValue().containerRequests.isEmpty()) {
         // match found that fits in the larger resource
         list.add(entry.getValue().containerRequests);
       }
@@ -466,6 +491,33 @@ public class AMRMClientImpl<T extends Co
     return racks;
   }
   
+  /**
+   * ContainerRequests with locality relaxation cannot be made at the same
+   * priority as ContainerRequests without locality relaxation.
+   */
+  private void checkLocalityRelaxationConflict(Priority priority,
+      Collection<String> locations, boolean relaxLocality) {
+    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
+        this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      return;
+    }
+    // Locality relaxation will be set to relaxLocality for all implicitly
+    // requested racks. Make sure that existing rack requests match this.
+    for (String location : locations) {
+        TreeMap<Resource, ResourceRequestInfo> reqs =
+            remoteRequests.get(location);
+        if (reqs != null && !reqs.isEmpty()
+            && reqs.values().iterator().next().remoteRequest.getRelaxLocality()
+            != relaxLocality) {
+          throw new InvalidContainerRequestException("Cannot submit a "
+              + "ContainerRequest asking for location " + location
+              + " with locality relaxation " + relaxLocality + " when it has "
+              + "already been requested with locality relaxation " + relaxLocality);
+        }
+      }
+  }
+  
   private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
     // This code looks weird but is needed because of the following scenario.
     // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
@@ -484,7 +536,7 @@ public class AMRMClientImpl<T extends Co
   }
 
   private void addResourceRequest(Priority priority, String resourceName,
-      Resource capability, int containerCount, T req) {
+      Resource capability, int containerCount, T req, boolean relaxLocality) {
     Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     if (remoteRequests == null) {
@@ -506,14 +558,15 @@ public class AMRMClientImpl<T extends Co
     ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
     if (resourceRequestInfo == null) {
       resourceRequestInfo =
-          new ResourceRequestInfo(priority, resourceName, capability);
+          new ResourceRequestInfo(priority, resourceName, capability,
+              relaxLocality);
       reqMap.put(capability, resourceRequestInfo);
     }
     
     resourceRequestInfo.remoteRequest.setNumContainers(
          resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
 
-    if(req instanceof StoredContainerRequest) {
+    if (req instanceof StoredContainerRequest && relaxLocality) {
       resourceRequestInfo.containerRequests.add(req);
     }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java?rev=1503526&r1=1503525&r2=1503526&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
Tue Jul 16 00:23:18 2013
@@ -83,6 +83,7 @@ public class TestAMRMClient {
   
   static Resource capability;
   static Priority priority;
+  static Priority priority2;
   static String node;
   static String rack;
   static String[] nodes;
@@ -105,6 +106,7 @@ public class TestAMRMClient {
     nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
     
     priority = Priority.newInstance(1);
+    priority2 = Priority.newInstance(2);
     capability = Resource.newInstance(1024, 1);
 
     node = nodeReports.get(0).getNodeId().getHost();
@@ -181,6 +183,7 @@ public class TestAMRMClient {
       Resource capability4 = Resource.newInstance(2000, 1);
       Resource capability5 = Resource.newInstance(1000, 3);
       Resource capability6 = Resource.newInstance(2000, 1);
+      Resource capability7 = Resource.newInstance(2000, 1);
 
       StoredContainerRequest storedContainer1 = 
           new StoredContainerRequest(capability1, nodes, racks, priority);
@@ -194,12 +197,15 @@ public class TestAMRMClient {
           new StoredContainerRequest(capability5, nodes, racks, priority);
       StoredContainerRequest storedContainer6 = 
           new StoredContainerRequest(capability6, nodes, racks, priority);
+      StoredContainerRequest storedContainer7 = 
+          new StoredContainerRequest(capability7, nodes, racks, priority2, false);
       amClient.addContainerRequest(storedContainer1);
       amClient.addContainerRequest(storedContainer2);
       amClient.addContainerRequest(storedContainer3);
       amClient.addContainerRequest(storedContainer4);
       amClient.addContainerRequest(storedContainer5);
       amClient.addContainerRequest(storedContainer6);
+      amClient.addContainerRequest(storedContainer7);
       
       // test matching of containers
       List<? extends Collection<StoredContainerRequest>> matches;
@@ -249,6 +255,15 @@ public class TestAMRMClient {
       matches = amClient.getMatchingRequests(priority, node, testCapability5);
       assert(matches.size() == 0);
       
+      // verify requests without relaxed locality are only returned at specific
+      // locations
+      Resource testCapability7 = Resource.newInstance(2000, 1);
+      matches = amClient.getMatchingRequests(priority2, ResourceRequest.ANY,
+          testCapability7);
+      assert(matches.size() == 0);
+      matches = amClient.getMatchingRequests(priority2, node, testCapability7);
+      assert(matches.size() == 1);
+      
       amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
           null, null);
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java?rev=1503526&r1=1503525&r2=1503526&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
Tue Jul 16 00:23:18 2013
@@ -24,12 +24,15 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.junit.Test;
 
@@ -52,11 +55,159 @@ public class TestAMRMClientContainerRequ
         new ContainerRequest(capability, new String[] {"host1", "host2"},
             new String[] {"/rack2"}, Priority.newInstance(1), 4);
     client.addContainerRequest(request);
-    verifyResourceRequestLocation(client, request, "host1");
-    verifyResourceRequestLocation(client, request, "host2");
-    verifyResourceRequestLocation(client, request, "/rack1");
-    verifyResourceRequestLocation(client, request, "/rack2");
-    verifyResourceRequestLocation(client, request, ResourceRequest.ANY);
+    verifyResourceRequest(client, request, "host1", true);
+    verifyResourceRequest(client, request, "host2", true);
+    verifyResourceRequest(client, request, "/rack1", true);
+    verifyResourceRequest(client, request, "/rack2", true);
+    verifyResourceRequest(client, request, ResourceRequest.ANY, true);
+  }
+  
+  @Test
+  public void testDisableLocalityRelaxation() {
+    AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+    Configuration conf = new Configuration();
+    conf.setClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        MyResolver.class, DNSToSwitchMapping.class);
+    client.init(conf);
+    
+    Resource capability = Resource.newInstance(1024, 1);
+    ContainerRequest nodeLevelRequest =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            null, Priority.newInstance(1), 4, false);
+    client.addContainerRequest(nodeLevelRequest);
+
+    verifyResourceRequest(client, nodeLevelRequest, ResourceRequest.ANY, false);
+    verifyResourceRequest(client, nodeLevelRequest, "/rack1", false);
+    verifyResourceRequest(client, nodeLevelRequest, "host1", true);
+    verifyResourceRequest(client, nodeLevelRequest, "host2", true);
+    
+    // Make sure we don't get any errors with two node-level requests at the
+    // same priority
+    ContainerRequest nodeLevelRequest2 =
+        new ContainerRequest(capability, new String[] {"host2", "host3"},
+            null, Priority.newInstance(1), 4, false);
+    client.addContainerRequest(nodeLevelRequest2);
+    
+    AMRMClient.ContainerRequest rackLevelRequest =
+        new AMRMClient.ContainerRequest(capability, null,
+            new String[] {"/rack3", "/rack4"}, Priority.newInstance(2), 3, false);
+    client.addContainerRequest(rackLevelRequest);
+    
+    verifyResourceRequest(client, rackLevelRequest, ResourceRequest.ANY, false);
+    verifyResourceRequest(client, rackLevelRequest, "/rack3", true);
+    verifyResourceRequest(client, rackLevelRequest, "/rack4", true);
+    
+    // Make sure we don't get any errors with two rack-level requests at the
+    // same priority
+    AMRMClient.ContainerRequest rackLevelRequest2 =
+        new AMRMClient.ContainerRequest(capability, null,
+            new String[] {"/rack4", "/rack5"}, Priority.newInstance(2), 3, false);
+    client.addContainerRequest(rackLevelRequest2);
+    
+    ContainerRequest bothLevelRequest =
+        new ContainerRequest(capability, new String[] {"host3", "host4"},
+            new String[] {"rack1", "/otherrack"},
+            Priority.newInstance(3), 4, false);
+    client.addContainerRequest(bothLevelRequest);
+
+    verifyResourceRequest(client, bothLevelRequest, ResourceRequest.ANY, false);
+    verifyResourceRequest(client, bothLevelRequest, "rack1",
+        true);
+    verifyResourceRequest(client, bothLevelRequest, "/otherrack",
+        true);
+    verifyResourceRequest(client, bothLevelRequest, "host3", true);
+    verifyResourceRequest(client, bothLevelRequest, "host4", true);
+    
+    // Make sure we don't get any errors with two both-level requests at the
+    // same priority
+    ContainerRequest bothLevelRequest2 =
+        new ContainerRequest(capability, new String[] {"host4", "host5"},
+            new String[] {"rack1", "/otherrack2"},
+            Priority.newInstance(3), 4, false);
+    client.addContainerRequest(bothLevelRequest2);
+  }
+  
+  @Test (expected = InvalidContainerRequestException.class)
+  public void testDifferentLocalityRelaxationSamePriority() {
+    AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+    Configuration conf = new Configuration();
+    conf.setClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        MyResolver.class, DNSToSwitchMapping.class);
+    client.init(conf);
+    
+    Resource capability = Resource.newInstance(1024, 1);
+    ContainerRequest request1 =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            null, Priority.newInstance(1), 4, false);
+    client.addContainerRequest(request1);
+    ContainerRequest request2 =
+        new ContainerRequest(capability, new String[] {"host3"},
+            null, Priority.newInstance(1), 4, true);
+    client.addContainerRequest(request2);
+  }
+  
+  @Test
+  public void testInvalidValidWhenOldRemoved() {
+    AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+    Configuration conf = new Configuration();
+    conf.setClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        MyResolver.class, DNSToSwitchMapping.class);
+    client.init(conf);
+    
+    Resource capability = Resource.newInstance(1024, 1);
+    ContainerRequest request1 =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            null, Priority.newInstance(1), 4, false);
+    client.addContainerRequest(request1);
+    
+    client.removeContainerRequest(request1);
+
+    ContainerRequest request2 =
+        new ContainerRequest(capability, new String[] {"host3"},
+            null, Priority.newInstance(1), 4, true);
+    client.addContainerRequest(request2);
+    
+    client.removeContainerRequest(request2);
+    
+    ContainerRequest request3 =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            null, Priority.newInstance(1), 4, false);
+    client.addContainerRequest(request3);
+    
+    client.removeContainerRequest(request3);
+    
+    ContainerRequest request4 =
+        new ContainerRequest(capability, null,
+            new String[] {"rack1"}, Priority.newInstance(1), 4, true);
+    client.addContainerRequest(request4);
+
+  }
+  
+  @Test (expected = InvalidContainerRequestException.class)
+  public void testLocalityRelaxationDifferentLevels() {
+    AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+    Configuration conf = new Configuration();
+    conf.setClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        MyResolver.class, DNSToSwitchMapping.class);
+    client.init(conf);
+    
+    Resource capability = Resource.newInstance(1024, 1);
+    ContainerRequest request1 =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            null, Priority.newInstance(1), 4, false);
+    client.addContainerRequest(request1);
+    ContainerRequest request2 =
+        new ContainerRequest(capability, null,
+            new String[] {"rack1"}, Priority.newInstance(1), 4, true);
+    client.addContainerRequest(request2);
   }
   
   private static class MyResolver implements DNSToSwitchMapping {
@@ -70,12 +221,13 @@ public class TestAMRMClientContainerRequ
     public void reloadCachedMappings() {}
   }
   
-  private void verifyResourceRequestLocation(
+  private void verifyResourceRequest(
       AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
-      String location) {
+      String location, boolean expectedRelaxLocality) {
     ResourceRequest ask =  client.remoteRequestsTable.get(request.getPriority())
         .get(location).get(request.getCapability()).remoteRequest;
     assertEquals(location, ask.getResourceName());
     assertEquals(request.getContainerCount(), ask.getNumContainers());
+    assertEquals(expectedRelaxLocality, ask.getRelaxLocality());
   }
 }



Mime
View raw message