hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r387279 - in /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred: MapOutputFile.java TaskTracker.java
Date Mon, 20 Mar 2006 19:08:08 GMT
Author: cutting
Date: Mon Mar 20 11:08:07 2006
New Revision: 387279

URL: http://svn.apache.org/viewcvs?rev=387279&view=rev
Log:
Fix for HADOOP-86.  Errors while reading map output now cause map task to fail and be re-executed.

Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=387279&r1=387278&r2=387279&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Mon Mar 20 11:08:07
2006
@@ -23,6 +23,8 @@
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskTracker.MapOutputServer;
 
 /** A local file to be transferred via the {@link MapOutputProtocol}. */ 
 class MapOutputFile implements Writable, Configurable {
@@ -106,16 +108,15 @@
     UTF8.writeString(out, reduceTaskId);
     out.writeInt(partition);
     
-    // write the length-prefixed file content to the wire
     File file = getOutputFile(mapTaskId, partition);
-    out.writeLong(file.length());
-
     FSDataInputStream in = null;
     try {
+      // write the length-prefixed file content to the wire
+      out.writeLong(file.length());
       in = FileSystem.getNamed("local", this.jobConf).open(file);
-    } catch (IOException e) {
-      // log a SEVERE exception in order to cause TaskTracker to exit
+    } catch (FileNotFoundException e) {
       TaskTracker.LOG.log(Level.SEVERE, "Can't open map output:" + file, e);
+      ((MapOutputServer)Server.get()).getTaskTracker().mapOutputLost(mapTaskId);
       throw e;
     }
     try {
@@ -127,8 +128,8 @@
         try {
           l = in.read(buffer);
         } catch (IOException e) {
-          // log a SEVERE exception in order to cause TaskTracker to exit
           TaskTracker.LOG.log(Level.SEVERE,"Can't read map output:" + file, e);
+          ((MapOutputServer)Server.get()).getTaskTracker().mapOutputLost(mapTaskId);
           throw e;
         }
       }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=387279&r1=387278&r2=387279&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Mar 20 11:08:07
2006
@@ -70,6 +70,15 @@
 
     private int maxCurrentTasks;
 
+    class MapOutputServer extends RPC.Server {
+      private MapOutputServer(int port, int threads) {
+        super(TaskTracker.this, fConf, port, threads, false);
+      }
+      public TaskTracker getTaskTracker() {
+        return TaskTracker.this;
+      }
+    }
+
     /**
      * Start with the local machine name, and the default JobTracker
      */
@@ -127,7 +136,7 @@
         }
         while (true) {
             try {
-                this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, maxCurrentTasks,
false, this.fConf);
+                this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks);
                 this.mapOutputServer.start();
                 break;
             } catch (BindException e) {
@@ -305,11 +314,6 @@
               }
             }
             lastHeartbeat = now;
-
-            if (LogFormatter.hasLoggedSevere()) {
-              LOG.info("Severe problem detected.  TaskTracker exiting.");
-              return STALE_STATE;
-            }
         }
 
         return 0;
@@ -539,6 +543,22 @@
         }
 
         /**
+         * The map output has been lost.
+         */
+        public synchronized void mapOutputLost() throws IOException {
+            if (runstate == TaskStatus.SUCCEEDED) {
+              LOG.info("Reporting output lost:"+task.getTaskId());
+              runstate = TaskStatus.FAILED;       // change status to failure
+              synchronized (TaskTracker.this) {   // force into next heartbeat
+                runningTasks.put(task.getTaskId(), this);
+                mapTotal++;
+              }
+            } else {
+              LOG.warning("Output already reported lost:"+task.getTaskId());
+            }
+        }
+
+        /**
          * We no longer need anything from this task.  Either the 
          * controlling job is all done and the files have been copied
          * away, or the task failed and we don't need the remains.
@@ -642,6 +662,18 @@
           tip.taskFinished();
         } else {
           LOG.warning("Unknown child task finshed: "+taskid+". Ignored.");
+        }
+    }
+
+    /**
+     * A completed map task's output has been lost.
+     */
+    public synchronized void mapOutputLost(String taskid) throws IOException {
+        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
+        if (tip != null) {
+          tip.mapOutputLost();
+        } else {
+          LOG.warning("Unknown child with bad map output: "+taskid+". Ignored.");
         }
     }
 



Mime
View raw message