hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1129983 - in /hadoop/common/branches/branch-0.22: CHANGES.txt src/java/org/apache/hadoop/ipc/Server.java src/test/core/org/apache/hadoop/ipc/TestIPC.java
Date Wed, 01 Jun 2011 02:02:19 GMT
Author: todd
Date: Wed Jun  1 02:02:19 2011
New Revision: 1129983

URL: http://svn.apache.org/viewvc?rev=1129983&view=rev
Log:
HADOOP-7121. Exceptions while serializing IPC call responses are not handled well. Contributed
by Todd Lipcon.

Modified:
    hadoop/common/branches/branch-0.22/CHANGES.txt
    hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestIPC.java

Modified: hadoop/common/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/CHANGES.txt?rev=1129983&r1=1129982&r2=1129983&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.22/CHANGES.txt Wed Jun  1 02:02:19 2011
@@ -494,6 +494,9 @@ Release 0.22.0 - Unreleased
 
     HADOOP-7146. RPC server leaks file descriptors (todd)
 
+    HADOOP-7121. Exceptions while serializing IPC call responses are not
+    handled well. (todd)
+
 Release 0.21.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java?rev=1129983&r1=1129982&r2=1129983&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java Wed Jun
 1 02:02:19 2011
@@ -904,9 +904,9 @@ public abstract class Server {
     public UserGroupInformation attemptingUser = null; // user name before auth
 
     // Fake 'call' for failed authorization response
-    private static final int AUTHROIZATION_FAILED_CALLID = -1;
+    private static final int AUTHORIZATION_FAILED_CALLID = -1;
     private final Call authFailedCall = 
-      new Call(AUTHROIZATION_FAILED_CALLID, null, this);
+      new Call(AUTHORIZATION_FAILED_CALLID, null, this);
     private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
     // Fake 'call' for SASL context setup
     private static final int SASL_CALLID = -33;
@@ -1341,9 +1341,22 @@ public abstract class Server {
         
       if (LOG.isDebugEnabled())
         LOG.debug(" got #" + id);
-
-      Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
-      param.readFields(dis);        
+      Writable param;
+      try {
+        param = ReflectionUtils.newInstance(paramClass, conf);//read param
+        param.readFields(dis);
+      } catch (Throwable t) {
+        LOG.warn("Unable to read call parameters for client " +
+                 getHostAddress(), t);
+        final Call readParamsFailedCall = new Call(id, null, this);
+        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+
+        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+            t.getClass().getName(),
+            "IPC server unable to read call parameters: " + t.getMessage());
+        responder.doRespond(readParamsFailedCall);
+        return;
+      }
         
       Call call = new Call(id, param, this);
       callQueue.put(call);              // queue the call; maybe blocked here
@@ -1569,7 +1582,18 @@ public abstract class Server {
     out.writeInt(status.state);           // write status
 
     if (status == Status.SUCCESS) {
-      rv.write(out);
+      try {
+        rv.write(out);
+      } catch (Throwable t) {
+        LOG.warn("Error serializing call response for call " + call, t);
+        // Call back to same function - this is OK since the
+        // buffer is reset at the top, and since status is changed
+        // to ERROR it won't infinite loop.
+        setupResponse(response, call, Status.ERROR,
+            null, t.getClass().getName(),
+            StringUtils.stringifyException(t));
+        return;
+      }
     } else {
       WritableUtils.writeString(out, errorClass);
       WritableUtils.writeString(out, error);

Modified: hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestIPC.java?rev=1129983&r1=1129982&r2=1129983&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestIPC.java Wed
Jun  1 02:02:19 2011
@@ -28,30 +28,37 @@ import org.apache.hadoop.net.NetUtils;
 import java.util.Random;
 import java.io.DataInput;
 import java.io.File;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import javax.net.SocketFactory;
 
-import junit.framework.TestCase;
+import org.junit.Test;
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Assume;
 
 /** Unit tests for IPC. */
-public class TestIPC extends TestCase {
+public class TestIPC {
   public static final Log LOG =
     LogFactory.getLog(TestIPC.class);
   
   final private static Configuration conf = new Configuration();
   final static private int PING_INTERVAL = 1000;
   final static private int MIN_SLEEP_TIME = 1000;
+
+  /**
+   * Flag used to turn off the fault injection behavior
+   * of the various writables.
+   **/
+  static boolean WRITABLE_FAULTS_ENABLED = true;
   
   static {
     Client.setPingInterval(conf, PING_INTERVAL);
   }
-  public TestIPC(String name) { super(name); }
 
   private static final Random RANDOM = new Random();
 
@@ -62,11 +69,19 @@ public class TestIPC extends TestCase {
 
   private static class TestServer extends Server {
     private boolean sleep;
+    private Class<? extends Writable> responseClass;
 
-    public TestServer(int handlerCount, boolean sleep) 
+    public TestServer(int handlerCount, boolean sleep) throws IOException {
+      this(handlerCount, sleep, LongWritable.class, null);
+    }
+    
+    public TestServer(int handlerCount, boolean sleep,
+        Class<? extends Writable> paramClass,
+        Class<? extends Writable> responseClass) 
       throws IOException {
-      super(ADDRESS, 0, LongWritable.class, handlerCount, conf);
+      super(ADDRESS, 0, paramClass, handlerCount, conf);
       this.sleep = sleep;
+      this.responseClass = responseClass;
     }
 
     @Override
@@ -78,7 +93,15 @@ public class TestIPC extends TestCase {
           Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME);
         } catch (InterruptedException e) {}
       }
-      return param;                               // echo param as result
+      if (responseClass != null) {
+        try {
+          return responseClass.newInstance();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }  
+      } else {
+        return param;                               // echo param as result
+      }
     }
   }
 
@@ -148,6 +171,7 @@ public class TestIPC extends TestCase {
     }
   }
 
+  @Test
   public void testSerial() throws Exception {
     testSerial(3, false, 2, 5, 100);
     testSerial(3, true, 2, 5, 10);
@@ -180,6 +204,7 @@ public class TestIPC extends TestCase {
     server.stop();
   }
 	
+  @Test
   public void testParallel() throws Exception {
     testParallel(10, false, 2, 4, 2, 4, 100);
   }
@@ -222,6 +247,7 @@ public class TestIPC extends TestCase {
     }
   }
 	
+  @Test
   public void testStandAloneClient() throws Exception {
     testParallel(10, false, 2, 4, 2, 4, 100);
     Client client = new Client(LongWritable.class, conf);
@@ -242,83 +268,179 @@ public class TestIPC extends TestCase {
               message.contains(causeText));
     }
   }
+  
+  static void maybeThrowIOE() throws IOException {
+    if (WRITABLE_FAULTS_ENABLED) {
+      throw new IOException("Injected fault");
+    }
+  }
 
-  private static class LongErrorWritable extends LongWritable {
-    private final static String ERR_MSG = 
-      "Come across an exception while reading";
-    
-    LongErrorWritable() {}
-    
-    LongErrorWritable(long longValue) {
-      super(longValue);
+  static void maybeThrowRTE() {
+    if (WRITABLE_FAULTS_ENABLED) {
+      throw new RuntimeException("Injected fault");
     }
-    
+  }
+
+  @SuppressWarnings("unused")
+  private static class IOEOnReadWritable extends LongWritable {
+    public IOEOnReadWritable() {}
+
     public void readFields(DataInput in) throws IOException {
       super.readFields(in);
-      throw new IOException(ERR_MSG);
+      maybeThrowIOE();
     }
   }
   
-  private static class LongRTEWritable extends LongWritable {
-    private final static String ERR_MSG = 
-      "Come across an runtime exception while reading";
-    
-    LongRTEWritable() {}
-    
-    LongRTEWritable(long longValue) {
-      super(longValue);
-    }
+  @SuppressWarnings("unused")
+  private static class RTEOnReadWritable extends LongWritable {
+    public RTEOnReadWritable() {}
     
     public void readFields(DataInput in) throws IOException {
       super.readFields(in);
-      throw new RuntimeException(ERR_MSG);
+      maybeThrowRTE();
     }
   }
+  
+  @SuppressWarnings("unused")
+  private static class IOEOnWriteWritable extends LongWritable {
+    public IOEOnWriteWritable() {}
 
-  public void testErrorClient() throws Exception {
-    // start server
-    Server server = new TestServer(1, false);
-    InetSocketAddress addr = NetUtils.getConnectAddress(server);
-    server.start();
+    @Override
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      maybeThrowIOE();
+    }
+  }
 
-    // start client
-    Client client = new Client(LongErrorWritable.class, conf);
-    try {
-      client.call(new LongErrorWritable(RANDOM.nextLong()),
-              addr, null, null, 0, conf);
-      fail("Expected an exception to have been thrown");
-    } catch (IOException e) {
-      // check error
-      Throwable cause = e.getCause();
-      assertTrue(cause instanceof IOException);
-      assertEquals(LongErrorWritable.ERR_MSG, cause.getMessage());
+  @SuppressWarnings("unused")
+  private static class RTEOnWriteWritable extends LongWritable {
+    public RTEOnWriteWritable() {}
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      maybeThrowRTE();
     }
   }
   
-  public void testRuntimeExceptionWritable() throws Exception {
+  /**
+   * Generic test case for exceptions thrown at some point in the IPC
+   * process.
+   * 
+   * @param clientParamClass - client writes this writable for parameter
+   * @param serverParamClass - server reads this writable for parameter
+   * @param serverResponseClass - server writes this writable for response
+   * @param clientResponseClass - client reads this writable for response
+   */
+  private void doErrorTest(
+      Class<? extends LongWritable> clientParamClass,
+      Class<? extends LongWritable> serverParamClass,
+      Class<? extends LongWritable> serverResponseClass,
+      Class<? extends LongWritable> clientResponseClass) throws Exception {
+    
     // start server
-    Server server = new TestServer(1, false);
+    Server server = new TestServer(1, false,
+        serverParamClass, serverResponseClass);
     InetSocketAddress addr = NetUtils.getConnectAddress(server);
     server.start();
 
     // start client
-    Client client = new Client(LongRTEWritable.class, conf);
+    WRITABLE_FAULTS_ENABLED = true;
+    Client client = new Client(clientResponseClass, conf);
     try {
-      client.call(new LongRTEWritable(RANDOM.nextLong()),
-              addr, null, null, 0, conf);
-      fail("Expected an exception to have been thrown");
-    } catch (IOException e) {
-      // check error
-      Throwable cause = e.getCause();
-      assertTrue(cause instanceof IOException);
-      // it's double-wrapped
-      Throwable cause2 = cause.getCause();
-      assertTrue(cause2 instanceof RuntimeException);
+      LongWritable param = clientParamClass.newInstance();
 
-      assertEquals(LongRTEWritable.ERR_MSG, cause2.getMessage());
+      try {
+        client.call(param, addr, null, null, 0, conf);
+        fail("Expected an exception to have been thrown");
+      } catch (Throwable t) {
+        assertExceptionContains(t, "Injected fault");
+      }
+      
+      // Doing a second call with faults disabled should return fine --
+      // ie the internal state of the client or server should not be broken
+      // by the failed call
+      WRITABLE_FAULTS_ENABLED = false;
+      client.call(param, addr, null, null, 0, conf);
+      
+    } finally {
+      server.stop();
     }
   }
 
+  @Test
+  public void testIOEOnClientWriteParam() throws Exception {
+    doErrorTest(IOEOnWriteWritable.class,
+        LongWritable.class,
+        LongWritable.class,
+        LongWritable.class);
+  }
+  
+  @Test
+  public void testRTEOnClientWriteParam() throws Exception {
+    doErrorTest(RTEOnWriteWritable.class,
+        LongWritable.class,
+        LongWritable.class,
+        LongWritable.class);
+  }
+
+  @Test
+  public void testIOEOnServerReadParam() throws Exception {
+    doErrorTest(LongWritable.class,
+        IOEOnReadWritable.class,
+        LongWritable.class,
+        LongWritable.class);
+  }
+  
+  @Test
+  public void testRTEOnServerReadParam() throws Exception {
+    doErrorTest(LongWritable.class,
+        RTEOnReadWritable.class,
+        LongWritable.class,
+        LongWritable.class);
+  }
+
+  
+  @Test
+  public void testIOEOnServerWriteResponse() throws Exception {
+    doErrorTest(LongWritable.class,
+        LongWritable.class,
+        IOEOnWriteWritable.class,
+        LongWritable.class);
+  }
+  
+  @Test
+  public void testRTEOnServerWriteResponse() throws Exception {
+    doErrorTest(LongWritable.class,
+        LongWritable.class,
+        RTEOnWriteWritable.class,
+        LongWritable.class);
+  }
+  
+  @Test
+  public void testIOEOnClientReadResponse() throws Exception {
+    doErrorTest(LongWritable.class,
+        LongWritable.class,
+        LongWritable.class,
+        IOEOnReadWritable.class);
+  }
+  
+  @Test
+  public void testRTEOnClientReadResponse() throws Exception {
+    doErrorTest(LongWritable.class,
+        LongWritable.class,
+        LongWritable.class,
+        RTEOnReadWritable.class);
+  }
+  
+  private static void assertExceptionContains(
+      Throwable t, String substring) {
+    String msg = StringUtils.stringifyException(t);
+    assertTrue("Exception should contain substring '" + substring + "':\n" +
+        msg, msg.contains(substring));
+    LOG.info("Got expected exception", t);
+  }
+  
   /**
    * Test that, if the socket factory throws an IOE, it properly propagates
    * to the client.
@@ -384,9 +506,9 @@ public class TestIPC extends TestCase {
 
   public static void main(String[] args) throws Exception {
 
-    //new TestIPC("test").testSerial(5, false, 2, 10, 1000);
+    //new TestIPC().testSerial(5, false, 2, 10, 1000);
 
-    new TestIPC("test").testParallel(10, false, 2, 4, 2, 4, 1000);
+    new TestIPC().testParallel(10, false, 2, 4, 2, 4, 1000);
 
   }
 



Mime
View raw message