Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5EEA910779 for ; Sat, 27 Jul 2013 03:44:59 +0000 (UTC) Received: (qmail 71856 invoked by uid 500); 27 Jul 2013 03:44:55 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 71415 invoked by uid 500); 27 Jul 2013 03:44:47 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 71408 invoked by uid 99); 27 Jul 2013 03:44:45 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 27 Jul 2013 03:44:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 27 Jul 2013 03:44:41 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A823E23888E4 for ; Sat, 27 Jul 2013 03:44:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1507569 - in /hadoop/common/branches/branch-1: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Sat, 27 Jul 2013 03:44:19 -0000 To: common-commits@hadoop.apache.org From: tucu@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130727034419.A823E23888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tucu Date: Sat Jul 27 03:44:18 2013 New Revision: 1507569 URL: http://svn.apache.org/r1507569 Log: MAPREDUCE-4366. mapred metrics shows negative count of waiting maps and reduces. (sandyr via tucu) Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1507569&r1=1507568&r2=1507569&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Sat Jul 27 03:44:18 2013 @@ -105,6 +105,9 @@ Release 1.3.0 - unreleased HADOOP-9507. LocalFileSystem rename() is broken in some cases when destination exists. (cnauroth) + MAPREDUCE-4366. mapred metrics shows negative count of waiting maps and + reduces. (sandyr via tucu) + Release 1.2.1 - 2013.07.06 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1507569&r1=1507568&r2=1507569&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Sat Jul 27 03:44:18 2013 @@ -115,6 +115,9 @@ public class JobInProgress { // runningMapTasks include speculative tasks, so we need to capture // speculative tasks separately + // if a task is incomplete, running attempts over one per task are counted + // in these variables. if a task is complete, all its running attempts are + // included int speculativeMapTasks = 0; int speculativeReduceTasks = 0; @@ -1738,6 +1741,7 @@ public class JobInProgress { String name; String splits = ""; Enum counter = null; + boolean speculative = tip.getActiveTasks().size() > 1; if (tip.isJobSetupTask()) { launchedSetup = true; name = Values.SETUP.name(); @@ -1749,18 +1753,18 @@ public class JobInProgress { name = Values.MAP.name(); counter = Counter.TOTAL_LAUNCHED_MAPS; splits = tip.getSplitNodes(); - if (tip.getActiveTasks().size() > 1) + if (speculative) speculativeMapTasks++; - metrics.launchMap(id); - this.queueMetrics.launchMap(id); + metrics.launchMap(id, speculative); + this.queueMetrics.launchMap(id, speculative); } else { ++runningReduceTasks; name = Values.REDUCE.name(); counter = Counter.TOTAL_LAUNCHED_REDUCES; - if (tip.getActiveTasks().size() > 1) + if (speculative) speculativeReduceTasks++; - metrics.launchReduce(id); - this.queueMetrics.launchReduce(id); + metrics.launchReduce(id, speculative); + this.queueMetrics.launchReduce(id, speculative); } // Note that the logs are for the scheduled tasks only. Tasks that join on // restart has already their logs in place. @@ -2692,10 +2696,6 @@ public class JobInProgress { jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); } else if (tip.isMapTask()) { runningMapTasks -= 1; - // check if this was a sepculative task - if (oldNumAttempts > 1) { - speculativeMapTasks -= (oldNumAttempts - newNumAttempts); - } finishedMapTasks += 1; metrics.completeMap(taskid); this.queueMetrics.completeMap(taskid); @@ -2709,9 +2709,6 @@ public class JobInProgress { } } else { runningReduceTasks -= 1; - if (oldNumAttempts > 1) { - speculativeReduceTasks -= (oldNumAttempts - newNumAttempts); - } finishedReduceTasks += 1; metrics.completeReduce(taskid); this.queueMetrics.completeReduce(taskid); @@ -3003,7 +3000,7 @@ public class JobInProgress { tip.incompleteSubTask(taskid, this.status); boolean isRunning = tip.isRunning(); - boolean isComplete = tip.isComplete(); + boolean tipIsComplete = tip.isComplete(); boolean metricsDone = isComplete(); // job metrics garbage collected if (wasAttemptRunning) { @@ -3018,14 +3015,24 @@ public class JobInProgress { // hence we are decrementing the same set. // Except after garbageCollect in a different thread. if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) { + boolean incWaiting = !tipIsComplete && !metricsDone && + tip.getActiveTasks().isEmpty(); + boolean wasSpeculative = wasComplete || !tip.getActiveTasks().isEmpty(); + if (tip.isMapTask() && !metricsDone) { runningMapTasks -= 1; - metrics.failedMap(taskid); - this.queueMetrics.failedMap(taskid); + metrics.failedMap(taskid, incWaiting); + this.queueMetrics.failedMap(taskid, incWaiting); + if (wasSpeculative) { + speculativeMapTasks--; + } } else if (!metricsDone) { runningReduceTasks -= 1; - metrics.failedReduce(taskid); - this.queueMetrics.failedReduce(taskid); + metrics.failedReduce(taskid, incWaiting); + this.queueMetrics.failedReduce(taskid, incWaiting); + if (wasSpeculative) { + speculativeReduceTasks--; + } } } @@ -3042,14 +3049,14 @@ public class JobInProgress { } else if (tip.isMapTask()) { // remove from the running queue and put it in the non-running cache // if the tip is not complete i.e if the tip still needs to be run - if (!isComplete) { + if (!tipIsComplete) { retireMap(tip); failMap(tip); } } else { // remove from the running queue and put in the failed queue if the tip // is not complete - if (!isComplete) { + if (!tipIsComplete) { retireReduce(tip); failReduce(tip); } @@ -3058,7 +3065,7 @@ public class JobInProgress { // The case when the map was complete but the task tracker went down. // However, we don't need to do any metering here... - if (wasComplete && !isComplete) { + if (wasComplete && !tipIsComplete) { if (tip.isMapTask()) { // Put the task back in the cache. This will help locality for cases // where we have a different TaskTracker from the same rack/switch Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=1507569&r1=1507568&r2=1507569&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Sat Jul 27 03:44:18 2013 @@ -32,22 +32,22 @@ class JobTrackerInstrumentation { tracker = jt; } - public void launchMap(TaskAttemptID taskAttemptID) + public void launchMap(TaskAttemptID taskAttemptID, boolean speculative) { } public void completeMap(TaskAttemptID taskAttemptID) { } - public void failedMap(TaskAttemptID taskAttemptID) + public void failedMap(TaskAttemptID taskAttemptID, boolean taskNowWaiting) { } - public void launchReduce(TaskAttemptID taskAttemptID) + public void launchReduce(TaskAttemptID taskAttemptID, boolean speculative) { } public void completeReduce(TaskAttemptID taskAttemptID) { } - public void failedReduce(TaskAttemptID taskAttemptID) + public void failedReduce(TaskAttemptID taskAttemptID, boolean taskNowWaiting) { } public void submitJob(JobConf conf, JobID id) Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java?rev=1507569&r1=1507568&r2=1507569&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java Sat Jul 27 03:44:18 2013 @@ -111,9 +111,11 @@ class JobTrackerMetricsSource extends Jo } @Override - public void launchMap(TaskAttemptID taskAttemptID) { + public void launchMap(TaskAttemptID taskAttemptID, boolean speculative) { mapsLaunched.incr(); - decWaitingMaps(taskAttemptID.getJobID(), 1); + if (!speculative) { + decWaitingMaps(taskAttemptID.getJobID(), 1); + } } @Override @@ -122,15 +124,19 @@ class JobTrackerMetricsSource extends Jo } @Override - public void failedMap(TaskAttemptID taskAttemptID) { + public void failedMap(TaskAttemptID taskAttemptID, boolean incWaiting) { mapsFailed.incr(); - addWaitingMaps(taskAttemptID.getJobID(), 1); + if (incWaiting) { + addWaitingMaps(taskAttemptID.getJobID(), 1); + } } @Override - public void launchReduce(TaskAttemptID taskAttemptID) { + public void launchReduce(TaskAttemptID taskAttemptID, boolean speculative) { redsLaunched.incr(); - decWaitingReduces(taskAttemptID.getJobID(), 1); + if (!speculative) { + decWaitingReduces(taskAttemptID.getJobID(), 1); + } } @Override @@ -139,9 +145,11 @@ class JobTrackerMetricsSource extends Jo } @Override - public void failedReduce(TaskAttemptID taskAttemptID) { + public void failedReduce(TaskAttemptID taskAttemptID, boolean incWaiting) { redsFailed.incr(); - addWaitingReduces(taskAttemptID.getJobID(), 1); + if (incWaiting) { + addWaitingReduces(taskAttemptID.getJobID(), 1); + } } @Override Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1507569&r1=1507568&r2=1507569&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Sat Jul 27 03:44:18 2013 @@ -219,7 +219,7 @@ public class LocalJobRunner implements J map.setConf(localConf); try { map_tasks.getAndIncrement(); - myMetrics.launchMap(mapId); + myMetrics.launchMap(mapId, false); map.run(localConf, Job.this); myMetrics.completeMap(mapId); } finally { @@ -393,8 +393,8 @@ public class LocalJobRunner implements J reduce.localizeConfiguration(localConf); reduce.setConf(localConf); reduce_tasks += 1; - myMetrics.launchReduce(reduce.getTaskID()); - queueMetrics.launchReduce(reduce.getTaskID()); + myMetrics.launchReduce(reduce.getTaskID(), false); + queueMetrics.launchReduce(reduce.getTaskID(), false); reduce.run(localConf, this); myMetrics.completeReduce(reduce.getTaskID()); queueMetrics.completeReduce(reduce.getTaskID()); Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java?rev=1507569&r1=1507568&r2=1507569&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java Sat Jul 27 03:44:18 2013 @@ -136,32 +136,40 @@ class QueueMetrics implements MetricsSou registry.snapshot(builder.addRecord(registry.name()), all); } - public void launchMap(TaskAttemptID taskAttemptID) { + public void launchMap(TaskAttemptID taskAttemptID, boolean speculative) { mapsLaunched.incr(); - decWaitingMaps(taskAttemptID.getJobID(), 1); + if (!speculative) { + decWaitingMaps(taskAttemptID.getJobID(), 1); + } } public void completeMap(TaskAttemptID taskAttemptID) { mapsCompleted.incr(); } - public void failedMap(TaskAttemptID taskAttemptID) { + public void failedMap(TaskAttemptID taskAttemptID, boolean incWaiting) { mapsFailed.incr(); - addWaitingMaps(taskAttemptID.getJobID(), 1); + if (incWaiting) { + addWaitingMaps(taskAttemptID.getJobID(), 1); + } } - public void launchReduce(TaskAttemptID taskAttemptID) { + public void launchReduce(TaskAttemptID taskAttemptID, boolean speculative) { redsLaunched.incr(); - decWaitingReduces(taskAttemptID.getJobID(), 1); + if (!speculative) { + decWaitingReduces(taskAttemptID.getJobID(), 1); + } } public void completeReduce(TaskAttemptID taskAttemptID) { redsCompleted.incr(); } - public void failedReduce(TaskAttemptID taskAttemptID) { + public void failedReduce(TaskAttemptID taskAttemptID, boolean incWaiting) { redsFailed.incr(); - addWaitingReduces(taskAttemptID.getJobID(), 1); + if (incWaiting) { + addWaitingReduces(taskAttemptID.getJobID(), 1); + } } public void submitJob(JobConf conf, JobID id) { Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java?rev=1507569&r1=1507568&r2=1507569&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java Sat Jul 27 03:44:18 2013 @@ -54,21 +54,21 @@ public class TestQueueMetrics extends Te QueueMetrics metrics = QueueMetrics.create(queueName, new Configuration()); assertEquals(metrics.getQueueName(), "single"); - metrics.launchMap(taskAttemptID); + metrics.launchMap(taskAttemptID, false); checkMaps(metrics, 1, 0, 0, 0, -1, 0); metrics.addWaitingMaps(taskAttemptID.getJobID(), 5); - metrics.launchMap(taskAttemptID); + metrics.launchMap(taskAttemptID, false); checkMaps(metrics, 2, 0, 0, 0, 3, 0); checkReduces(metrics, 0, 0, 0, 0, 0, 0); metrics.completeMap(taskAttemptID); - metrics.failedMap(taskAttemptID); + metrics.failedMap(taskAttemptID, true); checkMaps(metrics, 2, 1, 1, 0, 4, 0); checkReduces(metrics, 0, 0, 0, 0, 0, 0); - metrics.launchReduce(taskAttemptID); + metrics.launchReduce(taskAttemptID, false); metrics.completeReduce(taskAttemptID); - metrics.failedReduce(taskAttemptID); + metrics.failedReduce(taskAttemptID, true); checkMaps(metrics, 2, 1, 1, 0, 4, 0); checkReduces(metrics, 1, 1, 1, 0, 0, 0);