hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r692248 - in /hadoop/core/trunk: ./ conf/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/core/org/apache/hadoop/security/ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/ha...
Date Thu, 04 Sep 2008 21:14:15 GMT
Author: omalley
Date: Thu Sep  4 14:14:15 2008
New Revision: 692248

URL: http://svn.apache.org/viewvc?rev=692248&view=rev
Log:
HADOOP-3698. Add access control to control who is allowed to submit or 
modify jobs in the JobTracker. (Hemanth Yamijala via omalley)

Added:
    hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessControlIOException.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/QueueManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobProfile.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Sep  4 14:14:15 2008
@@ -123,6 +123,9 @@
     HADOOP-3866. Added sort and multi-job updates in the JobTracker web ui.
     (Craig Weisenfluh via omalley)
 
+    HADOOP-3698. Add access control to control who is allowed to submit or 
+    modify jobs in the JobTracker. (Hemanth Yamijala via omalley)
+
   IMPROVEMENTS
 
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Thu Sep  4 14:14:15 2008
@@ -1380,4 +1380,65 @@
   </description>  
 </property>
 
+<property>
+  <name>mapred.queue.names</name>
+  <value>default</value>
+  <description> Comma separated list of queues configured for this jobtracker.
+    Jobs are added to queues and schedulers can configure different 
+    scheduling properties for the various queues. To configure a property 
+    for a queue, the name of the queue must match the name specified in this 
+    value. Queue properties that are common to all schedulers are configured 
+    here with the naming convention, mapred.queue.$QUEUE-NAME.$PROPERTY-NAME,
+    for e.g. mapred.queue.default.submit-job-acl.
+    The number of queues configured in this parameter could depend on the
+    type of scheduler being used, as specified in 
+    mapred.jobtracker.taskScheduler. For example, the JobQueueTaskScheduler
+    supports only a single queue, which is the default configured here.
+    Before adding more queues, ensure that the scheduler you've configured
+    supports multiple queues.
+  </description>
+</property>
+
+<property>
+  <name>mapred.acls.enabled</name>
+  <value>false</value>
+  <description> Specifies whether ACLs are enabled, and should be checked
+    for various operations.
+  </description>
+</property>
+
+<property>
+  <name>mapred.queue.default.acl-submit-job</name>
+  <value>*</value>
+  <description> Comma separated list of user and group names that are allowed
+    to submit jobs to the 'default' queue. The user list and the group list
+    are separated by a blank. For e.g. alice,bob group1,group2. 
+    If set to the special value '*', it means all users are allowed to 
+    submit jobs. 
+  </description>
+</property>
+
+<property>
+  <name>mapred.queue.default.acl-administer-jobs</name>
+  <value>*</value>
+  <description> Comma separated list of user and group names that are allowed
+    to delete jobs or modify job's priority for jobs not owned by the current
+    user in the 'default' queue. The user list and the group list
+    are separated by a blank. For e.g. alice,bob group1,group2. 
+    If set to the special value '*', it means all users are allowed to do 
+    this operation.
+  </description>
+</property>
+
+<property>
+  <name>queue.name</name>
+  <value>default</value>
+  <description> Queue to which a job is submitted. This must match one of the
+    queues defined in mapred.queue.names for the system. Also, the ACL setup
+    for the queue must allow the current user to submit a job to the queue.
+    Before specifying a queue, ensure that the system is configured with 
+    the queue, and access is allowed for submitting jobs to the queue.
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Thu Sep  4 14:14:15 2008
@@ -131,6 +131,11 @@
     }
 
     @Override
+    public QueueManager getQueueManager() {
+      return null;
+    }
+    
+    @Override
     public int getNumberOfUniqueHosts() {
       return 0;
     }

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessControlIOException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessControlIOException.java?rev=692248&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessControlIOException.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessControlIOException.java Thu
Sep  4 14:14:15 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.io.IOException;
+
+/**
+ * An exception indicating access control violations.  
+ */
+public class AccessControlIOException extends IOException {
+
+  private static final long serialVersionUID = -1874018786480045420L;
+  
+  /**
+   * Default constructor is needed for unwrapping from 
+   * {@link org.apache.hadoop.ipc.RemoteException}.
+   */
+  public AccessControlIOException() {
+    super("Permission denied.");
+  }
+
+  /**
+   * Constructs an {@link AccessControlIOException}
+   * with the specified detail message.
+   * @param s the detail message.
+   */
+  public AccessControlIOException(String s) {
+    super(s);
+  }
+}

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java Thu Sep  4 14:14:15
2008
@@ -169,6 +169,15 @@
   public int run(int numMapper, int numReducer, long mapSleepTime,
       int mapSleepCount, long reduceSleepTime,
       int reduceSleepCount) throws IOException {
+    JobConf job = setupJobConf(numMapper, numReducer, mapSleepTime, 
+                  mapSleepCount, reduceSleepTime, reduceSleepCount);
+    JobClient.runJob(job);
+    return 0;
+  }
+
+  public JobConf setupJobConf(int numMapper, int numReducer, 
+                                long mapSleepTime, int mapSleepCount, 
+                                long reduceSleepTime, int reduceSleepCount) {
     JobConf job = new JobConf(getConf(), SleepJob.class);
     job.setNumMapTasks(numMapper);
     job.setNumReduceTasks(numReducer);
@@ -186,9 +195,7 @@
     job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
     job.setInt("sleep.job.map.sleep.count", mapSleepCount);
     job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
-
-    JobClient.runJob(job);
-    return 0;
+    return job;
   }
 
   public int run(String[] args) throws Exception {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Thu Sep  4 14:14:15
2008
@@ -61,6 +61,7 @@
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -396,7 +397,7 @@
   private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
       Configuration conf) throws IOException {
     return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
-        JobSubmissionProtocol.versionID, addr, conf,
+        JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
         NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
   }
 
@@ -552,13 +553,7 @@
      * set this user's id in job configuration, so later job files can be
      * accessed using this user's id
      */
-    UnixUserGroupInformation ugi = null;
-    try {
-      ugi = UnixUserGroupInformation.login(job, true);
-    } catch (LoginException e) {
-      throw (IOException)(new IOException(
-          "Failed to get the current user's information.").initCause(e));
-    }
+    UnixUserGroupInformation ugi = getUGI(job);
       
     //
     // Figure out what fs the JobTracker is using.  Copy the
@@ -677,6 +672,17 @@
     }
 
   }
+
+  private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
+    UnixUserGroupInformation ugi = null;
+    try {
+      ugi = UnixUserGroupInformation.login(job, true);
+    } catch (LoginException e) {
+      throw (IOException)(new IOException(
+          "Failed to get the current user's information.").initCause(e));
+    }
+    return ugi;
+  }
   
   /**
    * Submit a job to the MR system.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Thu Sep  4 14:14:15
2008
@@ -111,6 +111,12 @@
   public static final long DISABLED_VIRTUAL_MEMORY_LIMIT = -1L;
   
   /**
+   * Name of the queue to which jobs will be submitted, if no queue
+   * name is mentioned.
+   */
+  public static final String DEFAULT_QUEUE_NAME = "default";
+  
+  /**
    * Construct a map/reduce job configuration.
    */
   public JobConf() {}
@@ -1351,7 +1357,26 @@
   public void setMaxVirtualMemoryForTask(long vmem) {
     setLong("mapred.task.maxmemory", vmem);
   }
-    
+  
+  /**
+   * Return the name of the queue to which this job is submitted.
+   * Defaults to 'default'.
+   * 
+   * @return name of the queue
+   */
+  public String getQueueName() {
+    return get("queue.name", DEFAULT_QUEUE_NAME);
+  }
+  
+  /**
+   * Set the name of the queue to which this job should be submitted.
+   * 
+   * @param queueName Name of the queue
+   */
+  public void setQueueName(String queueName) {
+    set("queue.name", queueName);
+  }
+  
   /** 
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Thu Sep  4 14:14:15
2008
@@ -193,7 +193,8 @@
     conf = new JobConf(localJobFile);
     this.priority = conf.getJobPriority();
     this.profile = new JobProfile(conf.getUser(), jobid, 
-                                  jobFile.toString(), url, conf.getJobName());
+                                  jobFile.toString(), url, conf.getJobName(),
+                                  conf.getQueueName());
     String jarFile = conf.getJar();
     if (jarFile != null) {
       fs.copyToLocalFile(new Path(jarFile), localJarFile);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobProfile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobProfile.java?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobProfile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobProfile.java Thu Sep  4 14:14:15
2008
@@ -47,7 +47,8 @@
   String jobFile;
   String url;
   String name;
-
+  String queueName;
+  
   /**
    * Construct an empty {@link JobProfile}.
    */
@@ -66,13 +67,30 @@
    */
   public JobProfile(String user, JobID jobid, String jobFile, String url,
                     String name) {
+    this(user, jobid, jobFile, url, name, JobConf.DEFAULT_QUEUE_NAME);
+  }
+
+  /**
+   * Construct a {@link JobProfile} the userid, jobid, 
+   * job config-file, job-details url and job name. 
+   * 
+   * @param user userid of the person who submitted the job.
+   * @param jobid id of the job.
+   * @param jobFile job configuration file. 
+   * @param url link to the web-ui for details of the job.
+   * @param name user-specified job name.
+   * @param queueName name of the queue to which the job is submitted
+   */
+  public JobProfile(String user, JobID jobid, String jobFile, String url,
+                      String name, String queueName) {
     this.user = user;
     this.jobid = jobid;
     this.jobFile = jobFile;
     this.url = url;
     this.name = name;
+    this.queueName = queueName;
   }
-
+  
   /**
    * @deprecated use JobProfile(String, JobID, String, String, String) instead
    */
@@ -128,7 +146,15 @@
   public String getJobName() {
     return name;
   }
-    
+  
+  /**
+   * Get the name of the queue to which the job is submitted.
+   * @return name of the queue.
+   */
+  public String getQueueName() {
+    return queueName;
+  }
+  
   ///////////////////////////////////////
   // Writable
   ///////////////////////////////////////
@@ -138,6 +164,7 @@
     Text.writeString(out, url);
     Text.writeString(out, user);
     Text.writeString(out, name);
+    Text.writeString(out, queueName);
   }
   public void readFields(DataInput in) throws IOException {
     this.jobid = JobID.read(in);
@@ -145,6 +172,7 @@
     this.url = Text.readString(in);
     this.user = Text.readString(in);
     this.name = Text.readString(in);
+    this.queueName = Text.readString(in);
   }
 }
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Thu Sep
 4 14:14:15 2008
@@ -40,8 +40,9 @@
    * Version 8: change {job|task}id's to use corresponding objects rather that strings.
    * Version 9: change the counter representation for HADOOP-1915
    * Version 10: added getSystemDir for HADOOP-3135
+   * Version 11: changed JobProfile to include the queue name for HADOOP-3698
    */
-  public static final long versionID = 10L;
+  public static final long versionID = 11L;
 
   /**
    * Allocate a name for the job.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Sep  4 14:14:15
2008
@@ -60,6 +60,9 @@
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.security.AccessControlIOException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -511,6 +514,8 @@
 
   private Thread taskCommitThread;
   
+  private QueueManager queueManager;
+
   /**
    * Start the JobTracker process, listen on the indicated port
    */
@@ -533,6 +538,8 @@
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
                                            conf.get("mapred.hosts.exclude", ""));
     
+    queueManager = new QueueManager(this.conf);
+    
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
       = conf.getClass("mapred.jobtracker.taskScheduler",
@@ -1134,6 +1141,13 @@
     jobInProgressListeners.remove(listener);
   }
   
+  /**
+   * Return the {@link QueueManager} associated with the JobTracker.
+   */
+  public QueueManager getQueueManager() {
+    return queueManager;
+  }
+  
   ////////////////////////////////////////////////////
   // InterTrackerProtocol
   ////////////////////////////////////////////////////
@@ -1490,6 +1504,8 @@
     
     totalSubmissions++;
     JobInProgress job = new JobInProgress(jobId, this, this.conf);
+    checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
+
     synchronized (jobs) {
       synchronized (taskScheduler) {
         jobs.put(job.getProfile().getJobID(), job);
@@ -1502,6 +1518,26 @@
     return job.getStatus();
   }
 
+  // Check whether the specified operation can be performed
+  // related to the job. If ownerAllowed is true, then an owner
+  // of the job can perform the operation irrespective of
+  // access control.
+  private void checkAccess(JobInProgress job, 
+                                QueueManager.QueueOperation oper) 
+                                  throws IOException {
+    // get the user group info
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+
+    // get the queue
+    String queue = job.getProfile().getQueueName();
+    if (!queueManager.hasAccess(queue, job, oper, ugi)) {
+      throw new AccessControlIOException("User " 
+                            + ugi.getUserName() 
+                            + " cannot perform "
+                            + "operation " + oper + " on queue " + queue);
+    }
+  }
+
   public synchronized ClusterStatus getClusterStatus() {
     synchronized (taskTrackers) {
       return new ClusterStatus(taskTrackers.size(),
@@ -1513,8 +1549,9 @@
     }
   }
     
-  public synchronized void killJob(JobID jobid) {
+  public synchronized void killJob(JobID jobid) throws IOException {
     JobInProgress job = jobs.get(jobid);
+    checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
     if (job.inited()) {
       job.kill();
     }
@@ -1674,6 +1711,7 @@
   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
+      checkAccess(tip.getJob(), QueueManager.QueueOperation.ADMINISTER_JOBS);
       return tip.killTask(taskid, shouldFail);
     }
     else {

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/QueueManager.java?rev=692248&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/QueueManager.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/QueueManager.java Thu Sep  4 14:14:15
2008
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Class that exposes information about queues maintained by the Hadoop
+ * Map/Reduce framework.
+ * 
+ * The Map/Reduce framework can be configured with one or more queues,
+ * depending on the scheduler it is configured with. While some 
+ * schedulers work only with one queue, some schedulers support multiple 
+ * queues.
+ *  
+ * Queues can be configured with various properties. Some of these
+ * properties are common to all schedulers, and those are handled by this
+ * class. Schedulers might also associate several custom properties with 
+ * queues. Where such a case exists, the queue name must be used to link 
+ * the common properties with the scheduler specific ones.  
+ */
+public class QueueManager {
+  
+  private static final Log LOG = LogFactory.getLog(QueueManager.class);
+  
+  // Prefix in configuration for queue related keys
+  private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX 
+                                                        = "mapred.queue.";
+  // Indicates an ACL string that represents access to all users
+  private static final String ALL_ALLOWED_ACL_VALUE = "*";
+  // Configured queues
+  private Set<String> queueNames;
+  // Map of a queue and ACL property name with an ACL
+  private HashMap<String, ACL> aclsMap;
+  // Map of a queue name to any generic object that represents 
+  // scheduler information 
+  private HashMap<String, Object> schedulerInfoObjects;
+  // Whether ACLs are enabled in the system or not.
+  private boolean aclsEnabled;
+  
+  /**
+   * Enum representing an operation that can be performed on a queue.
+   */
+  static enum QueueOperation {
+    SUBMIT_JOB ("acl-submit-job", false),
+    ADMINISTER_JOBS ("acl-administer-jobs", true);
+    // TODO: Add ACL for LIST_JOBS when we have ability to authenticate 
+    //       users in UI
+    // TODO: Add ACL for CHANGE_ACL when we have an admin tool for 
+    //       configuring queues.
+    
+    private final String aclName;
+    private final boolean jobOwnerAllowed;
+    
+    QueueOperation(String aclName, boolean jobOwnerAllowed) {
+      this.aclName = aclName;
+      this.jobOwnerAllowed = jobOwnerAllowed;
+    }
+
+    final String getAclName() {
+      return aclName;
+    }
+    
+    final boolean isJobOwnerAllowed() {
+      return jobOwnerAllowed;
+    }
+  }
+  
+  /**
+   * Class representing an access control that is configured.
+   */
+  private static class ACL {
+    
+    // Set of users who are granted access.
+    private Set<String> users;
+    // Set of groups which are granted access
+    private Set<String> groups;
+    // Whether all users are granted access.
+    private boolean allAllowed;
+    
+    /**
+     * Construct a new ACL from a String representation of the same.
+     * 
+     * The String is a a comma separated list of users and groups.
+     * The user list comes first and is separated by a space followed 
+     * by the group list. For e.g. "user1,user2 group1,group2"
+     * 
+     * @param aclString String representation of the ACL
+     */
+    ACL (String aclString) {
+      users = new TreeSet<String>();
+      groups = new TreeSet<String>();
+      if (aclString.equals(ALL_ALLOWED_ACL_VALUE)) {
+        allAllowed = true;
+      } else {
+        String[] userGroupStrings = aclString.split(" ", 2);
+        
+        if (userGroupStrings.length >= 1) {
+          String[] usersStr = userGroupStrings[0].split(",");
+          if (usersStr.length >= 1) {
+            addToSet(users, usersStr);
+          }
+        }
+        
+        if (userGroupStrings.length == 2) {
+          String[] groupsStr = userGroupStrings[1].split(",");
+          if (groupsStr.length >= 1) {
+            addToSet(groups, groupsStr);
+          }
+        }
+      }
+    }
+    
+    boolean allUsersAllowed() {
+      return allAllowed;
+    }
+    
+    boolean isUserAllowed(String user) {
+      return users.contains(user);
+    }
+    
+    boolean isAnyGroupAllowed(String[] otherGroups) {
+      for (String g : otherGroups) {
+        if (groups.contains(g)) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+  
+  /**
+   * Construct a new QueueManager using configuration specified in the passed
+   * in {@link org.apache.hadoop.conf.Configuration} object.
+   * 
+   * @param conf Configuration object where queue configuration is specified.
+   */
+  public QueueManager(Configuration conf) {
+    queueNames = new TreeSet<String>();
+    aclsMap = new HashMap<String, ACL>();
+    schedulerInfoObjects = new HashMap<String, Object>();
+    initialize(conf);
+  }
+  
+  /**
+   * Return the set of queues configured in the system.
+   * 
+   * The number of queues configured should be dependent on the Scheduler 
+   * configured. Note that some schedulers work with only one queue, whereas
+   * others can support multiple queues.
+   *  
+   * @return Set of queue names.
+   */
+  public synchronized Set<String> getQueues() {
+    return queueNames;
+  }
+  
+  /**
+   * Return true if the given {@link QueueManager.QueueOperation} can be 
+   * performed by the specified user on the given queue.
+   * 
+   * An operation is allowed if all users are provided access for this
+   * operation, or if either the user or any of the groups specified is
+   * provided access.
+   * 
+   * @param queueName Queue on which the operation needs to be performed. 
+   * @param oper The operation to perform
+   * @param ugi The user and groups who wish to perform the operation.
+   * 
+   * @return true if the operation is allowed, false otherwise.
+   */
+  public synchronized boolean hasAccess(String queueName, QueueOperation oper,
+                                UserGroupInformation ugi) {
+    return hasAccess(queueName, null, oper, ugi);
+  }
+  
+  /**
+   * Return true if the given {@link QueueManager.QueueOperation} can be 
+   * performed by the specified user on the specified job in the given queue.
+   * 
+   * An operation is allowed either if the owner of the job is the user 
+   * performing the task, all users are provided access for this
+   * operation, or if either the user or any of the groups specified is
+   * provided access.
+   * 
+   * If the {@link QueueManager.QueueOperation} is not job specific then the 
+   * job parameter is ignored.
+   * 
+   * @param queueName Queue on which the operation needs to be performed.
+   * @param job The {@link JobInProgress} on which the operation is being
+   *            performed. 
+   * @param oper The operation to perform
+   * @param ugi The user and groups who wish to perform the operation.
+   * 
+   * @return true if the operation is allowed, false otherwise.
+   */
+  public synchronized boolean hasAccess(String queueName, JobInProgress job, 
+                                QueueOperation oper, 
+                                UserGroupInformation ugi) {
+    if (!aclsEnabled) {
+      return true;
+    }
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("checking access for : " + toFullPropertyName(queueName, 
+                                            oper.getAclName()));      
+    }
+    
+    if (oper.isJobOwnerAllowed()) {
+      if (job.getJobConf().getUser().equals(ugi.getUserName())) {
+        return true;
+      }
+    }
+    
+    ACL acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName()));
+    if (acl == null) {
+      return false;
+    }
+    return ((acl.allUsersAllowed()) ||
+              (acl.isUserAllowed(ugi.getUserName())) ||
+              (acl.isAnyGroupAllowed(ugi.getGroupNames())));    
+  }
+  
+  /**
+   * Set a generic Object that represents scheduling information relevant
+   * to a queue.
+   * 
+   * A string representation of this Object will be used by the framework
+   * to display in user facing applications like the JobTracker web UI and
+   * the hadoop CLI.
+   * 
+   * @param queueName queue for which the scheduling information is to be set. 
+   * @param queueInfo scheduling information for this queue.
+   */
+  public synchronized void setSchedulerInfo(String queueName, 
+                                              Object queueInfo) {
+    schedulerInfoObjects.put(queueName, queueInfo);
+  }
+  
+  /**
+   * Return the scheduler information configured for this queue.
+   * 
+   * @param queueName queue for which the scheduling information is required.
+   * @return The scheduling information for this queue.
+   * 
+   * @see #setSchedulerInfo(String, Object)
+   */
+  public synchronized Object getSchedulerInfo(String queueName) {
+    return schedulerInfoObjects.get(queueName);
+  }
+  
+  /**
+   * Refresh information configured for queues in the system by reading
+   * it from the passed in {@link org.apache.hadoop.conf.Configuration}.
+   *
+   * Previously stored information about queues is removed and new
+   * information populated from the configuration.
+   * 
+   * @param conf New configuration for the queues. 
+   */
+  public synchronized void refresh(Configuration conf) {
+    queueNames.clear();
+    aclsMap.clear();
+    initialize(conf);
+  }
+  
+  private void initialize(Configuration conf) {
+    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
+    String[] queues = conf.getStrings("mapred.queue.names", 
+                                  new String[] {JobConf.DEFAULT_QUEUE_NAME});
+    addToSet(queueNames, queues);
+    
+    // for every queue, and every operation, get the ACL
+    // if any is specified and store in aclsMap.
+    for (String queue : queues) {
+      for (QueueOperation oper : QueueOperation.values()) {
+        String key = toFullPropertyName(queue, oper.getAclName());
+        String aclString = conf.get(key, "*");
+        aclsMap.put(key, new ACL(aclString));
+      }
+    }
+  }
+  
+  private static final String toFullPropertyName(String queue, 
+      String property) {
+    return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
+  }
+  
+  private static final void addToSet(Set<String> set, String[] elems) {
+    for (String elem : elems) {
+      set.add(elem);
+    }
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java Thu Sep
 4 14:14:15 2008
@@ -56,4 +56,11 @@
    */
   public void removeJobInProgressListener(JobInProgressListener listener);
 
+  /**
+   * Return the {@link QueueManager} which manages the queues in this
+   * {@link TaskTrackerManager}.
+   *
+   * @return the {@link QueueManager}
+   */
+  public QueueManager getQueueManager();
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=692248&r1=692247&r2=692248&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Thu
Sep  4 14:14:15 2008
@@ -139,6 +139,11 @@
       listeners.remove(listener);
     }
     
+    @Override
+    public QueueManager getQueueManager() {
+      return null;
+    }
+    
     // Test methods
     
     public void submitJob(JobInProgress job) {

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=692248&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java Thu Sep  4 14:14:15
2008
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+
+import javax.security.auth.login.LoginException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+
+public class TestQueueManager extends TestCase {
+
+  private static final Log LOG = LogFactory.getLog(TestQueueManager.class);
+  
+  private MiniDFSCluster miniDFSCluster;
+  private MiniMRCluster miniMRCluster;
+
+  public void testDefaultQueueConfiguration() {
+    JobConf conf = new JobConf();
+    QueueManager qMgr = new QueueManager(conf);
+    Set<String> expQueues = new TreeSet<String>();
+    expQueues.add("default");
+    verifyQueues(expQueues, qMgr.getQueues());
+    // pass true so it will fail if the key is not found.
+    assertFalse(conf.getBoolean("mapred.acls.enabled", true));
+  }
+  
+  public void testMultipleQueues() {
+    JobConf conf = new JobConf();
+    conf.set("mapred.queue.names", "q1,q2,Q3");
+    QueueManager qMgr = new QueueManager(conf);
+    Set<String> expQueues = new TreeSet<String>();
+    expQueues.add("q1");
+    expQueues.add("q2");
+    expQueues.add("Q3");
+    verifyQueues(expQueues, qMgr.getQueues());
+  }
+  
+  public void testSchedulerInfo() {
+    JobConf conf = new JobConf();
+    conf.set("mapred.queue.names", "qq1,qq2");
+    QueueManager qMgr = new QueueManager(conf);
+    qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
+    qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
+    assertEquals(qMgr.getSchedulerInfo("qq2"), "queueInfoForqq2");
+    assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
+  }
+  
+  public void testAllEnabledACLForJobSubmission() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
+    verifyJobSubmission(conf, true);
+  }
+  
+  public void testAllDisabledACLForJobSubmission() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "");
+    verifyJobSubmission(conf, false);
+  }
+  
+  public void testUserDisabledACLForJobSubmission() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", 
+                                "3698-non-existent-user");
+    verifyJobSubmission(conf, false);
+  }
+  
+  public void testDisabledACLForNonDefaultQueue() throws IOException {
+    // allow everyone in default queue
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
+    // setup a different queue
+    conf.set("mapred.queue.names", "default,q1");
+    // setup a different acl for this queue.
+    conf.set("mapred.queue.q1.acl-submit-job", "dummy-user");
+    // verify job submission to other queue fails.
+    verifyJobSubmission(conf, false, "q1");
+  }
+  
+  public void testEnabledACLForNonDefaultQueue() throws IOException,
+                                                          LoginException {
+    // login as self...
+    UserGroupInformation ugi = UnixUserGroupInformation.login();
+    String userName = ugi.getUserName();
+    // allow everyone in default queue
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
+    // setup a different queue
+    conf.set("mapred.queue.names", "default,q2");
+    // setup a different acl for this queue.
+    conf.set("mapred.queue.q2.acl-submit-job", userName);
+    // verify job submission to other queue fails.
+    verifyJobSubmission(conf, true, "q2");
+  }
+  
+  public void testUserEnabledACLForJobSubmission() 
+                                    throws IOException, LoginException {
+    // login as self...
+    UserGroupInformation ugi = UnixUserGroupInformation.login();
+    String userName = ugi.getUserName();
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
+                                  "3698-junk-user," + userName 
+                                    + " 3698-junk-group1,3698-junk-group2");
+    verifyJobSubmission(conf, true);
+  }
+  
+  public void testGroupsEnabledACLForJobSubmission() 
+                                    throws IOException, LoginException {
+    // login as self, get one group, and add in allowed list.
+    UserGroupInformation ugi = UnixUserGroupInformation.login();
+    String[] groups = ugi.getGroupNames();
+    assertTrue(groups.length > 0);
+    JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
+                                "3698-junk-user1,3698-junk-user2 " 
+                                  + groups[groups.length-1] 
+                                           + ",3698-junk-group");
+    verifyJobSubmission(conf, true);
+  }
+  
+  public void testAllEnabledACLForJobKill() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
+    verifyJobKill(conf, true);
+  }
+
+  public void testAllDisabledACLForJobKill() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
+    verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
+  }
+  
+  public void testOwnerAllowedForJobKill() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", 
+                                              "junk-user");
+    verifyJobKill(conf, true);
+  }
+  
+  public void testUserDisabledACLForJobKill() throws IOException {
+    //setup a cluster allowing a user to submit
+    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", 
+                                              "dummy-user");
+    verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
+  }
+  
+  public void testUserEnabledACLForJobKill() throws IOException, 
+                                                    LoginException {
+    // login as self...
+    UserGroupInformation ugi = UnixUserGroupInformation.login();
+    String userName = ugi.getUserName();
+    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
+                                              "dummy-user,"+userName);
+    verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-user-group");
+  }
+  
+  private JobConf setupConf(String aclName, String aclValue) {
+    JobConf conf = new JobConf();
+    conf.setBoolean("mapred.acls.enabled", true);
+    conf.set(aclName, aclValue);
+    return conf;
+  }
+  
+  private void verifyQueues(Set<String> expectedQueues, 
+                                          Set<String> actualQueues) {
+    assertEquals(expectedQueues.size(), actualQueues.size());
+    for (String queue : expectedQueues) {
+      assertTrue(actualQueues.contains(queue));
+    }
+  }
+  
+  private void verifyJobSubmission(JobConf conf, boolean shouldSucceed) 
+                                              throws IOException {
+    verifyJobSubmission(conf, shouldSucceed, "default");
+  }
+
+  private void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 
+                                    String queue) throws IOException {
+    setUpCluster(conf);
+    try {
+      RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
+      if (shouldSucceed) {
+        assertTrue(rjob.isSuccessful());
+      } else {
+        fail("Job submission should have failed.");
+      }
+    } catch (IOException ioe) {
+      if (shouldSucceed) {
+        throw ioe;
+      } else {
+        LOG.info("exception while submitting job: " + ioe.getMessage());
+        assertTrue(ioe.getMessage().
+            contains("cannot perform operation " +
+            "SUBMIT_JOB on queue " + queue));
+      }
+    } finally {
+      tearDownCluster();
+    }
+}
+
+  private void verifyJobKill(JobConf conf, boolean shouldSucceed) 
+                                      throws IOException {
+    setUpCluster(conf);
+    try {
+      RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false);
+      assertFalse(rjob.isComplete());
+      while(rjob.mapProgress() == 0.0f) {
+        try {
+          Thread.sleep(10);  
+        } catch (InterruptedException ie) {
+          break;
+        }
+      }
+      rjob.killJob();
+      if (shouldSucceed) {
+        assertTrue(rjob.isComplete());
+      } else {
+        fail("Job kill should have failed.");
+      }
+    } catch (IOException ioe) {
+      if (shouldSucceed) {
+        throw ioe;
+      } else {
+        LOG.info("exception while submitting job: " + ioe.getMessage());
+        assertTrue(ioe.getMessage().
+                        contains("cannot perform operation " +
+                                    "ADMINISTER_JOBS on queue default"));
+      }
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
+                                        String otherUserInfo) 
+                        throws IOException {
+    setUpCluster(conf);
+    try {
+      // submit a job as another user.
+      String userInfo = otherUserInfo;
+      RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
+      assertFalse(rjob.isComplete());
+
+      //try to kill as self
+      try {
+        rjob.killJob();
+        if (!shouldSucceed) {
+          fail("should fail kill operation");  
+        }
+      } catch (IOException ioe) {
+        if (shouldSucceed) {
+          throw ioe;
+        }
+        //verify it fails
+        LOG.info("exception while submitting job: " + ioe.getMessage());
+        assertTrue(ioe.getMessage().
+                        contains("cannot perform operation " +
+                                    "ADMINISTER_JOBS on queue default"));
+      }
+      //wait for job to complete on its own
+      while (!rjob.isComplete()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          break;
+        }
+      }
+    } finally {
+      tearDownCluster();
+    }
+  }
+  
+  private void setUpCluster(JobConf conf) throws IOException {
+    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fileSys = miniDFSCluster.getFileSystem();
+    String namenode = fileSys.getUri().toString();
+    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
+                      null, null, conf);
+  }
+  
+  private void tearDownCluster() throws IOException {
+    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
+    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
+  }
+  
+  private RunningJob submitSleepJob(int numMappers, int numReducers, 
+                            long mapSleepTime, long reduceSleepTime,
+                            boolean shouldComplete) 
+                              throws IOException {
+    return submitSleepJob(numMappers, numReducers, mapSleepTime,
+                          reduceSleepTime, shouldComplete, null);
+  }
+  
+  private RunningJob submitSleepJob(int numMappers, int numReducers, 
+                                      long mapSleepTime, long reduceSleepTime,
+                                      boolean shouldComplete, String userInfo) 
+                                            throws IOException {
+    return submitSleepJob(numMappers, numReducers, mapSleepTime, 
+                          reduceSleepTime, shouldComplete, userInfo, null);
+  }
+
+  private RunningJob submitSleepJob(int numMappers, int numReducers, 
+                                    long mapSleepTime, long reduceSleepTime,
+                                    boolean shouldComplete, String userInfo,
+                                    String queueName) 
+                                      throws IOException {
+    JobConf clientConf = new JobConf();
+    clientConf.set("mapred.job.tracker", "localhost:"
+        + miniMRCluster.getJobTrackerPort());
+    SleepJob job = new SleepJob();
+    job.setConf(clientConf);
+    clientConf = job.setupJobConf(numMappers, numReducers, 
+        mapSleepTime, (int)mapSleepTime/100,
+        reduceSleepTime, (int)reduceSleepTime/100);
+    if (queueName != null) {
+      clientConf.setQueueName(queueName);
+    }
+    RunningJob rJob = null;
+    if (shouldComplete) {
+      rJob = JobClient.runJob(clientConf);  
+    } else {
+      JobConf jc = new JobConf(clientConf);
+      if (userInfo != null) {
+        jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
+      }
+      rJob = new JobClient(clientConf).submitJob(jc);
+    }
+    return rJob;
+  }
+
+}



Mime
View raw message