hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r396605 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/
Date Mon, 24 Apr 2006 17:02:52 GMT
Author: cutting
Date: Mon Apr 24 10:02:39 2006
New Revision: 396605

URL: http://svn.apache.org/viewcvs?rev=396605&view=rev
Log:
Fix HADOOP-157.  Make dfs client wait long enough for locks on abandoned files to expire when
creating files, so that when a task that writes to dfs fails, its replacements do not also
immediately fail when they try to open the same files.  Contributed by Owen O'Malley.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RemoteException.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=396605&r1=396604&r2=396605&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Apr 24 10:02:39 2006
@@ -72,6 +72,14 @@
 20. Fix HADOOP-69.  Don't throw a NullPointerException when getting
     hints for non-existing file split.  (Bryan Pendelton via cutting)
 
+21. Fix HADOOP-157.  When a task that writes dfs files (e.g., a reduce
+    task) failed and was retried, it would fail again and again,
+    eventually failing the job.  The problem was that dfs did not yet
+    know that the failed task had abandoned the files, and would not
+    yet let another task create files with the same names.  Dfs now
+    retries when creating a file long enough for locks on abandoned
+    files to expire.  (omalley via cutting)
+
 
 Release 0.1.1 - 2006-04-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=396605&r1=396604&r2=396605&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Mon Apr 24 10:02:39
2006
@@ -86,7 +86,7 @@
      * A null response means the NameNode could not allocate a block,
      * and that the caller should try again.
      */
-    public LocatedBlock addBlock(String src, String clientMachine) throws IOException;
+    public LocatedBlock addBlock(String src, String clientName) throws IOException;
 
     /**
      * A client that wants to abandon writing to the current file

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=396605&r1=396604&r2=396605&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Apr 24 10:02:39
2006
@@ -353,6 +353,9 @@
                         namenode.renewLease(clientName);
                         lastRenewed = System.currentTimeMillis();
                     } catch (IOException ie) {
+                      String err = StringUtils.stringifyException(ie);
+                      LOG.warning("Problem renewing lease for " + clientName +
+                                  ": " + err);
                     }
                 }
                 try {
@@ -679,31 +682,15 @@
          */
         private synchronized void nextBlockOutputStream() throws IOException {
             boolean retry = false;
-            long start = System.currentTimeMillis();
+            long startTime = System.currentTimeMillis();
             do {
                 retry = false;
                 
-                long localstart = System.currentTimeMillis();
-                boolean blockComplete = false;
-                LocatedBlock lb = null;                
-                while (! blockComplete) {
-                    if (firstTime) {
-                        lb = namenode.create(src.toString(), clientName.toString(), localName,
overwrite, replication);
-                    } else {
-                        lb = namenode.addBlock(src.toString(), localName);
-                    }
-
-                    if (lb == null) {
-                        try {
-                            Thread.sleep(400);
-                            if (System.currentTimeMillis() - localstart > 5000) {
-                                LOG.info("Waiting to find new output block node for " + (System.currentTimeMillis()
- start) + "ms");
-                            }
-                        } catch (InterruptedException ie) {
-                        }
-                    } else {
-                        blockComplete = true;
-                    }
+                LocatedBlock lb;
+                if (firstTime) {
+                  lb = locateNewBlock();
+                } else {
+                  lb = locateFollowingBlock(startTime);
                 }
 
                 block = lb.getBlock();
@@ -721,7 +708,7 @@
                 } catch (IOException ie) {
                     // Connection failed.  Let's wait a little bit and retry
                     try {
-                        if (System.currentTimeMillis() - start > 5000) {
+                        if (System.currentTimeMillis() - startTime > 5000) {
                             LOG.info("Waiting to find target node: " + target);
                         }
                         Thread.sleep(6000);
@@ -754,6 +741,65 @@
                 blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
             } while (retry);
             firstTime = false;
+        }
+
+        private LocatedBlock locateNewBlock() throws IOException {     
+          int retries = 3;
+          while (true) {
+            while (true) {
+              try {
+                return namenode.create(src.toString(), clientName.toString(),
+                    localName, overwrite, replication);
+              } catch (RemoteException e) {
+                if (--retries == 0 || 
+                    "org.apache.hadoop.dfs.NameNode.AlreadyBeingCreatedException".
+                        equals(e.getClassName())) {
+                  throw e;
+                } else {
+                  // because failed tasks take upto LEASE_PERIOD to
+                  // release their pendingCreates files, if the file
+                  // we want to create is already being created, 
+                  // wait and try again.
+                  LOG.info(StringUtils.stringifyException(e));
+                  try {
+                    Thread.sleep(LEASE_PERIOD);
+                  } catch (InterruptedException ie) {
+                  }
+                }
+              }
+            }
+          }
+        }
+        
+        private LocatedBlock locateFollowingBlock(long start
+                                                  ) throws IOException {     
+          int retries = 5;
+          while (true) {
+            long localstart = System.currentTimeMillis();
+            while (true) {
+              try {
+                return namenode.addBlock(src.toString(), 
+                                         clientName.toString());
+              } catch (RemoteException e) {
+                if (--retries == 0 || 
+                    "org.apache.hadoop.dfs.NameNode.NotReplicatedYetException".
+                        equals(e.getClassName())) {
+                  throw e;
+                } else {
+                  LOG.info(StringUtils.stringifyException(e));
+                  if (System.currentTimeMillis() - localstart > 5000) {
+                    LOG.info("Waiting for replication for " + 
+                             (System.currentTimeMillis() - localstart)/1000 + 
+                             " seconds");
+                  }
+                  try {
+                    Thread.sleep(400);
+                  } catch (InterruptedException ie) {
+                  }
+                }                
+              }
+            }
+          } 
         }
 
         /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=396605&r1=396604&r2=396605&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Apr 24 10:02:39
2006
@@ -233,8 +233,9 @@
      * writes data.  Subsequent items in the list must be provided in
      * the connection to the first datanode.
      * @return Return an array that consists of the block, plus a set
-     * of machines, or null if src is invalid for creation (based on
-     * {@link FSDirectory#isValidToCreate(UTF8)}.
+     * of machines
+     * @throws IOException if the filename is invalid
+     *         {@link FSDirectory#isValidToCreate(UTF8)}.
      */
     public synchronized Object[] startFile( UTF8 src, 
                                             UTF8 holder, 
@@ -242,11 +243,12 @@
                                             boolean overwrite,
                                             short replication 
                                           ) throws IOException {
+      try {
         if (pendingCreates.get(src) != null) {
-          LOG.warning("Cannot create file " + src + " for " + holder +
+          String msg = "Cannot create file " + src + " for " + holder +
                        " on " + clientMachine + 
-                       " because pendingCreates is non-null.");
-          return null;
+                       " because pendingCreates is non-null.";
+          throw new NameNode.AlreadyBeingCreatedException(msg);
         }
 
         if( replication > maxReplication )
@@ -261,27 +263,28 @@
             + "Requested replication " + replication
             + " is less than the required minimum " + minReplication );
         
-        boolean fileValid = dir.isValidToCreate(src);
-        if (overwrite && ! fileValid) {
+        if (!dir.isValidToCreate(src)) {
+          if (overwrite) {
             delete(src);
-            fileValid = true;
-        }
-  
-        if ( ! fileValid) {
-          LOG.warning("Cannot start file because it is invalid. src=" + src);
-          return null;
+          } else {
+            throw new IOException("Can't create file " + src + 
+                                  ", because the filename is invalid.");
+          }
         }
 
         // Get the array of replication targets 
         DatanodeInfo targets[] = chooseTargets(replication, null, clientMachine);
         if (targets.length < this.minReplication) {
-            LOG.warning("Target-length is " + targets.length +
-                ", below MIN_REPLICATION (" + this.minReplication+ ")");
-            return null;
-        }
+            throw new IOException("Target-length is " + targets.length +
+                                  ", below MIN_REPLICATION (" + 
+                                  minReplication+ ")");
+       }
 
         // Reserve space for this pending file
-        pendingCreates.put(src, new FileUnderConstruction( replication ));
+        pendingCreates.put(src, 
+                           new FileUnderConstruction(replication, 
+                                                     holder,
+                                                     clientMachine));
         LOG.fine("Adding " + src + " to pendingCreates for " + holder);
         synchronized (leases) {
             Lease lease = (Lease) leases.get(holder);
@@ -302,6 +305,10 @@
         results[0] = allocateBlock(src);
         results[1] = targets;
         return results;
+      } catch (IOException ie) {
+        LOG.warning(ie.getMessage());
+        throw ie;
+      }
     }
 
     /**
@@ -315,29 +322,42 @@
      * are replicated.  Will return an empty 2-elt array if we want the
      * client to "try again later".
      */
-    public synchronized Object[] getAdditionalBlock(UTF8 src, UTF8 clientMachine) {
-        Object results[] = null;
+    public synchronized Object[] getAdditionalBlock(UTF8 src, 
+                                                    UTF8 clientName
+                                                    ) throws IOException {
         FileUnderConstruction pendingFile = 
           (FileUnderConstruction) pendingCreates.get(src);
-        if (dir.getFile(src) == null && pendingFile != null) {
-            results = new Object[2];
-
-            //
-            // If we fail this, bad things happen!
-            //
-            if (checkFileProgress(src)) {
-                // Get the array of replication targets 
-                DatanodeInfo targets[] = chooseTargets(pendingFile.getReplication(), null,
clientMachine);
-                if (targets.length < this.minReplication) {
-                    return null;
-                }
+        // make sure that we still have the lease on this file
+        if (pendingFile == null) {
+          throw new NameNode.LeaseExpiredException("No lease on " + src);
+        }
+        if (!pendingFile.getClientName().equals(clientName)) {
+          throw new NameNode.LeaseExpiredException("Lease mismatch on " + src + 
+              " owned by " + pendingFile.getClientName() + 
+              " and appended by " + clientName);
+        }
+        if (dir.getFile(src) != null) {
+          throw new IOException("File " + src + " created during write");
+        }
 
-                // Create next block
-                results[0] = allocateBlock(src);
-                results[1] = targets;
-            }
+        //
+        // If we fail this, bad things happen!
+        //
+        if (!checkFileProgress(src)) {
+          throw new NameNode.NotReplicatedYetException("Not replicated yet");
         }
-        return results;
+        
+        // Get the array of replication targets 
+        DatanodeInfo targets[] = chooseTargets(pendingFile.getReplication(), 
+            null, pendingFile.getClientMachine());
+        if (targets.length < this.minReplication) {
+          throw new IOException("File " + src + " could only be replicated to " +
+                                targets.length + " nodes, instead of " +
+                                minReplication);
+        }
+        
+        // Create next block
+        return new Object[]{allocateBlock(src), targets};
     }
 
     /**
@@ -347,8 +367,10 @@
         //
         // Remove the block from the pending creates list
         //
-        Vector pendingVector = (Vector) pendingCreates.get(src);
-        if (pendingVector != null) {
+        FileUnderConstruction pendingFile = 
+          (FileUnderConstruction) pendingCreates.get(src);
+        if (pendingFile != null) {
+            Vector pendingVector = pendingFile.getBlocks();
             for (Iterator it = pendingVector.iterator(); it.hasNext(); ) {
                 Block cur = (Block) it.next();
                 if (cur.compareTo(b) == 0) {
@@ -368,7 +390,9 @@
                                                    UTF8 holder
                                                    ) throws IOException {
       LOG.info("abandoning file in progress on " + src.toString());
-      internalReleaseCreate(src, holder);
+      synchronized (leases) {
+        internalReleaseCreate(src, holder);
+      }
     }
 
     /**
@@ -387,9 +411,11 @@
             return STILL_WAITING;
         }
         
-        FileUnderConstruction pendingFile = (FileUnderConstruction) pendingCreates.get(src);
-        int nrBlocks = pendingFile.size();
-        Block pendingBlocks[] = (Block[]) pendingFile.toArray(new Block[nrBlocks]);
+        FileUnderConstruction pendingFile = 
+            (FileUnderConstruction) pendingCreates.get(src);
+        Vector blocks = pendingFile.getBlocks();
+        int nrBlocks = blocks.size();
+        Block pendingBlocks[] = (Block[]) blocks.toArray(new Block[nrBlocks]);
 
         //
         // We have the pending blocks, but they won't have
@@ -473,7 +499,7 @@
         Block b = new Block();
         FileUnderConstruction v = 
           (FileUnderConstruction) pendingCreates.get(src);
-        v.add(b);
+        v.getBlocks().add(b);
         pendingCreateBlocks.add(b);
         return b;
     }
@@ -486,7 +512,7 @@
         FileUnderConstruction v = 
           (FileUnderConstruction) pendingCreates.get(src);
 
-        for (Iterator it = v.iterator(); it.hasNext(); ) {
+        for (Iterator it = v.getBlocks().iterator(); it.hasNext(); ) {
             Block b = (Block) it.next();
             TreeSet containingNodes = (TreeSet) blocksMap.get(b);
             if (containingNodes == null || containingNodes.size() < this.minReplication)
{
@@ -639,8 +665,8 @@
     class Lease implements Comparable {
         public UTF8 holder;
         public long lastUpdate;
-        TreeSet locks = new TreeSet();
-        TreeSet creates = new TreeSet();
+        private TreeSet locks = new TreeSet();
+        private TreeSet creates = new TreeSet();
 
         public Lease(UTF8 holder) {
             this.holder = holder;
@@ -803,7 +829,7 @@
           if (v != null) {
             LOG.info("Removing " + src + " from pendingCreates for " + 
                      holder + " (failure)");
-            for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
+            for (Iterator it2 = v.getBlocks().iterator(); it2.hasNext(); ) {
               Block b = (Block) it2.next();
               pendingCreateBlocks.remove(b);
             }
@@ -1416,15 +1442,35 @@
      * 
      * @author shv
      */
-    private class FileUnderConstruction extends Vector {
+    private class FileUnderConstruction {
       private short blockReplication; // file replication
+      private Vector blocks;
+      private UTF8 clientName;         // lease holder
+      private UTF8 clientMachine;
       
-      FileUnderConstruction( short replication ) throws IOException {
+      FileUnderConstruction(short replication,
+                            UTF8 clientName,
+                            UTF8 clientMachine) throws IOException {
         this.blockReplication = replication;
+        this.blocks = new Vector();
+        this.clientName = clientName;
+        this.clientMachine = clientMachine;
       }
       
       public short getReplication() {
         return this.blockReplication;
+      }
+      
+      public Vector getBlocks() {
+        return blocks;
+      }
+      
+      public UTF8 getClientName() {
+        return clientName;
+      }
+      
+      public UTF8 getClientMachine() {
+        return clientMachine;
       }
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=396605&r1=396604&r2=396605&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon Apr 24 10:02:39 2006
@@ -18,7 +18,7 @@
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.LogFormatter;
+import org.apache.hadoop.util.*;
 
 import java.io.*;
 import java.util.logging.*;
@@ -140,6 +140,37 @@
     }
 
     /**
+     * The exception that happens when you ask to create a file that already
+     * is being created, but is not closed yet.
+     * @author Owen O'Malley
+     */
+    public static class AlreadyBeingCreatedException extends IOException {
+      public AlreadyBeingCreatedException(String msg) {
+        super(msg);
+      }
+    }
+    
+    /**
+     * The lease that was being used to create this file has expired.
+     * @author Owen O'Malley
+     */
+    public static class LeaseExpiredException extends IOException {
+      public LeaseExpiredException(String msg) {
+        super(msg);
+      }
+    }
+    
+    /**
+     * The file has not finished being written to enough datanodes yet.
+     * @author Owen O'Malley
+     */
+    public static class NotReplicatedYetException extends IOException {
+      public NotReplicatedYetException(String msg) {
+        super(msg);
+      }
+    }
+    
+    /**
      */
     public LocatedBlock create(String src, 
                                String clientName, 
@@ -152,9 +183,6 @@
                                                 new UTF8(clientMachine), 
                                                 overwrite,
                                                 replication);
-        if (results == null)
-            throw new IOException("Cannot create file " + src + " on client " + clientName);
-
         Block b = (Block) results[0];
         DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
         return new LocatedBlock(b, targets);
@@ -162,27 +190,14 @@
 
     /**
      */
-    public LocatedBlock addBlock(String src, String clientMachine) throws IOException {
-        int retries = 5;
-        Object results[] = namesystem.getAdditionalBlock(new UTF8(src), new UTF8(clientMachine));
-        while (results != null && results[0] == null && retries > 0) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException ie) {
-            }
-            results = namesystem.getAdditionalBlock(new UTF8(src), new UTF8(clientMachine));
-            retries--;
-        }
-
-        if (results == null) {
-            throw new IOException("Cannot obtain additional block for file " + src);
-        } else if (results[0] == null) {
-            return null;
-        } else {
-            Block b = (Block) results[0];
-            DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
-            return new LocatedBlock(b, targets);
-        }
+    public LocatedBlock addBlock(String src, 
+                                 String clientName) throws IOException {
+        UTF8 src8 = new UTF8(src);
+        UTF8 client8 = new UTF8(clientName);
+        Object[] results = namesystem.getAdditionalBlock(src8, client8);
+        Block b = (Block) results[0];
+        DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
+        return new LocatedBlock(b, targets);            
     }
 
     /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=396605&r1=396604&r2=396605&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Mon Apr 24 10:02:39 2006
@@ -29,8 +29,6 @@
 import java.io.FilterInputStream;
 import java.io.FilterOutputStream;
 
-import java.rmi.RemoteException;
-
 import java.util.Hashtable;
 import java.util.logging.Logger;
 import java.util.logging.Level;
@@ -39,6 +37,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.UTF8;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
@@ -65,7 +64,7 @@
     int id;                                       // call id
     Writable param;                               // parameter
     Writable value;                               // value, null if error
-    String error;                                 // error, null if value
+    RemoteException error;                        // error, null if value
     long lastActivity;                            // time of last i/o
     boolean done;                                 // true when call is done
 
@@ -89,7 +88,7 @@
     }
 
     /** Update lastActivity with the current time. */
-    public synchronized void setResult(Writable value, String error) {
+    public synchronized void setResult(Writable value, RemoteException error) {
       this.value = value;
       this.error = error;
       this.done = true;
@@ -157,9 +156,10 @@
           Call call = (Call)calls.remove(new Integer(id));
           boolean isError = in.readBoolean();     // read if error
           if (isError) {
-            UTF8 utf8 = new UTF8();
-            utf8.readFields(in);                  // read error string
-            call.setResult(null, utf8.toString());
+            RemoteException ex = 
+              new RemoteException(WritableUtils.readString(in),
+                                  WritableUtils.readString(in));
+            call.setResult(null, ex);
           } else {
             Writable value = makeValue();
             try {
@@ -300,7 +300,7 @@
       } while (!call.done && wait > 0);
 
       if (call.error != null) {
-        throw new RemoteException(call.error);
+        throw call.error;
       } else if (!call.done) {
         throw new IOException("timed out waiting for response");
       } else {

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RemoteException.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RemoteException.java?rev=396605&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RemoteException.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RemoteException.java Mon Apr 24 10:02:39
2006
@@ -0,0 +1,32 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+
+public class RemoteException extends IOException {
+  private String className;
+  
+  public RemoteException(String className, String msg) {
+    super(msg);
+    this.className = className;
+  }
+  
+  public String getClassName() {
+    return className;
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=396605&r1=396604&r2=396605&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Mon Apr 24 10:02:39 2006
@@ -38,6 +38,7 @@
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.UTF8;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
@@ -210,15 +211,14 @@
             LOG.fine(getName() + ": has #" + call.id + " from " +
                      call.connection.socket.getInetAddress().getHostAddress());
           
+          String errorClass = null;
           String error = null;
           Writable value = null;
           try {
             value = call(call.param);             // make the call
-          } catch (IOException e) {
-            LOG.log(Level.INFO, getName() + " call error: " + e, e);
-            error = getStackTrace(e);
-          } catch (Exception e) {
+          } catch (Throwable e) {
             LOG.log(Level.INFO, getName() + " call error: " + e, e);
+            errorClass = e.getClass().getName();
             error = getStackTrace(e);
           }
             
@@ -226,9 +226,12 @@
           synchronized (out) {
             out.writeInt(call.id);                // write call id
             out.writeBoolean(error!=null);        // write error flag
-            if (error != null)
-              value = new UTF8(error);
-            value.write(out);                     // write value
+            if (error == null) {
+              value.write(out);
+            } else {
+              WritableUtils.writeString(out, errorClass);
+              WritableUtils.writeString(out, error);
+            }
             out.flush();
           }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=396605&r1=396604&r2=396605&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Mon Apr 24 10:02:39
2006
@@ -269,7 +269,7 @@
       logStream(process.getInputStream());        // normally empty
       
       int exit_code = process.waitFor();
-      if (exit_code != 0) {
+      if (!killed && exit_code != 0) {
         throw new IOException("Task process exit with nonzero status of " +
                               exit_code + ".");
       }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=396605&r1=396604&r2=396605&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Apr 24 10:02:39
2006
@@ -282,11 +282,16 @@
             synchronized (this) {
                 for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
                     TaskInProgress tip = (TaskInProgress) it.next();
+                    long timeSinceLastReport = System.currentTimeMillis() - 
+                                               tip.getLastProgressReport();
                     if ((tip.getRunState() == TaskStatus.RUNNING) &&
-                        (System.currentTimeMillis() - tip.getLastProgressReport() > this.taskTimeout)
&&
+                        (timeSinceLastReport > this.taskTimeout) &&
                         !tip.wasKilled) {
-                        LOG.info("Task " + tip.getTask().getTaskId() + " timed out.  Killing.");
-                        tip.reportDiagnosticInfo("Timed out.");
+                        String msg = "Task failed to report status for " +
+                                     (timeSinceLastReport / 1000) + 
+                                     " seconds. Killing.";
+                        LOG.info(tip.getTask().getTaskId() + ": " + msg);
+                        tip.reportDiagnosticInfo(msg);
                         tip.killAndCleanup(true);
                     }
                 }



Mime
View raw message