Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B1FA7115B6 for ; Sat, 26 Jul 2014 19:54:02 +0000 (UTC) Received: (qmail 41372 invoked by uid 500); 26 Jul 2014 19:54:02 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 41300 invoked by uid 500); 26 Jul 2014 19:54:02 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 41287 invoked by uid 99); 26 Jul 2014 19:54:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Jul 2014 19:54:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Jul 2014 19:54:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DE5242388A67 for ; Sat, 26 Jul 2014 19:53:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1613720 - in /hadoop/common/branches/branch-1: ./ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ Date: Sat, 26 Jul 2014 19:53:36 -0000 To: common-commits@hadoop.apache.org From: kasha@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140726195336.DE5242388A67@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kasha Date: Sat Jul 26 19:53:36 2014 New Revision: 1613720 URL: http://svn.apache.org/r1613720 Log: MAPREDUCE-5966. MR1 FairScheduler use of custom weight adjuster is not thread safe for comparisons. (Anubhav Dhoot via kasha) Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1613720&r1=1613719&r2=1613720&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Sat Jul 26 19:53:36 2014 @@ -230,6 +230,9 @@ Release 1.3.0 - unreleased MAPREDUCE-5979. FairScheduler: zero weight can cause sort failures. (Anubhav Dhoot via kasha) + MAPREDUCE-5966. MR1 FairScheduler use of custom weight adjuster is not + thread safe for comparisons. (Anubhav Dhoot via kasha) + Release 1.2.2 - unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1613720&r1=1613719&r2=1613720&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original) +++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Sat Jul 26 19:53:36 2014 @@ -701,11 +701,15 @@ public class FairScheduler extends TaskS updateRunnability(); // Set job runnability based on user/pool limits - // Update demands of jobs and pools + // Update demands and weights of jobs and pools for (Pool pool: poolMgr.getPools()) { pool.getMapSchedulable().updateDemand(); pool.getReduceSchedulable().updateDemand(); + + pool.getMapSchedulable().updateWeight(); + pool.getReduceSchedulable().updateWeight(); } + // Compute fair shares based on updated demands List mapScheds = getPoolSchedulables(TaskType.MAP); Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=1613720&r1=1613719&r2=1613720&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java (original) +++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java Sat Jul 26 19:53:36 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.mapred.FairScheduler.JobInfo; import org.apache.hadoop.mapreduce.TaskType; @@ -29,6 +30,7 @@ public class JobSchedulable extends Sche protected JobInProgress job; protected TaskType taskType; private int demand = 0; + private double weight = 1.0; public JobSchedulable(FairScheduler scheduler, JobInProgress job, TaskType taskType) { @@ -133,9 +135,14 @@ public class JobSchedulable extends Sche @Override public double getWeight() { - return scheduler.getJobWeight(job, taskType); + return weight; } - + + @Override + public void updateWeight() { + weight = scheduler.getJobWeight(job, taskType); + } + @Override public int getMinShare() { return 0; @@ -181,7 +188,6 @@ public class JobSchedulable extends Sche } } - @Override protected String getMetricsContextName() { return "jobs"; Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java?rev=1613720&r1=1613719&r2=1613720&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java (original) +++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java Sat Jul 26 19:53:36 2014 @@ -95,7 +95,18 @@ public class PoolSchedulable extends Sch + "; maxTasks is " + maxTasks); } } - + + /** + * Ask jobs in the pool to update weights. Its own weight is computed on + * the fly + */ + @Override + public void updateWeight() { + for (JobSchedulable sched: jobScheds) { + sched.updateWeight(); + } + } + /** * Distribute the pool's fair share among its jobs */ @@ -160,6 +171,9 @@ public class PoolSchedulable extends Sch } else { throw new RuntimeException("Unsupported pool scheduling mode " + mode); } + for (JobSchedulable sched: jobScheds) { + sched.updateWeight(); + } Collections.sort(jobScheds, comparator); for (JobSchedulable sched: jobScheds) { Task task = sched.assignTask(tts, currentTime, visited); Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java?rev=1613720&r1=1613719&r2=1613720&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java (original) +++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java Sat Jul 26 19:53:36 2014 @@ -95,7 +95,10 @@ abstract class Schedulable { /** Refresh the Schedulable's demand and those of its children if any. */ public abstract void updateDemand(); - + + /** Refresh the Schedulable's weight and those of its children if any */ + public abstract void updateWeight(); + /** * Distribute the fair share assigned to this Schedulable among its * children (used in pools where the internal scheduler is fair sharing). Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java?rev=1613720&r1=1613719&r2=1613720&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java (original) +++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java Sat Jul 26 19:53:36 2014 @@ -109,6 +109,9 @@ public class FakeSchedulable extends Sch public void updateDemand() {} @Override + public void updateWeight() {} + + @Override public TaskType getTaskType() { return TaskType.MAP; } Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1613720&r1=1613719&r2=1613720&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original) +++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Sat Jul 26 19:53:36 2014 @@ -31,13 +31,14 @@ import java.util.IdentityHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.LinkedBlockingDeque; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.mapred.FairScheduler.JobInfo; import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException; import org.apache.hadoop.mapred.UtilsForTests.FakeClock; @@ -3092,8 +3093,6 @@ public class TestFairScheduler extends T assertNull(scheduler.assignTasks(tracker("tt2"))); } - - class TestJobSchedulableSort extends JobSchedulable { private final double testFairShare; @@ -3154,10 +3153,6 @@ public class TestFairScheduler extends T public void testFairShareComparator() { - List jobs = new ArrayList(); - final int iterations = 100; - int jobCount = 100; - Comparator comparator = new SchedulingAlgorithms.FairShareComparator(); @@ -3186,8 +3181,54 @@ public class TestFairScheduler extends T // s3 has a higher running task to weight ratio (infinity) assertTrue(comparator.compare(s1, s3) < 0); } - - + + /** + * This test verifies that sorting of JobSchedulables with a custom + * weightadjuster that returns different values when called does not break + * the sorting. If the weight changes during the sort, + * the sort would fail in jdk7 + */ + public void testJobSchedulableSortingWithCustomWeightAdjuster() throws + IOException, InterruptedException { + final int iterations = 100, jobCount = 100, racks = 100, nodesPerRack = 2; + final int totalTaskTrackers = nodesPerRack * racks; + + setUpCluster(racks, nodesPerRack, true); + + scheduler.weightAdjuster = new WeightAdjuster() { + Random r = new Random(); + + @Override + public double adjustWeight(JobInProgress job, TaskType taskType, double + curWeight) { + return curWeight * r.nextInt(100); + } + }; + + for (int j = 0; j < jobCount; j++) { + advanceTime(100); + submitJob(JobStatus.RUNNING, 2 * iterations, iterations); + scheduler.updateMetrics(); + } + + final LinkedBlockingDeque tasks = + new LinkedBlockingDeque(); + + final String taskTrackerNamePrefix = "tt"; + + Random r1 = new Random(); + for (int i = 0; i < iterations; i++) { + + int randomTaskTrackerId = r1.nextInt(totalTaskTrackers) + 1; + String taskTrackerName = taskTrackerNamePrefix + randomTaskTrackerId; + List assignedTasks = scheduler.assignTasks(tracker + (taskTrackerName)); + if (assignedTasks != null) { + tasks.addAll(assignedTasks); + } + } + } + /** * Ask scheduler to update metrics and then verify that they're all * correctly published to the metrics context