Return-Path:
Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org
Received: (qmail 14739 invoked from network); 16 Oct 2009 07:47:35 -0000
Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3)
by minotaur.apache.org with SMTP; 16 Oct 2009 07:47:35 -0000
Received: (qmail 71297 invoked by uid 500); 16 Oct 2009 07:47:35 -0000
Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org
Received: (qmail 71247 invoked by uid 500); 16 Oct 2009 07:47:35 -0000
Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: mapreduce-dev@hadoop.apache.org
Delivered-To: mailing list mapreduce-commits@hadoop.apache.org
Received: (qmail 71234 invoked by uid 99); 16 Oct 2009 07:47:35 -0000
Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Oct 2009 07:47:35 +0000
X-ASF-Spam-Status: No, hits=-2.6 required=5.0
tests=BAYES_00
X-Spam-Check-By: apache.org
Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Oct 2009 07:47:32 +0000
Received: by eris.apache.org (Postfix, from userid 65534)
id 69CF32388897; Fri, 16 Oct 2009 07:47:12 +0000 (UTC)
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: svn commit: r825789 - in /hadoop/mapreduce/branches/branch-0.21: ./
src/examples/org/apache/hadoop/examples/pi/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/
src/test/mapred/org/apache/hadoop/mapred/
Date: Fri, 16 Oct 2009 07:47:12 -0000
To: mapreduce-commits@hadoop.apache.org
From: sharad@apache.org
X-Mailer: svnmailer-1.0.8
Message-Id: <20091016074712.69CF32388897@eris.apache.org>
Author: sharad
Date: Fri Oct 16 07:47:11 2009
New Revision: 825789
URL: http://svn.apache.org/viewvc?rev=825789&view=rev
Log:
MAPREDUCE-1117. Fix ClusterMetrics to return info about slots instead of tasks. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
Modified:
hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=825789&r1=825788&r2=825789&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Fri Oct 16 07:47:11 2009
@@ -747,3 +747,6 @@
MAPREDUCE-769. Make findbugs and javac warnings to zero.
(Amareshwari Sriramadasu via sharad)
+ MAPREDUCE-1117. Fix ClusterMetrics to return info about slots instead of
+ tasks. (Amareshwari Sriramadasu via sharad)
+
Modified: hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=825789&r1=825788&r2=825789&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java Fri Oct 16 07:47:11 2009
@@ -38,9 +38,9 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -379,14 +379,14 @@
public static class MixMachine extends Machine {
private static final MixMachine INSTANCE = new MixMachine();
- private JobClient jobclient;
+ private Cluster cluster;
/** {@inheritDoc} */
@Override
public synchronized void init(Job job) throws IOException {
final Configuration conf = job.getConfiguration();
- if (jobclient == null)
- jobclient = new JobClient(JobTracker.getAddress(conf), conf);
+ if (cluster == null)
+ cluster = new Cluster(JobTracker.getAddress(conf), conf);
chooseMachine(conf).init(job);
}
@@ -398,9 +398,11 @@
try {
for(;; Thread.sleep(2000)) {
//get cluster status
- final ClusterStatus status = jobclient.getClusterStatus();
- final int m = status.getMaxMapTasks() - status.getMapTasks();
- final int r = status.getMaxReduceTasks() - status.getReduceTasks();
+ final ClusterMetrics status = cluster.getClusterStatus();
+ final int m =
+ status.getMapSlotCapacity() - status.getOccupiedMapSlots();
+ final int r =
+ status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
if (m >= parts || r >= parts) {
//favor ReduceSide machine
final Machine value = r >= parts?
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=825789&r1=825788&r2=825789&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Oct 16 07:47:11 2009
@@ -20,7 +20,6 @@
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
@@ -1206,6 +1205,8 @@
//
int totalMaps = 0;
int totalReduces = 0;
+ private int occupiedMapSlots = 0;
+ private int occupiedReduceSlots = 0;
private HashMap taskTrackers =
new HashMap();
MapuniqueHostsMap = new ConcurrentHashMap();
@@ -2400,6 +2401,8 @@
if (oldStatus != null) {
totalMaps -= oldStatus.countMapTasks();
totalReduces -= oldStatus.countReduceTasks();
+ occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
+ occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
int mapSlots = oldStatus.getMaxMapSlots();
totalMapTaskCapacity -= mapSlots;
@@ -2422,6 +2425,8 @@
if (status != null) {
totalMaps += status.countMapTasks();
totalReduces += status.countReduceTasks();
+ occupiedMapSlots += status.countOccupiedMapSlots();
+ occupiedReduceSlots += status.countOccupiedReduceSlots();
if (!faultyTrackers.isBlacklisted(status.getHost())) {
int mapSlots = status.getMaxMapSlots();
totalMapTaskCapacity += mapSlots;
@@ -2927,8 +2932,8 @@
}
public synchronized ClusterMetrics getClusterMetrics() {
- return new ClusterMetrics(totalMaps, totalReduces, totalMapTaskCapacity,
- totalReduceTaskCapacity, taskTrackers.size() -
+ return new ClusterMetrics(occupiedMapSlots, occupiedReduceSlots,
+ totalMapTaskCapacity, totalReduceTaskCapacity, taskTrackers.size() -
getBlacklistedTrackerCount(),
getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
}
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=825789&r1=825788&r2=825789&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java Fri Oct 16 07:47:11 2009
@@ -35,10 +35,10 @@
* Number of blacklisted and decommissioned trackers.
*
*
- * Task capacity of the cluster.
+ * Slot capacity of the cluster.
*
*
- * The number of currently running map & reduce tasks.
+ * The number of currently occupied map & reduce slots.
*
*
*
@@ -48,24 +48,25 @@
* @see Cluster
*/
public class ClusterMetrics implements Writable {
- int runningMaps;
- int runningReduces;
- int mapSlots;
- int reduceSlots;
- int numTrackers;
- int numBlacklistedTrackers;
- int numDecommissionedTrackers;
+ private int occupiedMapSlots;
+ private int occupiedReduceSlots;
+ private int totalMapSlots;
+ private int totalReduceSlots;
+ private int numTrackers;
+ private int numBlacklistedTrackers;
+ private int numDecommissionedTrackers;
public ClusterMetrics() {
}
- public ClusterMetrics(int runningMaps, int runningReduces, int mapSlots,
- int reduceSlots, int numTrackers, int numBlacklistedTrackers,
- int numDecommisionedNodes) {
- this.runningMaps = runningMaps;
- this.runningReduces = runningReduces;
- this.mapSlots = mapSlots;
- this.reduceSlots = reduceSlots;
+ public ClusterMetrics(int occupiedMapSlots, int occupiedReduceSlots,
+ int mapSlots, int reduceSlots,
+ int numTrackers, int numBlacklistedTrackers,
+ int numDecommisionedNodes) {
+ this.occupiedMapSlots = occupiedMapSlots;
+ this.occupiedReduceSlots = occupiedReduceSlots;
+ this.totalMapSlots = mapSlots;
+ this.totalReduceSlots = reduceSlots;
this.numTrackers = numTrackers;
this.numBlacklistedTrackers = numBlacklistedTrackers;
this.numDecommissionedTrackers = numDecommisionedNodes;
@@ -77,7 +78,7 @@
* @return occupied map slot count
*/
public int getOccupiedMapSlots() {
- return runningMaps;
+ return occupiedMapSlots;
}
/**
@@ -86,7 +87,7 @@
* @return occupied reduce slot count
*/
public int getOccupiedReduceSlots() {
- return runningReduces;
+ return occupiedReduceSlots;
}
/**
@@ -95,7 +96,7 @@
* @return map slot capacity
*/
public int getMapSlotCapacity() {
- return mapSlots;
+ return totalMapSlots;
}
/**
@@ -104,7 +105,7 @@
* @return reduce slot capacity
*/
public int getReduceSlotCapacity() {
- return reduceSlots;
+ return totalReduceSlots;
}
/**
@@ -136,10 +137,10 @@
@Override
public void readFields(DataInput in) throws IOException {
- runningMaps = in.readInt();
- runningReduces = in.readInt();
- mapSlots = in.readInt();
- reduceSlots = in.readInt();
+ occupiedMapSlots = in.readInt();
+ occupiedReduceSlots = in.readInt();
+ totalMapSlots = in.readInt();
+ totalReduceSlots = in.readInt();
numTrackers = in.readInt();
numBlacklistedTrackers = in.readInt();
numDecommissionedTrackers = in.readInt();
@@ -147,10 +148,10 @@
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(runningMaps);
- out.writeInt(runningReduces);
- out.writeInt(mapSlots);
- out.writeInt(reduceSlots);
+ out.writeInt(occupiedMapSlots);
+ out.writeInt(occupiedReduceSlots);
+ out.writeInt(totalMapSlots);
+ out.writeInt(totalReduceSlots);
out.writeInt(numTrackers);
out.writeInt(numBlacklistedTrackers);
out.writeInt(numDecommissionedTrackers);
Added: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java?rev=825789&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java Fri Oct 16 07:47:11 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.TaskType;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TestClusterStatus extends TestCase {
+
+ private static String[] trackers = new String[] { "tracker_tracker1:1000",
+ "tracker_tracker2:1000", "tracker_tracker3:1000" };
+ private static JobTracker jobTracker;
+ private static int mapSlotsPerTracker = 4;
+ private static int reduceSlotsPerTracker = 2;
+ private static MiniMRCluster mr;
+ private static Cluster cluster;
+ // heartbeat responseId. increment this after sending a heartbeat
+ private static short responseId = 1;
+
+ public static Test suite() {
+ TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
+ protected void setUp() throws Exception {
+
+ mr = new MiniMRCluster(0, "file:///", 1);
+ jobTracker = mr.getJobTrackerRunner().getJobTracker();
+ for (String tracker : trackers) {
+ FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
+ }
+ cluster = new Cluster(mr.createJobConf());
+ }
+
+ protected void tearDown() throws Exception {
+ cluster.close();
+ mr.shutdown();
+ }
+ };
+ return setup;
+ }
+
+ private TaskTrackerStatus getTTStatus(String trackerName,
+ List taskStatuses) {
+ return new TaskTrackerStatus(trackerName,
+ JobInProgress.convertTrackerNameToHostName(trackerName), 0,
+ taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
+ }
+
+ public void testClusterMetrics() throws IOException, InterruptedException {
+ assertEquals("tasktracker count doesn't match", trackers.length,
+ cluster.getClusterStatus().getTaskTrackerCount());
+
+ List list = new ArrayList();
+
+ // create a map task status, which uses 2 slots.
+ int mapSlotsPerTask = 2;
+ TaskStatus ts = TaskStatus.createTaskStatus(true,
+ new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0), 0.0f, mapSlotsPerTask,
+ TaskStatus.State.RUNNING, "", "", trackers[0],
+ TaskStatus.Phase.MAP, null);
+ list.add(ts);
+
+ // create a reduce task status, which uses 1 slot.
+ int reduceSlotsPerTask = 1;
+ ts = TaskStatus.createTaskStatus(false,
+ new TaskAttemptID("jt", 1, TaskType.REDUCE, 0, 0), 0.0f,
+ reduceSlotsPerTask,
+ TaskStatus.State.RUNNING, "", "", trackers[0],
+ TaskStatus.Phase.REDUCE, null);
+ list.add(ts);
+
+ // create TaskTrackerStatus and send heartbeats
+ TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+ status[0] = getTTStatus(trackers[0], list);
+ status[1] = getTTStatus(trackers[1], new ArrayList());
+ status[2] = getTTStatus(trackers[2], new ArrayList());
+ for (int i = 0; i< trackers.length; i++) {
+ FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false,
+ trackers[i], responseId);
+ }
+ responseId++;
+ // assert ClusterMetrics
+ ClusterMetrics metrics = cluster.getClusterStatus();
+ assertEquals("occupied map slots do not match", mapSlotsPerTask,
+ metrics.getOccupiedMapSlots());
+ assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
+ metrics.getOccupiedReduceSlots());
+ assertEquals("map slot capacities do not match",
+ mapSlotsPerTracker * trackers.length,
+ metrics.getMapSlotCapacity());
+ assertEquals("reduce slot capacities do not match",
+ reduceSlotsPerTracker * trackers.length,
+ metrics.getReduceSlotCapacity());
+
+ // assert the values in ClusterStatus also
+ assertEquals("running map tasks do not match", 1,
+ jobTracker.getClusterStatus().getMapTasks());
+ assertEquals("running reduce tasks do not match", 1,
+ jobTracker.getClusterStatus().getReduceTasks());
+ assertEquals("map slot capacities do not match",
+ mapSlotsPerTracker * trackers.length,
+ jobTracker.getClusterStatus().getMaxMapTasks());
+ assertEquals("reduce slot capacities do not match",
+ reduceSlotsPerTracker * trackers.length,
+ jobTracker.getClusterStatus().getMaxReduceTasks());
+ cluster.close();
+ }
+}