hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1505589 - in /hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/io/retry/ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/
Date Mon, 22 Jul 2013 04:17:22 GMT
Author: suresh
Date: Mon Jul 22 04:17:22 2013
New Revision: 1505589

URL: http://svn.apache.org/r1505589
Log:
HADOOP-9716. Merge 1505052 from branch-2

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1505589&r1=1505588&r2=1505589&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
Mon Jul 22 04:17:22 2013
@@ -170,6 +170,9 @@ Release 2.1.0-beta - 2013-07-02
     HADOOP-9734. Common protobuf definitions for GetUserMappingsProtocol,
     RefreshAuthorizationPolicyProtocol and RefreshUserMappingsProtocol (jlowe)
 
+    HADOOP-9716. Rpc retries should use the same call ID as the original call.
+    (szetszwo)
+
   OPTIMIZATIONS
 
     HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1505589&r1=1505588&r2=1505589&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
Mon Jul 22 04:17:22 2013
@@ -18,8 +18,10 @@
 package org.apache.hadoop.io.retry;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -27,10 +29,12 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
-import org.apache.hadoop.util.ThreadUtil;
+import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcConstants;
 import org.apache.hadoop.ipc.RpcInvocationHandler;
+import org.apache.hadoop.util.ThreadUtil;
 
 class RetryInvocationHandler implements RpcInvocationHandler {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
@@ -45,13 +49,13 @@ class RetryInvocationHandler implements 
   private final RetryPolicy defaultPolicy;
   private final Map<String,RetryPolicy> methodNameToPolicyMap;
   private Object currentProxy;
-  
-  public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
+
+  RetryInvocationHandler(FailoverProxyProvider proxyProvider,
       RetryPolicy retryPolicy) {
     this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
   }
 
-  public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
+  RetryInvocationHandler(FailoverProxyProvider proxyProvider,
       RetryPolicy defaultPolicy,
       Map<String, RetryPolicy> methodNameToPolicyMap) {
     this.proxyProvider = proxyProvider;
@@ -70,6 +74,8 @@ class RetryInvocationHandler implements 
     
     // The number of times this method invocation has been failed over.
     int invocationFailoverCount = 0;
+    final boolean isRpc = isRpcInvocation();
+    final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
     int retries = 0;
     while (true) {
       // The number of times this invocation handler has ever been failed over,
@@ -79,6 +85,10 @@ class RetryInvocationHandler implements 
       synchronized (proxyProvider) {
         invocationAttemptFailoverCount = proxyProviderFailoverCount;
       }
+
+      if (isRpc) {
+        Client.setCallId(callId);
+      }
       try {
         Object ret = invokeMethod(method, args);
         hasMadeASuccessfulCall = true;
@@ -167,6 +177,14 @@ class RetryInvocationHandler implements 
     }
   }
 
+  private boolean isRpcInvocation() {
+    if (!Proxy.isProxyClass(currentProxy.getClass())) {
+      return false;
+    }
+    final InvocationHandler ih = Proxy.getInvocationHandler(currentProxy);
+    return ih instanceof RpcInvocationHandler;
+  }
+
   @Override
   public void close() throws IOException {
     proxyProvider.close();

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1505589&r1=1505588&r2=1505589&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
Mon Jul 22 04:17:22 2013
@@ -89,6 +89,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.CodedOutputStream;
 
@@ -102,11 +103,22 @@ public class Client {
   
   public static final Log LOG = LogFactory.getLog(Client.class);
 
+  /** A counter for generating call IDs. */
+  private static final AtomicInteger callIdCounter = new AtomicInteger();
+
+  private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
+
+  /** Set call id for the next call. */
+  public static void setCallId(int cid) {
+    Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
+    Preconditions.checkState(callId.get() == null);
+    callId.set(cid);
+  }
+
   private Hashtable<ConnectionId, Connection> connections =
     new Hashtable<ConnectionId, Connection>();
 
   private Class<? extends Writable> valueClass;   // class of call values
-  private final AtomicInteger counter = new AtomicInteger(); // call ID sequence
   private AtomicBoolean running = new AtomicBoolean(true); // if client runs
   final private Configuration conf;
 
@@ -205,10 +217,14 @@ public class Client {
     return refCount==0;
   }
 
+  Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
+    return new Call(rpcKind, rpcRequest);
+  }
+
   /** 
    * Class that represents an RPC call
    */
-  private class Call {
+  static class Call {
     final int id;               // call id
     final Writable rpcRequest;  // the serialized rpc request
     Writable rpcResponse;       // null if rpc has error
@@ -216,10 +232,17 @@ public class Client {
     final RPC.RpcKind rpcKind;      // Rpc EngineKind
     boolean done;               // true when call is done
 
-    protected Call(RPC.RpcKind rpcKind, Writable param) {
+    private Call(RPC.RpcKind rpcKind, Writable param) {
       this.rpcKind = rpcKind;
       this.rpcRequest = param;
-      this.id = nextCallId();
+
+      final Integer id = callId.get();
+      if (id == null) {
+        this.id = nextCallId();
+      } else {
+        callId.set(null);
+        this.id = id;
+      }
     }
 
     /** Indicate when the call is complete and the
@@ -1287,7 +1310,7 @@ public class Client {
   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
       ConnectionId remoteId, int serviceClass)
       throws InterruptedException, IOException {
-    Call call = new Call(rpcKind, rpcRequest);
+    final Call call = createCall(rpcKind, rpcRequest);
     Connection connection = getConnection(remoteId, call, serviceClass);
     try {
       connection.sendRpcRequest(call);                 // send the rpc request
@@ -1574,9 +1597,9 @@ public class Client {
    * versions of the client did not mask off the sign bit, so a server may still
    * see a negative call ID if it receives connections from an old client.
    * 
-   * @return int next valid call ID
+   * @return next call ID
    */
-  private int nextCallId() {
-    return counter.getAndIncrement() & 0x7FFFFFFF;
+  public static int nextCallId() {
+    return callIdCounter.getAndIncrement() & 0x7FFFFFFF;
   }
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1505589&r1=1505588&r2=1505589&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
Mon Jul 22 04:17:22 2013
@@ -124,7 +124,7 @@ public class ProtobufRpcEngine implement
     /**
      * This constructor takes a connectionId, instead of creating a new one.
      */
-    public Invoker(Class<?> protocol, Client.ConnectionId connId,
+    private Invoker(Class<?> protocol, Client.ConnectionId connId,
         Configuration conf, SocketFactory factory) {
       this.remoteId = connId;
       this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1505589&r1=1505588&r2=1505589&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Mon Jul 22 04:17:22 2013
@@ -275,7 +275,7 @@ public abstract class Server {
    * 
    * @return int sequential ID number of currently active RPC call
    */
-  public static int getCallId() {
+  static int getCallId() {
     Call call = CurCall.get();
     return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
   }
@@ -461,12 +461,12 @@ public abstract class Server {
     private final RPC.RpcKind rpcKind;
     private final byte[] clientId;
 
-    public Call(int id, Writable param, Connection connection) {
+    private Call(int id, Writable param, Connection connection) {
       this(id, param, connection, RPC.RpcKind.RPC_BUILTIN,
           RpcConstants.DUMMY_CLIENT_ID);
     }
 
-    public Call(int id, Writable param, Connection connection,
+    private Call(int id, Writable param, Connection connection,
         RPC.RpcKind kind, byte[] clientId) {
       this.callId = id;
       this.rpcRequest = param;
@@ -479,7 +479,7 @@ public abstract class Server {
     
     @Override
     public String toString() {
-      return rpcRequest.toString() + " from " + connection.toString();
+      return rpcRequest + " from " + connection + " Call#" + callId;
     }
 
     public void setResponse(ByteBuffer response) {
@@ -984,8 +984,7 @@ public abstract class Server {
           call = responseQueue.removeFirst();
           SocketChannel channel = call.connection.channel;
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": responding to #" + call.callId + " from " +
-                      call.connection);
+            LOG.debug(getName() + ": responding to " + call);
           }
           //
           // Send as much data as we can in the non-blocking fashion
@@ -1004,8 +1003,8 @@ public abstract class Server {
               done = false;            // more calls pending to be sent.
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
-                        call.connection + " Wrote " + numBytes + " bytes.");
+              LOG.debug(getName() + ": responding to " + call
+                  + " Wrote " + numBytes + " bytes.");
             }
           } else {
             //
@@ -1032,9 +1031,8 @@ public abstract class Server {
               }
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
-                        call.connection + " Wrote partial " + numBytes + 
-                        " bytes.");
+              LOG.debug(getName() + ": responding to " + call
+                  + " Wrote partial " + numBytes + " bytes.");
             }
           }
           error = false;              // everything went off well
@@ -2002,8 +2000,7 @@ public abstract class Server {
         try {
           final Call call = callQueue.take(); // pop the queue; maybe blocked here
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": has Call#" + call.callId + 
-                "for RpcKind " + call.rpcKind + " from " + call.connection);
+            LOG.debug(getName() + ": " + call + " for RpcKind " + call.rpcKind);
           }
           String errorClass = null;
           String error = null;

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1505589&r1=1505588&r2=1505589&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
Mon Jul 22 04:17:22 2013
@@ -18,40 +18,49 @@
 
 package org.apache.hadoop.ipc;
 
-import org.apache.commons.logging.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.ipc.Server.Connection;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.net.ConnectTimeoutException;
-import org.apache.hadoop.net.NetUtils;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
-import java.io.File;
 import java.io.DataOutput;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
-import javax.net.SocketFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
 
-import org.junit.Test;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import javax.net.SocketFactory;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC.RpcKind;
+import org.apache.hadoop.ipc.Server.Connection;
+import org.apache.hadoop.net.ConnectTimeoutException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
 import org.junit.Assume;
+import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -637,6 +646,48 @@ public class TestIPC {
     assertRetriesOnSocketTimeouts(conf, 4);
   }
 
+  private static class CallId {
+    int id = RpcConstants.INVALID_CALL_ID;
+  }
+
+  /**
+   * Test if the rpc server uses the call id generated by the rpc client.
+   */
+  @Test
+  public void testCallIds() throws Exception {
+    final CallId callId = new CallId();
+
+    // Override client to store the call id
+    final Client client = new Client(LongWritable.class, conf) {
+      @Override
+      Call createCall(RpcKind rpcKind, Writable rpcRequest) {
+        final Call call = super.createCall(rpcKind, rpcRequest);
+        callId.id = call.id;
+        return call;
+      }
+    };
+
+    // Attach a listener that tracks every call ID received by the server.
+    final TestServer server = new TestServer(1, false);
+    server.callListener = new Runnable() {
+      @Override
+      public void run() {
+        Assert.assertEquals(callId.id, Server.getCallId());
+      }
+    };
+
+    try {
+      InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      server.start();
+      final SerialCaller caller = new SerialCaller(client, addr, 10);
+      caller.run();
+      assertFalse(caller.failed);
+    } finally {
+      client.stop();
+      server.stop();
+    }
+  }
+
   /**
    * Tests that client generates a unique sequential call ID for each RPC call,
    * even if multiple threads are using the same client.
@@ -683,8 +734,9 @@ public class TestIPC {
     // of client call ID, so we must sort the call IDs before checking that it
     // contains every expected value.
     Collections.sort(callIds);
+    final int startID = callIds.get(0).intValue();
     for (int i = 0; i < expectedCallCount; ++i) {
-      assertEquals(i, callIds.get(i).intValue());
+      assertEquals(startID + i, callIds.get(i).intValue());
     }
   }
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=1505589&r1=1505588&r2=1505589&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
Mon Jul 22 04:17:22 2013
@@ -105,10 +105,10 @@ public class TestIPCServerResponder exte
           byte[] bytes = new byte[byteSize];
           System.arraycopy(BYTES, 0, bytes, 0, byteSize);
           Writable param = new BytesWritable(bytes);
-          Writable value = client.call(param, address);
+          client.call(param, address);
           Thread.sleep(RANDOM.nextInt(20));
         } catch (Exception e) {
-          LOG.fatal("Caught: " + e);
+          LOG.fatal("Caught Exception", e);
           failed = true;
         }
       }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java?rev=1505589&r1=1505588&r2=1505589&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
Mon Jul 22 04:17:22 2013
@@ -151,10 +151,8 @@ public class TestProtoBufRpc {
 
   private static TestRpcService getClient() throws IOException {
     // Set RPC engine to protobuf RPC engine
-    RPC.setProtocolEngine(conf, TestRpcService.class,
-        ProtobufRpcEngine.class);
-        return RPC.getProxy(TestRpcService.class, 0, addr,
-        conf);
+    RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
+    return RPC.getProxy(TestRpcService.class, 0, addr, conf);
   }
   
   private static TestRpcService2 getClient2() throws IOException {
@@ -191,6 +189,7 @@ public class TestProtoBufRpc {
       RemoteException re = (RemoteException)e.getCause();
       RpcServerException rse = (RpcServerException) re
           .unwrapRemoteException(RpcServerException.class);
+      Assert.assertNotNull(rse);
       Assert.assertTrue(re.getErrorCode().equals(
           RpcErrorCodeProto.ERROR_RPC_SERVER));
     }
@@ -246,6 +245,7 @@ public class TestProtoBufRpc {
         .setMessage(shortString).build();
     // short message goes through
     EchoResponseProto echoResponse = client.echo2(null, echoRequest);
+    Assert.assertEquals(shortString, echoResponse.getMessage());
     
     final String longString = StringUtils.repeat("X", 4096);
     echoRequest = EchoRequestProto.newBuilder()

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1505589&r1=1505588&r2=1505589&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
Mon Jul 22 04:17:22 2013
@@ -94,7 +94,7 @@ public class TestRPC {
 
   int datasize = 1024*100;
   int numThreads = 50;
-	
+
   public interface TestProtocol extends VersionedProtocol {
     public static final long versionID = 1L;
     
@@ -360,8 +360,7 @@ public class TestRPC {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
 
       // create a client
-      proxy = (TestProtocol)RPC.getProxy(
-          TestProtocol.class, TestProtocol.versionID, addr, conf);
+      proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
       
       assertEquals(addr, RPC.getServerAddress(proxy));
     } finally {
@@ -388,8 +387,7 @@ public class TestRPC {
     InetSocketAddress addr = NetUtils.getConnectAddress(server);
 
     // create a client
-    proxy = (TestProtocol)RPC.getProxy(
-        TestProtocol.class, TestProtocol.versionID, addr, conf);
+    proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
 
     SlowRPC slowrpc = new SlowRPC(proxy);
     Thread thread = new Thread(slowrpc, "SlowRPC");
@@ -432,8 +430,7 @@ public class TestRPC {
     server.start();
 
     InetSocketAddress addr = NetUtils.getConnectAddress(server);
-    proxy = (TestProtocol)RPC.getProxy(
-        TestProtocol.class, TestProtocol.versionID, addr, conf);
+    proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
       
     proxy.ping();
 
@@ -557,8 +554,7 @@ public class TestRPC {
     InetSocketAddress addr = NetUtils.getConnectAddress(server);
     
     try {
-      proxy = (TestProtocol)RPC.getProxy(
-          TestProtocol.class, TestProtocol.versionID, addr, conf);
+      proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
       proxy.ping();
 
       if (expectFailure) {
@@ -660,7 +656,7 @@ public class TestRPC {
   
   @Test
   public void testStopProxy() throws IOException {
-    StoppedProtocol proxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+    StoppedProtocol proxy = RPC.getProxy(StoppedProtocol.class,
         StoppedProtocol.versionID, null, conf);
     StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
         Proxy.getInvocationHandler(proxy);
@@ -671,7 +667,7 @@ public class TestRPC {
   
   @Test
   public void testWrappedStopProxy() throws IOException {
-    StoppedProtocol wrappedProxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+    StoppedProtocol wrappedProxy = RPC.getProxy(StoppedProtocol.class,
         StoppedProtocol.versionID, null, conf);
     StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
         Proxy.getInvocationHandler(wrappedProxy);
@@ -701,8 +697,7 @@ public class TestRPC {
     final InetSocketAddress addr = NetUtils.getConnectAddress(server);
     TestProtocol proxy = null;
     try {
-      proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
-          TestProtocol.versionID, addr, conf);
+      proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
       proxy.echo("");
     } catch (RemoteException e) {
       LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
@@ -730,7 +725,7 @@ public class TestRPC {
     proxy = null;
     try {
       UserGroupInformation.setConfiguration(conf);
-      proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+      proxy = RPC.getProxy(TestProtocol.class,
           TestProtocol.versionID, mulitServerAddr, conf);
       proxy.echo("");
     } catch (RemoteException e) {
@@ -846,7 +841,7 @@ public class TestRPC {
     server.start();
     InetSocketAddress addr = NetUtils.getConnectAddress(server);
 
-    final TestProtocol proxy = (TestProtocol) RPC.getProxy(
+    final TestProtocol proxy = RPC.getProxy(
         TestProtocol.class, TestProtocol.versionID, addr, conf);
     // Connect to the server
     proxy.ping();
@@ -883,7 +878,7 @@ public class TestRPC {
     
     for (int i = 0; i < numConcurrentRPC; i++) {
       final int num = i;
-      final TestProtocol proxy = (TestProtocol) RPC.getProxy(
+      final TestProtocol proxy = RPC.getProxy(
       TestProtocol.class, TestProtocol.versionID, addr, conf);
       Thread rpcThread = new Thread(new Runnable() {
         @Override



Mime
View raw message