hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sze...@apache.org
Subject [38/44] hive git commit: HIVE-10143 : HS2 fails to clean up Spark client state on timeout [Spark Branch] (Marcelo Vanzin via Szehon)
Date Thu, 23 Apr 2015 02:34:47 GMT
HIVE-10143 : HS2 fails to clean up Spark client state on timeout [Spark Branch] (Marcelo Vanzin
via Szehon)

git-svn-id: https://svn.apache.org/repos/asf/hive/branches/spark@1670202 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/93d5925d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/93d5925d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/93d5925d

Branch: refs/heads/master
Commit: 93d5925d683d521a05e4a3d85bf60e3fb28115ee
Parents: 5f331e3
Author: Szehon Ho <szehon@apache.org>
Authored: Mon Mar 30 21:29:26 2015 +0000
Committer: Szehon Ho <szehon@cloudera.com>
Committed: Wed Apr 22 19:33:53 2015 -0700

----------------------------------------------------------------------
 .../apache/hive/spark/client/rpc/RpcServer.java | 11 +++++--
 .../apache/hive/spark/client/rpc/TestRpc.java   | 32 ++++++++++++++++++--
 2 files changed, 38 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/93d5925d/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index b923acf..32d4c46 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -35,6 +35,7 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -126,6 +127,12 @@ public class RpcServer implements Closeable {
    */
   public Future<Rpc> registerClient(final String clientId, String secret,
       RpcDispatcher serverDispatcher) {
+    return registerClient(clientId, secret, serverDispatcher, config.getServerConnectTimeoutMs());
+  }
+
+  @VisibleForTesting
+  Future<Rpc> registerClient(final String clientId, String secret,
+      RpcDispatcher serverDispatcher, long clientTimeoutMs) {
     final Promise<Rpc> promise = group.next().newPromise();
 
     Runnable timeout = new Runnable() {
@@ -135,7 +142,7 @@ public class RpcServer implements Closeable {
       }
     };
     ScheduledFuture<?> timeoutFuture = group.schedule(timeout,
-        config.getServerConnectTimeoutMs(),
+        clientTimeoutMs,
         TimeUnit.MILLISECONDS);
     final ClientInfo client = new ClientInfo(clientId, promise, secret, serverDispatcher,
         timeoutFuture);
@@ -147,7 +154,7 @@ public class RpcServer implements Closeable {
     promise.addListener(new GenericFutureListener<Promise<Rpc>>() {
       @Override
       public void operationComplete(Promise<Rpc> p) {
-        if (p.isCancelled()) {
+        if (!p.isSuccess()) {
           pendingClients.remove(clientId);
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/93d5925d/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
index 8207514..d7969c9 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.security.sasl.SaslException;
 
@@ -41,9 +42,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 public class TestRpc {
 
@@ -190,6 +189,33 @@ public class TestRpc {
     assertEquals(outbound.message, reply.message);
   }
 
+  @Test
+  public void testClientTimeout() throws Exception {
+    Map<String, String> conf = ImmutableMap.<String,String>builder()
+      .putAll(emptyConfig)
+      .build();
+    RpcServer server = autoClose(new RpcServer(conf));
+    String secret = server.createSecret();
+
+    try {
+      autoClose(server.registerClient("client", secret, new TestDispatcher(), 1L).get());
+      fail("Server should have timed out client.");
+    } catch (ExecutionException ee) {
+      assertTrue(ee.getCause() instanceof TimeoutException);
+    }
+
+    NioEventLoopGroup eloop = new NioEventLoopGroup();
+    Future<Rpc> clientRpcFuture = Rpc.createClient(conf, eloop,
+        "localhost", server.getPort(), "client", secret, new TestDispatcher());
+    try {
+      autoClose(clientRpcFuture.get());
+      fail("Client should have failed to connect to server.");
+    } catch (ExecutionException ee) {
+      // Error should not be a timeout.
+      assertFalse(ee.getCause() instanceof TimeoutException);
+    }
+  }
+
   private void transfer(Rpc serverRpc, Rpc clientRpc) {
     EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel();
     EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel();


Mime
View raw message