hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r644763 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/mapred/
Date Fri, 04 Apr 2008 16:31:50 GMT
Author: ddas
Date: Fri Apr  4 09:31:43 2008
New Revision: 644763

URL: http://svn.apache.org/viewvc?rev=644763&view=rev
Log:
HADOOP-3140. Doesn't add a task in the commit queue if the task hadn't generated any output.
Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=644763&r1=644762&r2=644763&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Apr  4 09:31:43 2008
@@ -246,6 +246,9 @@
                              hold key/value indexes (default 5%)
     (cdouglas via omalley)
 
+    HADOOP-3140. Doesn't add a task in the commit queue if the task hadn't
+    generated any output. (Amar Kamat via ddas)
+
   BUG FIXES
 
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=644763&r1=644762&r2=644763&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Fri Apr  4 09:31:43
2008
@@ -44,7 +44,7 @@
       return TaskUmbilicalProtocol.versionID;
     }
     
-    public void done(String taskid) throws IOException {
+    public void done(String taskid, boolean shouldPromote) throws IOException {
       LOG.info("Task " + taskid + " reporting done.");
     }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=644763&r1=644762&r2=644763&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Apr  4 09:31:43
2008
@@ -466,6 +466,13 @@
     double oldProgress = tip.getProgress();   // save old progress
     boolean wasRunning = tip.isRunning();
     boolean wasComplete = tip.isComplete();
+    // If the TIP is already completed and the task reports as SUCCEEDED then 
+    // mark the task as KILLED.
+    // In case of task with no promotion the task tracker will mark the task 
+    // as SUCCEEDED.
+    if (wasComplete && (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
+      status.setRunState(TaskStatus.State.KILLED);
+    }
     boolean change = tip.updateStatus(status);
     if (change) {
       TaskStatus.State state = status.getRunState();
@@ -473,9 +480,7 @@
         this.jobtracker.getTaskTracker(status.getTaskTracker());
       String httpTaskLogLocation = null; 
 
-      if (state == TaskStatus.State.COMMIT_PENDING ||
-          state == TaskStatus.State.FAILED ||
-          state == TaskStatus.State.KILLED) {
+      if (state == TaskStatus.State.COMMIT_PENDING) {
         JobWithTaskContext j = new JobWithTaskContext(this, tip, 
                                                       status.getTaskId(),
                                                       metrics);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=644763&r1=644762&r2=644763&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Apr  4 09:31:43
2008
@@ -2273,19 +2273,6 @@
               }
             }
           }
-
-          for(int i = 0; i < jobList.size(); ++i) {
-            if (states[i] == TaskStatus.State.FAILED
-                || states[i] == TaskStatus.State.KILLED) {
-              try {
-                tasks[i].discardTaskOutput();
-              } catch (IOException ioe) {
-                LOG.info("Failed to discard the output of task " 
-                         + status[i].getTaskId() + " with: " 
-                         + StringUtils.stringifyException(ioe));
-              }
-            }
-          }
         } catch (InterruptedException ie) {
           break;
         }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=644763&r1=644762&r2=644763&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Apr  4 09:31:43
2008
@@ -287,7 +287,7 @@
       return true;
     }
 
-    public void done(String taskId) throws IOException {
+    public void done(String taskId, boolean shouldPromote) throws IOException {
       int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
         status.setMapProgress(1.0f);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=644763&r1=644762&r2=644763&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Apr  4 09:31:43 2008
@@ -31,6 +31,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.dfs.DistributedFileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -472,7 +473,29 @@
             Thread.currentThread().interrupt();       // interrupt ourself
           }
         }
-        umbilical.done(taskId);
+        // Check whether there is any task output
+        boolean shouldBePromoted = false;
+        try {
+          if (taskOutputPath != null) {
+            // Get the file-system for the task output directory
+            FileSystem fs = taskOutputPath.getFileSystem(conf);
+            if (fs.exists(taskOutputPath)) {
+              // Get the summary for the folder
+              ContentSummary summary = fs.getContentSummary(taskOutputPath);
+              // Check if the directory contains data to be promoted
+              // i.e total-files + total-folders - 1(itself)
+              if (summary != null 
+                  && (summary.getFileCount() + summary.getDirectoryCount() - 1)
+                      > 0) {
+                shouldBePromoted = true;
+              }
+            }
+          }
+        } catch (IOException ioe) {
+          // To be safe in case of an exception
+          shouldBePromoted = true;
+        }
+        umbilical.done(taskId, shouldBePromoted);
         LOG.info("Task '" + getTaskId() + "' done.");
         return;
       } catch (IOException ie) {
@@ -576,22 +599,4 @@
       }
     }
   }
-  
-  /**
-   * Discard the task's output on failure.
-   * 
-   * @throws IOException
-   */
-  void discardTaskOutput() throws IOException {
-    if (taskOutputPath != null) {
-      FileSystem fs = taskOutputPath.getFileSystem(conf);
-      if (fs.exists(taskOutputPath)) {
-        // Delete the temporary task-specific output directory
-        FileUtil.fullyDelete(fs, taskOutputPath);
-        LOG.info("Discarded output of task '" + getTaskId() + "' - " 
-                + taskOutputPath);
-      }
-    }
-  }
-
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=644763&r1=644762&r2=644763&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Apr  4 09:31:43
2008
@@ -1360,6 +1360,7 @@
     private TaskStatus taskStatus; 
     private long taskTimeout;
     private String debugCommand;
+    private boolean shouldPromoteOutput = false;
         
     /**
      */
@@ -1569,8 +1570,15 @@
     /**
      * The task is reporting that it's done running
      */
-    public synchronized void reportDone() {
-      this.taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
+    public synchronized void reportDone(boolean shouldPromote) {
+      TaskStatus.State state = null;
+      this.shouldPromoteOutput = shouldPromote;
+      if (shouldPromote) {
+        state = TaskStatus.State.COMMIT_PENDING;
+      } else {
+        state = TaskStatus.State.SUCCEEDED;
+      }
+      this.taskStatus.setRunState(state);
       this.taskStatus.setProgress(1.0f);
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
@@ -1602,7 +1610,11 @@
       boolean needCleanup = false;
       synchronized (this) {
         if (done) {
-          taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
+          if (shouldPromoteOutput) {
+            taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
+          } else {
+            taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+          }
         } else {
           if (!wasKilled) {
             failures += 1;
@@ -1953,10 +1965,11 @@
   /**
    * The task is done.
    */
-  public synchronized void done(String taskid) throws IOException {
+  public synchronized void done(String taskid, boolean shouldPromote) 
+  throws IOException {
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
-      tip.reportDone();
+      tip.reportDone(shouldPromote);
     } else {
       LOG.warn("Unknown child task done: "+taskid+". Ignored.");
     }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=644763&r1=644762&r2=644763&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Apr
 4 09:31:43 2008
@@ -38,8 +38,10 @@
    *         with {@link #statusUpdate(String, TaskStatus)}
    * Version 5 changed counters representation for HADOOP-2248
    * Version 6 changes the TaskStatus representation for HADOOP-2208
+   * Version 7 changes the done api (via HADOOP-3140). It now expects whether
+   *           or not the task's output needs to be promoted.
    * */
-  public static final long versionID = 6L;
+  public static final long versionID = 7L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(String taskid) throws IOException;
@@ -69,8 +71,11 @@
   boolean ping(String taskid) throws IOException;
 
   /** Report that the task is successfully completed.  Failure is assumed if
-   * the task process exits without calling this. */
-  void done(String taskid) throws IOException;
+   * the task process exits without calling this.
+   * @param taskid task's id
+   * @param shouldBePromoted whether to promote the task's output or not 
+   */
+  void done(String taskid, boolean shouldBePromoted) throws IOException;
 
   /** Report that a reduce-task couldn't shuffle map-outputs.*/
   void shuffleError(String taskId, String message) throws IOException;



Mime
View raw message