hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r783059 [2/2] - in /hadoop/core/trunk/src/contrib: ./ dynamic-scheduler/ dynamic-scheduler/ivy/ dynamic-scheduler/src/ dynamic-scheduler/src/java/ dynamic-scheduler/src/java/org/ dynamic-scheduler/src/java/org/apache/ dynamic-scheduler/src/...
Date Tue, 09 Jun 2009 16:16:34 GMT
Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
(added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
Tue Jun  9 16:16:33 2009
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Base class for various scheduler tests
+ */
+public class BaseSchedulerTest extends TestCase {
+  final static String[] QUEUES = new String[] {"queue1","queue2"};
+  protected FakeDynamicTimer timer = new FakeDynamicTimer();
+  protected FakeTaskTrackerManager taskTracker = new FakeTaskTrackerManager();
+  protected String budgetFile;
+  protected Configuration conf;
+   /**
+   * Create the test budget file
+   * @throws Exception
+   */
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+     String pathname = System.getProperty("test.build.data",
+                                          "build/contrib/dynamic-scheduler/test/data");
+     String testDir = new File(pathname).getAbsolutePath();
+    budgetFile = new File(testDir, "test-budget").getAbsolutePath();
+    new File(testDir).mkdirs();
+    new File(budgetFile).createNewFile();
+         conf = new Configuration();
+    conf.set(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_ALLOC_INTERVAL, "2");
+    conf.set(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_BUDGET_FILE, budgetFile);
+  }
+
+  /**
+    * deletes the test budget file
+    * @throws Exception
+    */
+   @Override
+   protected void tearDown() throws Exception {
+     new File(budgetFile).delete();
+   }
+
+  static class FakeTaskTrackerManager implements TaskTrackerManager {
+    FakeQueueManager qm = new FakeQueueManager();
+    public FakeTaskTrackerManager() {
+    }
+    public void addTaskTracker(String ttName) {
+    }
+    public ClusterStatus getClusterStatus() {
+      return null;
+    }
+    public int getNumberOfUniqueHosts() {
+      return 0;
+    }
+    public int getNextHeartbeatInterval() {
+      return 0;
+    }
+    public Collection<TaskTrackerStatus> taskTrackers() {
+      return null;
+    }
+    public void addJobInProgressListener(JobInProgressListener listener) {
+    }
+    public void removeJobInProgressListener(JobInProgressListener listener) {
+    }
+    public void submitJob(JobInProgress job) {
+    }
+    public TaskTrackerStatus getTaskTracker(String trackerID) {
+      return null;
+    }
+    public void killJob(JobID jobid) throws IOException {
+    }
+    public JobInProgress getJob(JobID jobid) {
+      return null;
+    }
+    public void startTask(String taskTrackerName, final Task t) {
+    }
+    void addQueues(String[] arr) {
+      Set<String> queues = new HashSet<String>();
+      queues.addAll(Arrays.asList(arr));
+      qm.setQueues(queues);
+    }
+    public QueueManager getQueueManager() {
+      return qm;
+    }
+  }
+
+
+
+
+  static class FakeDynamicTimer extends Timer {
+    private TimerTask task;
+    public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
+      this.task = task;
+    }
+    public void runTask() {
+      task.run();
+    }
+  }
+
+  static class FakeQueueManager extends QueueManager {
+    private Set<String> queues = null;
+    FakeQueueManager() {
+      super(new Configuration());
+    }
+    void setQueues(Set<String> queues) {
+      this.queues = queues;
+    }
+    public synchronized Set<String> getQueues() {
+      return queues;
+    }
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java
(added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java
Tue Jun  9 16:16:33 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Collection;
+
+/**
+ * Mock queue scheduler for testing only
+ */
+public class FakeDynamicScheduler extends QueueTaskScheduler {
+  public void start() throws IOException {
+  }
+  public void terminate() throws IOException {
+  }
+  public List<Task> assignTasks(TaskTrackerStatus taskTracker)
+    throws IOException {
+    return null;
+  }
+  public Collection<JobInProgress> getJobs(String queueName) {
+    return null;
+  }
+  public void setAllocator(QueueAllocator allocator) {
+  }
+}
+

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestDynamicScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestDynamicScheduler.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestDynamicScheduler.java
(added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestDynamicScheduler.java
Tue Jun  9 16:16:33 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+
+/**
+ Test the dynamic scheduler.
+ Use the System Property test.build.data to drive the test run
+ */
+public class TestDynamicScheduler extends BaseSchedulerTest {
+
+  private DynamicPriorityScheduler scheduler;
+
+
+  /**
+   * Create the test queues
+   * @throws Exception
+   */
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    conf.set(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_SCHEDULER,
+             "org.apache.hadoop.mapred.FakeDynamicScheduler");
+    scheduler = new DynamicPriorityScheduler();
+    scheduler.setTimer(timer); 
+    scheduler.setConf(conf);
+    scheduler.setTaskTrackerManager(taskTracker);
+    taskTracker.addQueues(QUEUES);
+    scheduler.start();
+  }
+
+  /**
+   * Remove the queues
+   * @throws Exception
+   */
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    removeQueues(QUEUES);
+  }
+  
+
+  private void setSpending(String queue, float spending) throws IOException {
+    scheduler.allocations.setSpending(queue, spending);
+  }
+
+  private void setBudgets(String[] queue, float[] budget) throws IOException {
+    for (int i = 0; i < queue.length; i++) {
+      scheduler.allocations.addBudget(queue[i], budget[i]);
+    }
+  }
+
+  private void addQueues(String[] queue) throws IOException {
+    for (String aQueue : queue) {
+      scheduler.allocations.addQueue(aQueue);
+    }
+  }
+  private void removeQueues(String[] queue) throws IOException {
+    for (String aQueue : queue) {
+      scheduler.allocations.removeQueue(aQueue);
+    }
+  }
+
+
+  public void testAllocation() throws IOException {
+    addQueues(QUEUES);
+    setSpending("queue1", 1.0f);
+    setSpending("queue2", 2.0f);
+    setBudgets(QUEUES, new float[] {100.0f, 100.0f});
+    scheduler.allocations.setUsage("queue1",2,0);
+    scheduler.allocations.setUsage("queue2",3,0);
+    timer.runTask();
+    assertNotNull(scheduler.allocations);
+    assertNotNull(scheduler.allocations.allocation);
+    assertNotNull(scheduler.allocations.allocation.get("queue1"));
+    assertNotNull(scheduler.allocations.allocation.get("queue2"));
+    Collection<BudgetQueue> budgetQueues =
+      scheduler.allocations.store.getQueues();
+    assertNotNull(budgetQueues);
+    assertEquals(2, budgetQueues.size());
+    BudgetQueue queue1Budget = null;
+    BudgetQueue queue2Budget = null;
+    for (BudgetQueue queue: budgetQueues) {
+      if (queue.name.equals("queue1")) {
+        queue1Budget = queue;
+      } else {
+        queue2Budget = queue;
+      }
+    }
+    assertNotNull(queue1Budget);
+    assertNotNull(queue2Budget);
+
+    assertEquals(98.0f, queue1Budget.budget, 0.1f);
+    assertEquals(94.0f, queue2Budget.budget, 0.1f);
+    assertEquals(1.0f, queue1Budget.spending, 0.1f);
+    assertEquals(2.0f, queue2Budget.spending, 0.1f);
+
+    Map<String,QueueAllocation> shares = scheduler.allocations.getAllocation();
+    assertNotNull(shares);
+    assertEquals(2, shares.size());
+    assertNotNull(shares.get("queue1")); 
+    assertNotNull(shares.get("queue2")); 
+    assertEquals(1.0f/3.0f, shares.get("queue1").getShare(), 0.1f);
+    assertEquals(2.0f/3.0f, shares.get("queue2").getShare(), 0.1f);
+  }
+
+  public void testBudgetUpdate() throws IOException {
+    addQueues(QUEUES);
+    setSpending("queue1", 1.0f);
+    setSpending("queue2", 2.0f);
+    setBudgets(QUEUES, new float[] {100.0f, 200.0f});
+    timer.runTask();
+    Collection<BudgetQueue> budgetQueues =
+      scheduler.allocations.store.getQueues();
+    BudgetQueue queue1Budget = null;
+    BudgetQueue queue2Budget = null;
+    for (BudgetQueue queue: budgetQueues) {
+      if (queue.name.equals("queue1")) {
+        queue1Budget = queue;
+      } else {
+        queue2Budget = queue;
+      }
+    }
+    assertNotNull(queue1Budget);
+    assertNotNull(queue2Budget);
+    assertEquals(100.0f, queue1Budget.budget, 0.1f);
+    assertEquals(200.0f, queue2Budget.budget, 0.1f);
+    setBudgets(QUEUES, new float[] {200.0f, 300.0f});
+    timer.runTask();
+    budgetQueues =  scheduler.allocations.store.getQueues();
+    for (BudgetQueue queue: budgetQueues) {
+      if (queue.name.equals("queue1")) {
+        queue1Budget = queue;
+      } else {
+        queue2Budget = queue;
+      }
+    }
+    assertEquals(300.0f, queue1Budget.budget, 0.1f);
+    assertEquals(500.0f, queue2Budget.budget, 0.1f);
+    removeQueues(QUEUES);
+  }
+
+  public void testSpendingUpdate() throws IOException {
+    addQueues(QUEUES);
+    setSpending("queue1", 1.0f);
+    setSpending("queue2", 2.0f);
+    setBudgets(QUEUES, new float[] {100.0f, 100.0f});
+    scheduler.allocations.setUsage("queue1", 1, 0);
+    scheduler.allocations.setUsage("queue2", 1, 0);
+    timer.runTask();
+    Map<String,QueueAllocation> shares =
+      scheduler.allocations.getAllocation();
+    assertEquals(1.0f/3.0f, shares.get("queue1").getShare(), 0.1f);
+    assertEquals(2.0f/3.0f, shares.get("queue2").getShare(), 0.1f);
+    setSpending("queue1", 5.0f);
+    setSpending("queue2", 1.0f);
+    timer.runTask();
+    shares = scheduler.allocations.getAllocation();
+    assertEquals(5.0f/6.0f, shares.get("queue1").getShare(), 0.1f);
+    assertEquals(1.0f/6.0f, shares.get("queue2").getShare(), 0.1f);
+  }
+}

Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestPriorityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestPriorityScheduler.java?rev=783059&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestPriorityScheduler.java
(added)
+++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestPriorityScheduler.java
Tue Jun  9 16:16:33 2009
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+public class TestPriorityScheduler extends BaseSchedulerTest {
+
+  private DynamicPriorityScheduler scheduler;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    conf.set(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_SCHEDULER,
+      "org.apache.hadoop.mapred.PriorityScheduler");
+    scheduler = new DynamicPriorityScheduler();
+    scheduler.setTimer(timer); 
+    scheduler.setConf(conf);
+    scheduler.setTaskTrackerManager(taskTracker);
+    taskTracker.addQueues(QUEUES);
+    scheduler.start();
+  }
+
+
+  /**
+   * Remove the queues
+   * @throws Exception
+   */
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    removeQueues(QUEUES);
+  }
+
+  private void setSpending(String queue, float spending) throws IOException {
+    scheduler.allocations.setSpending(queue, spending);
+  }
+
+  private void setBudgets(String[] queue, float[] budget) throws IOException {
+    for (int i = 0; i < queue.length; i++) {
+      scheduler.allocations.addBudget(queue[i], budget[i]);
+    }
+  }
+
+  private void addQueues(String[] queue) throws IOException {
+    for (String aQueue : queue) {
+      scheduler.allocations.addQueue(aQueue);
+    }
+  }
+  private void removeQueues(String[] queue) throws IOException {
+    for (String aQueue : queue) {
+      scheduler.allocations.removeQueue(aQueue);
+    }
+  }
+
+  public void testQueueAllocation() throws IOException {
+    addQueues(QUEUES);
+    setSpending("queue1", 1.0f);
+    setSpending("queue2", 2.0f);
+    setBudgets(QUEUES, new float[] {100.0f, 100.0f});
+    scheduler.allocations.setUsage("queue1", 2,0);
+    scheduler.allocations.setUsage("queue2", 3,0);
+    timer.runTask();
+    Map<String,PriorityScheduler.QueueQuota> queueQuota = 
+      ((PriorityScheduler)scheduler.scheduler).
+      getQueueQuota(100, 10, PriorityScheduler.MAP); 
+    assertEquals(2, queueQuota.size());
+    for (PriorityScheduler.QueueQuota quota: queueQuota.values()) {
+      if (quota.name.equals("queue1")) {
+        assertEquals(Math.round(100 * 1.0f/3.0f), quota.quota, 0.1f);
+      } else {
+        assertEquals(Math.round(100 * 2.0f/3.0f), quota.quota, 0.1f);
+      }
+      assertTrue(quota.mappers == quota.quota);
+    }     
+    queueQuota = ((PriorityScheduler)scheduler.scheduler).getQueueQuota(100, 10,
+        PriorityScheduler.REDUCE); 
+    assertEquals(2, queueQuota.size());
+    for (PriorityScheduler.QueueQuota quota: queueQuota.values()) {
+      if (quota.name.equals("queue1")) {
+        assertEquals( Math.round(10 * 1.0f/3.0f), quota.quota, 0.1f);
+      } else {
+        assertEquals(Math.round(10 * 2.0f/3.0f), quota.quota, 0.1f);
+      }
+      assertTrue(quota.reducers == quota.quota);
+    }     
+  }
+
+  public void testUsage() throws IOException {
+    addQueues(QUEUES);
+    setSpending("queue1", 1.0f);
+    setSpending("queue2", 2.0f);
+    setBudgets(QUEUES, new float[] {1000.0f, 1000.0f});
+    scheduler.allocations.setUsage("queue1", 0, 1);
+    scheduler.allocations.setUsage("queue2", 0, 1);
+    timer.runTask();
+    Map<String,PriorityScheduler.QueueQuota> queueQuota = 
+      ((PriorityScheduler)scheduler.scheduler).getQueueQuota(100, 10,
+          PriorityScheduler.MAP); 
+    PriorityScheduler.QueueQuota quota1 = queueQuota.get("queue1");
+    PriorityScheduler.QueueQuota quota2 = queueQuota.get("queue2");
+    quota1.map_used = 10;
+    quota2.map_used = 90; 
+    ((PriorityScheduler)scheduler.scheduler).markIdle(queueQuota);
+    timer.runTask();
+
+    Collection<BudgetQueue> budgetQueues =
+      scheduler.allocations.store.getQueues();
+    assertNotNull(budgetQueues);
+    assertEquals(2, budgetQueues.size());
+    BudgetQueue queue1Budget = null;
+    BudgetQueue queue2Budget = null;
+    for (BudgetQueue queue: budgetQueues) {
+      if (queue.name.equals("queue1")) {
+        queue1Budget = queue;
+      } else {
+        queue2Budget = queue;
+      }
+    }
+    assertNotNull(queue1Budget);
+    assertNotNull(queue2Budget);
+    assertEquals("Budget incorrect", 990.0f, queue1Budget.budget, 0.1f);
+    assertEquals("Budget incorrect", 866.0f, queue2Budget.budget, 0.1f);
+  }
+}



Mime
View raw message