hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1510793 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common/src: main/java/org/apache/hadoop/ipc/ test/java/org/apache/hadoop/ipc/
Date Mon, 05 Aug 2013 23:01:28 GMT
Author: daryn
Date: Mon Aug  5 23:01:27 2013
New Revision: 1510793

URL: http://svn.apache.org/r1510793
Log:
HADOOP-9832. Add RPC header to client ping (daryn)


Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1510793&r1=1510792&r2=1510793&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
Mon Aug  5 23:01:27 2013
@@ -18,10 +18,11 @@
 
 package org.apache.hadoop.ipc;
 
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.*;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FilterInputStream;
@@ -382,6 +383,7 @@ public class Client {
     private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
     private boolean doPing; //do we need to send ping message
     private int pingInterval; // how often sends ping to the server in msecs
+    private ByteArrayOutputStream pingRequest; // ping message
     
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@@ -407,6 +409,15 @@ public class Client {
       this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
       this.tcpNoDelay = remoteId.getTcpNoDelay();
       this.doPing = remoteId.getDoPing();
+      if (doPing) {
+        // construct a RPC header with the callId as the ping callId
+        pingRequest = new ByteArrayOutputStream();
+        RpcRequestHeaderProto pingHeader = ProtoUtil
+            .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+                OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
+                RpcConstants.INVALID_RETRY_COUNT, clientId);
+        pingHeader.writeDelimitedTo(pingRequest);
+      }
       this.pingInterval = remoteId.getPingInterval();
       this.serviceClass = serviceClass;
       if (LOG.isDebugEnabled()) {
@@ -910,7 +921,8 @@ public class Client {
       if ( curTime - lastActivity.get() >= pingInterval) {
         lastActivity.set(curTime);
         synchronized (out) {
-          out.writeInt(RpcConstants.PING_CALL_ID);
+          out.writeInt(pingRequest.size());
+          pingRequest.writeTo(out);
           out.flush();
         }
       }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1510793&r1=1510792&r2=1510793&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
Mon Aug  5 23:01:27 2013
@@ -27,13 +27,13 @@ public class RpcConstants {
     // Hidden Constructor
   }
   
-  public static final int PING_CALL_ID = -1;
+  public static final int AUTHORIZATION_FAILED_CALL_ID = -1;
+  public static final int INVALID_CALL_ID = -2;
+  public static final int CONNECTION_CONTEXT_CALL_ID = -3;
+  public static final int PING_CALL_ID = -4;
   
   public static final byte[] DUMMY_CLIENT_ID = new byte[0];
   
-  public static final int INVALID_CALL_ID = -2;
-
-  public static final int CONNECTION_CONTEXT_CALL_ID = -3;
   
   public static final int INVALID_RETRY_COUNT = -1;
   

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1510793&r1=1510792&r2=1510793&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Mon Aug  5 23:01:27 2013
@@ -72,8 +72,7 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.*;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -1177,9 +1176,7 @@ public abstract class Server {
     public UserGroupInformation attemptingUser = null; // user name before auth
 
     // Fake 'call' for failed authorization response
-    private static final int AUTHORIZATION_FAILED_CALLID = -1;
-    
-    private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID,
+    private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
         RpcConstants.INVALID_RETRY_COUNT, null, this);
     private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
     
@@ -1523,11 +1520,6 @@ public abstract class Server {
         if (data == null) {
           dataLengthBuffer.flip();
           dataLength = dataLengthBuffer.getInt();
-          if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) {
-            // covers the !useSasl too
-            dataLengthBuffer.clear();
-            return 0; // ping message
-          }
           checkDataLength(dataLength);
           data = ByteBuffer.allocate(dataLength);
         }
@@ -1738,13 +1730,6 @@ public abstract class Server {
         if (unwrappedData == null) {
           unwrappedDataLengthBuffer.flip();
           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
-          if (unwrappedDataLength == RpcConstants.PING_CALL_ID) {
-            if (LOG.isDebugEnabled())
-              LOG.debug("Received ping message");
-            unwrappedDataLengthBuffer.clear();
-            continue; // ping message
-          }
           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
         }
 
@@ -1913,6 +1898,8 @@ public abstract class Server {
               "SASL protocol not requested by client");
         }
         saslReadAndProcess(dis);
+      } else if (callId == PING_CALL_ID) {
+        LOG.debug("Received ping message");
       } else {
         throw new WrappedRpcServerException(
             RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
@@ -1926,7 +1913,7 @@ public abstract class Server {
      */
     private void authorizeConnection() throws WrappedRpcServerException {
       try {
-        // If auth method is DIGEST, the token was obtained by the
+        // If auth method is TOKEN, the token was obtained by the
         // real user for the effective user, therefore not required to
         // authorize real user. doAs is allowed only for simple or kerberos
         // authentication

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1510793&r1=1510792&r2=1510793&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
Mon Aug  5 23:01:27 2013
@@ -100,6 +100,7 @@ public class TestRPC {
     
     void ping() throws IOException;
     void slowPing(boolean shouldSlow) throws IOException;
+    void sleep(long delay) throws IOException, InterruptedException;
     String echo(String value) throws IOException;
     String[] echo(String[] value) throws IOException;
     Writable echo(Writable value) throws IOException;
@@ -146,6 +147,11 @@ public class TestRPC {
     }
     
     @Override
+    public void sleep(long delay) throws InterruptedException {
+      Thread.sleep(delay);
+    }
+    
+    @Override
     public String echo(String value) throws IOException { return value; }
 
     @Override
@@ -932,6 +938,28 @@ public class TestRPC {
     }
   }
 
+  @Test
+  public void testConnectionPing() throws Exception {
+    Configuration conf = new Configuration();
+    int pingInterval = 50;
+    conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
+    conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval);
+    final Server server = new RPC.Builder(conf)
+        .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+        .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+        .build();
+    server.start();
+
+    final TestProtocol proxy = RPC.getProxy(TestProtocol.class,
+        TestProtocol.versionID, server.getListenerAddress(), conf);
+    try {
+      // this call will throw exception if server couldn't decode the ping
+      proxy.sleep(pingInterval*4);
+    } finally {
+      if (proxy != null) RPC.stopProxy(proxy);
+    }
+  }
+
   public static void main(String[] args) throws IOException {
     new TestRPC().testCallsInternal(conf);
 



Mime
View raw message