hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iwasak...@apache.org
Subject hadoop git commit: HADOOP-12672. RPC timeout should not override IPC ping interval (iwasakims)
Date Tue, 05 Apr 2016 18:29:32 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 0907ce8c9 -> 2542e9bcc


HADOOP-12672. RPC timeout should not override IPC ping interval (iwasakims)

(cherry picked from commit 85ec5573eb9fd746a9295ecc6fe1ae683073aaf5)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2542e9bc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2542e9bc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2542e9bc

Branch: refs/heads/branch-2
Commit: 2542e9bccfc3b9a58fca16224e3a41a7e2ddbd62
Parents: 0907ce8
Author: Masatake Iwasaki <iwasakims@apache.org>
Authored: Wed Apr 6 03:22:48 2016 +0900
Committer: Masatake Iwasaki <iwasakims@apache.org>
Committed: Wed Apr 6 03:26:33 2016 +0900

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/ipc/Client.java | 57 +++++++++++++----
 .../src/main/resources/core-default.xml         |  9 ++-
 .../java/org/apache/hadoop/ipc/TestRPC.java     | 67 ++++++++++++++++++--
 .../hadoop/hdfs/client/impl/DfsClientConf.java  |  2 +-
 4 files changed, 108 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2542e9bc/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 093fe1e..efdb3f5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -239,14 +239,33 @@ public class Client {
    * 
    * @param conf Configuration
    * @return the timeout period in milliseconds. -1 if no timeout value is set
+   * @deprecated use {@link #getRpcTimeout(Configuration)} instead
    */
+  @Deprecated
   final public static int getTimeout(Configuration conf) {
+    int timeout = getRpcTimeout(conf);
+    if (timeout > 0)  {
+      return timeout;
+    }
     if (!conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY,
         CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT)) {
       return getPingInterval(conf);
     }
     return -1;
   }
+
+  /**
+   * The time after which a RPC will timeout.
+   *
+   * @param conf Configuration
+   * @return the timeout period in milliseconds.
+   */
+  public static final int getRpcTimeout(Configuration conf) {
+    int timeout =
+        conf.getInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY,
+            CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_DEFAULT);
+    return (timeout < 0) ? 0 : timeout;
+  }
   /**
    * set the connection timeout value in configuration
    * 
@@ -386,7 +405,7 @@ public class Client {
     private Socket socket = null;                 // connected socket
     private DataInputStream in;
     private DataOutputStream out;
-    private int rpcTimeout;
+    private final int rpcTimeout;
     private int maxIdleTime; //connections will be culled if it was idle for 
     //maxIdleTime msecs
     private final RetryPolicy connectionRetryPolicy;
@@ -394,8 +413,9 @@ public class Client {
     private int maxRetriesOnSocketTimeouts;
     private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
     private final boolean tcpLowLatency; // if T then use low-delay QoS
-    private boolean doPing; //do we need to send ping message
-    private int pingInterval; // how often sends ping to the server in msecs
+    private final boolean doPing; //do we need to send ping message
+    private final int pingInterval; // how often sends ping to the server
+    private final int soTimeout; // used by ipc ping and rpc timeout
     private ByteArrayOutputStream pingRequest; // ping message
     
     // currently active calls
@@ -434,6 +454,14 @@ public class Client {
         pingHeader.writeDelimitedTo(pingRequest);
       }
       this.pingInterval = remoteId.getPingInterval();
+      if (rpcTimeout > 0) {
+        // effective rpc timeout is rounded up to multiple of pingInterval
+        // if pingInterval < rpcTimeout.
+        this.soTimeout = (doPing && pingInterval < rpcTimeout) ?
+            pingInterval : rpcTimeout;
+      } else {
+        this.soTimeout = pingInterval;
+      }
       this.serviceClass = serviceClass;
       if (LOG.isDebugEnabled()) {
         LOG.debug("The ping interval is " + this.pingInterval + " ms.");
@@ -484,12 +512,12 @@ public class Client {
 
       /* Process timeout exception
        * if the connection is not going to be closed or 
-       * is not configured to have a RPC timeout, send a ping.
-       * (if rpcTimeout is not set to be 0, then RPC should timeout.
-       * otherwise, throw the timeout exception.
+       * the RPC is not timed out yet, send a ping.
        */
-      private void handleTimeout(SocketTimeoutException e) throws IOException {
-        if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
+      private void handleTimeout(SocketTimeoutException e, int waiting)
+          throws IOException {
+        if (shouldCloseConnection.get() || !running.get() ||
+            (0 < rpcTimeout && rpcTimeout <= waiting)) {
           throw e;
         } else {
           sendPing();
@@ -503,11 +531,13 @@ public class Client {
        */
       @Override
       public int read() throws IOException {
+        int waiting = 0;
         do {
           try {
             return super.read();
           } catch (SocketTimeoutException e) {
-            handleTimeout(e);
+            waiting += soTimeout;
+            handleTimeout(e, waiting);
           }
         } while (true);
       }
@@ -520,11 +550,13 @@ public class Client {
        */
       @Override
       public int read(byte[] buf, int off, int len) throws IOException {
+        int waiting = 0;
         do {
           try {
             return super.read(buf, off, len);
           } catch (SocketTimeoutException e) {
-            handleTimeout(e);
+            waiting += soTimeout;
+            handleTimeout(e, waiting);
           }
         } while (true);
       }
@@ -632,10 +664,7 @@ public class Client {
           }
           
           NetUtils.connect(this.socket, server, connectionTimeout);
-          if (rpcTimeout > 0) {
-            pingInterval = rpcTimeout;  // rpcTimeout overwrites pingInterval
-          }
-          this.socket.setSoTimeout(pingInterval);
+          this.socket.setSoTimeout(soTimeout);
           return;
         } catch (ConnectTimeoutException toe) {
           /* Check for an address change and update the local reference.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2542e9bc/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 28e0a8a..f3dbc5b 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1055,7 +1055,7 @@
   <value>true</value>
   <description>Send a ping to the server when timeout on reading the response,
   if set to true. If no failure is detected, the client retries until at least
-  a byte is read.
+  a byte is read or the time given by ipc.client.rpc-timeout.ms is passed.
   </description>
 </property>
 
@@ -1072,10 +1072,9 @@
   <name>ipc.client.rpc-timeout.ms</name>
   <value>0</value>
   <description>Timeout on waiting response from server, in milliseconds.
-  Currently this timeout works only when ipc.client.ping is set to true
-  because it uses the same facilities with IPC ping.
-  The timeout overrides the ipc.ping.interval and client will throw exception
-  instead of sending ping when the interval is passed.
+  If ipc.client.ping is set to true and this rpc-timeout is greater than
+  the value of ipc.ping.interval, the effective value of the rpc-timeout is
+  rounded up to multiple of ipc.ping.interval.
   </description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2542e9bc/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 531db80..d600e01 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -1130,14 +1130,67 @@ public class TestRPC extends TestRpcBase {
         .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
     server = setupTestServer(builder);
 
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
     try {
-      proxy = getClient(addr, conf);
-      proxy.sleep(null, newSleepRequest(3000));
-      fail("RPC should time out.");
-    } catch (ServiceException e) {
-      assertTrue(e.getCause() instanceof SocketTimeoutException);
-      LOG.info("got expected timeout.", e);
+      // Test RPC timeout with default ipc.client.ping.
+      try {
+        Configuration c = new Configuration(conf);
+        c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
+        proxy = getClient(addr, c);
+        proxy.sleep(null, newSleepRequest(3000));
+        fail("RPC should time out.");
+      } catch (ServiceException e) {
+        assertTrue(e.getCause() instanceof SocketTimeoutException);
+        LOG.info("got expected timeout.", e);
+      }
+
+      // Test RPC timeout when ipc.client.ping is false.
+      try {
+        Configuration c = new Configuration(conf);
+        c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
+        c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
+        proxy = getClient(addr, c);
+        proxy.sleep(null, newSleepRequest(3000));
+        fail("RPC should time out.");
+      } catch (ServiceException e) {
+        assertTrue(e.getCause() instanceof SocketTimeoutException);
+        LOG.info("got expected timeout.", e);
+      }
+
+      // Test negative timeout value.
+      try {
+        Configuration c = new Configuration(conf);
+        c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, -1);
+        proxy = getClient(addr, c);
+        proxy.sleep(null, newSleepRequest(2000));
+      } catch (ServiceException e) {
+        LOG.info("got unexpected exception.", e);
+        fail("RPC should not time out.");
+      }
+
+      // Test RPC timeout greater than ipc.ping.interval.
+      try {
+        Configuration c = new Configuration(conf);
+        c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
+        c.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800);
+        c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
+        proxy = getClient(addr, c);
+
+        try {
+          // should not time out because effective rpc-timeout is
+          // multiple of ping interval: 1600 (= 800 * (1000 / 800 + 1))
+          proxy.sleep(null, newSleepRequest(1300));
+        } catch (ServiceException e) {
+          LOG.info("got unexpected exception.", e);
+          fail("RPC should not time out.");
+        }
+
+        proxy.sleep(null, newSleepRequest(2000));
+        fail("RPC should time out.");
+      } catch (ServiceException e) {
+        assertTrue(e.getCause() instanceof SocketTimeoutException);
+        LOG.info("got expected timeout.", e);
+      }
+
     } finally {
       stop(server, proxy);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2542e9bc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index ed6cd23..8848f86 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -141,7 +141,7 @@ public class DfsClientConf {
 
   public DfsClientConf(Configuration conf) {
     // The hdfsTimeout is currently the same as the ipc timeout
-    hdfsTimeout = Client.getTimeout(conf);
+    hdfsTimeout = Client.getRpcTimeout(conf);
 
     maxRetryAttempts = conf.getInt(
         Retry.MAX_ATTEMPTS_KEY,


Mime
View raw message