Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 81905 invoked from network); 20 Oct 2008 23:53:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Oct 2008 23:53:47 -0000 Received: (qmail 86363 invoked by uid 500); 20 Oct 2008 23:53:49 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 86331 invoked by uid 500); 20 Oct 2008 23:53:49 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 86322 invoked by uid 99); 20 Oct 2008 23:53:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Oct 2008 16:53:49 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Oct 2008 23:52:39 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E5254238887D; Mon, 20 Oct 2008 16:53:17 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r706462 - in /hadoop/core/branches/branch-0.19: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/... Date: Mon, 20 Oct 2008 23:53:17 -0000 To: core-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081020235317.E5254238887D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Mon Oct 20 16:53:17 2008 New Revision: 706462 URL: http://svn.apache.org/viewvc?rev=706462&view=rev Log: HADOOP-4149. Fix handling of updates to the job priority, by changing the list of jobs to be keyed by the priority, submit time, and job tracker id. (Amar Kamat via omalley) Merge of -r 706460:706461 from trunk to branch 0.19. Modified: hadoop/core/branches/branch-0.19/CHANGES.txt hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Modified: hadoop/core/branches/branch-0.19/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=706462&r1=706461&r2=706462&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/CHANGES.txt (original) +++ hadoop/core/branches/branch-0.19/CHANGES.txt Mon Oct 20 16:53:17 2008 @@ -930,6 +930,10 @@ HADOOP-4404. saveFSImage() removes files from a storage directory that do not correspond to its type. (shv) + HADOOP-4149. Fix handling of updates to the job priority, by changing the + list of jobs to be keyed by the priority, submit time, and job tracker id. + (Amar Kamat via omalley) + Release 0.18.2 - Unreleased BUG FIXES Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=706462&r1=706461&r2=706462&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original) +++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Mon Oct 20 16:53:17 2008 @@ -20,13 +20,13 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.Map; -import java.util.TreeSet; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo; import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType; /** @@ -45,35 +45,17 @@ * is ahead in the queue, so insertion should be at the tail. */ - // comparator for jobs in queues that support priorities - private static final Comparator PRIORITY_JOB_COMPARATOR - = new Comparator() { - public int compare(JobInProgress o1, JobInProgress o2) { - // Look at priority. - int res = o1.getPriority().compareTo(o2.getPriority()); - if (res == 0) { - // the job that started earlier wins - if (o1.getStartTime() < o2.getStartTime()) { - res = -1; - } else { - res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1); - } - } - if (res == 0) { - res = o1.getJobID().compareTo(o2.getJobID()); - } - return res; - } - }; // comparator for jobs in queues that don't support priorities - private static final Comparator STARTTIME_JOB_COMPARATOR - = new Comparator() { - public int compare(JobInProgress o1, JobInProgress o2) { + private static final Comparator STARTTIME_JOB_COMPARATOR + = new Comparator() { + public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) { // the job that started earlier wins if (o1.getStartTime() < o2.getStartTime()) { return -1; } else { - return (o1.getStartTime() == o2.getStartTime() ? 0 : 1); + return (o1.getStartTime() == o2.getStartTime() + ? o1.getJobID().compareTo(o2.getJobID()) + : 1); } } }; @@ -83,40 +65,27 @@ // whether the queue supports priorities boolean supportsPriorities; - // maintain separate collections of running & waiting jobs. This we do + // maintain separate structures for running & waiting jobs. This we do // mainly because when a new job is added, it cannot superceede a running // job, even though the latter may be a lower priority. If this is ever // changed, we may get by with one collection. - Collection waitingJobs; + Map waitingJobs; Collection runningJobs; QueueInfo(boolean prio) { this.supportsPriorities = prio; if (supportsPriorities) { - this.waitingJobs = new TreeSet(PRIORITY_JOB_COMPARATOR); + // use the default priority-aware comparator + this.waitingJobs = + new TreeMap( + JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR); } else { - this.waitingJobs = new TreeSet(STARTTIME_JOB_COMPARATOR); + this.waitingJobs = + new TreeMap(STARTTIME_JOB_COMPARATOR); } this.runningJobs = new LinkedList(); } - - /** - * we need to delete an object from our TreeSet based on referential - * equality, rather than value equality that the TreeSet uses. - * Another way to do this is to extend the TreeSet and override remove(). - */ - static private boolean removeOb(Collection c, Object o) { - Iterator i = c.iterator(); - while (i.hasNext()) { - if (i.next() == o) { - i.remove(); - return true; - } - } - return false; - } - } // we maintain a hashmap of queue-names to queue info @@ -150,7 +119,7 @@ * Returns the queue of waiting jobs associated with the name */ public Collection getWaitingJobQueue(String queueName) { - return jobQueues.get(queueName).waitingJobs; + return jobQueues.get(queueName).waitingJobs.values(); } @Override @@ -167,19 +136,18 @@ } // add job to waiting queue. It will end up in the right place, // based on priority. - // We use our own version of removing objects based on referential - // equality, since the 'job' object has already been changed. - qi.waitingJobs.add(job); + qi.waitingJobs.put(new JobSchedulingInfo(job), job); // let scheduler know. scheduler.jobAdded(job); } - private void jobCompleted(JobInProgress job, QueueInfo qi) { + private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo, + QueueInfo qi) { LOG.info("Job " + job.getJobID().toString() + " submitted to queue " + job.getProfile().getQueueName() + " has completed"); // job could be in running or waiting queue if (!qi.runningJobs.remove(job)) { - QueueInfo.removeOb(qi.waitingJobs, job); + qi.waitingJobs.remove(oldInfo); } // let scheduler know scheduler.jobCompleted(job); @@ -191,23 +159,24 @@ // This is used to reposition a job in the queue. A job can get repositioned // because of the change in the job priority or job start-time. - private void reorderJobs(JobInProgress job, QueueInfo qi) { - Collection queue = qi.waitingJobs; + private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, + QueueInfo qi) { - // Remove from the waiting queue - if (!QueueInfo.removeOb(queue, job)) { - queue = qi.runningJobs; - QueueInfo.removeOb(queue, job); + if (qi.waitingJobs.remove(oldInfo) == null) { + qi.runningJobs.remove(job); + // Add back to the running queue + qi.runningJobs.add(job); + } else { + // Add back to the waiting queue + qi.waitingJobs.put(new JobSchedulingInfo(job), job); } - - // Add back to the queue - queue.add(job); } // This is used to move a job from the waiting queue to the running queue. - private void makeJobRunning(JobInProgress job, QueueInfo qi) { + private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo, + QueueInfo qi) { // Remove from the waiting queue - QueueInfo.removeOb(qi.waitingJobs, job); + qi.waitingJobs.remove(oldInfo); // Add the job to the running queue qi.runningJobs.add(job); @@ -216,21 +185,23 @@ // Update the scheduler as job's state has changed private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) { JobInProgress job = event.getJobInProgress(); + JobSchedulingInfo oldJobStateInfo = + new JobSchedulingInfo(event.getOldStatus()); // Check if the ordering of the job has changed // For now priority and start-time can change the job ordering if (event.getEventType() == EventType.PRIORITY_CHANGED || event.getEventType() == EventType.START_TIME_CHANGED) { // Make a priority change - reorderJobs(job, qi); + reorderJobs(job, oldJobStateInfo, qi); } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) { // Check if the job is complete int runState = job.getStatus().getRunState(); if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED) { - jobCompleted(job, qi); + jobCompleted(job, oldJobStateInfo, qi); } else if (runState == JobStatus.RUNNING) { - makeJobRunning(job, qi); + makeJobRunning(job, oldJobStateInfo, qi); } } } Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=706462&r1=706461&r2=706462&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original) +++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Mon Oct 20 16:53:17 2008 @@ -57,8 +57,9 @@ super(jId, jobConf); this.taskTrackerManager = taskTrackerManager; this.startTime = System.currentTimeMillis(); - this.status = new JobStatus(); - this.status.setRunState(JobStatus.PREP); + this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP); + this.status.setJobPriority(JobPriority.NORMAL); + this.status.setStartTime(startTime); if (null == jobConf.getQueueName()) { this.profile = new JobProfile(user, jId, null, null, null); @@ -316,6 +317,23 @@ } } + public void setStartTime(FakeJobInProgress fjob, long start) { + // take a snapshot of the status before changing it + JobStatus oldStatus = (JobStatus)fjob.getStatus().clone(); + + fjob.startTime = start; // change the start time of the job + fjob.status.setStartTime(start); // change the start time of the jobstatus + + JobStatus newStatus = (JobStatus)fjob.getStatus().clone(); + + JobStatusChangeEvent event = + new JobStatusChangeEvent (fjob, EventType.START_TIME_CHANGED, oldStatus, + newStatus); + for (JobInProgressListener listener : listeners) { + listener.jobUpdated(event); + } + } + void addQueues(String[] arr) { Set queues = new HashSet(); for (String s: arr) { @@ -481,18 +499,44 @@ submitJob(JobStatus.PREP, 1, 0, "default", "user"); // check if the job is in the waiting queue - assertTrue("Waiting queue doesnt contain queued job", - scheduler.jobQueuesManager.getWaitingJobQueue("default") - .contains(fjob1)); - - // change the job priority - taskTrackerManager.setPriority(fjob2, JobPriority.HIGH); + JobInProgress[] jobs = + scheduler.jobQueuesManager.getWaitingJobQueue("default") + .toArray(new JobInProgress[0]); + assertTrue("Waiting queue doesnt contain queued job #1 in right order", + jobs[0].getJobID().equals(fjob1.getJobID())); + assertTrue("Waiting queue doesnt contain queued job #2 in right order", + jobs[1].getJobID().equals(fjob2.getJobID())); + + // I. Check the start-time change + // Change job2 start-time and check if job2 bumps up in the queue + taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1); + + jobs = scheduler.jobQueuesManager.getWaitingJobQueue("default") + .toArray(new JobInProgress[0]); + assertTrue("Start time change didnt not work as expected for job #2", + jobs[0].getJobID().equals(fjob2.getJobID())); + assertTrue("Start time change didnt not work as expected for job #1", + jobs[1].getJobID().equals(fjob1.getJobID())); + + // check if the queue is fine + assertEquals("Start-time change garbled the waiting queue", + 2, scheduler.getJobs("default").size()); + + // II. Change job priority change + // Bump up job1's priority and make sure job1 bumps up in the queue + taskTrackerManager.setPriority(fjob1, JobPriority.HIGH); // Check if the priority changes are reflected - JobInProgress firstJob = - scheduler.getJobs("default").toArray(new JobInProgress[0])[0]; - assertTrue("Priority change didnt not work as expected", - firstJob.getJobID().equals(fjob2.getJobID())); + jobs = scheduler.jobQueuesManager.getWaitingJobQueue("default") + .toArray(new JobInProgress[0]); + assertTrue("Priority change didnt not work as expected for job #1", + jobs[0].getJobID().equals(fjob1.getJobID())); + assertTrue("Priority change didnt not work as expected for job #2", + jobs[1].getJobID().equals(fjob2.getJobID())); + + // check if the queue is fine + assertEquals("Priority change has garbled the waiting queue", + 2, scheduler.getJobs("default").size()); // Create an event JobChangeEvent event = initTasksAndReportEvent(fjob1); @@ -500,10 +544,26 @@ // inform the scheduler scheduler.jobQueuesManager.jobUpdated(event); + // waiting queue + Collection wqueue = + scheduler.jobQueuesManager.getWaitingJobQueue("default"); + + // check if the job is not in the waiting queue + assertFalse("Waiting queue contains running/inited job", + wqueue.contains(fjob1)); + + // check if the waiting queue is fine + assertEquals("Waiting queue is garbled on job init", 1, wqueue.size()); + + Collection rqueue = + scheduler.jobQueuesManager.getRunningJobQueue("default"); + // check if the job is in the running queue assertTrue("Running queue doesnt contain running/inited job", - scheduler.jobQueuesManager.getRunningJobQueue("default") - .contains(fjob1)); + rqueue.contains(fjob1)); + + // check if the running queue is fine + assertEquals("Running queue is garbled upon init", 1, rqueue.size()); // schedule a task List tasks = scheduler.assignTasks(tracker("tt1")); @@ -515,9 +575,16 @@ // mark the job as complete taskTrackerManager.finalizeJob(fjob1); + rqueue = scheduler.jobQueuesManager.getRunningJobQueue("default"); + // check if the job is removed from the scheduler assertFalse("Scheduler contains completed job", - scheduler.getJobs("default").contains(fjob1)); + rqueue.contains(fjob1)); + + // check if the running queue size is correct + assertEquals("Job finish garbles the queue", + 0, rqueue.size()); + } /*protected void submitJobs(int number, int state, int maps, int reduces) Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=706462&r1=706461&r2=706462&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Oct 20 16:53:17 2008 @@ -198,6 +198,7 @@ this.jobtracker = jobtracker; this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP); this.startTime = System.currentTimeMillis(); + status.setStartTime(startTime); this.localFs = FileSystem.getLocal(default_conf); JobConf default_job_conf = new JobConf(default_conf); Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java?rev=706462&r1=706461&r2=706462&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java (original) +++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java Mon Oct 20 16:53:17 2008 @@ -20,7 +20,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.TreeSet; +import java.util.Map; +import java.util.TreeMap; import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType; @@ -32,9 +33,32 @@ */ class JobQueueJobInProgressListener extends JobInProgressListener { - private static final Comparator FIFO_JOB_QUEUE_COMPARATOR - = new Comparator() { - public int compare(JobInProgress o1, JobInProgress o2) { + /** A class that groups all the information from a {@link JobInProgress} that + * is necessary for scheduling a job. + */ + static class JobSchedulingInfo { + private JobPriority priority; + private long startTime; + private JobID id; + + public JobSchedulingInfo(JobInProgress jip) { + this(jip.getStatus()); + } + + public JobSchedulingInfo(JobStatus status) { + priority = status.getJobPriority(); + startTime = status.getStartTime(); + id = status.getJobID(); + } + + JobPriority getPriority() {return priority;} + long getStartTime() {return startTime;} + JobID getJobID() {return id;} + } + + static final Comparator FIFO_JOB_QUEUE_COMPARATOR + = new Comparator() { + public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) { int res = o1.getPriority().compareTo(o2.getPriority()); if (res == 0) { if (o1.getStartTime() < o2.getStartTime()) { @@ -50,38 +74,40 @@ } }; - private Collection jobQueue; + private Map jobQueue; public JobQueueJobInProgressListener() { - this(new TreeSet(FIFO_JOB_QUEUE_COMPARATOR)); + this(new TreeMap(FIFO_JOB_QUEUE_COMPARATOR)); } /** * For clients that want to provide their own job priorities. * @param jobQueue A collection whose iterator returns jobs in priority order. */ - protected JobQueueJobInProgressListener(Collection jobQueue) { - this.jobQueue = Collections.synchronizedCollection(jobQueue); + protected JobQueueJobInProgressListener(Map jobQueue) { + this.jobQueue = Collections.synchronizedMap(jobQueue); } /** * Returns a synchronized view of the the job queue. */ public Collection getJobQueue() { - return jobQueue; + return jobQueue.values(); } @Override public void jobAdded(JobInProgress job) { - jobQueue.add(job); + jobQueue.put(new JobSchedulingInfo(job.getStatus()), job); } // Job will be removed once the job completes @Override public void jobRemoved(JobInProgress job) {} - private void jobCompleted(JobInProgress job) { - jobQueue.remove(job); + private void jobCompleted(JobSchedulingInfo oldInfo) { + jobQueue.remove(oldInfo); } @Override @@ -91,26 +117,28 @@ // Check if the ordering of the job has changed // For now priority and start-time can change the job ordering JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event; + JobSchedulingInfo oldInfo = + new JobSchedulingInfo(statusEvent.getOldStatus()); if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED || statusEvent.getEventType() == EventType.START_TIME_CHANGED) { // Make a priority change - reorderJobs(job); + reorderJobs(job, oldInfo); } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) { // Check if the job is complete int runState = statusEvent.getNewStatus().getRunState(); if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED) { - jobCompleted(job); + jobCompleted(oldInfo); } } } } - private void reorderJobs(JobInProgress job) { + private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo) { synchronized (jobQueue) { - jobQueue.remove(job); - jobQueue.add(job); + jobQueue.remove(oldInfo); + jobQueue.put(new JobSchedulingInfo(job), job); } } Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java?rev=706462&r1=706461&r2=706462&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java (original) +++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java Mon Oct 20 16:53:17 2008 @@ -19,8 +19,12 @@ package org.apache.hadoop.mapred; import java.util.ArrayList; +import java.io.IOException; import java.util.List; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,6 +37,171 @@ public class TestJobInProgressListener extends TestCase { private static final Log LOG = LogFactory.getLog(TestJobInProgressListener.class); + private final Path testDir = new Path("test-jip-listener-update"); + + private JobConf configureJob(JobConf conf, int m, int r, + Path inDir, Path outputDir, + String mapSignalFile, String redSignalFile) + throws IOException { + TestJobTrackerRestart.configureWaitingJobConf(conf, inDir, outputDir, + m, r, "job-listener-test", + mapSignalFile, redSignalFile); + return conf; + } + + /** + * This test case tests if external updates to JIP do not result into + * undesirable effects + * Test is as follows + * - submit 2 jobs of normal priority. job1 is a waiting job which waits and + * blocks the cluster + * - change one parameter of job2 such that the job bumps up in the queue + * - check if the queue looks ok + * + */ + public void testJobQueueChanges() throws IOException { + LOG.info("Testing job queue changes"); + JobConf conf = new JobConf(); + MiniDFSCluster dfs = new MiniDFSCluster(conf, 1, true, null, null); + dfs.waitActive(); + FileSystem fileSys = dfs.getFileSystem(); + + dfs.startDataNodes(conf, 1, true, null, null, null, null); + dfs.waitActive(); + + String namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + + (dfs.getFileSystem()).getUri().getPort(); + MiniMRCluster mr = new MiniMRCluster(1, namenode, 1); + JobClient jobClient = new JobClient(mr.createJobConf()); + + // clean up + fileSys.delete(testDir, true); + + if (!fileSys.mkdirs(testDir)) { + throw new IOException("Mkdirs failed to create " + testDir.toString()); + } + + // Write the input file + Path inDir = new Path(testDir, "input"); + Path shareDir = new Path(testDir, "share"); + String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir); + String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir); + TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, + new Path(inDir + "/file"), + (short)1); + + JobQueueJobInProgressListener myListener = + new JobQueueJobInProgressListener(); + + // add the listener + mr.getJobTrackerRunner().getJobTracker() + .addJobInProgressListener(myListener); + + // big blocking job + Path outputDir = new Path(testDir, "output"); + Path newOutputDir = outputDir.suffix("0"); + JobConf job1 = configureJob(mr.createJobConf(), 10, 0, inDir, newOutputDir, + mapSignalFile, redSignalFile); + + // short blocked job + newOutputDir = outputDir.suffix("1"); + JobConf job2 = configureJob(mr.createJobConf(), 1, 0, inDir, newOutputDir, + mapSignalFile, redSignalFile); + + RunningJob rJob1 = jobClient.submitJob(job1); + LOG.info("Running job " + rJob1.getID().toString()); + + RunningJob rJob2 = jobClient.submitJob(job2); + LOG.info("Running job " + rJob2.getID().toString()); + + // I. Check job-priority change + LOG.info("Testing job priority changes"); + + // bump up job2's priority + LOG.info("Increasing job2's priority to HIGH"); + rJob2.setJobPriority("HIGH"); + + // check if the queue is sane + assertTrue("Priority change garbles the queue", + myListener.getJobQueue().size() == 2); + + JobInProgress[] queue = + myListener.getJobQueue().toArray(new JobInProgress[0]); + + // check if the bump has happened + assertTrue("Priority change failed to bump up job2 in the queue", + queue[0].getJobID().equals(rJob2.getID())); + + assertTrue("Priority change failed to bump down job1 in the queue", + queue[1].getJobID().equals(rJob1.getID())); + + assertEquals("Priority change has garbled the queue", + 2, queue.length); + + // II. Check start-time change + LOG.info("Testing job start-time changes"); + + // reset the priority which will make the order as + // - job1 + // - job2 + // this will help in bumping job2 on start-time change + LOG.info("Increasing job2's priority to NORMAL"); + rJob2.setJobPriority("NORMAL"); + + // create the change event + JobInProgress jip2 = mr.getJobTrackerRunner().getJobTracker() + .getJob(rJob2.getID()); + JobInProgress jip1 = mr.getJobTrackerRunner().getJobTracker() + .getJob(rJob1.getID()); + + JobStatus prevStatus = (JobStatus)jip2.getStatus().clone(); + + // change job2's start-time and the status + jip2.startTime = jip1.startTime - 1; + jip2.status.setStartTime(jip2.startTime); + + + JobStatus newStatus = (JobStatus)jip2.getStatus().clone(); + + // inform the listener + LOG.info("Updating the listener about job2's start-time change"); + JobStatusChangeEvent event = + new JobStatusChangeEvent(jip2, EventType.START_TIME_CHANGED, + prevStatus, newStatus); + myListener.jobUpdated(event); + + // check if the queue is sane + assertTrue("Start time change garbles the queue", + myListener.getJobQueue().size() == 2); + + queue = myListener.getJobQueue().toArray(new JobInProgress[0]); + + // check if the bump has happened + assertTrue("Start time change failed to bump up job2 in the queue", + queue[0].getJobID().equals(rJob2.getID())); + + assertTrue("Start time change failed to bump down job1 in the queue", + queue[1].getJobID().equals(rJob1.getID())); + + assertEquals("Start time change has garbled the queue", + 2, queue.length); + + // signal the maps to complete + TestJobTrackerRestart.signalTasks(dfs, fileSys, true, + mapSignalFile, redSignalFile); + + // check if job completion leaves the queue sane + while (rJob2.getJobState() != JobStatus.SUCCEEDED) { + TestJobTrackerRestart.waitFor(10); + } + + while (rJob1.getJobState() != JobStatus.SUCCEEDED) { + TestJobTrackerRestart.waitFor(10); + } + + assertTrue("Job completion garbles the queue", + myListener.getJobQueue().size() == 0); + } // A listener that inits the tasks one at a time and also listens to the // events Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=706462&r1=706461&r2=706462&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original) +++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Mon Oct 20 16:53:17 2008 @@ -42,8 +42,9 @@ super(new JobID("test", ++jobCounter), jobConf); this.taskTrackerManager = taskTrackerManager; this.startTime = System.currentTimeMillis(); - this.status = new JobStatus(); - this.status.setRunState(JobStatus.PREP); + this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP); + this.status.setJobPriority(JobPriority.NORMAL); + this.status.setStartTime(startTime); } @Override