hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1534894 - in /hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn...
Date Wed, 23 Oct 2013 02:34:45 GMT
Author: arp
Date: Wed Oct 23 02:34:44 2013
New Revision: 1534894

URL: http://svn.apache.org/r1534894
Log:
Merging r1534707 through r1534893 from trunk to branch HDFS-2832

Modified:
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Wed Oct 23 02:34:44 2013
@@ -82,6 +82,12 @@ Release 2.3.0 - UNRELEASED
     YARN-1300. SLS tests fail because conf puts YARN properties in
     fair-scheduler.xml (Ted Yu via Sandy Ryza)
 
+    YARN-1183. MiniYARNCluster shutdown takes several minutes intermittently
+    (Andrey Klochkov via jeagles)
+
+    YARN-1305. RMHAProtocolService#serviceInit should handle HAUtil's
+    IllegalArgumentException (Tsuyoshi Ozawa via bikas)
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -130,6 +136,9 @@ Release 2.2.1 - UNRELEASED
     YARN-1331. yarn.cmd exits with NoClassDefFoundError trying to run rmadmin or
     logs. (cnauroth)
 
+    YARN-1330. Fair Scheduler: defaultQueueSchedulingPolicy does not take effect
+    (Sandy Ryza)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
Wed Oct 23 02:34:44 2013
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
 import java.util.Arrays;
@@ -42,10 +43,13 @@ public class HAUtil {
           YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
           YarnConfiguration.RM_WEBAPP_ADDRESS));
 
+  public static final String BAD_CONFIG_MESSAGE_PREFIX =
+    "Invalid configuration! ";
+
   private HAUtil() { /* Hidden constructor */ }
 
   private static void throwBadConfigurationException(String msg) {
-    throw new YarnRuntimeException("Invalid configuration! " + msg);
+    throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg);
   }
 
   /**
@@ -59,29 +63,137 @@ public class HAUtil {
         YarnConfiguration.DEFAULT_RM_HA_ENABLED);
   }
 
+  /**
+   * Verify configuration for Resource Manager HA.
+   * @param conf Configuration
+   * @throws YarnRuntimeException
+   */
+  public static void verifyAndSetConfiguration(Configuration conf)
+    throws YarnRuntimeException {
+    verifyAndSetRMHAIds(conf);
+    verifyAndSetRMHAId(conf);
+    verifyAndSetAllRpcAddresses(conf);
+  }
+
+
+  private static void verifyAndSetRMHAIds(Configuration conf) {
+    Collection<String> ids =
+      conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS);
+    if (ids.size() <= 0) {
+      throwBadConfigurationException(
+        getInvalidValueMessage(YarnConfiguration.RM_HA_IDS,
+          conf.get(YarnConfiguration.RM_HA_IDS)));
+    } else if (ids.size() == 1) {
+      LOG.warn(getRMHAIdsWarningMessage(ids.toString()));
+    }
+
+    StringBuilder setValue = new StringBuilder();
+    for (String id: ids) {
+      setValue.append(id);
+      setValue.append(",");
+    }
+    conf.set(YarnConfiguration.RM_HA_IDS,
+      setValue.substring(0, setValue.length() - 1));
+  }
+
+  private static void verifyAndSetRMHAId(Configuration conf) {
+    String rmId = conf.getTrimmed(YarnConfiguration.RM_HA_ID);
+    if (rmId == null) {
+      throwBadConfigurationException(
+        getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID));
+    } else {
+      Collection<String> ids = getRMHAIds(conf);
+      if (!ids.contains(rmId)) {
+        throwBadConfigurationException(
+          getRMHAIdNeedToBeIncludedMessage(ids.toString(), rmId));
+      }
+    }
+    conf.set(YarnConfiguration.RM_HA_ID, rmId);
+  }
+
+  private static void verifyAndSetConfValue(String prefix, Configuration conf) {
+    String confKey = null;
+    String confValue = null;
+    try {
+      confKey = getConfKeyForRMInstance(prefix, conf);
+      confValue = getConfValueForRMInstance(prefix, conf);
+      conf.set(prefix, confValue);
+    } catch (YarnRuntimeException yre) {
+      // Error at getRMHAId()
+      throw yre;
+    } catch (IllegalArgumentException iae) {
+      String errmsg;
+      if (confKey == null) {
+        // Error at addSuffix
+        errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID,
+          getRMHAId(conf));
+      } else {
+        // Error at Configuration#set.
+        errmsg = getNeedToSetValueMessage(confKey);
+      }
+      throwBadConfigurationException(errmsg);
+    }
+  }
+
+  public static void verifyAndSetAllRpcAddresses(Configuration conf) {
+    for (String confKey : RPC_ADDRESS_CONF_KEYS) {
+     verifyAndSetConfValue(confKey, conf);
+    }
+  }
+
+  /**
+   * @param conf Configuration. Please use getRMHAIds to check.
+   * @return RM Ids on success
+   */
   public static Collection<String> getRMHAIds(Configuration conf) {
-    return conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS);
+    return  conf.getStringCollection(YarnConfiguration.RM_HA_IDS);
   }
 
   /**
-   * @param conf Configuration
+   * @param conf Configuration. Please use verifyAndSetRMHAId to check.
    * @return RM Id on success
-   * @throws YarnRuntimeException for configurations without a node id
    */
   @VisibleForTesting
-  public static String getRMHAId(Configuration conf) {
-    String rmId = conf.get(YarnConfiguration.RM_HA_ID);
-    if (rmId == null) {
-      throwBadConfigurationException(YarnConfiguration.RM_HA_ID +
-          " needs to be set in a HA configuration");
-    }
-    return rmId;
+  static String getRMHAId(Configuration conf) {
+    return conf.get(YarnConfiguration.RM_HA_ID);
+  }
+
+  @VisibleForTesting
+  static String getNeedToSetValueMessage(String confKey) {
+    return confKey + " needs to be set in a HA configuration.";
+  }
+
+  @VisibleForTesting
+  static String getInvalidValueMessage(String confKey,
+                                              String invalidValue){
+    return "Invalid value of "  + confKey +". "
+      + "Current value is " + invalidValue;
+  }
+
+  @VisibleForTesting
+  static String getRMHAIdNeedToBeIncludedMessage(String ids,
+                                                        String rmId) {
+    return YarnConfiguration.RM_HA_IDS + "("
+      + ids +  ") need to contain " + YarnConfiguration.RM_HA_ID + "("
+      + rmId + ") in a HA configuration.";
+  }
+
+  @VisibleForTesting
+  static String getRMHAIdsWarningMessage(String ids) {
+    return  "Resource Manager HA is enabled, but " +
+      YarnConfiguration.RM_HA_IDS + " has only one id(" +
+      ids.toString() + ")";
+  }
+
+  private static String getConfKeyForRMInstance(String prefix,
+                                                Configuration conf) {
+    return addSuffix(prefix, getRMHAId(conf));
   }
 
   private static String getConfValueForRMInstance(String prefix,
                                                   Configuration conf) {
-    String confKey = addSuffix(prefix, getRMHAId(conf));
-    String retVal = conf.get(confKey);
+    String confKey = getConfKeyForRMInstance(prefix, conf);
+    String retVal = conf.getTrimmed(confKey);
     if (LOG.isTraceEnabled()) {
       LOG.trace("getConfValueForRMInstance: prefix = " + prefix +
           "; confKey being looked up = " + confKey +
@@ -96,16 +208,6 @@ public class HAUtil {
     return (value == null) ? defaultValue : value;
   }
 
-  private static void setConfValue(String prefix, Configuration conf) {
-    conf.set(prefix, getConfValueForRMInstance(prefix, conf));
-  }
-
-  public static void setAllRpcAddresses(Configuration conf) {
-    for (String confKey : RPC_ADDRESS_CONF_KEYS) {
-      setConfValue(confKey, conf);
-    }
-  }
-
   /** Add non empty and non null suffix to a key */
   @VisibleForTesting
   public static String addSuffix(String key, String suffix) {

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java
Wed Oct 23 02:34:44 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.conf;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.junit.Before;
 import org.junit.Test;
@@ -27,53 +28,134 @@ import org.junit.Test;
 import java.util.Collection;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 public class TestHAUtil {
   private Configuration conf;
 
-  private static final String RM1_ADDRESS = "1.2.3.4:8021";
+  private static final String RM1_ADDRESS_UNTRIMMED = "  \t\t\n 1.2.3.4:8021  \n\t ";
+  private static final String RM1_ADDRESS = RM1_ADDRESS_UNTRIMMED.trim();
   private static final String RM2_ADDRESS = "localhost:8022";
-  private static final String RM1_NODE_ID = "rm1";
+  private static final String RM1_NODE_ID_UNTRIMMED = "rm1 ";
+  private static final String RM1_NODE_ID = RM1_NODE_ID_UNTRIMMED.trim();
   private static final String RM2_NODE_ID = "rm2";
+  private static final String RM3_NODE_ID = "rm3";
+  private static final String RM_INVALID_NODE_ID = ".rm";
+  private static final String RM_NODE_IDS_UNTRIMMED = RM1_NODE_ID_UNTRIMMED + "," + RM2_NODE_ID;
+  private static final String RM_NODE_IDS = RM1_NODE_ID + "," + RM2_NODE_ID;
 
   @Before
   public void setUp() {
     conf = new Configuration();
-    conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
-    conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
+    conf.set(YarnConfiguration.RM_HA_IDS, RM_NODE_IDS_UNTRIMMED);
+    conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
 
     for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
-      conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
+      // configuration key itself cannot contains space/tab/return chars.
+      conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
       conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
     }
   }
 
   @Test
   public void testGetRMServiceId() throws Exception {
+    conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
     Collection<String> rmhaIds = HAUtil.getRMHAIds(conf);
     assertEquals(2, rmhaIds.size());
+
+    String[] ids = rmhaIds.toArray(new String[0]);
+    assertEquals(RM1_NODE_ID, ids[0]);
+    assertEquals(RM2_NODE_ID, ids[1]);
   }
 
   @Test
   public void testGetRMId() throws Exception {
+    conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
     assertEquals("Does not honor " + YarnConfiguration.RM_HA_ID,
-        RM1_NODE_ID, HAUtil.getRMHAId(conf));
-    conf = new YarnConfiguration();
-    try {
-      HAUtil.getRMHAId(conf);
-      fail("getRMHAId() fails to throw an exception when RM_HA_ID is not set");
-    } catch (YarnRuntimeException yre) {
-      // do nothing
-    }
+      RM1_NODE_ID, HAUtil.getRMHAId(conf));
+
+    conf.clear();
+    assertNull("Return null when " + YarnConfiguration.RM_HA_ID
+        + " is not set", HAUtil.getRMHAId(conf));
   }
 
   @Test
-  public void testSetGetRpcAddresses() throws Exception {
-    HAUtil.setAllRpcAddresses(conf);
+  public void testVerifyAndSetConfiguration() throws Exception {
+    try {
+      HAUtil.verifyAndSetConfiguration(conf);
+    } catch (YarnRuntimeException e) {
+      fail("Should not throw any exceptions.");
+    }
+
+    assertEquals("Should be saved as Trimmed collection",
+      StringUtils.getStringCollection(RM_NODE_IDS), HAUtil.getRMHAIds(conf));
+    assertEquals("Should be saved as Trimmed string",
+      RM1_NODE_ID, HAUtil.getRMHAId(conf));
     for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
       assertEquals("RPC address not set for " + confKey,
-          RM1_ADDRESS, conf.get(confKey));
+        RM1_ADDRESS, conf.get(confKey));
+    }
+
+    conf.clear();
+    conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
+    try {
+      HAUtil.verifyAndSetConfiguration(conf);
+    } catch (YarnRuntimeException e) {
+      assertEquals("YarnRuntimeException by getRMId()",
+        HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
+          HAUtil.getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID),
+        e.getMessage());
+    }
+
+    conf.clear();
+    conf.set(YarnConfiguration.RM_HA_ID, RM_INVALID_NODE_ID);
+    conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
+    for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
+      // simulate xml with invalid node id
+      conf.set(confKey + RM_INVALID_NODE_ID, RM_INVALID_NODE_ID);
+    }
+    try {
+      HAUtil.verifyAndSetConfiguration(conf);
+    } catch (YarnRuntimeException e) {
+      assertEquals("YarnRuntimeException by addSuffix()",
+        HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
+          HAUtil.getInvalidValueMessage(YarnConfiguration.RM_HA_ID,
+            RM_INVALID_NODE_ID),
+        e.getMessage());
+    }
+
+    conf.clear();
+    // simulate the case HAUtil.RPC_ADDRESS_CONF_KEYS are not set
+    conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
+    conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
+    try {
+      HAUtil.verifyAndSetConfiguration(conf);
+      fail("Should throw YarnRuntimeException. by Configuration#set()");
+    } catch (YarnRuntimeException e) {
+      String confKey =
+        HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID);
+      assertEquals("YarnRuntimeException by Configuration#set()",
+        HAUtil.BAD_CONFIG_MESSAGE_PREFIX + HAUtil.getNeedToSetValueMessage(confKey),
+        e.getMessage());
+    }
+
+    // simulate the case YarnConfiguration.RM_HA_IDS doesn't contain
+    // the value of YarnConfiguration.RM_HA_ID
+    conf.clear();
+    conf.set(YarnConfiguration.RM_HA_IDS, RM2_NODE_ID + "," + RM3_NODE_ID);
+    conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
+    for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
+      conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
+      conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
+    }
+    try {
+      HAUtil.verifyAndSetConfiguration(conf);
+    } catch (YarnRuntimeException e) {
+      assertEquals("YarnRuntimeException by getRMId()'s validation",
+        HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
+        HAUtil.getRMHAIdNeedToBeIncludedMessage("[rm2, rm3]", RM1_NODE_ID),
+        e.getMessage());
     }
   }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
Wed Oct 23 02:34:44 2013
@@ -57,7 +57,7 @@ public class RMHAProtocolService extends
     this.conf = conf;
     haEnabled = HAUtil.isHAEnabled(this.conf);
     if (haEnabled) {
-      HAUtil.setAllRpcAddresses(this.conf);
+      HAUtil.verifyAndSetConfiguration(conf);
       rm.setConf(this.conf);
     }
     rm.createAndInitActiveServices();

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
Wed Oct 23 02:34:44 2013
@@ -378,22 +378,24 @@ public class QueueManager {
           queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
           queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
       
-      // Update metrics
+      // Make sure all queues exist
+      for (String name: queueNamesInAllocFile) {
+        getLeafQueue(name, true);
+      }
+      
       for (FSQueue queue : queues.values()) {
+        // Update queue metrics
         FSQueueMetrics queueMetrics = queue.getMetrics();
         queueMetrics.setMinShare(queue.getMinShare());
         queueMetrics.setMaxShare(queue.getMaxShare());
+        // Set scheduling policies
+        if (queuePolicies.containsKey(queue.getName())) {
+          queue.setPolicy(queuePolicies.get(queue.getName()));
+        } else {
+          queue.setPolicy(SchedulingPolicy.getDefault());
+        }
       }
  
-      // Create all queus
-      for (String name: queueNamesInAllocFile) {
-        getLeafQueue(name, true);
-      }
-      
-      // Set custom policies as specified
-      for (Map.Entry<String, SchedulingPolicy> entry : queuePolicies.entrySet()) {
-        queues.get(entry.getKey()).setPolicy(entry.getValue());
-      }
     }
   }
   

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Wed Oct 23 02:34:44 2013
@@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
@@ -807,6 +808,7 @@ public class TestFairScheduler {
     out.println("<queue name=\"queueB\">");
     out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
+    out.println("<schedulingPolicy>fair</schedulingPolicy>");
     out.println("</queue>");
     // Give queue C no minimum
     out.println("<queue name=\"queueC\">");
@@ -833,6 +835,8 @@ public class TestFairScheduler {
         + "</defaultMinSharePreemptionTimeout>");
     // Set fair share preemption timeout to 5 minutes
     out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
+    // Set default scheduling policy to DRF
+    out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
     out.println("</allocations>");
     out.close();
 
@@ -894,6 +898,18 @@ public class TestFairScheduler {
     assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
     assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE"));
     assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
+    
+    // Verify existing queues have default scheduling policy
+    assertEquals(DominantResourceFairnessPolicy.NAME,
+        queueManager.getQueue("root").getPolicy().getName());
+    assertEquals(DominantResourceFairnessPolicy.NAME,
+        queueManager.getQueue("root.queueA").getPolicy().getName());
+    // Verify default is overriden if specified explicitly
+    assertEquals(FairSharePolicy.NAME,
+        queueManager.getQueue("root.queueB").getPolicy().getName());
+    // Verify new queue gets default scheduling policy
+    assertEquals(DominantResourceFairnessPolicy.NAME,
+        queueManager.getLeafQueue("root.newqueue", true).getPolicy().getName());
   }
 
   @Test

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Wed Oct 23 02:34:44 2013
@@ -22,6 +22,8 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,8 +36,10 @@ import org.apache.hadoop.service.Abstrac
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -52,6 +56,10 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 /**
@@ -83,6 +91,9 @@ public class MiniYARNCluster extends Com
 
   private ResourceManagerWrapper resourceManagerWrapper;
   
+  private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
+      new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
+  
   private File testWorkDir;
 
   // Number of nm-local-dirs per nodemanager
@@ -210,6 +221,16 @@ public class MiniYARNCluster extends Com
         };
       };
       resourceManager.init(conf);
+      resourceManager.getRMContext().getDispatcher().register(RMAppAttemptEventType.class,

+          new EventHandler<RMAppAttemptEvent>() {
+            public void handle(RMAppAttemptEvent event) {
+              if (event instanceof RMAppAttemptRegistrationEvent) {
+                appMasters.put(event.getApplicationAttemptId(), event.getTimestamp());
+              } else if (event instanceof RMAppAttemptUnregistrationEvent) {
+                appMasters.remove(event.getApplicationAttemptId());
+              }
+            }
+          });
       super.serviceInit(conf);
     }
 
@@ -243,9 +264,22 @@ public class MiniYARNCluster extends Com
                WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
     }
 
+    private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException
{
+      long started = System.currentTimeMillis();
+      synchronized (appMasters) {
+        while (!appMasters.isEmpty() && System.currentTimeMillis() - started <
timeoutMillis) {
+          appMasters.wait(1000);
+        }
+      }
+      if (!appMasters.isEmpty()) {
+        LOG.warn("Stopping RM while some app masters are still alive");
+      }
+    }
+    
     @Override
     protected synchronized void serviceStop() throws Exception {
       if (resourceManager != null) {
+        waitForAppMastersToFinish(5000);
         resourceManager.stop();
       }
       super.serviceStop();



Mime
View raw message