Modified: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Jun 19 05:42:53 2009
@@ -40,7 +40,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil.AccessControlList;
-
public class TestCapacityScheduler extends TestCase {
static final Log LOG =
@@ -146,17 +145,24 @@
}
private ControlledInitializationPoller controlledInitializationPoller;
-
+ /*
+ * Fake job in progress object used for testing the schedulers scheduling
+ * decisions. The JobInProgress objects returns out FakeTaskInProgress
+ * objects when assignTasks is called. If speculative maps and reduces
+ * are configured then JobInProgress returns exactly one Speculative
+ * map and reduce task.
+ */
static class FakeJobInProgress extends JobInProgress {
- private FakeTaskTrackerManager taskTrackerManager;
+ protected FakeTaskTrackerManager taskTrackerManager;
private int mapTaskCtr;
private int redTaskCtr;
private Set<TaskInProgress> mapTips =
new HashSet<TaskInProgress>();
private Set<TaskInProgress> reduceTips =
new HashSet<TaskInProgress>();
-
+ private int speculativeMapTaskCounter = 0;
+ private int speculativeReduceTaskCounter = 0;
public FakeJobInProgress(JobID jId, JobConf jobConf,
FakeTaskTrackerManager taskTrackerManager, String user) {
super(jId, jobConf, null);
@@ -175,8 +181,6 @@
}
mapTaskCtr = 0;
redTaskCtr = 0;
- super.setMaxVirtualMemoryForTask(jobConf.getMaxVirtualMemoryForTask());
- super.setMaxPhysicalMemoryForTask(jobConf.getMaxPhysicalMemoryForTask());
}
@Override
@@ -187,8 +191,14 @@
@Override
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
- if (mapTaskCtr == numMapTasks) return null;
- TaskAttemptID attemptId = getTaskAttemptID(true);
+ boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
+ if (areAllMapsRunning){
+ if(!getJobConf().getMapSpeculativeExecution() ||
+ speculativeMapTasks > 0) {
+ return null;
+ }
+ }
+ TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
@Override
public String toString() {
@@ -198,16 +208,39 @@
taskTrackerManager.startTask(tts.getTrackerName(), task);
runningMapTasks++;
// create a fake TIP and keep track of it
- mapTips.add(new FakeTaskInProgress(getJobID(),
- getJobConf(), task, true, this));
+ FakeTaskInProgress mapTip = new FakeTaskInProgress(getJobID(),
+ getJobConf(), task, true, this);
+ mapTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
+ if(areAllMapsRunning) {
+ speculativeMapTasks++;
+ //you have scheduled a speculative map. Now set all tips in the
+ //map tips not to have speculative task.
+ for(TaskInProgress t : mapTips) {
+ if (t instanceof FakeTaskInProgress) {
+ FakeTaskInProgress mt = (FakeTaskInProgress) t;
+ mt.hasSpeculativeMap = false;
+ }
+ }
+ } else {
+ //add only non-speculative tips.
+ mapTips.add(mapTip);
+ //add the tips to the JobInProgress TIPS
+ maps = mapTips.toArray(new TaskInProgress[mapTips.size()]);
+ }
return task;
}
-
+
@Override
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
int clusterSize, int ignored) throws IOException {
- if (redTaskCtr == numReduceTasks) return null;
- TaskAttemptID attemptId = getTaskAttemptID(false);
+ boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
+ if (areAllReducesRunning){
+ if(!getJobConf().getReduceSpeculativeExecution() ||
+ speculativeReduceTasks > 0) {
+ return null;
+ }
+ }
+ TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
Task task = new ReduceTask("", attemptId, 0, 10) {
@Override
public String toString() {
@@ -217,8 +250,25 @@
taskTrackerManager.startTask(tts.getTrackerName(), task);
runningReduceTasks++;
// create a fake TIP and keep track of it
- reduceTips.add(new FakeTaskInProgress(getJobID(),
- getJobConf(), task, false, this));
+ FakeTaskInProgress reduceTip = new FakeTaskInProgress(getJobID(),
+ getJobConf(), task, false, this);
+ reduceTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
+ if(areAllReducesRunning) {
+ speculativeReduceTasks++;
+ //you have scheduled a speculative map. Now set all tips in the
+ //map tips not to have speculative task.
+ for(TaskInProgress t : reduceTips) {
+ if (t instanceof FakeTaskInProgress) {
+ FakeTaskInProgress rt = (FakeTaskInProgress) t;
+ rt.hasSpeculativeReduce = false;
+ }
+ }
+ } else {
+ //add only non-speculative tips.
+ reduceTips.add(reduceTip);
+ //add the tips to the JobInProgress TIPS
+ reduces = reduceTips.toArray(new TaskInProgress[reduceTips.size()]);
+ }
return task;
}
@@ -232,14 +282,19 @@
finishedReduceTasks++;
}
- private TaskAttemptID getTaskAttemptID(boolean isMap) {
+ private TaskAttemptID getTaskAttemptID(boolean isMap, boolean isSpeculative) {
JobID jobId = getJobID();
TaskType t = TaskType.REDUCE;
if (isMap) {
t = TaskType.MAP;
}
- return new TaskAttemptID(jobId.getJtIdentifier(),
- jobId.getId(), t, (isMap)?++mapTaskCtr: ++redTaskCtr, 0);
+ if (!isSpeculative) {
+ return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
+ (isMap) ? ++mapTaskCtr : ++redTaskCtr, 0);
+ } else {
+ return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
+ (isMap) ? mapTaskCtr : redTaskCtr, 1);
+ }
}
@Override
@@ -270,12 +325,15 @@
this.status.setRunState(JobStatus.FAILED);
}
}
-
+
static class FakeTaskInProgress extends TaskInProgress {
private boolean isMap;
private FakeJobInProgress fakeJob;
private TreeMap<TaskAttemptID, String> activeTasks;
private TaskStatus taskStatus;
+ boolean hasSpeculativeMap;
+ boolean hasSpeculativeReduce;
+
FakeTaskInProgress(JobID jId, JobConf jobConf, Task t,
boolean isMap, FakeJobInProgress job) {
super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
@@ -287,6 +345,16 @@
this.taskStatus = TaskStatus.createTaskStatus(isMap);
taskStatus.setProgress(0.5f);
taskStatus.setRunState(TaskStatus.State.RUNNING);
+ if (jobConf.getMapSpeculativeExecution()) {
+ //resetting of the hasSpeculativeMap is done
+ //when speculative map is scheduled by the job.
+ hasSpeculativeMap = true;
+ }
+ if (jobConf.getReduceSpeculativeExecution()) {
+ //resetting of the hasSpeculativeReduce is done
+ //when speculative reduce is scheduled by the job.
+ hasSpeculativeReduce = true;
+ }
}
@Override
@@ -561,13 +629,13 @@
// represents a fake queue configuration info
static class FakeQueueInfo {
String queueName;
- float gc;
+ float capacity;
boolean supportsPrio;
int ulMin;
- public FakeQueueInfo(String queueName, float gc, boolean supportsPrio, int ulMin) {
+ public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) {
this.queueName = queueName;
- this.gc = gc;
+ this.capacity = capacity;
this.supportsPrio = supportsPrio;
this.ulMin = ulMin;
}
@@ -597,10 +665,10 @@
}*/
public float getCapacity(String queue) {
- if(queueMap.get(queue).gc == -1) {
+ if(queueMap.get(queue).capacity == -1) {
return super.getCapacity(queue);
}
- return queueMap.get(queue).gc;
+ return queueMap.get(queue).capacity;
}
public int getMinimumUserLimitPercent(String queue) {
@@ -666,6 +734,9 @@
resConf.getQueues());
scheduler.setInitializationPoller(controlledInitializationPoller);
scheduler.setConf(conf);
+ //by default disable speculative execution.
+ conf.setMapSpeculativeExecution(false);
+ conf.setReduceSpeculativeExecution(false);
}
@Override
@@ -678,7 +749,7 @@
private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
FakeJobInProgress job =
new FakeJobInProgress(new JobID("test", ++jobCounter),
- (jobConf == null ? new JobConf() : jobConf), taskTrackerManager,
+ (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
jobConf.getUser());
job.getStatus().setRunState(state);
taskTrackerManager.submitJob(job);
@@ -852,13 +923,6 @@
return queue.toArray(new JobInProgress[0]);
}
- /*protected void submitJobs(int number, int state, int maps, int reduces)
- throws IOException {
- for (int i = 0; i < number; i++) {
- submitJob(state, maps, reduces);
- }
- }*/
-
// tests if tasks can be assinged when there are multiple jobs from a same
// user
public void testJobFinished() throws Exception {
@@ -995,7 +1059,7 @@
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- // set the gc % as 10%, so that gc will be zero initially as
+ // set the capacity % as 10%, so that capacity will be zero initially as
// the cluster capacity increase slowly.
queues.add(new FakeQueueInfo("default", 10.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 90.0f, true, 25));
@@ -1029,7 +1093,7 @@
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt5");
// now job from default should run, as it is furthest away
- // in terms of runningMaps / gc.
+ // in terms of runningMaps / capacity.
checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
verifyCapacity("1", "default");
verifyCapacity("9", "q2");
@@ -1040,7 +1104,7 @@
String schedInfo = taskTrackerManager.getQueueManager().
getSchedulerInfo(queue).toString();
assertTrue(schedInfo.contains("Map tasks\nCapacity: "
- + expectedCapacity));
+ + expectedCapacity + " slots"));
}
// test capacity transfer
@@ -1221,7 +1285,82 @@
// first in the queue
checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
}
-
+
+ /**
+ * Test to verify that high memory jobs hit user limits faster than any normal
+ * job.
+ *
+ * @throws IOException
+ */
+ public void testUserLimitsForHighMemoryJobs()
+ throws IOException {
+ taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ String[] qs = { "default" };
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
+ resConf.setFakeQueues(queues);
+ // enabled memory-based scheduling
+ // Normal job in the cluster would be 1GB maps/reduces
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // Submit one normal job to the other queue.
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(6);
+ jConf.setNumReduceTasks(6);
+ jConf.setUser("u1");
+ jConf.setQueueName("default");
+ FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+ + "6 map and 6 reduce tasks");
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(6);
+ jConf.setNumReduceTasks(6);
+ jConf.setQueueName("default");
+ jConf.setUser("u2");
+ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ // Verify that normal job takes 3 task assignments to hit user limits
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1");
+ // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
+ // are hit. So u2 should get slots
+
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+
+ // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
+ // slots. Because of high memory tasks, giving u2 another task would
+ // overflow limits. So, no more tasks should be given to anyone.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ }
+
/*
* Following is the testing strategy for testing scheduling information.
* - start capacity scheduler with two queues.
@@ -1271,21 +1410,35 @@
scheduler.assignTasks(tracker("tt1")); // heartbeat
scheduler.assignTasks(tracker("tt2")); // heartbeat
int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
- int totalReduces = taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+ int totalReduces =
+ taskTrackerManager.getClusterStatus().getMaxReduceTasks();
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
- String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo();
- String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
+ String schedulingInfo =
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ String schedulingInfo2 =
+ queueManager.getJobQueueInfo("q2").getSchedulingInfo();
String[] infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[1] , "Capacity Percentage: 50.0%");
- assertEquals(infoStrings[6] , "Capacity: " + totalMaps * 50/100);
- assertEquals(infoStrings[10] , "Capacity: " + totalReduces * 50/100);
- assertEquals(infoStrings[2] , "User Limit: 25%");
- assertEquals(infoStrings[3] , "Priority Supported: YES");
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 0");
- assertEquals(infoStrings[15] , "Number of users who have submitted jobs: 0");
+ assertEquals(infoStrings.length, 18);
+ assertEquals(infoStrings[0], "Queue configuration");
+ assertEquals(infoStrings[1], "Capacity Percentage: 50.0%");
+ assertEquals(infoStrings[2], "User Limit: 25%");
+ assertEquals(infoStrings[3], "Priority Supported: YES");
+ assertEquals(infoStrings[4], "-------------");
+ assertEquals(infoStrings[5], "Map tasks");
+ assertEquals(infoStrings[6], "Capacity: " + totalMaps * 50 / 100
+ + " slots");
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[9], "-------------");
+ assertEquals(infoStrings[10], "Reduce tasks");
+ assertEquals(infoStrings[11], "Capacity: " + totalReduces * 50 / 100
+ + " slots");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[14], "-------------");
+ assertEquals(infoStrings[15], "Job info");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 0");
+ assertEquals(infoStrings[17], "Number of users who have submitted jobs: 0");
assertEquals(schedulingInfo, schedulingInfo2);
//Testing with actual job submission.
@@ -1296,10 +1449,13 @@
infoStrings = schedulingInfo.split("\n");
//waiting job should be equal to number of jobs submitted.
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+ assertEquals(infoStrings.length, 18);
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
+ assertEquals(infoStrings[17], "Number of users who have submitted jobs: 1");
//Initalize the jobs but don't raise events
controlledInitializationPoller.selectJobsToInitialize();
@@ -1307,12 +1463,14 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
+ assertEquals(infoStrings.length, 18);
//should be previous value as nothing is scheduled because no events
//has been raised after initialization.
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
//Raise status change event so that jobs can move to running queue.
raiseStatusChangeEvents(scheduler.jobQueuesManager);
@@ -1329,10 +1487,14 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 18);
- assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
- assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[16] , "Number of Waiting Jobs: 4");
+ assertEquals(infoStrings.length, 20);
+ assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 1");
+ assertEquals(infoStrings[9], "Active users:");
+ assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[15], "Running tasks: 0");
+ assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
//assign a reduce task
Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
@@ -1341,10 +1503,16 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 20);
- assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
- assertEquals(infoStrings[13],"Running tasks: 100.0% of Capacity");
- assertEquals(infoStrings[18] , "Number of Waiting Jobs: 4");
+ assertEquals(infoStrings.length, 22);
+ assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 1");
+ assertEquals(infoStrings[9], "Active users:");
+ assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[15], "Running tasks: 1");
+ assertEquals(infoStrings[16], "Active users:");
+ assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
//Complete the job and check the running tasks count
FakeJobInProgress u1j1 = userJobs.get(0);
@@ -1357,10 +1525,12 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 4");
+ assertEquals(infoStrings.length, 18);
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 4");
//Fail a job which is initialized but not scheduled and check the count.
FakeJobInProgress u1j2 = userJobs.get(1);
@@ -1374,10 +1544,14 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 3");
+ assertEquals(infoStrings.length, 18);
+ //should be previous value as nothing is scheduled because no events
+ //has been raised after initialization.
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 3");
//Fail a job which is not initialized but is in the waiting queue.
FakeJobInProgress u1j5 = userJobs.get(4);
@@ -1392,10 +1566,14 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 2");
+ assertEquals(infoStrings.length, 18);
+ //should be previous value as nothing is scheduled because no events
+ //has been raised after initialization.
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 2");
//Raise status change events as none of the intialized jobs would be
//in running queue as we just failed the second job which was initialized
@@ -1418,10 +1596,12 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 18);
- assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
- assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[16] , "Number of Waiting Jobs: 1");
+ assertEquals(infoStrings.length, 20);
+ assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 1");
+ assertEquals(infoStrings[9], "Active users:");
+ assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[18], "Number of Waiting Jobs: 1");
//Fail the executing job
taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
@@ -1431,11 +1611,10 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 1");
-
+ assertEquals(infoStrings.length, 18);
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 1");
}
/**
@@ -1449,12 +1628,6 @@
LOG.debug("Starting the scheduler.");
taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
- // Limited TT - 1GB vmem and 512MB pmem
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
- .setTotalVirtualMemory(1 * 1024 * 1024 * 1024L);
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
- .setTotalPhysicalMemory(512 * 1024 * 1024L);
-
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1464,11 +1637,11 @@
// memory-based scheduling disabled by default.
scheduler.start();
- LOG.debug("Submit one high memory(3GB vmem, 1GBpmem) job of 1 map task "
- + "and 1 reduce task.");
+ LOG.debug("Submit one high memory job of 1 3GB map task "
+ + "and 1 1GB reduce task.");
JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
- jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
+ jConf.setMemoryForMapTask(3 * 1024L); // 3GB
+ jConf.setMemoryForReduceTask(1 * 1024L); // 1 GB
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(1);
jConf.setQueueName("default");
@@ -1483,193 +1656,59 @@
}
/**
- * Test to verify that highPmemJobs are scheduled like all other jobs when
- * physical-memory based scheduling is not enabled.
- * @throws IOException
- */
- public void testDisabledPmemBasedScheduling()
- throws IOException {
-
- LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-
- // Limited TT - 100GB vmem and 500MB pmem
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- ttStatus.setTotalVirtualMemory(100 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(500 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
-
- taskTrackerManager.addQueues(new String[] { "default" });
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
- scheduler.setTaskTrackerManager(taskTrackerManager);
- // enable vmem-based scheduling. pmem based scheduling disabled by default.
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- 1536 * 1024 * 1024L);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- 3 * 1024 * 1024 * 1024L);
- scheduler.start();
-
- LOG.debug("Submit one high pmem(3GB vmem, 1GBpmem) job of 1 map task "
- + "and 1 reduce task.");
- JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
- jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- submitJobAndInit(JobStatus.RUNNING, jConf);
-
- // assert that all tasks are launched even though they transgress the
- // scheduling limits.
-
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
- }
-
- /**
- * Test HighMemoryJobs.
- * @throws IOException
- */
- public void testHighMemoryJobs()
- throws IOException {
-
- LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
- // Normal job on this TT would be 1.5GB vmem, 0.5GB pmem
-
- taskTrackerManager.addQueues(new String[] { "default" });
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setTaskTrackerManager(taskTrackerManager);
- // enabled memory-based scheduling
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- 1536 * 1024 * 1024L);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- 3 * 1024 * 1024 * 1024L);
- resConf.setDefaultPercentOfPmemInVmem(33.3f);
- resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- LOG.debug("Submit one high memory(1600MB vmem, 400MB pmem) job of "
- + "1 map task and 1 reduce task.");
- JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(1600 * 1024 * 1024L); // 1.6GB vmem
- jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-
- // No more tasks of this job can run on the TT because of lack of vmem
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- // Let attempt_test_0001_m_000001_0 finish, task assignment should succeed.
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-
- LOG.debug("Submit another high memory(1200MB vmem, 800MB pmem) job of "
- + "1 map task and 0 reduces.");
- jConf.setMaxVirtualMemoryForTask(1200 * 1024 * 1024L);
- jConf.setMaxPhysicalMemoryForTask(800 * 1024 * 1024L);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(0);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- submitJobAndInit(JobStatus.PREP, jConf); // job2
-
- // This job shouldn't run the TT now because of lack of pmem
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- // Let attempt_test_0001_m_000002_0 finish, task assignment should succeed.
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job1);
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-
- LOG.debug("Submit a normal memory(200MB vmem, 100MB pmem) job of "
- + "0 maps and 1 reduce task.");
- jConf.setMaxVirtualMemoryForTask(200 * 1024 * 1024L);
- jConf.setMaxPhysicalMemoryForTask(100 * 1024 * 1024L);
- jConf.setNumMapTasks(0);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- submitJobAndInit(JobStatus.PREP, jConf); // job3
-
- checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
- }
-
- /**
- * Test HADOOP-4979.
- * Bug fix for making sure we always return null to TT if there is a
- * high-mem job, and not look at reduce jobs (if map tasks are high-mem)
- * or vice-versa.
+ * Test reverting HADOOP-4979. If there is a high-mem job, we should now look
+ * at reduce jobs (if map tasks are high-mem) or vice-versa.
+ *
* @throws IOException
*/
- public void testHighMemoryBlocking()
+ public void testHighMemoryBlockingAcrossTaskTypes()
throws IOException {
// 2 map and 1 reduce slots
taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
- // Normal job on this TT would be 1GB vmem, 0.5GB pmem
-
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- 1 * 1024 * 1024 * 1024L);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- 3 * 1024 * 1024 * 1024L);
- resConf.setDefaultPercentOfPmemInVmem(33.3f);
- resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+ // Normal job in the cluster would be 1GB maps/reduces
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
- // We need a situation where the scheduler needs to run a map task,
- // but the available one has a high-mem requirement. There should
- // be another job whose maps or reduces can run, but they shouldn't
- // be scheduled.
+ // The situation : Two jobs in the queue. First job with only maps and no
+ // reduces and is a high memory job. Second job is a normal job with both
+ // maps and reduces.
+ // First job cannot run for want of memory for maps. In this case, second
+ // job's reduces should run.
- LOG.debug("Submit one high memory(2GB vmem, 400MB pmem) job of "
+ LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+ "2 map tasks");
- JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
- jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(0);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- LOG.debug("Submit another regular memory(900MB vmem, 200MB pmem) job of "
+
+ LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
+ "2 map/red tasks");
- jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(900 * 1024 * 1024L); // 900MB vmem
- jConf.setMaxPhysicalMemoryForTask(200 * 1024 * 1024L); // 200MB pmem
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
@@ -1678,76 +1717,17 @@
// first, a map from j1 will run
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- // at this point, the scheduler tries to schedule another map from j1.
- // there isn't enough space. There is space to run the second job's
- // map or reduce task, but they shouldn't be scheduled
- assertNull(scheduler.assignTasks(tracker("tt1")));
- }
-
- /**
- * test invalid highMemoryJobs
- * @throws IOException
- */
- public void testHighMemoryJobWithInvalidRequirements()
- throws IOException {
- LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024);
- ttStatus.setReservedPhysicalMemory(0);
+ // Total 2 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 100.0f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
- taskTrackerManager.addQueues(new String[] { "default" });
- resConf.setFakeQueues(queues);
- scheduler.setTaskTrackerManager(taskTrackerManager);
- // enabled memory-based scheduling
- long vmemUpperLimit = 1 * 1024 * 1024 * 1024L;
- long vmemDefault = 1536 * 1024 * 1024L;
- long pmemUpperLimit = vmemUpperLimit;
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- vmemDefault);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- vmemUpperLimit);
- resConf.setDefaultPercentOfPmemInVmem(33.3f);
- resConf.setLimitMaxPmemForTasks(pmemUpperLimit);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of "
- + "1 map, 0 reduce tasks.");
- long jobMaxVmem = 5 * 1024 * 1024 * 1024L;
- long jobMaxPmem = 3 * 1024 * 1024 * 1024L;
- JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(jobMaxVmem);
- jConf.setMaxPhysicalMemoryForTask(jobMaxPmem);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(0);
- jConf.setQueueName("default");
- jConf.setUser("u1");
-
- boolean throwsException = false;
- String msg = null;
- FakeJobInProgress job;
- try {
- job = submitJob(JobStatus.PREP, jConf);
- } catch (IOException ioe) {
- // job has to fail
- throwsException = true;
- msg = ioe.getMessage();
- }
-
- assertTrue(throwsException);
- job = (FakeJobInProgress) taskTrackerManager.getJobs().toArray()[0];
- assertTrue(msg.matches(job.getJobID() + " \\(" + jobMaxVmem + "vmem, "
- + jobMaxPmem + "pmem\\) exceeds the cluster's max-memory-limits \\("
- + vmemUpperLimit + "vmem, " + pmemUpperLimit
- + "pmem\\). Cannot run in this cluster, so killing it."));
- // For job, no cleanup task needed so gets killed immediately.
- assertTrue(job.getStatus().getRunState() == JobStatus.KILLED);
+ // at this point, the scheduler tries to schedule another map from j1.
+ // there isn't enough space. The second job's reduce should be scheduled.
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ // Total 1 reduce slot should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+ 100.0f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
}
/**
@@ -1758,13 +1738,7 @@
throws IOException {
LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
+ taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1772,68 +1746,103 @@
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- 1536 * 1024 * 1024L);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- 4 * 1024 * 1024 * 1024L);
- resConf.setDefaultPercentOfPmemInVmem(33.3f);
- resConf.setLimitMaxPmemForTasks(2 * 1024 * 1024 * 1024L);
+ // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
- LOG.debug("Submit one high memory(4GB vmem, 512MB pmem) job of "
- + "1 map, 0 reduce tasks.");
- JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(4 * 1024 * 1024 * 1024L);
- jConf.setMaxPhysicalMemoryForTask(512 * 1024 * 1024L);
+ LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
+ + "1 map, 1 reduce tasks.");
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(0);
+ jConf.setNumReduceTasks(1);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- // TTs should not run these jobs i.e. cluster blocked because of lack of
- // vmem
- assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- // Job should still be alive
- assertTrue(job1.getStatus().getRunState() == JobStatus.RUNNING);
- LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
- // Use cluster-wide defaults
- jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
- jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+ // Fill the second tt with this job.
+ checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+ // Total 1 map slot should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 25.0f);
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 0, 0),
+ (String) job1.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
+ checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+ // Total 1 map slot should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+ 25.0f);
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 1, 1),
+ (String) job1.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+
+ LOG.debug("Submit one high memory(2GB maps/reduces) job of "
+ + "2 map, 2 reduce tasks.");
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
- // cluster should still be blocked for job1 and so even job2 should not run
- // even though it is a normal job
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- scheduler.taskTrackerManager.killJob(job2.getJobID());
- scheduler.taskTrackerManager.killJob(job1.getJobID());
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ // Total 3 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 0, 0),
+ (String) job2.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ // Total 3 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+ 75.0f);
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+ (String) job2.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
- LOG.debug("Submit one high memory(2GB vmem, 2GB pmem) job of "
+ LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
+ "1 map, 0 reduce tasks.");
- jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L);
- jConf.setMaxPhysicalMemoryForTask(2 * 1024 * 1024 * 1024L);
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(1);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
- // TTs should not run these jobs i.e. cluster blocked because of lack of
- // pmem now.
- assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- // Job should still be alive
- assertTrue(job3.getStatus().getRunState() == JobStatus.RUNNING);
-
- LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
- // Use cluster-wide defaults
- jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
- jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
- submitJobAndInit(JobStatus.PREP, jConf); // job4
- // cluster should still be blocked for job3 and so even job4 should not run
- // even though it is a normal job
+ // Job2 cannot fit on tt2 or tt1. Blocking. Job3 also will not run.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertNull(scheduler.assignTasks(tracker("tt2")));
assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+ 75.0f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
+ checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+ (String) job2.getSchedulingInfo());
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 0, 0),
+ (String) job3.getSchedulingInfo());
}
/**
@@ -1847,13 +1856,6 @@
// create a cluster with a single node.
LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots");
taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- LOG.debug("Assume TT has 4 GB virtual mem and 2 GB RAM");
- ttStatus.setTotalVirtualMemory(4 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(2 * 1024 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
// create scheduler
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1862,14 +1864,17 @@
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
- LOG.debug("By default, jobs get 0.5 GB per task vmem" +
- " and 2 GB max vmem, with 50% of it for RAM");
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- 512 * 1024 * 1024L);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- 2 * 1024 * 1024 * 1024L);
- resConf.setDefaultPercentOfPmemInVmem(50.0f);
- resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
+ LOG.debug("Assume TT has 2GB for maps and 2GB for reduces");
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024L);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 512);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 2 * 1024L);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 512);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1878,15 +1883,31 @@
JobConf jConf = new JobConf();
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
+ jConf.setMemoryForMapTask(512);
+ jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
// 1st cycle - 1 map gets assigned.
Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // Total 1 map slot should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
+ checkMemReservedForTasksOnTT("tt1", 512L, 0L);
+
+ // 1st cycle of reduces - 1 reduce gets assigned.
+ Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ // Total 1 reduce slot should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+ 50.0f);
+ checkMemReservedForTasksOnTT("tt1", 512L, 512L);
// kill this job !
taskTrackerManager.killJob(job1.getJobID());
+ // No more map/reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 0, 0, 0.0f);
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 0, 0,
+ 0.0f);
// retire the job
taskTrackerManager.removeJob(job1.getJobID());
@@ -1896,6 +1917,8 @@
jConf = new JobConf();
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(1);
+ jConf.setMemoryForMapTask(512);
+ jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
@@ -1903,30 +1926,30 @@
// 2nd cycle - nothing should get assigned. Memory matching code
// will see the job is missing and fail memory requirements.
assertNull(scheduler.assignTasks(tracker("tt1")));
+ checkMemReservedForTasksOnTT("tt1", null, null);
+
// calling again should not make a difference, as the task is still running
assertNull(scheduler.assignTasks(tracker("tt1")));
+ checkMemReservedForTasksOnTT("tt1", null, null);
- // finish the task on the tracker.
+ // finish the tasks on the tracker.
taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
+ taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
+
// now a new task can be assigned.
t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ // Total 1 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
+ checkMemReservedForTasksOnTT("tt1", 512L, 0L);
+
// reduce can be assigned.
t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ // Total 1 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+ 50.0f);
+ checkMemReservedForTasksOnTT("tt1", 512L, 512L);
}
-
- protected TaskTrackerStatus tracker(String taskTrackerName) {
- return taskTrackerManager.getTaskTracker(taskTrackerName);
- }
-
- protected Task checkAssignment(String taskTrackerName,
- String expectedTaskString) throws IOException {
- List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
- assertNotNull(expectedTaskString, tasks);
- assertEquals(expectedTaskString, 1, tasks.size());
- assertEquals(expectedTaskString, tasks.get(0).toString());
- return tasks.get(0);
- }
-
+
/*
* Test cases for Job Initialization poller.
*/
@@ -2133,10 +2156,9 @@
checkFailedInitializedJobMovement();
// Check failed waiting job movement
- checkFailedWaitingJobMovement();
-
+ checkFailedWaitingJobMovement();
}
-
+
public void testStartWithoutDefaultQueueConfigured() throws Exception {
//configure a single queue which is not default queue
String[] qs = {"q1"};
@@ -2188,6 +2210,382 @@
assertFalse("Waiting job contains submitted job",
mgr.getRunningJobQueue("default").contains(job));
}
+
+ /**
+ * Test case deals with normal jobs which have speculative maps and reduce.
+ * Following is test executed
+ * <ol>
+ * <li>Submit one job with speculative maps and reduce.</li>
+ * <li>Submit another job with no speculative execution.</li>
+ * <li>Observe that all tasks from first job get scheduled, speculative
+ * and normal tasks</li>
+ * <li>Finish all the first jobs tasks second jobs tasks get scheduled.</li>
+ * </ol>
+ * @throws IOException
+ */
+ public void testSpeculativeTaskScheduling() throws IOException {
+ String[] qs = {"default"};
+ taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
+ JobConf conf = new JobConf();
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(1);
+ conf.setMapSpeculativeExecution(true);
+ conf.setReduceSpeculativeExecution(true);
+ //Submit a job which would have one speculative map and one speculative
+ //reduce.
+ FakeJobInProgress fjob1 = submitJob(JobStatus.PREP, conf);
+
+ conf = new JobConf();
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(1);
+ //Submit a job which has no speculative map or reduce.
+ FakeJobInProgress fjob2 = submitJob(JobStatus.PREP, conf);
+
+ //Ask the poller to initalize all the submitted job and raise status
+ //change event.
+ controlledInitializationPoller.selectJobsToInitialize();
+ raiseStatusChangeEvents(mgr);
+
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ assertTrue("Pending maps of job1 greater than zero",
+ (fjob1.pendingMaps() == 0));
+ checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+ checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ assertTrue("Pending reduces of job2 greater than zero",
+ (fjob1.pendingReduces() == 0));
+ checkAssignment("tt2", "attempt_test_0001_r_000001_1 on tt2");
+
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", fjob1);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", fjob1);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", fjob1);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_1", fjob1);
+ taskTrackerManager.finalizeJob(fjob1);
+
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2);
+ taskTrackerManager.finalizeJob(fjob2);
+ }
+
+ /**
+ * Test case to test scheduling of
+ * <ol>
+ * <li>High ram job with speculative map execution.
+ * <ul>
+ * <li>Submit one high ram job which has speculative map.</li>
+ * <li>Submit a normal job which has no speculative map.</li>
+ * <li>Scheduler should schedule first all map tasks from first job and block
+ * the cluster till both maps from first job get completed.
+ * </ul>
+ * </li>
+ * <li>High ram job with speculative reduce execution.
+ * <ul>
+ * <li>Submit one high ram job which has speculative reduce.</li>
+ * <li>Submit a normal job which has no speculative reduce.</li>
+ * <li>Scheduler should schedule first all reduce tasks from first job and
+ * block the cluster till both reduces are completed.</li>
+ * </ul>
+ * </li>
+ * </ol>
+ * @throws IOException
+ */
+ public void testHighRamJobWithSpeculativeExecution() throws IOException {
+ // 2 TTs, 3 map and 3 reduce slots on each TT
+ taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3);
+
+ taskTrackerManager.addQueues(new String[] { "default" });
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ // enabled memory-based scheduling
+ // 1GB for each map, 1GB for each reduce
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 3 * 1024L);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 3 * 1024L);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024L);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ //Submit a high memory job with speculative tasks.
+ JobConf jConf = new JobConf();
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(0);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(0);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ jConf.setMapSpeculativeExecution(true);
+ jConf.setReduceSpeculativeExecution(false);
+ FakeJobInProgress job1 =
+ new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
+ taskTrackerManager, "u1");
+ taskTrackerManager.submitJob(job1);
+
+ //Submit normal job
+ jConf = new JobConf();
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(0);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(0);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ jConf.setMapSpeculativeExecution(false);
+ jConf.setReduceSpeculativeExecution(false);
+ FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
+
+ controlledInitializationPoller.selectJobsToInitialize();
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
+
+ // first, a map from j1 will run
+ // at this point, there is a speculative task for the same job to be
+ //scheduled. This task would be scheduled. Till the tasks from job1 gets
+ //complete none of the tasks from other jobs would be scheduled.
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+ assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
+ // Total 2 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+
+ //make same tracker get back, check if you are blocking. Your job
+ //has speculative map task so tracker should be blocked even tho' it
+ //can run job2's map.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ // Total 2 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
+ //TT2 now gets speculative map of the job1
+ checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+ // Total 4 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 4, 66.7f);
+ checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
+
+ // Now since the first job has no more speculative maps, it can schedule
+ // the second job.
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ // Total 5 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 5, 83.3f);
+ checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L);
+
+ //finish everything
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0",
+ job1);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1",
+ job1);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0",
+ job2);
+ taskTrackerManager.finalizeJob(job1);
+ taskTrackerManager.finalizeJob(job2);
+
+ //Now submit high ram job with speculative reduce and check.
+ jConf = new JobConf();
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(2 * 1024L);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(1);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ jConf.setMapSpeculativeExecution(false);
+ jConf.setReduceSpeculativeExecution(true);
+ FakeJobInProgress job3 =
+ new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
+ taskTrackerManager, "u1");
+ taskTrackerManager.submitJob(job3);
+
+ //Submit normal job w.r.t reduces
+ jConf = new JobConf();
+ jConf.setMemoryForMapTask(1 * 1024L);
+ jConf.setMemoryForReduceTask(1 * 1024L);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(1);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ jConf.setMapSpeculativeExecution(false);
+ jConf.setReduceSpeculativeExecution(false);
+ FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
+
+ controlledInitializationPoller.selectJobsToInitialize();
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
+
+ // Finish up the map scheduler
+ checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+ // Total 2 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
+ checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
+ // Total 3 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 50.0f);
+ checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
+
+ // first, a reduce from j3 will run
+ // at this point, there is a speculative task for the same job to be
+ //scheduled. This task would be scheduled. Till the tasks from job3 gets
+ //complete none of the tasks from other jobs would be scheduled.
+ checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
+ assertEquals("pending reduces greater than zero ", job3.pendingReduces(),
+ 0);
+ // Total 2 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
+ 33.3f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L);
+
+ //make same tracker get back, check if you are blocking. Your job
+ //has speculative reduce task so tracker should be blocked even tho' it
+ //can run job4's reduce.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ // Total 2 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
+ 33.3f);
+
+ //TT2 now gets speculative reduce of the job3
+ checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
+ // Total 4 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 4,
+ 66.7f);
+ checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 2 * 1024L);
+
+ // Now since j3 has no more speculative reduces, it can schedule
+ // the j4.
+ checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
+ // Total 5 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 5,
+ 83.3f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L);
+ }
+
+ /**
+ * Test to verify that queue ordering is based on the number of slots occupied
+ * and hence to verify that presence of high memory jobs is reflected properly
+ * while determining used capacities of queues and hence the queue ordering.
+ *
+ * @throws IOException
+ */
+ public void testQueueOrdering()
+ throws IOException {
+ taskTrackerManager = new FakeTaskTrackerManager(2, 6, 6);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ String[] qs = { "default", "q1" };
+ String[] reversedQs = { qs[1], qs[0] };
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 100));
+ queues.add(new FakeQueueInfo("q1", 50.0f, true, 100));
+ resConf.setFakeQueues(queues);
+ // enabled memory-based scheduling
+ // Normal job in the cluster would be 1GB maps/reduces
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+ + "6 map and 6 reduce tasks");
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(6);
+ jConf.setNumReduceTasks(6);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ // Submit a normal job to the other queue.
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(6);
+ jConf.setNumReduceTasks(6);
+ jConf.setUser("u1");
+ jConf.setQueueName("q1");
+ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ // Map 1 of high memory job
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkQueuesOrder(qs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 1 of high memory job
+ checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ checkQueuesOrder(qs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+ // Map 1 of normal job
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 1 of normal job
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+ // Map 2 of normal job
+ checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 2 of normal job
+ checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+ // Now both the queues are equally served. But the comparator doesn't change
+ // the order if queues are equally served.
+
+ // Map 3 of normal job
+ checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 3 of normal job
+ checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+ // Map 2 of high memory job
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ checkQueuesOrder(qs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 2 of high memory job
+ checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+ checkQueuesOrder(qs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+ // Map 4 of normal job
+ checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 4 of normal job
+ checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+ }
private void checkRunningJobMovementAndCompletion() throws IOException {
@@ -2332,6 +2730,89 @@
userJobs.put(user, jips);
}
return userJobs;
+ }
+
+
+ protected TaskTrackerStatus tracker(String taskTrackerName) {
+ return taskTrackerManager.getTaskTracker(taskTrackerName);
+ }
+
+ protected Task checkAssignment(String taskTrackerName,
+ String expectedTaskString) throws IOException {
+ List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+ assertNotNull(expectedTaskString, tasks);
+ assertEquals(expectedTaskString, 1, tasks.size());
+ assertEquals(expectedTaskString, tasks.get(0).toString());
+ return tasks.get(0);
+ }
+ /**
+ * Get the amount of memory that is reserved for tasks on the taskTracker and
+ * verify that it matches what is expected.
+ *
+ * @param taskTracker
+ * @param expectedMemForMapsOnTT
+ * @param expectedMemForReducesOnTT
+ */
+ private void checkMemReservedForTasksOnTT(String taskTracker,
+ Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
+ Long observedMemForMapsOnTT =
+ scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+ CapacityTaskScheduler.TYPE.MAP);
+ Long observedMemForReducesOnTT =
+ scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+ CapacityTaskScheduler.TYPE.REDUCE);
+ if (expectedMemForMapsOnTT == null) {
+ assertTrue(observedMemForMapsOnTT == null);
+ } else {
+ assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+ }
+ if (expectedMemForReducesOnTT == null) {
+ assertTrue(observedMemForReducesOnTT == null);
+ } else {
+ assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+ }
+ }
+
+ /**
+ * Verify the number of slots of type 'type' from the queue 'queue'.
+ *
+ * @param queue
+ * @param type
+ * @param numActiveUsers in the queue at present.
+ * @param expectedOccupiedSlots
+ * @param expectedOccupiedSlotsPercent
+ * @return
+ */
+ private void checkOccupiedSlots(String queue,
+ CapacityTaskScheduler.TYPE type, int numActiveUsers,
+ int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
+ scheduler.updateQSIInfoForTests();
+ QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
+ String schedulingInfo =
+ queueManager.getJobQueueInfo(queue).getSchedulingInfo();
+ String[] infoStrings = schedulingInfo.split("\n");
+ int index = -1;
+ if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+ index = 7;
+ } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+ index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
+ }
+ LOG.info(infoStrings[index]);
+ assertEquals(String.format("Used capacity: %d (%.1f%% of Capacity)",
+ expectedOccupiedSlots, expectedOccupiedSlotsPercent),
+ infoStrings[index]);
+ }
+
+ private void checkQueuesOrder(String[] expectedOrder, String[] observedOrder) {
+ assertTrue("Observed and expected queues are not of same length.",
+ expectedOrder.length == observedOrder.length);
+ int i = 0;
+ for (String expectedQ : expectedOrder) {
+ assertTrue("Observed and expected queues are not in the same order. "
+ + "Differ at index " + i + ". Got " + observedOrder[i]
+ + " instead of " + expectedQ, expectedQ.equals(observedOrder[i]));
+ i++;
+ }
}
}
Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/data_join/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/data_join:713112
+/hadoop/core/trunk/src/contrib/data_join:776175-786373
Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/dynamic-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.19/src/contrib/dynamic-scheduler:713112
-/hadoop/core/trunk/src/contrib/dynamic-scheduler:784975-785643
+/hadoop/core/trunk/src/contrib/dynamic-scheduler:784975-786373
Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/fairscheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/fairscheduler:713112
+/hadoop/core/trunk/src/contrib/fairscheduler:776175-786373
Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/index/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112
+/hadoop/core/trunk/src/contrib/index:776175-786373
Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/mrunit/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/mrunit:713112
+/hadoop/core/trunk/src/contrib/mrunit:776175-786373
Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/sqoop/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.19/src/contrib/sqoop:713112
-/hadoop/core/trunk/src/contrib/sqoop:784975-785643
+/hadoop/core/trunk/src/contrib/sqoop:784975-786373
Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/streaming/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/streaming:713112
-/hadoop/core/trunk/src/contrib/streaming:776175-785643
+/hadoop/core/trunk/src/contrib/streaming:776175-786373
Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/vaidya/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/vaidya:713112
-/hadoop/core/trunk/src/contrib/vaidya:776175-785643
+/hadoop/core/trunk/src/contrib/vaidya:776175-786373
|