hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r783059 [1/2] - in /hadoop/core/trunk/src/contrib: ./ dynamic-scheduler/ dynamic-scheduler/ivy/ dynamic-scheduler/src/ dynamic-scheduler/src/java/ dynamic-scheduler/src/java/org/ dynamic-scheduler/src/java/org/apache/ dynamic-scheduler/src/...
Date Tue, 09 Jun 2009 16:16:34 GMT
Author: stevel
Date: Tue Jun  9 16:16:33 2009
New Revision: 783059

URL: http://svn.apache.org/viewvc?rev=783059&view=rev
Log:
HADOOP-4768 Dynamic Priority Scheduler that allows queue shares to be controlled dynamically by a currency

Added:
    hadoop/core/trunk/src/contrib/dynamic-scheduler/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/README
    hadoop/core/trunk/src/contrib/dynamic-scheduler/build.xml
    hadoop/core/trunk/src/contrib/dynamic-scheduler/ivy/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/ivy.xml
    hadoop/core/trunk/src/contrib/dynamic-scheduler/ivy/libraries.properties
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/AllocationStore.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/BudgetQueue.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityServlet.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/FileAllocationStore.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityAuthorization.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PrioritySchedulerOptions.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueAllocation.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueAllocator.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueTaskScheduler.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestDynamicScheduler.java
    hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestPriorityScheduler.java
Modified:
    hadoop/core/trunk/src/contrib/build.xml

Modified: hadoop/core/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/build.xml?rev=783059&r1=783058&r2=783059&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/build.xml (original)
+++ hadoop/core/trunk/src/contrib/build.xml Tue Jun  9 16:16:33 2009
@@ -55,6 +55,7 @@
       <fileset dir="." includes="fairscheduler/build.xml"/>
       <fileset dir="." includes="capacity-scheduler/build.xml"/>
       <fileset dir="." includes="mrunit/build.xml"/>
+      <fileset dir="." includes="dynamic-scheduler/build.xml"/>
     </subant>
     <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
     <fail if="testsfailed">Tests failed!</fail>

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/README
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/README?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/README (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/README Tue Jun  9 16:16:33 2009
@@ -0,0 +1,166 @@
+# Copyright 2008 The Apache Software Foundation Licensed under the
+# Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License.  You may obtain a copy
+# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+# required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.  See the License for the specific language governing
+# permissions and limitations under the License.
+
+This package implements dynamic priority scheduling for MapReduce jobs.
+
+Overview
+--------
+The purpose of this scheduler is to allow users to increase and decrease
+their queue priorities continuosly to meet the requirements of their
+current workloads. The scheduler is aware of the current demand and makes
+it more expensive to boost the priority under peak usage times. Thus
+users who move their workload to low usage times are rewarded with
+discounts. Priorities can only be boosted within a limited quota.
+All users are given a quota or a budget which is deducted periodically
+in configurable accounting intervals. How much of the budget is 
+deducted is determined by a per-user spending rate, which may
+be modified at any time directly by the user. The cluster slots 
+share allocated to a particular user is computed as that users
+spending rate over the sum of all spending rates in the same accounting
+period.
+
+Configuration
+-------------
+This scheduler comprises two components, an accounting or resource allocation part that 
+manages and bills for queue shares, and a scheduler that
+enforces the queue shares in the form of map and reduce slots of running jobs.
+
+Hadoop Configuration (e.g. hadoop-site.xml):
+mapred.jobtracker.taskScheduler      
+    This needs to be set to 
+    org.apache.hadoop.mapred.DynamicPriorityScheduler
+    to use the dynamic scheduler.
+Scheduler Configuration:
+mapred.dynamic-scheduler.scheduler   
+    The Java path of the MapReduce scheduler that should
+    enforce the allocated shares.
+    Has been tested with (which is the default):
+    org.apache.hadoop.mapred.PriorityScheduler
+mapred.priority-scheduler.acl-file
+    Full path of ACL with syntax:
+      <user> <role> <secret key>
+    separated by line feeds
+mapred.dynamic-scheduler.budget-file  
+    The full OS path of the file from which the
+    budgets are read and stored. The syntax of this file is:
+    <queue name> <budget> <spending rate>
+    separated by newlines where budget can be specified
+    as a Java float. The file should not be edited
+    directly, if the server is running, but through the 
+    servlet API to ensure proper synchronization.
+
+mapred.dynamic-scheduler.alloc-interval       
+    Allocation interval, when the scheduler rereads the
+    spending rates and recalculates the cluster shares.
+    Specified as seconds between allocations.
+    Default is 20 seconds.
+
+Servlet API
+----------
+The queue should be managed through the Servlet REST API
+if the jobtracker server is running.
+
+It is installed at
+[job tracker URL]/scheduler
+operations supported: 
+  price       i
+    get the current price of the cluster 
+    (aggregate spending rates of queues with running or pending jobs)
+  time        
+    get start time of server and current time in epoch units 
+  info=queue_to_query 
+    get info about queue (requires user or admin privilege>
+  infos 
+    get info about all queues (requires admin privilege)
+  addBudget=budget_to_add,queue=queue_to_change 
+    add budget to queue (requires admin privilege)
+  setSpending=spending_to_set,queue=queue_to_change 
+    set spending rate of queue (requires user or admin privilege)
+  addQueue=queue_to_add 
+    add new queue  (requires admin privilege)
+  removeQueue=queue_to_remove 
+    remove queue (requires admin privilege)
+
+Example:
+  http://myhost:50030/scheduler?setSpending=0.01&queue=myqueue
+  The Authorization header is used for signing
+
+The signature is created akin to the AWS Query Authentication scheme
+HMAC_SHA1("<query path>&user=<user>&timestamp=<timestamp>", key)
+For the servlet operations query path is everything that comes after /scheduler?
+in the url. For job submission the query path is just the empty string "".
+Job submissions also need to set the following job properties:
+-Dmapred.job.timestamp=<ms epoch time> -Dmapred.job.signature=<signature as above> -Dmapred.job.queue.name=<queue>
+Note queue must match the user submitting the job.
+
+Example python query 
+--------------------------------
+import base64
+import hmac
+import sha
+import httplib, urllib
+import sys
+import time
+from popen2 import popen3
+import os
+
+def hmac_sha1(data, key):
+    return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip())
+
+stdout, stdin, stderr = popen3("id -un")
+USER = stdout.read().strip()
+f = open(os.path.expanduser("~/.ssh/hadoop_key"))
+KEY = f.read().strip()
+f.close()
+f = open(os.path.expanduser("/etc/hadoop_server"))
+SERVER = f.read().strip()
+f.close()
+URL = "/scheduler"
+conn = httplib.HTTPConnection(SERVER)
+params = sys.argv[1]
+params = params + "&user=%s&timestamp=%d" % (USER,long(time.time()*1000))
+print params
+headers = {"Authorization": hmac_sha1(params, KEY)}
+print headers
+conn.request("GET",URL + "?" + params,None, headers)
+response = conn.getresponse()
+print response.status, response.reason
+data =  response.read()
+conn.close()
+print data
+
+Example python job submission parameter generation
+--------------------------------------------------
+import base64
+import hmac
+import sha
+import httplib, urllib
+import sys
+import time
+import os
+from popen2 import popen3
+
+def hmac_sha1(data, key):
+    return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip())
+
+stdout, stdin, stderr = popen3("id -un")
+USER = stdout.read().strip()
+f = open(os.path.expanduser("~/.ssh/hadoop_key"))
+KEY = f.read().strip()
+f.close()
+if len(sys.argv) > 1:
+  params = sys.argv[1]
+else:
+  params = ""
+timestamp = long(time.time()*1000)
+params = params + "&user=%s&timestamp=%d" % (USER,timestamp)
+print "-Dmapred.job.timestamp=%d -Dmapred.job.signature=%s -Dmapred.job.queue.name=%s" % (timestamp, hmac_sha1(params, KEY), USER)
+
+

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/build.xml?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/build.xml (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/build.xml Tue Jun  9 16:16:33 2009
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!-- 
+Before you can run these subtargets directly, you need 
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="dynamic-priority" default="jar">
+  <import file="../build-contrib.xml"/>
+
+</project>

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/ivy.xml?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/ivy.xml (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/ivy.xml Tue Jun  9 16:16:33 2009
@@ -0,0 +1,223 @@
+<?xml version="1.0" ?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<ivy-module version="1.0">
+  <info organisation="org.apache.hadoop" module="${ant.project.name}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
+    <description>
+        Apache Hadoop contrib
+    </description>
+  </info>
+  <configurations defaultconfmapping="default">
+    <!--these match the Maven configurations-->
+    <conf name="default" extends="master,runtime"/>
+    <conf name="master" description="contains the artifact but no dependencies"/>
+    <conf name="runtime" description="runtime but not the artifact"
+      extends="client,server,s3-server,kfs"/>
+
+    <conf name="mandatory" description="contains the critical  dependencies"
+      extends="commons-logging,log4j"/>
+
+    <!--
+    These public configurations contain the core dependencies for running hadoop client or server.
+    The server is effectively a superset of the client.
+    -->
+    <conf name="client" description="client-side dependencies"
+      extends="mandatory,httpclient"/>
+    <conf name="server" description="server-side dependencies"
+      extends="client"/>
+    <conf name="s3-client" description="dependencies for working with S3/EC2 infrastructure"
+      extends="client"/>
+    <conf name="s3-server" description="dependencies for running on S3/EC2 infrastructure"
+      extends="s3-client,server"/>
+    <conf name="kfs" description="dependencies for KFS file system support"/>
+    <conf name="ftp" description="dependencies for workign with FTP filesytems"
+              extends="mandatory"/>
+   <conf name="jetty" description="Jetty provides the in-VM HTTP daemon" extends="commons-logging"/>
+
+    <!--Private configurations. -->
+
+    <conf name="common" visibility="private" extends="runtime,mandatory,httpclient,ftp,jetty"
+		      description="common artifacts"/>
+    <conf name="javadoc" visibility="private" description="artiracts required while performing doc generation"
+      extends="common,mandatory,jetty,lucene"/>
+    <!--Testing pulls in everything-->
+    <conf name="test" extends="common,default,s3-server,kfs" visibility="private"
+      description="the classpath needed to run tests"/>
+    <conf name="releaseaudit" visibility="private"
+	description="Artifacts required for releaseaudit target"/>
+     
+    <conf name="commons-logging" visibility="private"/>
+    <conf name="httpclient" visibility="private" extends="commons-logging"/>
+    <conf name="log4j" visibility="private"/>
+    <conf name="lucene" visibility="private"/>
+    <conf name="jdiff" visibility="private" extends="log4j,s3-client,jetty,server"/>
+    <conf name="checkstyle" visibility="private"/>
+
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+  <dependencies>
+
+ <!--used client side-->
+
+    <dependency org="checkstyle"
+      name="checkstyle"
+      rev="${checkstyle.version}"
+      conf="checkstyle->default"/>
+    <dependency org="jdiff"
+      name="jdiff"
+      rev="${jdiff.version}"
+      conf="jdiff->default"/>
+    <dependency org="xerces"
+      name="xerces"
+      rev="${xerces.version}"
+      conf="jdiff->default">
+    </dependency>
+
+    <dependency org="xmlenc"
+      name="xmlenc"
+      rev="${xmlenc.version}"
+      conf="server->default"/>
+
+    <!--Configuration: httpclient-->
+
+    <!--
+    commons-httpclient asks for too many files.
+    All it needs is commons-codec and commons-logging JARs
+    -->
+    <dependency org="commons-httpclient"
+      name="commons-httpclient"
+      rev="${commons-httpclient.version}"
+      conf="httpclient->master">
+    </dependency>
+
+    <dependency org="commons-codec"
+      name="commons-codec"
+      rev="${commons-codec.version}"
+      conf="httpclient->default"/>
+
+    <dependency org="commons-net"
+      name="commons-net"
+      rev="${commons-net.version}"
+      conf="ftp->default"/>
+
+    <!--Configuration: Jetty -->
+
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="jetty->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="jetty->master"/>
+
+    <dependency org="tomcat"
+      name="jasper-runtime"
+      rev="${jasper.version}"
+      conf="jetty->master"/>
+    <dependency org="tomcat"
+      name="jasper-compiler"
+      rev="${jasper.version}"
+      conf="jetty->master"/>
+    <dependency org="commons-el"
+      name="commons-el"
+      rev="${commons-el.version}"
+      conf="jetty->master"/>
+
+
+    <!--Configuration: commons-logging -->
+
+    <!--it is essential that only the master JAR of commons logging
+    is pulled in, as its dependencies are usually a mess, including things
+    like out of date servlet APIs, bits of Avalon, etc.
+    -->
+    <dependency org="commons-logging"
+      name="commons-logging"
+      rev="${commons-logging.version}"
+      conf="commons-logging->master"/>
+
+
+    <!--Configuration: commons-logging -->
+
+    <!--log4J is not optional until commons-logging.properties is stripped out of the JAR -->
+    <dependency org="log4j"
+      name="log4j"
+      rev="${log4j.version}"
+      conf="log4j->master"/>
+
+    <!--Configuration: s3-client -->
+    <!--there are two jets3t projects in the repository; this one goes up to 0.6 and
+    is assumed to be the live one-->
+    <dependency org="net.java.dev.jets3t"
+      name="jets3t"
+      rev="${jets3t.version}"
+      conf="s3-client->master"/>
+    <dependency org="commons-net"
+      name="commons-net"
+      rev="${commons-net.version}"
+      conf="s3-client->master"/> 
+    <dependency org="org.mortbay.jetty"
+      name="servlet-api-2.5"
+      rev="${servlet-api-2.5.version}"
+      conf="s3-client->master"/>
+
+    <!--Configuration: test -->
+
+    <!--artifacts needed for testing -->
+    <dependency org="junit"
+      name="junit"
+      rev="${junit.version}"
+      conf="common->default"/>
+    <dependency org="com.google.code.p.arat"
+      name="rat-lib"
+      rev="${rats-lib.version}"
+      conf="releaseaudit->default"/>
+    <dependency org="commons-lang"
+      name="commons-lang"
+      rev="${commons-lang.version}"
+      conf="releaseaudit->default"/>
+    <dependency org="commons-collections"
+      name="commons-collections"
+      rev="${commons-collections.version}"
+      conf="releaseaudit->default"/>
+    <dependency org="org.apache.lucene"
+      name="lucene-core"
+      rev="${lucene-core.version}"
+      conf="javadoc->default"/> 
+    <dependency org="commons-logging"
+      name="commons-logging-api"
+      rev="${commons-logging-api.version}"
+      conf="common->default"/>
+    <dependency org="org.slf4j"
+      name="slf4j-api"
+      rev="${slf4j-api.version}"
+      conf="common->master"/>
+    <dependency org="org.slf4j"
+      name="slf4j-log4j12"
+      rev="${slf4j-log4j12.version}"
+      conf="common->master">
+    </dependency>
+    </dependencies>
+  
+</ivy-module>

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/ivy/libraries.properties?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/ivy/libraries.properties (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/ivy/libraries.properties Tue Jun  9 16:16:33 2009
@@ -0,0 +1,17 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+#This properties file lists the versions of the various artifacts used by streaming.
+#It drives ivy and the generation of a maven POM
+
+#Please list the dependencies name with version if they are different from the ones 
+#listed in the global libraries.properties file (in alphabetical order)

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/AllocationStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/AllocationStore.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/AllocationStore.java (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/AllocationStore.java Tue Jun  9 16:16:33 2009
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collection;
+
+/**
+ * Abstract class for implementing a persistent store
+ * of allocation information.
+ */
+public abstract class AllocationStore {
+  Map<String,BudgetQueue> queueCache = new HashMap<String,BudgetQueue>();
+
+  /**
+   * Initializes configuration
+   * @param conf MapReduce configuration
+   */ 
+  public abstract void init(Configuration conf);
+
+  /**
+   * Loads allocations from persistent store
+   */ 
+  public abstract void load();
+
+  /**
+   * Saves allocations to persistent store
+   */ 
+  public abstract void save();
+
+  /**
+   * Gets current remaining budget associated with queue.
+   * @param queue name of queue
+   * @return budget in credits
+   */ 
+  public float getBudget(String queue) {
+    float budget = 0.0f;
+    BudgetQueue budgetQueue = queueCache.get(queue);
+    if (budgetQueue != null) {
+      budget = budgetQueue.budget;
+    }
+    return budget;
+  }
+
+  /**
+   * Gets current spending rate associated with queue.
+   * @param queue name of queue
+   * @return spending rate in credits per allocation interval to be
+   * deducted from budget
+   */ 
+  public float getSpending(String queue) {
+    float spending = 0;
+    BudgetQueue budgetQueue = queueCache.get(queue);
+    if (budgetQueue != null) {
+      spending = budgetQueue.spending;
+    }
+    return spending;
+  }
+
+  /**
+   * Adds budget to queue.
+   * @param queue name of queue
+   * @param budget in credits to be added to queue
+   */
+  public synchronized void addBudget(String queue, float budget) {
+    BudgetQueue budgetQueue = queueCache.get(queue);
+    if (budgetQueue == null) {
+        return;
+    }
+    budgetQueue.addBudget(budget);
+  }
+
+
+  /**
+   * Adds new queue.
+   * @param queue name of queue
+   */
+  public synchronized void addQueue(String queue) {
+    queueCache.put(queue, new BudgetQueue(queue,0.0f,0.0f));
+  }
+
+  /**
+   * Gets queue info.
+   * @param queue name of queue
+   * @return xml representation of queue info as a string
+   */
+  public String getQueueInfo(String queue) {
+    BudgetQueue budgetQueue = queueCache.get(queue);
+    if (budgetQueue == null) {
+        return "";
+    }
+    return "<budget>" + Float.toString(budgetQueue.budget) + "</budget>\n" +
+        "<spending>" + Float.toString(budgetQueue.spending) + "</spending>\n" +
+        "<used>" + Integer.toString(budgetQueue.used) + "</used>\n" +
+        "<pending>" + budgetQueue.pending + "</pending>\n";
+  }
+
+  /**
+   * Remove queue.
+   * @param queue name of queue
+   */
+  public synchronized void removeQueue(String queue) {
+    queueCache.remove(queue);
+  }
+
+  /**
+   * Sets spending rate for queue.
+   * @param queue name of queue
+   * @param spending spending rate in credits per allocation interval to be
+   * deducted from budget
+   */ 
+  public synchronized void setSpending(String queue, float spending) {
+    BudgetQueue budgetQueue = queueCache.get(queue);
+    if (budgetQueue == null) {
+        return;
+    }
+    budgetQueue.spending = spending;
+  }
+
+  /**
+   * Sets queue usage for accounting
+   * @param queue name of queue
+   * @param used slots currently in use
+   * @param pending pending tasks
+   */ 
+  public synchronized void setUsage(String queue, int used, int pending) {
+    BudgetQueue budgetQueue = queueCache.get(queue);
+    if (budgetQueue == null) {
+        return;
+    }
+    budgetQueue.used = used;
+    budgetQueue.pending = pending;
+  }
+
+  /**
+  * Gets queue status (budget, spending, usage)
+  * @return collection of queue status objects
+  */
+  public Collection<BudgetQueue> getQueues() {
+    return queueCache.values();
+  }
+}

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/BudgetQueue.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/BudgetQueue.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/BudgetQueue.java (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/BudgetQueue.java Tue Jun  9 16:16:33 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ * Class to hold accounting info about a queue
+ * such as remaining budget, spending rate and
+ * whether queue usage
+ */
+public class BudgetQueue {
+  String name;
+  volatile float budget;
+  volatile float spending;
+  volatile int used;
+  volatile int pending;
+  /**
+   * @param name queue name
+   * @param budget queue budget in credits
+   * @param spending queue spending rate in credits per allocation interval
+   * to deduct from budget
+   */
+  public BudgetQueue(String name, float budget, float spending) {
+      this.name = name;
+      this.budget = budget;
+      this.spending = spending;
+      this.used = 0;
+      this.pending = 0;
+  }
+  /**
+   * Thread safe addition of budget
+   * @param newBudget budget to add
+   */
+  public synchronized void addBudget(float newBudget) {
+    budget += newBudget;
+  }
+}

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java Tue Jun  9 16:16:33 2009
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.http.HttpServer;
+
+
+/**
+ * A {@link TaskScheduler} that 
+ * provides the following features: 
+ * The purpose of this scheduler is to allow users to increase and decrease
+ * their queue priorities continuosly to meet the requirements of their
+ * current workloads. The scheduler is aware of the current demand and makes
+ * it more expensive to boost the priority under peak usage times. Thus
+ * users who move their workload to low usage times are rewarded with
+ * discounts. Priorities can only be boosted within a limited quota.
+ * All users are given a quota or a budget which is deducted periodically
+ * in configurable accounting intervals. How much of the budget is
+ * deducted is determined by a per-user spending rate, which may
+ * be modified at any time directly by the user. The cluster slots
+ * share allocated to a particular user is computed as that users
+ * spending rate over the sum of all spending rates in the same accounting
+ * period.
+ *
+ * This scheduler has been designed as a meta-scheduler on top of 
+ * existing MapReduce schedulers, which are responsible for enforcing
+ * shares computed by the dynamic scheduler in the cluster. 
+ */
+class DynamicPriorityScheduler extends TaskScheduler {
+  /**
+   * This class periodically checks spending rates for queues and
+   * updates queue capacity shares and budgets
+   */
+  static class Allocations extends TimerTask implements QueueAllocator {
+    Map<String,QueueAllocation> allocation = 
+        new HashMap<String,QueueAllocation>();
+    Configuration conf;
+    HashMap<String,String> queueInfo = new HashMap<String,String>();
+    float totalSpending;
+    Set<String> infoQueues;
+    QueueManager queueManager;
+    AllocationStore store;
+    Allocations(Configuration conf, QueueManager queueManager) {
+      this.conf = conf;
+      this.queueManager = queueManager;
+      this.infoQueues = queueManager.getQueues();
+      
+      this.store = ReflectionUtils.newInstance(
+          conf.getClass(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_STORE,
+              FileAllocationStore.class, AllocationStore.class), conf);
+      this.store.init(this.conf);
+      this.store.load();
+    }
+    void addBudget(String queue, float budget) {
+      store.addBudget(queue, budget);
+    }
+    void addQueue(String queue) {
+      store.addQueue(queue);
+    }
+    synchronized float getPrice() {
+      return totalSpending;
+    }
+    void removeQueue(String queue) {
+      store.removeQueue(queue);
+      queueManager.setSchedulerInfo(queue, "");
+    }
+    void setSpending(String queue, float spending) {
+      store.setSpending(queue, spending);
+    }
+    String getInfo(String queue) {
+      return store.getQueueInfo(queue);
+    }
+    String getQueueInfos() {
+      String info = "<price>" + Float.toString(totalSpending) + "</price>\n";
+      for (BudgetQueue queue: store.getQueues()) {
+        info += "<queue name=\"" + queue.name + "\">" + 
+            queueInfo.get(queue.name) + "</queue>\n";
+      }
+      return info;
+    }
+    private synchronized void updateAllocation() {
+      String queueList = "";
+      totalSpending = 0.0f;
+      for (BudgetQueue queue: store.getQueues()) {
+        if (!infoQueues.contains(queue.name)) {
+          infoQueues.add(queue.name);
+          QueueInfo newQueueInfo = new QueueInfo(queue.name, null, this); 
+          queueManager.setSchedulerInfo(queue.name, newQueueInfo);
+        }
+        if (!queueList.equals("")) {
+          queueList += ",";
+        }
+        queueList += queue.name;
+        // What to include in the published price in spending per slot
+        if (queue.spending <= queue.budget && 
+            (queue.used != 0 || queue.pending != 0)) {
+          totalSpending += queue.spending;
+        } 
+      } 
+      conf.set(PrioritySchedulerOptions.MAPRED_QUEUE_NAMES, queueList);
+      setShares();
+    }
+    // Calculates shares in proportion to spending rates
+    // and sets the appropriate configuration parameter
+    // for schedulers to read
+    private synchronized void setShares() {
+      Map<String,QueueAllocation> shares = new HashMap<String,QueueAllocation>();
+      for (BudgetQueue queue: store.getQueues()) {
+        float spending = queue.spending;
+        if (queue.budget < (queue.spending * queue.used) || 
+            (queue.used == 0 && queue.pending == 0)) {
+          spending = 0.0f;
+        } else {
+          queue.addBudget(-(queue.spending*queue.used));
+        }
+        float queueShare = 0.0f;
+        if (totalSpending > 0.0f) {
+          queueShare = (spending/totalSpending);
+        }
+        queueInfo.put(queue.name, "<budget>" + Float.toString(queue.budget) + 
+            "</budget>\n<spending>" + Float.toString(spending) + "</spending>\n<share>" + 
+            Float.toString(queueShare) + "</share>\n<used>" + 
+            Integer.toString(queue.used) + "</used>\n<pending>" +
+            Integer.toString(queue.pending) + "</pending>\n"); 
+        shares.put(queue.name,new QueueAllocation(queue.name,queueShare));
+      }
+      setAllocation(shares);
+    }
+    private synchronized void setAllocation(Map<String,QueueAllocation> shares) {
+      allocation = shares;
+    }
+    /** {@inheritDoc} */
+    public synchronized Map<String,QueueAllocation> getAllocation() {
+      return allocation;
+    }
+    /** {@inheritDoc} */
+    public synchronized void setUsage(String queue, int used, int pending) {
+      store.setUsage(queue, used, pending);
+    }
+    // Used to expose the QueueInfo in the JobTracker web UI 
+    synchronized String getQueueInfo(String queue) {
+      return queueInfo.get(queue);
+    }
+    // run once in each allocation interval to 
+    // calculate new shares based on updated
+    // budgets and spending rates
+    @Override
+    public void run() {
+      store.load();
+      updateAllocation();
+      store.save();
+    }
+  }
+  /**
+   * this class merges the queue info from the underlying
+   * MapReduce scheduler and the dynamic scheduler
+   * to be displayed in the JobTracker web UI
+   */
+  private static class QueueInfo {
+    String queue;
+    Object info; 
+    Allocations allocations;
+    QueueInfo(String queue, Object info, Allocations allocations) {
+      this.queue = queue;
+      this.info = info;  
+      this.allocations = allocations;
+    }
+    public String toString() {
+      String buffer = "";
+      if (info != null) {
+        buffer += info.toString();
+      }
+      String queueInfo = allocations.getQueueInfo(queue);
+      buffer += queueInfo;
+      return buffer;
+    }
+  } 
+
+  // this is the actual scheduler that picks
+  // the jobs to run, e.g. PriorityScheduler
+  protected QueueTaskScheduler scheduler;
+  private Timer timer = new Timer(true);
+  protected Allocations allocations;
+  private static final Log LOG = LogFactory.getLog(DynamicPriorityScheduler.class);
+
+  // Used for testing in discrete time
+  void setTimer(Timer timer) {
+    this.timer = timer;
+  }
+
+  @Override
+  public void start() throws IOException {
+    Configuration conf = getConf();
+    QueueManager queueManager = taskTrackerManager.getQueueManager();
+    allocations = new Allocations(conf,queueManager);
+    scheduler = ReflectionUtils.newInstance(
+        conf.getClass(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_SCHEDULER,
+        PriorityScheduler.class, QueueTaskScheduler.class), conf);
+    scheduler.setAllocator(allocations);
+    scheduler.setConf(conf);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    scheduler.start();
+    long interval = conf.getLong(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_ALLOC_INTERVAL,20)*1000;
+     
+    timer.scheduleAtFixedRate(allocations, interval, interval);   
+    for (String queue: queueManager.getQueues()) {
+      Object info = queueManager.getSchedulerInfo(queue);
+      QueueInfo queueInfo = new QueueInfo(queue, info, allocations); 
+      queueManager.setSchedulerInfo(queue, queueInfo);
+    }
+    if (taskTrackerManager instanceof JobTracker) {
+      JobTracker jobTracker = (JobTracker) taskTrackerManager;
+      HttpServer infoServer = jobTracker.infoServer;
+      infoServer.setAttribute("scheduler", this);
+      infoServer.addServlet("scheduler", "/scheduler",
+          DynamicPriorityServlet.class);
+    }
+  }
+
+  @Override
+  public void terminate() throws IOException {
+    scheduler.terminate();
+  }
+
+  @Override
+  public List<Task> assignTasks(TaskTrackerStatus taskTracker)
+      throws IOException {
+    return scheduler.assignTasks(taskTracker);  
+  }
+
+  @Override
+  public Collection<JobInProgress> getJobs(String queueName) {
+    return scheduler.getJobs(queueName);
+  }
+}

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityServlet.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityServlet.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityServlet.java (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityServlet.java Tue Jun  9 16:16:33 2009
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Servlet for controlling queue allocations, installed at
+ * [job tracker URL]/scheduler when the {@link DynamicPriorityScheduler} 
+ * is in use.
+ * operations supported: <br>
+ * price <br>
+ * time <br>
+ * info=queue_to_query (requires user or admin privilege><br>
+ * infos (requires admin privilege) <br> 
+ * addBudget=budget_to_add,queue=queue_to_change 
+ * (requires admin privilege) <br>
+ * setSpending=spending_to_set,queue=queue_to_change 
+ * (requires user or admin privilege) <br>
+ * addQueue=queue_to_add (requires admin privilege) <br>
+ * removeQueue=queue_to_remove (requires admin privilege) <br>
+ */
+public class DynamicPriorityServlet extends HttpServlet {
+
+  private DynamicPriorityScheduler scheduler;
+  private JobTracker jobTracker;
+  private PriorityAuthorization auth;
+  @Override
+  public void init() throws ServletException {
+    super.init();
+    ServletContext servletContext = getServletContext();
+    scheduler = 
+        (DynamicPriorityScheduler) servletContext.getAttribute("scheduler");
+    jobTracker = (JobTracker) scheduler.taskTrackerManager;
+    auth = new PriorityAuthorization();
+    auth.init(scheduler.conf);
+  }
+
+  @Override
+  protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+    doGet(req, resp); // Same handler for both GET and POST
+  }
+
+  private void checkAdmin(int role, String query) throws IOException {
+    if (role != PriorityAuthorization.ADMIN) {
+      throw new IOException("ACCESS DENIED: " + query);
+    }
+  }
+  private void checkUser(int role, HttpServletRequest request, 
+      String queue, String query) throws IOException {
+    if (role == PriorityAuthorization.ADMIN) {
+      return;
+    } 
+    if (role == PriorityAuthorization.USER &&
+        request.getParameter("user").equals(queue)) {
+      return;
+    }
+    throw new IOException("ACCESS DENIED: " + query);
+  }
+
+  @Override
+  public void doGet(HttpServletRequest request, 
+      HttpServletResponse response) throws ServletException, IOException {
+    String query = request.getQueryString();
+    int role = auth.authorize(query,
+        request.getHeader("Authorization"),
+        request.getParameter("user"),
+        request.getParameter("timestamp"));
+
+    String queue = request.getParameter("queue");
+    String info = "";
+    // admin
+    if (request.getParameter("addQueue") != null) {
+      checkAdmin(role, query);
+      queue = request.getParameter("addQueue");
+      scheduler.allocations.addQueue(queue);
+      info = scheduler.allocations.getInfo(queue); 
+    }
+    // admin
+    if (request.getParameter("removeQueue") != null) {
+      checkAdmin(role, query);
+      queue = request.getParameter("removeQueue");
+      scheduler.allocations.removeQueue(queue);
+      info = scheduler.allocations.getInfo(queue); 
+    }
+    // admin
+    if (request.getParameter("addBudget") != null) {
+      checkAdmin(role, query);
+      float budget = Float.parseFloat(request.getParameter("addBudget"));
+      scheduler.allocations.addBudget(queue, budget);
+      info = scheduler.allocations.getInfo(queue); 
+    }
+    // user
+    if (request.getParameter("setSpending") != null) {
+      checkUser(role, request, queue, query);
+      float spending = Float.parseFloat(request.getParameter("setSpending"));
+      scheduler.allocations.setSpending(queue, spending);
+      info = scheduler.allocations.getInfo(queue); 
+    }
+    // user
+    if (request.getParameter("info") != null) {
+      queue = request.getParameter("info");
+      checkUser(role, request, queue, query);
+      info = scheduler.allocations.getQueueInfo(queue);
+    }
+    // admin
+    if (request.getParameter("infos") != null) {
+      checkAdmin(role, query);
+      info = scheduler.allocations.getQueueInfos();
+    }
+
+    // all
+    if (request.getParameter("price") != null) {
+      info = Float.toString(scheduler.allocations.getPrice());
+      info = "<price>" + info + "</price>\n";
+    }
+    // all
+    if (request.getParameter("time") != null) {
+      info = "<start>" + Long.toString(PriorityAuthorization.START_TIME) + 
+          "</start>\n";
+      info += "<time>" + Long.toString(System.currentTimeMillis()) + 
+          "</time>\n";
+    }
+    if (info == null) {
+      info = "";
+    }
+    response.setContentType("text/xml");
+    PrintWriter out = new PrintWriter(response.getOutputStream());
+    String hostname = StringUtils.simpleHostname(
+        jobTracker.getJobTrackerMachine());
+    out.print("<QueueInfo>");
+    out.printf("<host>%s</host>\n", hostname);
+    out.printf("%s", info);
+    out.print("</QueueInfo>\n");
+    out.close();
+  }
+}

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/FileAllocationStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/FileAllocationStore.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/FileAllocationStore.java (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/FileAllocationStore.java Tue Jun  9 16:16:33 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.FileReader;
+import java.io.BufferedReader;
+import java.io.PrintWriter;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.Closeable;
+
+/**
+ * Implements persistent storage for queue budget and spending
+ * information in a file.
+ */
+public class FileAllocationStore extends AllocationStore {
+  private static final Log LOG = LogFactory.getLog(FileAllocationStore.class);
+  private String fileName = "";
+  private boolean loaded = false;
+
+  /** {@inheritDoc} */
+  public void init(Configuration conf) {
+    fileName = conf.get(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_BUDGET_FILE,
+        "/etc/hadoop.budget");
+  }
+
+  /** {@inheritDoc} */
+  public void save() {
+    PrintWriter out = null; 
+    try {
+      out = new PrintWriter(new BufferedWriter(new FileWriter(fileName)));
+      for (BudgetQueue queue: getQueues()) {
+        out.printf("%s %.20f %.20f\n", queue.name, queue.budget, 
+            queue.spending);
+      }
+    } catch (Exception e) {
+      LOG.error("Error writing to file: " + fileName, e);
+    } finally {
+      close(out);
+    }
+  }
+
+  private void close(Closeable closeable) {
+    if (closeable != null) {
+      try {
+        closeable.close();
+      } catch (Exception ce) {
+        LOG.error("Error closing file: " + fileName, ce);
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void load() {
+    if (loaded) {
+      return;
+    }
+    BufferedReader in = null;
+    try {
+      in = new BufferedReader(new FileReader(fileName));
+      String line = in.readLine();
+      while (line != null) {
+        String[] nameValue = line.split(" ");
+        if (nameValue.length != 3) {
+          continue;
+        }
+        queueCache.put(nameValue[0], new BudgetQueue(nameValue[0],
+            Float.parseFloat(nameValue[1]), Float.parseFloat(nameValue[2])));
+        line = in.readLine();
+      } 
+      loaded = true;
+    } catch (Exception e) {
+      LOG.error("Error reading file: " + fileName, e);
+    } finally {
+       close(in);
+    }
+  }
+}
+

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityAuthorization.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityAuthorization.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityAuthorization.java (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityAuthorization.java Tue Jun  9 16:16:33 2009
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.security.SignatureException;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.File;
+import java.net.URLDecoder;
+
+
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.commons.codec.binary.Base64;
+
+
+/**
+ * This class implements symmetric key HMAC/SHA1 signature
+ * based authorization of users and admins.
+ */
+public class PriorityAuthorization {
+  public static final int USER = 0;
+  public static final int ADMIN = 1;
+  public static final int NO_ACCESS = 2;
+  private HashMap<String,UserACL> acl = new HashMap<String,UserACL>();
+  private long lastSuccessfulReload = 0;
+  public static final long START_TIME = System.currentTimeMillis();
+  private String aclFile;
+  private static final Log LOG = LogFactory.getLog(PriorityAuthorization.class);
+  private static final boolean debug = LOG.isDebugEnabled();
+  private static final String HMAC_SHA1_ALGORITHM = "HmacSHA1";
+
+  /**
+   * Initializes authorization configuration
+   * @param conf MapReduce configuration handle 
+   */
+  public void init(Configuration conf) {
+    aclFile = conf.get("mapred.priority-scheduler.acl-file","/etc/hadoop.acl");
+  }
+
+  /**
+   * Adapted from AWS Query Authentication cookbook:
+   * Computes RFC 2104-compliant HMAC signature.
+   *
+   * @param data
+   *     The data to be signed.
+   * @param key
+   *     The signing key.
+   * @return
+   *     The base64-encoded RFC 2104-compliant HMAC signature.
+   * @throws
+   *     java.security.SignatureException when signature generation fails
+   */
+  public static String hmac(String data, String key)
+    throws java.security.SignatureException {
+    String result;
+    try {
+      // get an hmac_sha1 key from the raw key bytes
+      SecretKeySpec signingKey = new SecretKeySpec(key.getBytes(), 
+          HMAC_SHA1_ALGORITHM);
+           
+      // get an hmac_sha1 Mac instance and initialize with the signing key
+      Mac mac = Mac.getInstance(HMAC_SHA1_ALGORITHM);
+      mac.init(signingKey);
+            
+      // compute the hmac on input data bytes
+      byte[] rawHmac = mac.doFinal(data.getBytes());
+            
+      // base64-encode the hmac
+      result = new String(Base64.encodeBase64(rawHmac));
+    } 
+    catch (Exception e) {
+      throw new SignatureException("Failed to generate HMAC : " + e, e);
+    }
+    return result;
+  }
+
+  class UserACL {
+    String user;
+    String role;
+    String key;
+    // for replay detection
+    long lastTimestamp = START_TIME;
+    UserACL(String user, String role, String key) {
+      this.user = user;
+      this.role = role;
+      this.key = key;
+    }
+  }
+
+  private void reloadACL() {
+    BufferedReader in = null;
+    try {
+      in = new BufferedReader(new FileReader(aclFile));
+      String line = in.readLine();
+      while (line != null) {
+        String[] nameValue = line.split(" ");
+        if (nameValue.length != 3) {
+          continue;
+        }
+        acl.put(nameValue[0], new UserACL(nameValue[0], nameValue[1], nameValue[2]));
+        if (debug) {
+          LOG.debug("Loading " + line);
+        }
+        line = in.readLine();
+      }
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+    try {
+      in.close();
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+  }
+
+  private void loadACL() {
+    long time = System.currentTimeMillis();
+    try {
+      File file = new File(aclFile);
+      long lastModified = file.lastModified();
+      if (lastModified > lastSuccessfulReload) {
+        reloadACL();
+        lastSuccessfulReload = time;
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to reload acl file", e);
+    }
+  }
+
+  private boolean isReplay(String timestamp, String signature, UserACL userACL) {
+    long signatureTime = Long.parseLong(timestamp);
+    if (debug) {
+      LOG.debug("signaturetime: " + Long.toString(signatureTime));
+      LOG.debug("lasttime: " + Long.toString(userACL.lastTimestamp));
+    }
+    if (signatureTime <= userACL.lastTimestamp) {
+        return true;
+    }
+    userACL.lastTimestamp = signatureTime;   
+    return false;
+  }
+
+  /**
+   * Returns authorized role for user.
+   * Checks whether signature obtained by user was made by key stored in local acl.
+   * Also checks for replay attacks.
+   * @param data data that was signed by user
+   * @param signature user-provided signature
+   * @param user-provided nonce/timestamp of signature 
+   * @return the authorized role of the user:
+   *   ADMIN, USER or NO_ACCESS
+   */
+  public int authorize(String data, String signature, String user, String timestamp) {
+    try {
+      signature = URLDecoder.decode(signature, "UTF-8");
+    } catch (Exception e) {
+      LOG.error("Authorization exception:",e);
+      return NO_ACCESS;
+    }
+    if (debug) {
+      LOG.debug(data + " sig: " + signature + " user: " + user + " time: " + timestamp);
+    }
+    try {
+      loadACL();
+      UserACL userACL = acl.get(user);  
+      if (userACL == null) {
+        return NO_ACCESS;
+      }
+      String signatureTest = hmac(data, userACL.key);
+      if (debug) {
+        LOG.debug("SignatureTest " + signatureTest);
+        LOG.debug("Signature " + signature);
+      }
+      if (signatureTest.equals(signature) && !isReplay(timestamp, signature, userACL)) {
+        return (userACL.role.equals("admin")) ? ADMIN : USER; 
+      }
+    } catch (Exception e) {
+      LOG.error("Athorization exception:", e);
+    }
+    return NO_ACCESS;
+  }
+}

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java Tue Jun  9 16:16:33 2009
@@ -0,0 +1,541 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.TreeMap;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+/**
+ * A {@link TaskScheduler} that 
+ * provides the following features: 
+ * (1) allows continuous enforcement of user controlled dynamic queue shares,
+ * (2) preempts tasks exceeding their queue shares instantaneously when new 
+ * jobs arrive,
+ * (3) is work conserving,
+ * (4) tracks queue usage to only charge when jobs are pending or running,
+ * (5) authorizes queue submissions based on symmetric private key HMAC/SHA1 
+ * signatures.
+ */
+class PriorityScheduler extends QueueTaskScheduler {
+
+  private class InitThread extends Thread {
+    JobInProgress job;
+
+    InitThread(JobInProgress job) {
+      this.job = job;
+    }
+
+    @Override
+    public void run() {
+      try {
+        job.initTasks();
+      } catch (IOException io) {
+        LOG.error(io);
+      }
+    }
+  }
+
+  private class JobListener extends JobInProgressListener {
+    @Override
+    public void jobAdded(JobInProgress job) {
+      new InitThread(job).start();
+      synchronized (PriorityScheduler.this) {
+        String queue = authorize(job);
+        if (queue.equals("")) {
+            job.kill(); 
+            return;
+        }
+        jobQueue.add(job);
+        QueueJobs jobs = queueJobs.get(queue);
+        if (jobs == null) {
+          jobs = new QueueJobs(queue);
+          queueJobs.put(queue,jobs);
+        }
+        jobs.jobs.add(job);
+        if (debug) {
+          LOG.debug("Add job " + job.getProfile().getJobID());
+        }
+      }
+    }
+    @Override
+    public void jobRemoved(JobInProgress job) {
+      synchronized (PriorityScheduler.this) {
+        jobQueue.remove(job);
+        String queue = getQueue(job);
+        queueJobs.get(queue).jobs.remove(job);
+      }
+    }
+    @Override
+    public void jobUpdated(JobChangeEvent event) {
+    }
+  }
+
+
+  static final Comparator<TaskInProgress> TASK_COMPARATOR
+    = new Comparator<TaskInProgress>() {
+    public int compare(TaskInProgress o1, TaskInProgress o2) {
+      int res = 0;
+      if (o1.getProgress() < o2.getProgress()) {
+        res = -1;
+      } else {
+        res = (o1.getProgress() == o2.getProgress() ? 0 : 1);
+      }
+      if (res == 0) {
+        if (o1.getExecStartTime() > o2.getExecStartTime()) {
+          res = -1;
+        } else {
+          res = (o1.getExecStartTime() == o2.getExecStartTime() ? 0 : 1);
+        }
+      }
+      return res;
+    }
+  };
+
+  static final Comparator<KillQueue> QUEUE_COMPARATOR
+    = new Comparator<KillQueue>() {
+    public int compare(KillQueue o1, KillQueue o2) {
+      if (o1.startTime < o2.startTime) {
+        return 1;
+      }
+      if (o1.startTime > o2.startTime) {
+        return -1;
+      }
+      return 0;
+    }
+  };
+
+  class QueueJobs {
+    String name;
+    LinkedList<JobInProgress> jobs = new LinkedList<JobInProgress>();
+    QueueJobs(String name) {
+      this.name = name;
+    }
+  }
+
+  class QueueQuota {
+    int quota;
+    int map_used;
+    int reduce_used;
+    int map_pending;
+    int reduce_pending;
+    int mappers;
+    int reducers;
+    String name;
+    QueueQuota(String name) {
+      this.name = name;
+    }
+  }
+
+  private QueueAllocator allocator;
+
+  private static final Log LOG = 
+    LogFactory.getLog(PriorityScheduler.class);
+
+  static final boolean MAP = true;
+  static final boolean REDUCE = false;
+  private static final boolean FILL = true;
+  private static final boolean NO_FILL = false;
+
+  private JobListener jobListener = new JobListener();
+  private static final boolean debug = LOG.isDebugEnabled();
+  private boolean sortTasks = true;
+  private long lastKill = 0;
+  private long killInterval = 0;
+  private PriorityAuthorization auth = new PriorityAuthorization();
+
+  private LinkedList<JobInProgress> jobQueue = 
+    new LinkedList<JobInProgress>();
+  private HashMap<String,QueueJobs> queueJobs = 
+    new HashMap<String,QueueJobs>();
+
+  @Override
+  public void start() throws IOException {
+    taskTrackerManager.addJobInProgressListener(jobListener);
+    sortTasks = conf.getBoolean("mapred.priority-scheduler.sort-tasks", true);
+    killInterval = conf.getLong("mapred.priority-scheduler.kill-interval", 0);
+    auth.init(conf);
+  }
+
+  @Override
+  public void terminate() throws IOException {
+  }
+
+  @Override
+  public void setAllocator(QueueAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  private boolean assignMapRedTask(JobInProgress job, 
+      TaskTrackerStatus taskTracker, int numTrackers, List<Task> assignedTasks,
+      Map<String,QueueQuota> queueQuota, boolean fill, boolean map) 
+      throws IOException {
+    String queue = getQueue(job);
+    QueueQuota quota = queueQuota.get(queue);
+    if (quota == null) {
+        LOG.error("Queue " + queue + " not configured properly");
+        return false;
+    }
+    if (quota.quota < 1 && !fill) {
+      return false;
+    }
+    Task t = null;
+    if (map) {
+      t = job.obtainNewLocalMapTask(taskTracker, numTrackers,
+                                 taskTrackerManager.getNumberOfUniqueHosts());
+      if (t != null) {
+        if (debug) {
+          LOG.debug("assigned local task for job " + job.getProfile().getJobID() + 
+              " " + taskType(map) );
+        }
+        assignedTasks.add(t);
+        if (map) {
+          quota.map_used++;
+        } else {
+          quota.reduce_used++;
+        }
+        quota.quota--;
+        return true;
+      }
+      t = job.obtainNewNonLocalMapTask(taskTracker, numTrackers,
+          taskTrackerManager.getNumberOfUniqueHosts());
+    } else {
+      t = job.obtainNewReduceTask(taskTracker, numTrackers,
+          taskTrackerManager.getNumberOfUniqueHosts());
+    }
+    if (t != null) {
+      if (debug) {
+        LOG.debug("assigned remote task for job " + job.getProfile().getJobID() + 
+            " " + taskType(map));
+      }
+      assignedTasks.add(t);
+      if (map) {
+        quota.map_used++;
+      } else {
+        quota.reduce_used++;
+      }
+      quota.quota--;
+      return true;
+    }
+    return false;
+  }
+
+  Map<String,QueueQuota> getQueueQuota(int maxMapTasks, int maxReduceTasks, 
+      boolean map) {
+    if (debug) {
+      LOG.debug("max map tasks " + Integer.toString(maxMapTasks) + " "  + 
+          taskType(map));
+      LOG.debug("max reduce tasks " + Integer.toString(maxReduceTasks) + " "  + 
+          taskType(map));
+    }
+    int maxTasks = (map) ? maxMapTasks : maxReduceTasks;
+    Map<String,QueueAllocation> shares = allocator.getAllocation();
+    Map<String,QueueQuota> quotaMap = new HashMap<String,QueueQuota>();
+    for (QueueAllocation share: shares.values()) {
+      QueueQuota quota = new QueueQuota(share.getName());
+      quota.mappers = Math.round(share.getShare() * maxMapTasks);
+      quota.reducers = Math.round(share.getShare() * maxReduceTasks);
+      quota.quota = (map) ? quota.mappers : quota.reducers;
+
+      if (debug) {
+        LOG.debug("queue " + quota.name + " initial quota " + 
+            Integer.toString(quota.quota) + " "  + taskType(map));
+      }
+      quota.map_used = 0;
+      quota.reduce_used = 0;
+      quota.map_pending = 0;
+      quota.reduce_pending = 0;
+      Collection<JobInProgress> jobs = getJobs(quota.name);
+      for (JobInProgress job : jobs) {
+        quota.map_pending += job.pendingMaps();
+        quota.reduce_pending += job.pendingReduces();
+        int running = (map) ? job.runningMapTasks : job.runningReduceTasks;
+        quota.quota -= running;
+        quota.map_used += job.runningMapTasks ;
+        quota.reduce_used += job.runningReduceTasks;
+      }
+      if (debug) {
+        LOG.debug("queue " + quota.name + " quota " + 
+            Integer.toString(quota.quota) + " "  + taskType(map));
+      }
+      quotaMap.put(quota.name,quota);
+    } 
+   return quotaMap;
+  }
+
+  private void scheduleJobs(int availableSlots, boolean map, boolean fill, 
+      TaskTrackerStatus taskTracker, int numTrackers, List<Task> assignedTasks, 
+      Map<String,QueueQuota> queueQuota) throws IOException {
+    for (int i = 0; i < availableSlots; i++) {
+      for (JobInProgress job : jobQueue) {
+        if ((job.getStatus().getRunState() != JobStatus.RUNNING) ||
+            (!map && job.numReduceTasks == 0)) {
+          continue;
+        }
+        if (assignMapRedTask(job, taskTracker, numTrackers, assignedTasks, 
+            queueQuota, fill, map)) {
+          break;
+        }
+      }
+    }
+  } 
+
+  private int countTasksToKill(Map<String,QueueQuota> queueQuota, boolean map) {
+    int killTasks = 0;
+    for (QueueQuota quota : queueQuota.values()) {
+      killTasks += Math.min((map) ? quota.map_pending : quota.reduce_pending,
+          Math.max(quota.quota,0));
+    } 
+    return killTasks;
+  }
+   
+  protected void markIdle(Map<String, QueueQuota> queueQuota) {
+    for (QueueQuota quota: queueQuota.values()) {
+      allocator.setUsage(quota.name, Math.min(quota.map_used, quota.mappers) + 
+          Math.min(quota.reduce_used, quota.reducers), 
+          (quota.map_pending + quota.reduce_pending));
+    }  
+  }
+
+  private synchronized void assignMapRedTasks(List<Task> assignedTasks, 
+      TaskTrackerStatus taskTracker, int numTrackers, boolean map)
+      throws IOException {
+    int taskOffset = assignedTasks.size();
+    int maxTasks = (map) ? taskTracker.getMaxMapTasks() : 
+        taskTracker.getMaxReduceTasks();
+    int countTasks =  (map) ? taskTracker.countMapTasks() : 
+        taskTracker.countReduceTasks();
+    int availableSlots = maxTasks - countTasks;
+    int map_capacity = 0;
+    int reduce_capacity = 0;
+    ClusterStatus status = taskTrackerManager.getClusterStatus();
+    if (status != null) {
+      map_capacity = status.getMaxMapTasks();
+      reduce_capacity = status.getMaxReduceTasks();
+    }
+    Map<String,QueueQuota> queueQuota = getQueueQuota(map_capacity, 
+        reduce_capacity,map);
+    if (debug) {
+      LOG.debug("available slots " + Integer.toString(availableSlots) + " " + 
+          taskType(map));
+      LOG.debug("queue size " + Integer.toString(jobQueue.size()));
+      LOG.debug("map capacity " + Integer.toString(map_capacity) + " " + 
+           taskType(map));
+      LOG.debug("reduce capacity " + Integer.toString(reduce_capacity) + " " + 
+           taskType(map));
+    }
+    scheduleJobs(availableSlots, map, NO_FILL, taskTracker, numTrackers, 
+        assignedTasks, queueQuota);
+    availableSlots -= assignedTasks.size() + taskOffset;
+    scheduleJobs(availableSlots, map, FILL, taskTracker, numTrackers, 
+        assignedTasks, queueQuota);
+    if (map) {
+      markIdle(queueQuota);
+    }
+
+    long currentTime = System.currentTimeMillis()/1000;
+    if ((killInterval > 0) && (currentTime - lastKill > killInterval)) {
+      lastKill = currentTime;
+    } else {
+      return;
+    }
+ 
+    int killTasks = countTasksToKill(queueQuota, map);
+    if (debug) {
+      LOG.debug("trying to kill " + Integer.toString(killTasks) + " tasks " + 
+          taskType(map));
+    }
+    killMapRedTasks(killTasks, queueQuota, map);
+  }
+
+  class KillQueue {
+    String name;
+    long startTime;
+    QueueQuota quota;
+  }
+
+  private Collection<KillQueue> getKillQueues(Map<String, 
+      QueueQuota> queueQuota) {
+    TreeMap killQueues = new TreeMap(QUEUE_COMPARATOR);
+    for (QueueJobs queueJob : queueJobs.values()) {
+      QueueQuota quota = queueQuota.get(queueJob.name); 
+      if (quota.quota >= 0) {
+        continue;
+      }
+      for (JobInProgress job : queueJob.jobs) {
+        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+          KillQueue killQueue = new KillQueue();
+          killQueue.name = queueJob.name;
+          killQueue.startTime = job.getStartTime();
+          killQueue.quota = quota;
+          killQueues.put(killQueue, killQueue);
+        }
+      }
+    }
+    return killQueues.values();
+  }
+  private void killMapRedTasks(int killTasks, Map<String,QueueQuota> queueQuota, 
+      boolean map) {
+    int killed = 0;
+    // sort queues  exceeding quota in reverse order of time since starting 
+    // a running job
+    Collection<KillQueue> killQueues = getKillQueues(queueQuota);
+    for (KillQueue killQueue : killQueues) {
+      if (killed == killTasks) {
+        return;
+      }
+      QueueQuota quota = killQueue.quota;
+      // don't kill more than needed and not more than quota exceeded
+      int toKill = Math.min(killTasks-killed,-quota.quota);
+      killQueueTasks(quota.name, toKill, map);
+      killed += toKill;
+    }  
+  }
+
+  private String taskType(boolean map) {
+    return (map) ? "MAP" : "REDUCE";
+  }
+  private void killQueueTasks(String queue, int killTasks, boolean map) {
+    if (killTasks == 0) {
+      return;
+    }
+    if (debug) {
+      LOG.debug("trying to kill " + Integer.toString(killTasks) + 
+          " tasks from queue " + queue + " " + taskType(map));
+    }
+    int killed = 0;
+    Collection<JobInProgress> jobs = getJobs(queue); 
+    if (debug) {
+      LOG.debug("total jobs to kill from " + Integer.toString(jobs.size()) + 
+          " " + taskType(map));
+    }
+    for (JobInProgress job : jobs) {
+      TaskInProgress tasks[] = (map) ? job.maps.clone() : 
+                                       job.reduces.clone();
+      if (sortTasks) {
+        Arrays.sort(tasks, TASK_COMPARATOR);
+      }
+      if (debug) {
+        LOG.debug("total tasks to kill from " + 
+            Integer.toString(tasks.length) + " " + taskType(map));
+      }
+      for (int i=0; i < tasks.length; i++) {
+        if (debug) {
+          LOG.debug("total active tasks to kill from " + 
+              Integer.toString(tasks[i].getActiveTasks().keySet().size()) + 
+              " " + taskType(map));
+        }
+        for (TaskAttemptID id: tasks[i].getActiveTasks().keySet()) {
+          if (tasks[i].isCommitPending(id)) {
+            continue;
+          }
+          tasks[i].killTask(id, false);
+          if (debug) {
+            LOG.debug("killed task " + id + " progress " + 
+                Double.toString(tasks[i].getProgress()) +
+                " start time " + Long.toString(tasks[i].getExecStartTime()) + 
+                " " +  taskType(map));
+          }
+          killed += 1;
+          if (killed == killTasks) {
+            return;
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<Task> assignTasks(TaskTrackerStatus taskTracker)
+    throws IOException {
+    long millis = 0;
+    if (debug) {
+      millis = System.currentTimeMillis();
+    } 
+    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+    int numTrackers = clusterStatus.getTaskTrackers();
+ 
+    List<Task> assignedTasks = new ArrayList<Task>();
+
+    assignMapRedTasks(assignedTasks, taskTracker, numTrackers, MAP);
+    assignMapRedTasks(assignedTasks, taskTracker, numTrackers, REDUCE);
+    if (debug) {
+      long elapsed = System.currentTimeMillis() - millis;
+      LOG.debug("assigned total tasks: " + 
+          Integer.toString(assignedTasks.size()) + " in " + 
+          Long.toString(elapsed) + " ms");
+    }
+    return assignedTasks;
+  }
+
+  @Override
+  public Collection<JobInProgress> getJobs(String queueName) {
+    QueueJobs jobs = queueJobs.get(queueName);
+    if (jobs == null) {
+      return new ArrayList<JobInProgress>();
+    }
+    return jobs.jobs;
+  }
+
+  private String getQueue(JobInProgress job) {
+    JobConf conf = job.getJobConf();
+    return conf.getQueueName();
+  }
+  private String getUser(JobInProgress job) {
+    JobConf conf = job.getJobConf();
+    return conf.getUser();
+  }
+  private String authorize(JobInProgress job) {
+    JobConf conf = job.getJobConf();
+    String user = conf.getUser();
+    String queue = conf.getQueueName();
+    if (!user.equals(queue)) {
+      return "";
+    }
+    String timestamp = conf.get("mapred.job.timestamp");
+    String signature = conf.get("mapred.job.signature");
+    int role = auth.authorize("&user=" + user + "&timestamp=" + timestamp, 
+        signature, user, timestamp);
+    if (role != PriorityAuthorization.NO_ACCESS) {
+      return queue;
+    }
+    return "";
+  }
+}

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PrioritySchedulerOptions.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PrioritySchedulerOptions.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PrioritySchedulerOptions.java (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PrioritySchedulerOptions.java Tue Jun  9 16:16:33 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.mapred;
+
+/**
+ * Configuration Options used in the priority schedulers
+ * -all in one place for ease of referencing in code.
+ */
+public class PrioritySchedulerOptions {
+  /** {@value} */
+  public static final String DYNAMIC_SCHEDULER_BUDGET_FILE = "mapred.dynamic-scheduler.budget-file";
+  /** {@value} */
+  public static final String DYNAMIC_SCHEDULER_STORE = "mapred.dynamic-scheduler.store";
+  /** {@value} */
+  public static final String MAPRED_QUEUE_NAMES = "mapred.queue.names";
+  /** {@value} */
+  public static final String DYNAMIC_SCHEDULER_SCHEDULER = "mapred.dynamic-scheduler.scheduler";
+  /** {@value} */
+  public static final String DYNAMIC_SCHEDULER_ALLOC_INTERVAL = "mapred.dynamic-scheduler.alloc-interval";
+}

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueAllocation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueAllocation.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueAllocation.java (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueAllocation.java Tue Jun  9 16:16:33 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ * Class to hold queue share info to be
+ * communicated between scheduler and 
+ * queue share manager
+ */
+public class QueueAllocation {
+  private String name;
+  private float share;
+  /**
+   * @param name queue name
+   * @param share queue share of total capacity (0..1)
+   */
+  public QueueAllocation(String name, float share) {
+    this.name = name;
+    this.share = share;
+  }
+  /**
+   * Gets queue share
+   * @return queue share of total capacity (0..1)
+   */
+  public float getShare() {
+    return this.share;
+  }
+  /**
+   * Gets queue name
+   * @return queue name
+   */
+  public String getName() {
+    return this.name;
+  }
+}

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueAllocator.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueAllocator.java (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueAllocator.java Tue Jun  9 16:16:33 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Map;
+/**
+ * This interface is intended for allowing schedulers to 
+ * communicate with the queue share management implementation.
+ * Schedulers can periodically poll this interface to
+ * obtain the latest queue allocations.
+ */
+public interface QueueAllocator {
+  /**
+   * Used by schedulers to obtain queue allocations periodically
+   * @return hashtable of queue names and their allocations (shares)
+   */
+  Map<String,QueueAllocation> getAllocation();
+  /**
+   * Used by schedulers to push queue usage info for
+   * accounting purposes.
+   * @param queue the queue name
+   * @param used of slots currently used
+   * @param pending number of tasks pending
+   */
+  void setUsage(String queue, int used, int pending);
+}

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueTaskScheduler.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueTaskScheduler.java (added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/QueueTaskScheduler.java Tue Jun  9 16:16:33 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/** 
+ * This class allows the scheduler to retrieve periodic
+ * queue allocation info from the queue share manager.
+ */ 
+abstract public class QueueTaskScheduler extends TaskScheduler {
+  /**
+   * Sets the queue share manager of a scheduler
+   * @param allocator the queue share manager of this scheduler
+   */
+  public abstract void setAllocator(QueueAllocator allocator);
+}



Mime
View raw message