Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 80193 invoked from network); 4 Mar 2011 03:24:04 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:24:04 -0000 Received: (qmail 84930 invoked by uid 500); 4 Mar 2011 03:24:03 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 84742 invoked by uid 500); 4 Mar 2011 03:24:03 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 84642 invoked by uid 99); 4 Mar 2011 03:24:03 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:24:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:24:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1FF1C2388AAA; Fri, 4 Mar 2011 03:23:39 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1076933 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/ Date: Fri, 04 Mar 2011 03:23:39 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304032339.1FF1C2388AAA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Fri Mar 4 03:23:38 2011 New Revision: 1076933 URL: http://svn.apache.org/viewvc?rev=1076933&view=rev Log: commit b3b11c4936a501eda5991fbfa4498fc1f336cf9d Author: Lee Tucker Date: Thu Jul 30 17:40:17 2009 -0700 Applying patch 2709938.5739.patch Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java?rev=1076933&r1=1076932&r2=1076933&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java Fri Mar 4 03:23:38 2011 @@ -197,6 +197,13 @@ public class ClusterWithCapacitySchedule } } + /** + * @return the mrCluster + */ + public MiniMRCluster getMrCluster() { + return mrCluster; + } + static class MyClassLoader extends ClassLoader { @Override public URL getResource(String name) { Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java?rev=1076933&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java Fri Mar 4 03:23:38 2011 @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.util.Properties; +import org.apache.hadoop.mapred.ControlledMapReduceJob.ControlledMapReduceJobRunner; + + +public class TestJobTrackerRestartWithCS extends ClusterWithCapacityScheduler { + + /** + * Test single queue. + * + *

+ * + * Submit a job with more M/R tasks than total capacity. Full queue capacity + * should be utilized and remaining M/R tasks should wait for slots to be + * available. + * + * @throws Exception + */ + public void testJobTrackerRestartWithCS() + throws Exception { + try { + Properties schedulerProps = new Properties(); + schedulerProps.put( + "mapred.capacity-scheduler.queue.default.guaranteed-capacity", "100"); + Properties clusterProps = new Properties(); + clusterProps.put("mapred.tasktracker.map.tasks.maximum", String.valueOf(2)); + clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String.valueOf(0)); + + // cluster capacity 2 maps, 0 reduces + startCluster(1, clusterProps, schedulerProps); + + ControlledMapReduceJobRunner jobRunner = + ControlledMapReduceJobRunner.getControlledMapReduceJobRunner( + getJobConf(), 4, 0); + jobRunner.start(); + ControlledMapReduceJob controlledJob = jobRunner.getJob(); + JobID myJobID = jobRunner.getJobID(); + JobInProgress myJob = getJobTracker().getJob(myJobID); + ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 2); + + LOG.info("Trying to finish 2 maps"); + controlledJob.finishNTasks(true, 2); + ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2); + assertTrue("Number of maps finished", myJob.finishedMaps() == 2); + + JobClient jobClient = new JobClient(getMrCluster().createJobConf()); + getMrCluster().stopJobTracker(); + + getMrCluster().getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", + true); + getMrCluster().startJobTracker(); + + UtilsForTests.waitForJobTracker(jobClient); + ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 1); + + controlledJob.finishNTasks(true, 2); + ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2); + } catch (Exception e) { + e.printStackTrace(); + } finally { + tearDown(); + } + } +} Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076933&r1=1076932&r2=1076933&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:23:38 2011 @@ -739,7 +739,7 @@ public class JobTracker implements MRCon TaskID id = TaskID.forName(taskId); TaskInProgress tip = getTip(id); - + updateTip(tip, task); } @@ -752,7 +752,10 @@ public class JobTracker implements MRCon // Check if the transaction for this attempt can be committed String taskStatus = attempt.get(Keys.TASK_STATUS); - + TaskAttemptID taskID = TaskAttemptID.forName(taskAttemptId); + JobInProgress jip = getJob(taskID.getJobID()); + JobStatus prevStatus = (JobStatus)jip.getStatus().clone(); + if (taskStatus.length() > 0) { // This means this is an update event if (taskStatus.equals(Values.SUCCESS.name())) { @@ -766,6 +769,16 @@ public class JobTracker implements MRCon } else { createTaskAttempt(jip, id, attempt); } + + JobStatus newStatus = (JobStatus)jip.getStatus().clone(); + if (prevStatus.getRunState() != newStatus.getRunState()) { + if(LOG.isDebugEnabled()) + LOG.debug("Status changed hence informing prevStatus" + prevStatus + " currentStatus "+ newStatus); + JobStatusChangeEvent event = + new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED, + prevStatus, newStatus); + updateJobInProgressListeners(event); + } } public void handle(JobHistory.RecordTypes recType, Map