hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r933252 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/
Date Mon, 12 Apr 2010 14:19:31 GMT
Author: vinodkv
Date: Mon Apr 12 14:19:31 2010
New Revision: 933252

URL: http://svn.apache.org/viewvc?rev=933252&view=rev
Log:
MAPREDUCE-1635. ResourceEstimator does not work after MAPREDUCE-842. Contributed by Amareshwari
Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskOutputSize.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=933252&r1=933251&r2=933252&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Apr 12 14:19:31 2010
@@ -522,6 +522,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1523. Sometimes rumen trace generator fails to extract the job
     finish time. (dick king via mahadev)
 
+    MAPREDUCE-1635. ResourceEstimator does not work after MAPREDUCE-842.
+    (Amareshwari Sriramadasu via vinodkv)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=933252&r1=933251&r2=933252&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Mon Apr 12
14:19:31 2010
@@ -27,10 +27,12 @@ import org.apache.hadoop.mapreduce.MRCon
 
 /**
  * Manipulate the working area for the transient store for maps and reduces.
+ * 
  * This class is used by map and reduce tasks to identify the directories that
  * they need to write to/read from for intermediate files. The callers of 
  * these methods are from child space and see mapreduce.cluster.local.dir as 
  * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
  * 
  * <FRAMEWORK-USE-ONLY>
  * This method is intended only for use by the Map/Reduce framework and not

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=933252&r1=933251&r2=933252&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Apr 12 14:19:31
2010
@@ -816,8 +816,12 @@ abstract public class Task implements Wr
     }
   }
   
+  /**
+   * Sends last status update before sending umbilical.done(); 
+   */
   private void sendLastUpdate(TaskUmbilicalProtocol umbilical) 
   throws IOException {
+    taskStatus.setOutputSize(calculateOutputSize());
     // send a final status report
     taskStatus.statusUpdate(taskProgress.get(),
                             taskProgress.toString(), 
@@ -825,6 +829,28 @@ abstract public class Task implements Wr
     statusUpdate(umbilical);
   }
 
+  /**
+   * Calculates the size of output for this task.
+   * 
+   * @return -1 if it can't be found.
+   */
+   private long calculateOutputSize() throws IOException {
+    if (!isMapOrReduce()) {
+      return -1;
+    }
+
+    if (isMapTask() && conf.getNumReduceTasks() > 0) {
+      try {
+        Path mapOutput =  mapOutputFile.getOutputFile();
+        FileSystem localFS = FileSystem.getLocal(conf);
+        return localFS.getFileStatus(mapOutput).getLen();
+      } catch (IOException e) {
+        LOG.warn ("Could not find output size " , e);
+      }
+    }
+    return -1;
+  }
+
   private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
     int retries = MAX_RETRIES;
     while (true) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=933252&r1=933251&r2=933252&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Mon Apr 12 14:19:31
2010
@@ -54,7 +54,7 @@ public abstract class TaskStatus impleme
     
   private long startTime; //in ms
   private long finishTime; 
-  private long outputSize;
+  private long outputSize = -1L;
     
   private volatile Phase phase = Phase.STARTING; 
   private Counters counters;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=933252&r1=933251&r2=933252&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Apr 12 14:19:31
2010
@@ -2049,44 +2049,6 @@ public class TaskTracker 
     return biggestSeenSoFar;
   }
     
-  /**
-   * Try to get the size of output for this task.
-   * Returns -1 if it can't be found.
-   * @return
-   */
-  long tryToGetOutputSize(TaskAttemptID taskId, JobConf conf) {
-    
-    try{
-      TaskInProgress tip;
-      synchronized(this) {
-        tip = tasks.get(taskId);
-      }
-      if(tip == null)
-         return -1;
-      
-      if (!tip.getTask().isMapTask() || 
-          tip.getRunState() != TaskStatus.State.SUCCEEDED) {
-        return -1;
-      }
-      
-      MapOutputFile mapOutputFile = new MapOutputFile();
-      mapOutputFile.setConf(conf);
-      
-      Path tmp_output =  mapOutputFile.getOutputFile();
-      if(tmp_output == null)
-        return 0;
-      FileSystem localFS = FileSystem.getLocal(conf);
-      FileStatus stat = localFS.getFileStatus(tmp_output);
-      if(stat == null)
-        return 0;
-      else
-        return stat.getLen();
-    } catch(IOException e) {
-      LOG.info(e);
-      return -1;
-    }
-  }
-
   private TaskLauncher mapLauncher;
   private TaskLauncher reduceLauncher;
   public JvmManager getJvmManagerInstance() {
@@ -3341,7 +3303,6 @@ public class TaskTracker 
     for(TaskInProgress tip: runningTasks.values()) {
       TaskStatus status = tip.getStatus();
       status.setIncludeCounters(sendCounters);
-      status.setOutputSize(tryToGetOutputSize(status.getTaskID(), fConf));
       // send counters for finished or failed tasks and commit pending tasks
       if (status.getRunState() != TaskStatus.State.RUNNING) {
         status.setIncludeCounters(true);

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskOutputSize.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskOutputSize.java?rev=933252&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskOutputSize.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskOutputSize.java
Mon Apr 12 14:19:31 2010
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.junit.After;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestTaskOutputSize {
+  private static Path rootDir = new Path(System.getProperty("test.build.data",
+      "/tmp"), "test");
+
+  @After
+  public void tearDown() throws Exception {
+    FileUtil.fullyDelete(new File(rootDir.toString()));
+  }
+
+  @Test
+  public void testTaskOutputSize() throws Exception {
+    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
+    Path inDir = new Path(rootDir, "input");
+    Path outDir = new Path(rootDir, "output");
+    Job job = MapReduceTestUtil.createJob(mr.createJobConf(), inDir, outDir, 1, 1);
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+    for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0, 100)) {
+      TaskStatus ts = jt.getTaskStatus(TaskAttemptID.downgrade(tce
+          .getTaskAttemptId()));
+      if (tce.isMapTask()) {
+        assertTrue(
+            "map output size is not found for " + tce.getTaskAttemptId(), ts
+                .getOutputSize() > 0);
+      } else {
+        assertEquals("task output size not expected for "
+            + tce.getTaskAttemptId(), -1, ts.getOutputSize());
+      }
+    }
+
+    // test output sizes for job with no reduces
+    job = MapReduceTestUtil.createJob(mr.createJobConf(), inDir, outDir, 1, 0);
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+    for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0, 100)) {
+      TaskStatus ts = jt.getTaskStatus(TaskAttemptID.downgrade(tce
+          .getTaskAttemptId()));
+      assertEquals("task output size not expected for "
+          + tce.getTaskAttemptId(), -1, ts.getOutputSize());
+    }
+
+    // test output sizes for failed job
+    job = MapReduceTestUtil.createFailJob(mr.createJobConf(), outDir, inDir);
+    job.waitForCompletion(true);
+    assertFalse("Job not failed", job.isSuccessful());
+    for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0, 100)) {
+      TaskStatus ts = jt.getTaskStatus(TaskAttemptID.downgrade(tce
+          .getTaskAttemptId()));
+      assertEquals("task output size not expected for "
+          + tce.getTaskAttemptId(), -1, ts.getOutputSize());
+    }
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=933252&r1=933251&r2=933252&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
Mon Apr 12 14:19:31 2010
@@ -158,7 +158,10 @@ public class MapReduceTestUtil {
    */
   public static Job createFailJob(Configuration conf, Path outdir, 
       Path... indirs) throws Exception {
-
+    FileSystem fs = outdir.getFileSystem(conf);
+    if (fs.exists(outdir)) {
+      fs.delete(outdir, true);
+    }
     conf.setInt(JobContext.MAP_MAX_ATTEMPTS, 2);
     Job theJob = new Job(conf);
     theJob.setJobName("Fail-Job");



Mime
View raw message