Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 4492 invoked from network); 11 Sep 2008 18:50:31 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Sep 2008 18:50:31 -0000 Received: (qmail 31939 invoked by uid 500); 11 Sep 2008 18:50:28 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 31909 invoked by uid 500); 11 Sep 2008 18:50:28 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 31900 invoked by uid 99); 11 Sep 2008 18:50:28 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Sep 2008 11:50:28 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Sep 2008 18:49:26 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9B80D2388A31; Thu, 11 Sep 2008 11:49:28 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r694415 [2/2] - in /hadoop/core/trunk: conf/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/ src/contrib/capacity-scheduler/src/java/ src/contrib/capacity-scheduler/src/java/org/ src/contrib/capacity-schedul... Date: Thu, 11 Sep 2008 18:49:26 -0000 To: core-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080911184928.9B80D2388A31@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=694415&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (added) +++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Sep 11 11:49:22 2008 @@ -0,0 +1,792 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.BytesWritable; +//import org.apache.hadoop.mapred.CapacityTaskScheduler; +import org.apache.hadoop.conf.Configuration; + +public class TestCapacityScheduler extends TestCase { + + private static int jobCounter; + + static class FakeJobInProgress extends JobInProgress { + + private FakeTaskTrackerManager taskTrackerManager; + private int mapTaskCtr; + private int redTaskCtr; + private Set mapTips = + new HashSet(); + private Set reduceTips = + new HashSet(); + + public FakeJobInProgress(JobID jId, JobConf jobConf, + FakeTaskTrackerManager taskTrackerManager, String user) + throws IOException { + super(jId, jobConf); + this.taskTrackerManager = taskTrackerManager; + this.startTime = System.currentTimeMillis(); + this.status = new JobStatus(); + this.status.setRunState(JobStatus.PREP); + if (null == jobConf.getQueueName()) { + this.profile = new JobProfile(user, jId, + null, null, null); + } + else { + this.profile = new JobProfile(user, jId, + null, null, null, jobConf.getQueueName()); + } + mapTaskCtr = 0; + redTaskCtr = 0; + } + + @Override + public synchronized void initTasks() throws IOException { + getStatus().setRunState(JobStatus.RUNNING); + } + + @Override + public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize, + int ignored) throws IOException { + if (runningMapTasks == numMapTasks) return null; + TaskAttemptID attemptId = getTaskAttemptID(true); + Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) { + @Override + public String toString() { + return String.format("%s on %s", getTaskID(), tts.getTrackerName()); + } + }; + taskTrackerManager.startTask(tts.getTrackerName(), task); + runningMapTasks++; + // create a fake TIP and keep track of it + mapTips.add(new FakeTaskInProgress(getJobID(), + getJobConf(), task, true, this)); + return task; + } + + @Override + public Task obtainNewReduceTask(final TaskTrackerStatus tts, + int clusterSize, int ignored) throws IOException { + if (runningReduceTasks == numReduceTasks) return null; + TaskAttemptID attemptId = getTaskAttemptID(false); + Task task = new ReduceTask("", attemptId, 0, 10) { + @Override + public String toString() { + return String.format("%s on %s", getTaskID(), tts.getTrackerName()); + } + }; + taskTrackerManager.startTask(tts.getTrackerName(), task); + runningReduceTasks++; + // create a fake TIP and keep track of it + reduceTips.add(new FakeTaskInProgress(getJobID(), + getJobConf(), task, false, this)); + return task; + } + + public void mapTaskFinished() { + runningMapTasks--; + finishedMapTasks++; + } + + public void reduceTaskFinished() { + runningReduceTasks--; + finishedReduceTasks++; + } + + private TaskAttemptID getTaskAttemptID(boolean isMap) { + JobID jobId = getJobID(); + return new TaskAttemptID(jobId.getJtIdentifier(), + jobId.getId(), isMap, (isMap)?++mapTaskCtr: ++redTaskCtr, 0); + } + + @Override + Set getNonLocalRunningMaps() { + return (Set)mapTips; + } + @Override + Set getRunningReduces() { + return (Set)reduceTips; + } + } + + static class FakeTaskInProgress extends TaskInProgress { + private boolean isMap; + private FakeJobInProgress fakeJob; + private TreeMap activeTasks; + private TaskStatus taskStatus; + FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, + boolean isMap, FakeJobInProgress job) { + super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0); + this.isMap = isMap; + this.fakeJob = job; + activeTasks = new TreeMap(); + activeTasks.put(t.getTaskID(), "tt"); + // create a fake status for a task that is running for a bit + this.taskStatus = TaskStatus.createTaskStatus(isMap); + taskStatus.setProgress(0.5f); + taskStatus.setRunState(TaskStatus.State.RUNNING); + } + + @Override + TreeMap getActiveTasks() { + return activeTasks; + } + @Override + public TaskStatus getTaskStatus(TaskAttemptID taskid) { + // return a status for a task that has run a bit + return taskStatus; + } + @Override + boolean killTask(TaskAttemptID taskId, boolean shouldFail) { + if (isMap) { + fakeJob.mapTaskFinished(); + } + else { + fakeJob.reduceTaskFinished(); + } + return true; + } + } + + static class FakeQueueManager extends QueueManager { + private Set queues = null; + FakeQueueManager() { + super(new Configuration()); + } + void setQueues(Set queues) { + this.queues = queues; + } + public synchronized Set getQueues() { + return queues; + } + } + + static class FakeTaskTrackerManager implements TaskTrackerManager { + int maps = 0; + int reduces = 0; + int maxMapTasksPerTracker = 2; + int maxReduceTasksPerTracker = 1; + List listeners = + new ArrayList(); + FakeQueueManager qm = new FakeQueueManager(); + + private Map trackers = + new HashMap(); + private Map taskStatuses = + new HashMap(); + + public FakeTaskTrackerManager() { + trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1, + new ArrayList(), 0, + maxMapTasksPerTracker, maxReduceTasksPerTracker)); + trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2, + new ArrayList(), 0, + maxMapTasksPerTracker, maxReduceTasksPerTracker)); + } + + public void addTaskTracker(String ttName) { + trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", 1, + new ArrayList(), 0, + maxMapTasksPerTracker, maxReduceTasksPerTracker)); + } + + public ClusterStatus getClusterStatus() { + int numTrackers = trackers.size(); + return new ClusterStatus(numTrackers, maps, reduces, + numTrackers * maxMapTasksPerTracker, + numTrackers * maxReduceTasksPerTracker, + JobTracker.State.RUNNING); + } + + public int getNumberOfUniqueHosts() { + return 0; + } + + public int getNextHeartbeatInterval() { + return MRConstants.HEARTBEAT_INTERVAL_MIN; + } + + public Collection taskTrackers() { + return trackers.values(); + } + + + public void addJobInProgressListener(JobInProgressListener listener) { + listeners.add(listener); + } + + public void removeJobInProgressListener(JobInProgressListener listener) { + listeners.remove(listener); + } + + public void submitJob(JobInProgress job) { + for (JobInProgressListener listener : listeners) { + listener.jobAdded(job); + } + } + + public TaskTrackerStatus getTaskTracker(String trackerID) { + return trackers.get(trackerID); + } + + public void startTask(String taskTrackerName, final Task t) { + if (t.isMapTask()) { + maps++; + } else { + reduces++; + } + TaskStatus status = new TaskStatus() { + @Override + public boolean getIsMap() { + return t.isMapTask(); + } + }; + taskStatuses.put(t.getTaskID().toString(), status); + status.setRunState(TaskStatus.State.RUNNING); + trackers.get(taskTrackerName).getTaskReports().add(status); + } + + public void finishTask(String taskTrackerName, String tipId, + FakeJobInProgress j) { + TaskStatus status = taskStatuses.get(tipId); + if (status.getIsMap()) { + maps--; + j.mapTaskFinished(); + } else { + reduces--; + j.reduceTaskFinished(); + } + status.setRunState(TaskStatus.State.SUCCEEDED); + } + + void addQueues(String[] arr) { + Set queues = new HashSet(); + for (String s: arr) { + queues.add(s); + } + qm.setQueues(queues); + } + + public QueueManager getQueueManager() { + return qm; + } + } + + // represents a fake queue configuration info + static class FakeQueueInfo { + String queueName; + float gc; + int reclaimTimeLimit; + boolean supportsPrio; + int ulMin; + + public FakeQueueInfo(String queueName, float gc, + int reclaimTimeLimit, boolean supportsPrio, int ulMin) { + this.queueName = queueName; + this.gc = gc; + this.reclaimTimeLimit = reclaimTimeLimit; + this.supportsPrio = supportsPrio; + this.ulMin = ulMin; + } + } + + static class FakeResourceManagerConf extends CapacitySchedulerConf { + + // map of queue names to queue info + private Map queueMap = + new LinkedHashMap(); + String firstQueue; + + void setFakeQueues(List queues) { + for (FakeQueueInfo q: queues) { + queueMap.put(q.queueName, q); + } + firstQueue = new String(queues.get(0).queueName); + } + + public synchronized Set getQueues() { + return queueMap.keySet(); + } + + /*public synchronized String getFirstQueue() { + return firstQueue; + }*/ + + public float getGuaranteedCapacity(String queue) { + return queueMap.get(queue).gc; + } + + public int getReclaimTimeLimit(String queue) { + return queueMap.get(queue).reclaimTimeLimit; + } + + public int getMinimumUserLimitPercent(String queue) { + return queueMap.get(queue).ulMin; + } + + public boolean isPrioritySupported(String queue) { + return queueMap.get(queue).supportsPrio; + } + } + + protected class FakeClock extends CapacityTaskScheduler.Clock { + private long time = 0; + + public void advance(long millis) { + time += millis; + } + + @Override + long getTime() { + return time; + } + } + + + protected JobConf conf; + protected CapacityTaskScheduler scheduler; + private FakeTaskTrackerManager taskTrackerManager; + private FakeResourceManagerConf resConf; + private FakeClock clock; + + @Override + protected void setUp() throws Exception { + jobCounter = 0; + taskTrackerManager = new FakeTaskTrackerManager(); + clock = new FakeClock(); + scheduler = new CapacityTaskScheduler(clock); + scheduler.setTaskTrackerManager(taskTrackerManager); + + conf = new JobConf(); + // set interval to a large number so thread doesn't interfere with us + conf.setLong("mapred.capacity-scheduler.reclaimCapacity.interval", 500); + scheduler.setConf(conf); + + } + + @Override + protected void tearDown() throws Exception { + if (scheduler != null) { + scheduler.terminate(); + } + } + + private FakeJobInProgress submitJob(int state, int maps, int reduces, + String queue, String user) throws IOException { + JobConf jobConf = new JobConf(conf); + jobConf.setNumMapTasks(maps); + jobConf.setNumReduceTasks(reduces); + if (queue != null) + jobConf.setQueueName(queue); + FakeJobInProgress job = new FakeJobInProgress( + new JobID("test", ++jobCounter), jobConf, taskTrackerManager, user); + job.getStatus().setRunState(state); + taskTrackerManager.submitJob(job); + return job; + } + + /*protected void submitJobs(int number, int state, int maps, int reduces) + throws IOException { + for (int i = 0; i < number; i++) { + submitJob(state, maps, reduces); + } + }*/ + + // basic tests, should be able to submit to queues + public void testSubmitToQueues() throws Exception { + // set up some queues + String[] qs = {"default", "q2"}; + taskTrackerManager.addQueues(qs); + resConf = new FakeResourceManagerConf(); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25)); + resConf.setFakeQueues(queues); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + // submit a job with no queue specified. It should be accepted + // and given to the default queue. + JobInProgress j = submitJob(JobStatus.PREP, 10, 10, null, "u1"); + // when we ask for a task, we should get one, from the job submitted + Task t; + t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + // submit another job, to a different queue + j = submitJob(JobStatus.PREP, 10, 10, "q2", "u1"); + // now when we get a task, it should be from the second job + t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); + } + + // test capacity transfer + public void testCapacityTransfer() throws Exception { + // set up some queues + String[] qs = {"default", "q2"}; + taskTrackerManager.addQueues(qs); + resConf = new FakeResourceManagerConf(); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25)); + resConf.setFakeQueues(queues); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + // submit a job + submitJob(JobStatus.PREP, 10, 10, "q2", "u1"); + // for queue 'q2', the GC for maps is 2. Since we're the only user, + // we should get a task + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + // I should get another map task. + checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); + // Now we're at full capacity for maps. If I ask for another map task, + // I should get a map task from the default queue's capacity. + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); + // and another + checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2"); + } + + // test user limits + public void testUserLimits() throws Exception { + // set up some queues + String[] qs = {"default", "q2"}; + taskTrackerManager.addQueues(qs); + resConf = new FakeResourceManagerConf(); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25)); + resConf.setFakeQueues(queues); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + // submit a job + submitJob(JobStatus.PREP, 10, 10, "q2", "u1"); + // for queue 'q2', the GC for maps is 2. Since we're the only user, + // we should get a task + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + // Submit another job, from a different user + submitJob(JobStatus.PREP, 10, 10, "q2", "u2"); + // Now if I ask for a map task, it should come from the second job + checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); + // Now we're at full capacity for maps. If I ask for another map task, + // I should get a map task from the default queue's capacity. + checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); + // and another + checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2"); + } + + // test user limits when a 2nd job is submitted much after first job + public void testUserLimits2() throws Exception { + // set up some queues + String[] qs = {"default", "q2"}; + taskTrackerManager.addQueues(qs); + resConf = new FakeResourceManagerConf(); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25)); + resConf.setFakeQueues(queues); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + // submit a job + submitJob(JobStatus.PREP, 10, 10, "q2", "u1"); + // for queue 'q2', the GC for maps is 2. Since we're the only user, + // we should get a task + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + // since we're the only job, we get another map + checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); + // Submit another job, from a different user + submitJob(JobStatus.PREP, 10, 10, "q2", "u2"); + // Now if I ask for a map task, it should come from the second job + checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); + // and another + checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2"); + } + + // test user limits when a 2nd job is submitted much after first job + // and we need to wait for first job's task to complete + public void testUserLimits3() throws Exception { + // set up some queues + String[] qs = {"default", "q2"}; + taskTrackerManager.addQueues(qs); + resConf = new FakeResourceManagerConf(); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25)); + resConf.setFakeQueues(queues); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + // submit a job + FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1"); + // for queue 'q2', the GC for maps is 2. Since we're the only user, + // we should get a task + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + // since we're the only job, we get another map + checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); + // we get two more maps from 'default queue' + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2"); + // Submit another job, from a different user + FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2"); + // one of the task finishes + taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1); + // Now if I ask for a map task, it should come from the second job + checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); + // another task from job1 finishes, another new task to job2 + taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1); + checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1"); + // now we have equal number of tasks from each job. Whichever job's + // task finishes, that job gets a new task + taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1); + checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2"); + taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2); + checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1"); + } + + // test user limits with many users, more slots + public void testUserLimits4() throws Exception { + // set up one queue, with 10 slots + String[] qs = {"default"}; + taskTrackerManager.addQueues(qs); + resConf = new FakeResourceManagerConf(); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 100.0f, 10000, true, 25)); + resConf.setFakeQueues(queues); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + // add some more TTs + taskTrackerManager.addTaskTracker("tt3"); + taskTrackerManager.addTaskTracker("tt4"); + taskTrackerManager.addTaskTracker("tt5"); + + // u1 submits job + FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1"); + // it gets the first 5 slots + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2"); + checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3"); + // u2 submits job with 4 slots + FakeJobInProgress j2 = submitJob(JobStatus.PREP, 4, 4, null, "u2"); + // u2 should get next 4 slots + checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3"); + checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4"); + checkAssignment("tt4", "attempt_test_0002_m_000003_0 on tt4"); + checkAssignment("tt5", "attempt_test_0002_m_000004_0 on tt5"); + // last slot should go to u1, since u2 has no more tasks + checkAssignment("tt5", "attempt_test_0001_m_000006_0 on tt5"); + // u1 finishes a task + taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1); + // u1 submits a few more jobs + submitJob(JobStatus.PREP, 10, 10, null, "u1"); + submitJob(JobStatus.PREP, 10, 10, null, "u1"); + submitJob(JobStatus.PREP, 10, 10, null, "u1"); + // u2 also submits a job + submitJob(JobStatus.PREP, 10, 10, null, "u2"); + // now u3 submits a job + submitJob(JobStatus.PREP, 2, 2, null, "u3"); + // next slot should go to u3, even though u2 has an earlier job, since + // user limits have changed and u1/u2 are over limits + checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5"); + // some other task finishes and u3 gets it + taskTrackerManager.finishTask("tt5", "attempt_test_0002_m_000004_0", j1); + checkAssignment("tt5", "attempt_test_0007_m_000002_0 on tt5"); + // now, u2 finishes a task + taskTrackerManager.finishTask("tt4", "attempt_test_0002_m_000002_0", j1); + // next slot will go to u1, since u3 has nothing to run and u1's job is + // first in the queue + checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4"); + } + + // test code to reclaim capacity + public void testReclaimCapacity() throws Exception { + // set up some queues + String[] qs = {"default", "q2", "q3"}; + taskTrackerManager.addQueues(qs); + resConf = new FakeResourceManagerConf(); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("q2", 25.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("q3", 25.0f, 1000000, true, 25)); + resConf.setFakeQueues(queues); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + // set up a situation where q2 is under capacity, and default & q3 + // are at/over capacity + FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1"); + FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1"); + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); + checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); + // now submit a job to q2 + FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1"); + // update our structures + scheduler.updateQSIInfo(); + // get scheduler to notice that q2 needs to reclaim + scheduler.reclaimCapacity(); + // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so + // we start reclaiming when 15 secs are left. + clock.advance(400000); + scheduler.reclaimCapacity(); + // no tasks should have been killed yet + assertEquals(j1.runningMapTasks, 3); + assertEquals(j2.runningMapTasks, 1); + clock.advance(200000); + scheduler.reclaimCapacity(); + // task from j1 will be killed + assertEquals(j1.runningMapTasks, 2); + assertEquals(j2.runningMapTasks, 1); + + } + + // test code to reclaim multiple capacity + public void testReclaimCapacity2() throws Exception { + // set up some queues + String[] qs = {"default", "q2", "q3", "q4"}; + taskTrackerManager.addQueues(qs); + resConf = new FakeResourceManagerConf(); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("q2", 20.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("q3", 20.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("q4", 10.0f, 1000000, true, 25)); + resConf.setFakeQueues(queues); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + // add some more TTs so our total map capacity is 10 + taskTrackerManager.addTaskTracker("tt3"); + taskTrackerManager.addTaskTracker("tt4"); + taskTrackerManager.addTaskTracker("tt5"); + + // q2 has nothing running, default is under cap, q3 and q4 are over cap + FakeJobInProgress j1 = submitJob(JobStatus.PREP, 2, 2, null, "u1"); + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1"); + checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); + FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q4", "u1"); + checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); + checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3"); + checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3"); + checkAssignment("tt4", "attempt_test_0003_m_000002_0 on tt4"); + checkAssignment("tt4", "attempt_test_0002_m_000004_0 on tt4"); + checkAssignment("tt5", "attempt_test_0002_m_000005_0 on tt5"); + checkAssignment("tt5", "attempt_test_0003_m_000003_0 on tt5"); + // at this point, q3 is running 5 tasks (with a cap of 2), q4 is + // running 3 tasks (with a cap of 1). + // If we submit a job to 'default', we need to get 3 slots back. + FakeJobInProgress j4 = submitJob(JobStatus.PREP, 10, 10, null, "u1"); + // update our structures + scheduler.updateQSIInfo(); + // get scheduler to notice that q2 needs to reclaim + scheduler.reclaimCapacity(); + // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so + // we start reclaiming when 15 secs are left. + clock.advance(400000); + scheduler.reclaimCapacity(); + // nothing should have happened + assertEquals(j2.runningMapTasks, 5); + assertEquals(j3.runningMapTasks, 3); + // 3 tasks to kill, 5 running over cap. q3 should give up 3*3/5 = 2 slots. + // q4 should give up 2*3/5 = 1 slot. + clock.advance(200000); + scheduler.reclaimCapacity(); + assertEquals(j2.runningMapTasks, 3); + assertEquals(j3.runningMapTasks, 2); + + } + + // test code to reclaim capacity in steps + public void testReclaimCapacityInSteps() throws Exception { + // set up some queues + String[] qs = {"default", "q2"}; + taskTrackerManager.addQueues(qs); + resConf = new FakeResourceManagerConf(); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, 1000000, true, 25)); + resConf.setFakeQueues(queues); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + // set up a situation where q2 is under capacity, and default is + // at/over capacity + FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1"); + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2"); + // now submit a job to q2 + FakeJobInProgress j2 = submitJob(JobStatus.PREP, 1, 1, "q2", "u1"); + // update our structures + scheduler.updateQSIInfo(); + // get scheduler to notice that q2 needs to reclaim + scheduler.reclaimCapacity(); + // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so + // we start reclaiming when 15 secs are left. + clock.advance(400000); + // submit another job to q2 which causes more capacity to be reclaimed + j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2"); + // update our structures + scheduler.updateQSIInfo(); + clock.advance(200000); + scheduler.reclaimCapacity(); + // one task from j1 will be killed + assertEquals(j1.runningMapTasks, 3); + clock.advance(300000); + scheduler.reclaimCapacity(); + // timer for 2nd job hasn't fired, so nothing killed + assertEquals(j1.runningMapTasks, 3); + clock.advance(400000); + scheduler.reclaimCapacity(); + // one task from j1 will be killed + assertEquals(j1.runningMapTasks, 2); + + } + + protected TaskTrackerStatus tracker(String taskTrackerName) { + return taskTrackerManager.getTaskTracker(taskTrackerName); + } + + protected Task checkAssignment(String taskTrackerName, + String expectedTaskString) throws IOException { + List tasks = scheduler.assignTasks(tracker(taskTrackerName)); + assertNotNull(expectedTaskString, tasks); + assertEquals(expectedTaskString, 1, tasks.size()); + assertEquals(expectedTaskString, tasks.get(0).toString()); + return tasks.get(0); + } + +} Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=694415&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java (added) +++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Thu Sep 11 11:49:22 2008 @@ -0,0 +1,245 @@ +/** Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.File; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.Map; + +import junit.framework.TestCase; + +import org.apache.hadoop.fs.Path; + +public class TestCapacitySchedulerConf extends TestCase { + + private static String testDataDir = System.getProperty("test.build.data"); + private static String testConfFile; + + private Map defaultProperties; + private CapacitySchedulerConf testConf; + private PrintWriter writer; + + static { + if (testDataDir == null) { + testDataDir = "."; + } else { + new File(testDataDir).mkdirs(); + } + testConfFile = new File(testDataDir, "test-conf.xml").getAbsolutePath(); + } + + public TestCapacitySchedulerConf() { + defaultProperties = setupQueueProperties( + new String[] { "guaranteed-capacity", + "reclaim-time-limit", + "supports-priority", + "minimum-user-limit-percent" }, + new String[] { "100", + "300", + "false", + "100" } + ); + } + + + public void setUp() throws IOException { + openFile(); + } + + public void tearDown() throws IOException { + File confFile = new File(testConfFile); + if (confFile.exists()) { + confFile.delete(); + } + } + + public void testDefaults() { + testConf = new CapacitySchedulerConf(); + Map> queueDetails + = new HashMap>(); + queueDetails.put("default", defaultProperties); + checkQueueProperties(testConf, queueDetails); + } + + public void testQueues() { + + Map q1Props = setupQueueProperties( + new String[] { "guaranteed-capacity", + "reclaim-time-limit", + "supports-priority", + "minimum-user-limit-percent" }, + new String[] { "10", + "600", + "true", + "25" } + ); + + Map q2Props = setupQueueProperties( + new String[] { "guaranteed-capacity", + "reclaim-time-limit", + "supports-priority", + "minimum-user-limit-percent" }, + new String[] { "100", + "6000", + "false", + "50" } + ); + + startConfig(); + writeQueueDetails("default", q1Props); + writeQueueDetails("research", q2Props); + endConfig(); + + testConf = new CapacitySchedulerConf(new Path(testConfFile)); + + Map> queueDetails + = new HashMap>(); + queueDetails.put("default", q1Props); + queueDetails.put("research", q2Props); + checkQueueProperties(testConf, queueDetails); + } + + public void testQueueWithDefaultProperties() { + Map q1Props = setupQueueProperties( + new String[] { "guaranteed-capacity", + "minimum-user-limit-percent" }, + new String[] { "20", + "75" } + ); + startConfig(); + writeQueueDetails("default", q1Props); + endConfig(); + + testConf = new CapacitySchedulerConf(new Path(testConfFile)); + + Map> queueDetails + = new HashMap>(); + Map expProperties = new HashMap(); + for (String key : q1Props.keySet()) { + expProperties.put(key, q1Props.get(key)); + } + expProperties.put("reclaim-time-limit", "300"); + expProperties.put("supports-priority", "false"); + queueDetails.put("default", expProperties); + checkQueueProperties(testConf, queueDetails); + } + + public void testReload() throws IOException { + // use the setup in the test case testQueues as a base... + testQueues(); + + // write new values to the file... + Map q1Props = setupQueueProperties( + new String[] { "guaranteed-capacity", + "reclaim-time-limit", + "supports-priority", + "minimum-user-limit-percent" }, + new String[] { "20.5", + "600", + "true", + "40" } + ); + + Map q2Props = setupQueueProperties( + new String[] { "guaranteed-capacity", + "reclaim-time-limit", + "supports-priority", + "minimum-user-limit-percent" }, + new String[] { "100", + "3000", + "false", + "50" } + ); + + openFile(); + startConfig(); + writeQueueDetails("default", q1Props); + writeQueueDetails("production", q2Props); + endConfig(); + + testConf.reloadConfiguration(); + + Map> queueDetails + = new HashMap>(); + queueDetails.put("default", q1Props); + queueDetails.put("production", q2Props); + checkQueueProperties(testConf, queueDetails); + } + + private void checkQueueProperties( + CapacitySchedulerConf testConf, + Map> queueDetails) { + for (String queueName : queueDetails.keySet()) { + Map map = queueDetails.get(queueName); + assertEquals(Float.parseFloat(map.get("guaranteed-capacity")), + testConf.getGuaranteedCapacity(queueName)); + assertEquals(Integer.parseInt(map.get("minimum-user-limit-percent")), + testConf.getMinimumUserLimitPercent(queueName)); + assertEquals(Integer.parseInt(map.get("reclaim-time-limit")), + testConf.getReclaimTimeLimit(queueName)); + assertEquals(Boolean.parseBoolean(map.get("supports-priority")), + testConf.isPrioritySupported(queueName)); + } + } + + private Map setupQueueProperties(String[] keys, + String[] values) { + HashMap map = new HashMap(); + for(int i=0; i"); + writer.println(""); + } + + private void writeQueueDetails(String queue, Map props) { + for (String key : props.keySet()) { + writer.println(""); + writer.println("mapred.capacity-scheduler.queue." + + queue + "." + key + + ""); + writer.println(""+props.get(key)+""); + writer.println(""); + } + } + + private void endConfig() { + writer.println(""); + writer.close(); + } + +} Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=694415&r1=694414&r2=694415&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original) +++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Thu Sep 11 11:49:22 2008 @@ -156,6 +156,11 @@ listeners.remove(listener); } + @Override + public int getNextHeartbeatInterval() { + return MRConstants.HEARTBEAT_INTERVAL_MIN; + } + // Test methods public void submitJob(JobInProgress job) { Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=694415&r1=694414&r2=694415&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Thu Sep 11 11:49:22 2008 @@ -72,6 +72,10 @@ int finishedReduceTasks = 0; int failedMapTasks = 0; int failedReduceTasks = 0; + // runningMapTasks include speculative tasks, so we need to capture + // speculative tasks separately + int speculativeMapTasks = 0; + int speculativeReduceTasks = 0; int mapFailuresPercent = 0; int reduceFailuresPercent = 0; @@ -441,6 +445,14 @@ public synchronized int finishedReduces() { return finishedReduceTasks; } + public synchronized int pendingMaps() { + return numMapTasks - runningMapTasks - failedMapTasks - + finishedMapTasks + speculativeMapTasks; + } + public synchronized int pendingReduces() { + return numReduceTasks - runningReduceTasks - failedReduceTasks - + finishedReduceTasks + speculativeReduceTasks; + } public JobPriority getPriority() { return this.priority; } @@ -484,7 +496,34 @@ TaskInProgress[] getReduceTasks() { return reduces; } - + + /** + * Return the nonLocalRunningMaps + * @return + */ + Set getNonLocalRunningMaps() + { + return nonLocalRunningMaps; + } + + /** + * Return the runningMapCache + * @return + */ + Map> getRunningMapCache() + { + return runningMapCache; + } + + /** + * Return runningReduces + * @return + */ + Set getRunningReduces() + { + return runningReduces; + } + /** * Get the job configuration * @return the job's configuration @@ -738,6 +777,8 @@ Task result = maps[target].getTaskToRun(tts.getTrackerName()); if (result != null) { runningMapTasks += 1; + if (maps[target].getActiveTasks().size() > 1) + speculativeMapTasks++; if (maps[target].isFirstAttempt(result.getTaskID())) { JobHistory.Task.logStarted(maps[target].getTIPId(), Values.MAP.name(), System.currentTimeMillis(), @@ -849,6 +890,8 @@ Task result = reduces[target].getTaskToRun(tts.getTrackerName()); if (result != null) { runningReduceTasks += 1; + if (reduces[target].getActiveTasks().size() > 1) + speculativeReduceTasks++; if (reduces[target].isFirstAttempt(result.getTaskID())) { JobHistory.Task.logStarted(reduces[target].getTIPId(), Values.REDUCE.name(), System.currentTimeMillis(), ""); @@ -1467,6 +1510,7 @@ JobTrackerInstrumentation metrics) { TaskAttemptID taskid = status.getTaskID(); + int oldNumAttempts = tip.getActiveTasks().size(); // Sanity check: is the TIP already complete? // It _is_ safe to not decrement running{Map|Reduce}Tasks and @@ -1514,10 +1558,14 @@ status.getCounters()); } - // Update the running/finished map/reduce counts + int newNumAttempts = tip.getActiveTasks().size(); if (!tip.isCleanupTask()) { if (tip.isMapTask()) { runningMapTasks -= 1; + // check if this was a sepculative task + if (oldNumAttempts > 1) { + speculativeMapTasks -= (oldNumAttempts - newNumAttempts); + } finishedMapTasks += 1; metrics.completeMap(taskid); // remove the completed map from the resp running caches @@ -1527,6 +1575,9 @@ } } else { runningReduceTasks -= 1; + if (oldNumAttempts > 1) { + speculativeReduceTasks -= (oldNumAttempts - newNumAttempts); + } finishedReduceTasks += 1; metrics.completeReduce(taskid); // remove the completed reduces from the running reducers set @@ -1620,7 +1671,7 @@ jobKilled = true; } } - + /** * A task assigned to this JobInProgress has reported in as failed. * Most of the time, we'll just reschedule execution. However, after Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=694415&r1=694414&r2=694415&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Sep 11 11:49:22 2008 @@ -1272,7 +1272,7 @@ * Heartbeat interval is incremented 1second for every 50 nodes. * @return next heartbeat interval. */ - private int getNextHeartbeatInterval() { + public int getNextHeartbeatInterval() { // get the no of task trackers int clusterSize = getClusterStatus().getTaskTrackers(); int heartbeatInterval = Math.max( Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=694415&r1=694414&r2=694415&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java Thu Sep 11 11:49:22 2008 @@ -63,4 +63,12 @@ * @return the {@link QueueManager} */ public QueueManager getQueueManager(); + + /** + * Return the current heartbeat interval that's used by {@link TaskTracker}s. + * + * @return the heartbeat interval used by {@link TaskTracker}s + */ + public int getNextHeartbeatInterval(); + } Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=694415&r1=694414&r2=694415&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Thu Sep 11 11:49:22 2008 @@ -144,6 +144,11 @@ return null; } + @Override + public int getNextHeartbeatInterval() { + return MRConstants.HEARTBEAT_INTERVAL_MIN; + } + // Test methods public void submitJob(JobInProgress job) {