hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject svn commit: r1618542 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server...
Date Mon, 18 Aug 2014 06:08:45 GMT
Author: jianhe
Date: Mon Aug 18 06:08:45 2014
New Revision: 1618542

URL: http://svn.apache.org/r1618542
Log:
YARN-2411. Support simple user and group mappings to queues. Contributed by Ram Venkatesh

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1618542&r1=1618541&r2=1618542&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Aug 18 06:08:45 2014
@@ -47,6 +47,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2378. Added support for moving applications across queues in
     CapacityScheduler. (Subramaniam Venkatraman Krishnan via jianhe)
 
+    YARN-2411. Support simple user and group mappings to queues. (Ram Venkatesh
+    via jianhe)
+
   IMPROVEMENTS
 
     YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml?rev=1618542&r1=1618541&r2=1618542&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
Mon Aug 18 06:08:45 2014
@@ -108,4 +108,27 @@
     </description>
   </property>
 
+  <property>
+    <name>yarn.scheduler.capacity.queue-mappings</name>
+    <value></value>
+    <description>
+      A list of mappings that will be used to assign jobs to queues
+      The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]*
+      Typically this list will be used to map users to queues,
+      for example, u:%user:%user maps all users to queues with the same name
+      as the user.
+    </description>
+  </property>
+
+  <property>
+    <name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
+    <value>false</value>
+    <description>
+      If a queue mapping is present, will it override the value specified
+      by the user? This can be used by administrators to place jobs in queues
+      that are different than the one specified by the user.
+      The default is false.
+    </description>
+  </property>
+
 </configuration>

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1618542&r1=1618541&r2=1618542&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
Mon Aug 18 06:08:45 2014
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import com.google.common.base.Preconditions;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -41,6 +39,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -59,10 +58,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -77,6 +73,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -94,6 +92,7 @@ import org.apache.hadoop.yarn.util.resou
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 @LimitedPrivate("yarn")
 @Evolving
@@ -199,6 +198,16 @@ public class CapacityScheduler extends
           + ".scheduling-interval-ms";
   private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
   
+  private boolean overrideWithQueueMappings = false;
+  private List<QueueMapping> mappings = new ArrayList<QueueMapping>();
+  private Groups groups;
+
+  @VisibleForTesting
+  public synchronized String getMappedQueueForTest(String user)
+      throws IOException {
+    return getMappedQueue(user);
+  }
+
   public CapacityScheduler() {
     super(CapacityScheduler.class.getName());
   }
@@ -263,7 +272,6 @@ public class CapacityScheduler extends
     this.applications =
         new ConcurrentHashMap<ApplicationId,
             SchedulerApplication<FiCaSchedulerApp>>();
-
     initializeQueues(this.conf);
 
     scheduleAsynchronously = this.conf.getScheduleAynschronously();
@@ -402,7 +410,32 @@ public class CapacityScheduler extends
     }
   }
   private static final QueueHook noop = new QueueHook();
-  
+
+  private void initializeQueueMappings() throws IOException {
+    overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+    LOG.info("Initialized queue mappings, override: "
+        + overrideWithQueueMappings);
+    // Get new user/group mappings
+    List<QueueMapping> newMappings = conf.getQueueMappings();
+    //check if mappings refer to valid queues
+    for (QueueMapping mapping : newMappings) {
+      if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
+          !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+        CSQueue queue = queues.get(mapping.queue);
+        if (queue == null || !(queue instanceof LeafQueue)) {
+          throw new IOException(
+              "mapping contains invalid or non-leaf queue " + mapping.queue);
+        }
+      }
+    }
+    //apply the new mappings since they are valid
+    mappings = newMappings;
+    // initialize groups if mappings are present
+    if (mappings.size() > 0) {
+      groups = new Groups(conf);
+    }
+  }
+
   @Lock(CapacityScheduler.class)
   private void initializeQueues(CapacitySchedulerConfiguration conf)
     throws IOException {
@@ -410,7 +443,9 @@ public class CapacityScheduler extends
     root = 
         parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, 
             queues, queues, noop);
+
     LOG.info("Initialized root queue " + root);
+    initializeQueueMappings();
   }
 
   @Lock(CapacityScheduler.class)
@@ -430,6 +465,7 @@ public class CapacityScheduler extends
     
     // Re-configure queues
     root.reinitialize(newRoot, clusterResource);
+    initializeQueueMappings();
   }
 
   /**
@@ -517,12 +553,73 @@ public class CapacityScheduler extends
   }
 
   synchronized CSQueue getQueue(String queueName) {
+    if (queueName == null) {
+      return null;
+    }
     return queues.get(queueName);
   }
 
+  private static final String CURRENT_USER_MAPPING = "%user";
+
+  private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
+
+  private String getMappedQueue(String user) throws IOException {
+    for (QueueMapping mapping : mappings) {
+      if (mapping.type == MappingType.USER) {
+        if (mapping.source.equals(CURRENT_USER_MAPPING)) {
+          if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
+            return user;
+          }
+          else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+            return groups.getGroups(user).get(0);
+          }
+          else {
+            return mapping.queue;
+          }
+        }
+        if (user.equals(mapping.source)) {
+          return mapping.queue;
+        }
+      }
+      if (mapping.type == MappingType.GROUP) {
+        for (String userGroups : groups.getGroups(user)) {
+          if (userGroups.equals(mapping.source)) {
+            return mapping.queue;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
   private synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user, boolean isAppRecovering) {
-    // santiy checks.
+    String queueName, String user, boolean isAppRecovering) {
+
+    if (mappings != null && mappings.size() > 0) {
+      try {
+        String mappedQueue = getMappedQueue(user);
+        if (mappedQueue != null) {
+          // We have a mapping, should we use it?
+          if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+              || overrideWithQueueMappings) {
+            LOG.info("Application " + applicationId + " user " + user
+                + " mapping [" + queueName + "] to [" + mappedQueue
+                + "] override " + overrideWithQueueMappings);
+            queueName = mappedQueue;
+            RMApp rmApp = rmContext.getRMApps().get(applicationId);
+            rmApp.setQueue(queueName);
+          }
+        }
+      } catch (IOException ioex) {
+        String message = "Failed to submit application " + applicationId +
+            " submitted by user " + user + " reason: " + ioex.getMessage();
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMAppRejectedEvent(applicationId, message));
+        return;
+      }
+    }
+
+    // sanity checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
       String message = "Application " + applicationId + 
@@ -902,8 +999,8 @@ public class CapacityScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser(),
-        appAddedEvent.getIsAppRecovering());
+        appAddedEvent.getQueue(),
+        appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1618542&r1=1618541&r2=1618542&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
Mon Aug 18 06:08:45 2014
@@ -18,8 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -145,6 +144,44 @@ public class CapacitySchedulerConfigurat
 
   @Private
   public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
+
+  @Private
+  public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
+
+  @Private
+  public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable";
+
+  @Private
+  public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
+
+  @Private
+  public static class QueueMapping {
+
+    public enum MappingType {
+
+      USER("u"),
+      GROUP("g");
+      private final String type;
+      private MappingType(String type) {
+        this.type = type;
+      }
+
+      public String toString() {
+        return type;
+      }
+
+    };
+
+    MappingType type;
+    String source;
+    String queue;
+
+    public QueueMapping(MappingType type, String source, String queue) {
+      this.type = type;
+      this.source = source;
+      this.queue = queue;
+    }
+  }
   
   public CapacitySchedulerConfiguration() {
     this(new Configuration());
@@ -378,4 +415,82 @@ public class CapacitySchedulerConfigurat
     setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
   }
 
+  public boolean getOverrideWithQueueMappings() {
+    return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE,
+        DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
+  }
+
+  /**
+   * Returns a collection of strings, trimming leading and trailing whitespeace
+   * on each value
+   *
+   * @param str
+   *          String to parse
+   * @param delim
+   *          delimiter to separate the values
+   * @return Collection of parsed elements.
+   */
+  private static Collection<String> getTrimmedStringCollection(String str,
+      String delim) {
+    List<String> values = new ArrayList<String>();
+    if (str == null)
+      return values;
+    StringTokenizer tokenizer = new StringTokenizer(str, delim);
+    while (tokenizer.hasMoreTokens()) {
+      String next = tokenizer.nextToken();
+      if (next == null || next.trim().isEmpty()) {
+        continue;
+      }
+      values.add(next.trim());
+    }
+    return values;
+  }
+
+  /**
+   * Get user/group mappings to queues.
+   *
+   * @return user/groups mappings or null on illegal configs
+   */
+  public List<QueueMapping> getQueueMappings() {
+    List<QueueMapping> mappings =
+        new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
+    Collection<String> mappingsString =
+        getTrimmedStringCollection(QUEUE_MAPPING);
+    for (String mappingValue : mappingsString) {
+      String[] mapping =
+          getTrimmedStringCollection(mappingValue, ":")
+              .toArray(new String[] {});
+      if (mapping.length != 3 || mapping[1].length() == 0
+          || mapping[2].length() == 0) {
+        throw new IllegalArgumentException(
+            "Illegal queue mapping " + mappingValue);
+      }
+
+      QueueMapping m;
+      try {
+        QueueMapping.MappingType mappingType;
+        if (mapping[0].equals("u")) {
+          mappingType = QueueMapping.MappingType.USER;
+        } else if (mapping[0].equals("g")) {
+          mappingType = QueueMapping.MappingType.GROUP;
+        } else {
+          throw new IllegalArgumentException(
+              "unknown mapping prefix " + mapping[0]);
+        }
+        m = new QueueMapping(
+                mappingType,
+                mapping[1],
+                mapping[2]);
+      } catch (Throwable t) {
+        throw new IllegalArgumentException(
+            "Illegal queue mapping " + mappingValue);
+      }
+
+      if (m != null) {
+        mappings.add(m);
+      }
+    }
+
+    return mappings;
+  }
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java?rev=1618542&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
(added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
Mon Aug 18 06:08:45 2014
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestQueueMappings {
+
+  private static final Log LOG = LogFactory.getLog(TestQueueMappings.class);
+
+  private static final String Q1 = "q1";
+  private static final String Q2 = "q2";
+
+  private final static String Q1_PATH =
+      CapacitySchedulerConfiguration.ROOT + "." + Q1;
+  private final static String Q2_PATH =
+      CapacitySchedulerConfiguration.ROOT + "." + Q2;
+
+  private MockRM resourceManager;
+
+  @After
+  public void tearDown() throws Exception {
+    if (resourceManager != null) {
+      LOG.info("Stopping the resource manager");
+      resourceManager.stop();
+    }
+  }
+
+  private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { Q1, Q2 });
+
+    conf.setCapacity(Q1_PATH, 10);
+    conf.setCapacity(Q2_PATH, 90);
+
+    LOG.info("Setup top-level queues q1 and q2");
+  }
+
+  @Test (timeout = 60000)
+  public void testQueueMapping() throws Exception {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    CapacityScheduler cs = new CapacityScheduler();
+
+    RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    cs.setConf(conf);
+    cs.setRMContext(rmContext);
+    cs.init(conf);
+    cs.start();
+
+    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+    conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
+        "true");
+
+    // configuration parsing tests - negative test cases
+    checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier");
+    checkInvalidQMapping(conf, cs, "u:a", "no queue specified");
+    checkInvalidQMapping(conf, cs, "g:a", "no queue specified");
+    checkInvalidQMapping(conf, cs, "u:a:b,g:a",
+        "multiple mappings with invalid mapping");
+    checkInvalidQMapping(conf, cs, "u:a:b,g:a:d:e", "too many path segments");
+    checkInvalidQMapping(conf, cs, "u::", "empty source and queue");
+    checkInvalidQMapping(conf, cs, "u:", "missing source missing queue");
+    checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q");
+
+    // simple base case for mapping user to queue
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:" + Q1);
+    cs.reinitialize(conf, null);
+    checkQMapping("a", Q1, cs);
+
+    // group mapping test
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:" + Q1);
+    cs.reinitialize(conf, null);
+    checkQMapping("a", Q1, cs);
+
+    // %user tests
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:" + Q2);
+    cs.reinitialize(conf, null);
+    checkQMapping("a", Q2, cs);
+
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user");
+    cs.reinitialize(conf, null);
+    checkQMapping("a", "a", cs);
+
+    // %primary_group tests
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
+        "u:%user:%primary_group");
+    cs.reinitialize(conf, null);
+    checkQMapping("a", "agroup", cs);
+
+    // non-primary group mapping
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
+        "g:asubgroup1:" + Q1);
+    cs.reinitialize(conf, null);
+    checkQMapping("a", Q1, cs);
+
+    // space trimming
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "    u : a : " + Q1);
+    cs.reinitialize(conf, null);
+    checkQMapping("a", Q1, cs);
+
+    csConf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    conf = new YarnConfiguration(csConf);
+
+    resourceManager = new MockRM(csConf);
+    resourceManager.start();
+
+    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+    conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
+        "true");
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
+    resourceManager.getResourceScheduler().reinitialize(conf, null);
+
+    // ensure that if the user specifies a Q that is still overriden
+    checkAppQueue(resourceManager, "user", Q2, Q1);
+
+    // toggle admin override and retry
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
+        false);
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
+    setupQueueConfiguration(csConf);
+    resourceManager.getResourceScheduler().reinitialize(conf, null);
+
+    checkAppQueue(resourceManager, "user", Q2, Q2);
+
+    // ensure that if a user does not specify a Q, the user mapping is used
+    checkAppQueue(resourceManager, "user", null, Q1);
+
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2);
+    setupQueueConfiguration(csConf);
+    resourceManager.getResourceScheduler().reinitialize(conf, null);
+
+    // ensure that if a user does not specify a Q, the group mapping is used
+    checkAppQueue(resourceManager, "user", null, Q2);
+
+    // if the mapping specifies a queue that does not exist, the job is rejected
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
+        "u:user:non_existent_queue");
+    setupQueueConfiguration(csConf);
+
+    boolean fail = false;
+    try {
+      resourceManager.getResourceScheduler().reinitialize(conf, null);
+    }
+    catch (IOException ioex) {
+      fail = true;
+    }
+    Assert.assertTrue("queue initialization failed for non-existent q", fail);
+    resourceManager.stop();
+  }
+
+  private void checkAppQueue(MockRM resourceManager, String user,
+      String submissionQueue, String expected)
+      throws Exception {
+    RMApp app = resourceManager.submitApp(200, "name", user,
+        new HashMap<ApplicationAccessType, String>(), false, submissionQueue, -1,
+        null, "MAPREDUCE", false);
+    RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED
+        : RMAppState.ACCEPTED;
+    resourceManager.waitForState(app.getApplicationId(), expectedState);
+    // get scheduler app
+    CapacityScheduler cs = (CapacityScheduler)
+        resourceManager.getResourceScheduler();
+    SchedulerApplication schedulerApp =
+        cs.getSchedulerApplications().get(app.getApplicationId());
+    String queue = "";
+    if (schedulerApp != null) {
+      queue = schedulerApp.getQueue().getQueueName();
+    }
+    Assert.assertTrue("expected " + expected + " actual " + queue,
+        expected.equals(queue));
+    Assert.assertEquals(expected, app.getQueue());
+  }
+
+  private void checkInvalidQMapping(YarnConfiguration conf,
+      CapacityScheduler cs,
+      String mapping, String reason)
+      throws IOException {
+    boolean fail = false;
+    try {
+      conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, mapping);
+      cs.reinitialize(conf, null);
+    } catch (IOException ex) {
+      fail = true;
+    }
+    Assert.assertTrue("invalid mapping did not throw exception for " + reason,
+        fail);
+  }
+
+  private void checkQMapping(String user, String expected, CapacityScheduler cs)
+          throws IOException {
+    String actual = cs.getMappedQueueForTest(user);
+    Assert.assertTrue("expected " + expected + " actual " + actual,
+        expected.equals(actual));
+  }
+}



Mime
View raw message