Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 14739 invoked from network); 16 Oct 2009 07:47:35 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 16 Oct 2009 07:47:35 -0000 Received: (qmail 71297 invoked by uid 500); 16 Oct 2009 07:47:35 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 71247 invoked by uid 500); 16 Oct 2009 07:47:35 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 71234 invoked by uid 99); 16 Oct 2009 07:47:35 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Oct 2009 07:47:35 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Oct 2009 07:47:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 69CF32388897; Fri, 16 Oct 2009 07:47:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r825789 - in /hadoop/mapreduce/branches/branch-0.21: ./ src/examples/org/apache/hadoop/examples/pi/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/test/mapred/org/apache/hadoop/mapred/ Date: Fri, 16 Oct 2009 07:47:12 -0000 To: mapreduce-commits@hadoop.apache.org From: sharad@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091016074712.69CF32388897@eris.apache.org> Author: sharad Date: Fri Oct 16 07:47:11 2009 New Revision: 825789 URL: http://svn.apache.org/viewvc?rev=825789&view=rev Log: MAPREDUCE-1117. Fix ClusterMetrics to return info about slots instead of tasks. Contributed by Amareshwari Sriramadasu. Added: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=825789&r1=825788&r2=825789&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original) +++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Fri Oct 16 07:47:11 2009 @@ -747,3 +747,6 @@ MAPREDUCE-769. Make findbugs and javac warnings to zero. (Amareshwari Sriramadasu via sharad) + MAPREDUCE-1117. Fix ClusterMetrics to return info about slots instead of + tasks. (Amareshwari Sriramadasu via sharad) + Modified: hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=825789&r1=825788&r2=825789&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java Fri Oct 16 07:47:11 2009 @@ -38,9 +38,9 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.ClusterStatus; -import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobTracker; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -379,14 +379,14 @@ public static class MixMachine extends Machine { private static final MixMachine INSTANCE = new MixMachine(); - private JobClient jobclient; + private Cluster cluster; /** {@inheritDoc} */ @Override public synchronized void init(Job job) throws IOException { final Configuration conf = job.getConfiguration(); - if (jobclient == null) - jobclient = new JobClient(JobTracker.getAddress(conf), conf); + if (cluster == null) + cluster = new Cluster(JobTracker.getAddress(conf), conf); chooseMachine(conf).init(job); } @@ -398,9 +398,11 @@ try { for(;; Thread.sleep(2000)) { //get cluster status - final ClusterStatus status = jobclient.getClusterStatus(); - final int m = status.getMaxMapTasks() - status.getMapTasks(); - final int r = status.getMaxReduceTasks() - status.getReduceTasks(); + final ClusterMetrics status = cluster.getClusterStatus(); + final int m = + status.getMapSlotCapacity() - status.getOccupiedMapSlots(); + final int r = + status.getReduceSlotCapacity() - status.getOccupiedReduceSlots(); if (m >= parts || r >= parts) { //favor ReduceSide machine final Machine value = r >= parts? Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=825789&r1=825788&r2=825789&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Oct 16 07:47:11 2009 @@ -20,7 +20,6 @@ import java.io.File; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; @@ -1206,6 +1205,8 @@ // int totalMaps = 0; int totalReduces = 0; + private int occupiedMapSlots = 0; + private int occupiedReduceSlots = 0; private HashMap taskTrackers = new HashMap(); MapuniqueHostsMap = new ConcurrentHashMap(); @@ -2400,6 +2401,8 @@ if (oldStatus != null) { totalMaps -= oldStatus.countMapTasks(); totalReduces -= oldStatus.countReduceTasks(); + occupiedMapSlots -= oldStatus.countOccupiedMapSlots(); + occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots(); if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) { int mapSlots = oldStatus.getMaxMapSlots(); totalMapTaskCapacity -= mapSlots; @@ -2422,6 +2425,8 @@ if (status != null) { totalMaps += status.countMapTasks(); totalReduces += status.countReduceTasks(); + occupiedMapSlots += status.countOccupiedMapSlots(); + occupiedReduceSlots += status.countOccupiedReduceSlots(); if (!faultyTrackers.isBlacklisted(status.getHost())) { int mapSlots = status.getMaxMapSlots(); totalMapTaskCapacity += mapSlots; @@ -2927,8 +2932,8 @@ } public synchronized ClusterMetrics getClusterMetrics() { - return new ClusterMetrics(totalMaps, totalReduces, totalMapTaskCapacity, - totalReduceTaskCapacity, taskTrackers.size() - + return new ClusterMetrics(occupiedMapSlots, occupiedReduceSlots, + totalMapTaskCapacity, totalReduceTaskCapacity, taskTrackers.size() - getBlacklistedTrackerCount(), getBlacklistedTrackerCount(), getExcludedNodes().size()) ; } Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=825789&r1=825788&r2=825789&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java Fri Oct 16 07:47:11 2009 @@ -35,10 +35,10 @@ * Number of blacklisted and decommissioned trackers. * *
  • - * Task capacity of the cluster. + * Slot capacity of the cluster. *
  • *
  • - * The number of currently running map & reduce tasks. + * The number of currently occupied map & reduce slots. *
  • *

    * @@ -48,24 +48,25 @@ * @see Cluster */ public class ClusterMetrics implements Writable { - int runningMaps; - int runningReduces; - int mapSlots; - int reduceSlots; - int numTrackers; - int numBlacklistedTrackers; - int numDecommissionedTrackers; + private int occupiedMapSlots; + private int occupiedReduceSlots; + private int totalMapSlots; + private int totalReduceSlots; + private int numTrackers; + private int numBlacklistedTrackers; + private int numDecommissionedTrackers; public ClusterMetrics() { } - public ClusterMetrics(int runningMaps, int runningReduces, int mapSlots, - int reduceSlots, int numTrackers, int numBlacklistedTrackers, - int numDecommisionedNodes) { - this.runningMaps = runningMaps; - this.runningReduces = runningReduces; - this.mapSlots = mapSlots; - this.reduceSlots = reduceSlots; + public ClusterMetrics(int occupiedMapSlots, int occupiedReduceSlots, + int mapSlots, int reduceSlots, + int numTrackers, int numBlacklistedTrackers, + int numDecommisionedNodes) { + this.occupiedMapSlots = occupiedMapSlots; + this.occupiedReduceSlots = occupiedReduceSlots; + this.totalMapSlots = mapSlots; + this.totalReduceSlots = reduceSlots; this.numTrackers = numTrackers; this.numBlacklistedTrackers = numBlacklistedTrackers; this.numDecommissionedTrackers = numDecommisionedNodes; @@ -77,7 +78,7 @@ * @return occupied map slot count */ public int getOccupiedMapSlots() { - return runningMaps; + return occupiedMapSlots; } /** @@ -86,7 +87,7 @@ * @return occupied reduce slot count */ public int getOccupiedReduceSlots() { - return runningReduces; + return occupiedReduceSlots; } /** @@ -95,7 +96,7 @@ * @return map slot capacity */ public int getMapSlotCapacity() { - return mapSlots; + return totalMapSlots; } /** @@ -104,7 +105,7 @@ * @return reduce slot capacity */ public int getReduceSlotCapacity() { - return reduceSlots; + return totalReduceSlots; } /** @@ -136,10 +137,10 @@ @Override public void readFields(DataInput in) throws IOException { - runningMaps = in.readInt(); - runningReduces = in.readInt(); - mapSlots = in.readInt(); - reduceSlots = in.readInt(); + occupiedMapSlots = in.readInt(); + occupiedReduceSlots = in.readInt(); + totalMapSlots = in.readInt(); + totalReduceSlots = in.readInt(); numTrackers = in.readInt(); numBlacklistedTrackers = in.readInt(); numDecommissionedTrackers = in.readInt(); @@ -147,10 +148,10 @@ @Override public void write(DataOutput out) throws IOException { - out.writeInt(runningMaps); - out.writeInt(runningReduces); - out.writeInt(mapSlots); - out.writeInt(reduceSlots); + out.writeInt(occupiedMapSlots); + out.writeInt(occupiedReduceSlots); + out.writeInt(totalMapSlots); + out.writeInt(totalReduceSlots); out.writeInt(numTrackers); out.writeInt(numBlacklistedTrackers); out.writeInt(numDecommissionedTrackers); Added: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java?rev=825789&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java Fri Oct 16 07:47:11 2009 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.ClusterMetrics; +import org.apache.hadoop.mapreduce.TaskType; + +import junit.extensions.TestSetup; +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +public class TestClusterStatus extends TestCase { + + private static String[] trackers = new String[] { "tracker_tracker1:1000", + "tracker_tracker2:1000", "tracker_tracker3:1000" }; + private static JobTracker jobTracker; + private static int mapSlotsPerTracker = 4; + private static int reduceSlotsPerTracker = 2; + private static MiniMRCluster mr; + private static Cluster cluster; + // heartbeat responseId. increment this after sending a heartbeat + private static short responseId = 1; + + public static Test suite() { + TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) { + protected void setUp() throws Exception { + + mr = new MiniMRCluster(0, "file:///", 1); + jobTracker = mr.getJobTrackerRunner().getJobTracker(); + for (String tracker : trackers) { + FakeObjectUtilities.establishFirstContact(jobTracker, tracker); + } + cluster = new Cluster(mr.createJobConf()); + } + + protected void tearDown() throws Exception { + cluster.close(); + mr.shutdown(); + } + }; + return setup; + } + + private TaskTrackerStatus getTTStatus(String trackerName, + List taskStatuses) { + return new TaskTrackerStatus(trackerName, + JobInProgress.convertTrackerNameToHostName(trackerName), 0, + taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker); + } + + public void testClusterMetrics() throws IOException, InterruptedException { + assertEquals("tasktracker count doesn't match", trackers.length, + cluster.getClusterStatus().getTaskTrackerCount()); + + List list = new ArrayList(); + + // create a map task status, which uses 2 slots. + int mapSlotsPerTask = 2; + TaskStatus ts = TaskStatus.createTaskStatus(true, + new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0), 0.0f, mapSlotsPerTask, + TaskStatus.State.RUNNING, "", "", trackers[0], + TaskStatus.Phase.MAP, null); + list.add(ts); + + // create a reduce task status, which uses 1 slot. + int reduceSlotsPerTask = 1; + ts = TaskStatus.createTaskStatus(false, + new TaskAttemptID("jt", 1, TaskType.REDUCE, 0, 0), 0.0f, + reduceSlotsPerTask, + TaskStatus.State.RUNNING, "", "", trackers[0], + TaskStatus.Phase.REDUCE, null); + list.add(ts); + + // create TaskTrackerStatus and send heartbeats + TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length]; + status[0] = getTTStatus(trackers[0], list); + status[1] = getTTStatus(trackers[1], new ArrayList()); + status[2] = getTTStatus(trackers[2], new ArrayList()); + for (int i = 0; i< trackers.length; i++) { + FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false, + trackers[i], responseId); + } + responseId++; + // assert ClusterMetrics + ClusterMetrics metrics = cluster.getClusterStatus(); + assertEquals("occupied map slots do not match", mapSlotsPerTask, + metrics.getOccupiedMapSlots()); + assertEquals("occupied reduce slots do not match", reduceSlotsPerTask, + metrics.getOccupiedReduceSlots()); + assertEquals("map slot capacities do not match", + mapSlotsPerTracker * trackers.length, + metrics.getMapSlotCapacity()); + assertEquals("reduce slot capacities do not match", + reduceSlotsPerTracker * trackers.length, + metrics.getReduceSlotCapacity()); + + // assert the values in ClusterStatus also + assertEquals("running map tasks do not match", 1, + jobTracker.getClusterStatus().getMapTasks()); + assertEquals("running reduce tasks do not match", 1, + jobTracker.getClusterStatus().getReduceTasks()); + assertEquals("map slot capacities do not match", + mapSlotsPerTracker * trackers.length, + jobTracker.getClusterStatus().getMaxMapTasks()); + assertEquals("reduce slot capacities do not match", + reduceSlotsPerTracker * trackers.length, + jobTracker.getClusterStatus().getMaxReduceTasks()); + cluster.close(); + } +}