hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amareshw...@apache.org
Subject hive git commit: HIVE-13415 : Decouple Sessions from thrift binary transport (Rajat Khandelwal, reviewed by Szehon Ho)
Date Fri, 15 Apr 2016 11:46:39 GMT
Repository: hive
Updated Branches:
  refs/heads/master 3fec161da -> b30fe72e0


HIVE-13415 : Decouple Sessions from thrift binary transport (Rajat Khandelwal, reviewed by
Szehon Ho)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b30fe72e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b30fe72e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b30fe72e

Branch: refs/heads/master
Commit: b30fe72e021a4adb83436c4f19b0f400d1f44edf
Parents: 3fec161
Author: Rajat Khandelwal <prongs@apache.org>
Authored: Fri Apr 15 17:16:26 2016 +0530
Committer: Amareshwari Sriramadasu <amareshwari@apache.org>
Committed: Fri Apr 15 17:16:26 2016 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../cli/thrift/ThriftBinaryCLIService.java      |  64 ++++++++-
 .../service/cli/thrift/ThriftCLIService.java    |  57 --------
 .../cli/TestRetryingThriftCLIServiceClient.java | 130 +++++++++++++++----
 4 files changed, 171 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b30fe72e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c7e5b33..5cf1609 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2304,6 +2304,8 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "6h",
         new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false),
         "The check interval for session/operation timeout, which can be disabled by setting
to zero or negative value."),
+    HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT("hive.server2.close.session.on.disconnect",
true,
+      "Session will be closed when connection is closed. Set this to false to have session
outlive its parent connection."),
     HIVE_SERVER2_IDLE_SESSION_TIMEOUT("hive.server2.idle.session.timeout", "7d",
         new TimeValidator(TimeUnit.MILLISECONDS),
         "Session will be closed when it's not accessed for this duration, which can be disabled
by setting to zero or negative value."),

http://git-wip-us.apache.org/repos/asf/hive/blob/b30fe72e/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index cf575a4..d9c7b2e 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -24,16 +24,25 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.SessionHandle;
 import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
 import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportFactory;
 
 
@@ -94,7 +103,60 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
 
       // TCP Server
       server = new TThreadPoolServer(sargs);
-      server.setServerEventHandler(serverEventHandler);
+      server.setServerEventHandler(new TServerEventHandler() {
+        @Override
+        public ServerContext createContext(
+          TProtocol input, TProtocol output) {
+          Metrics metrics = MetricsFactory.getInstance();
+          if (metrics != null) {
+            try {
+              metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+              metrics.incrementCounter(MetricsConstant.CUMULATIVE_CONNECTION_COUNT);
+            } catch (Exception e) {
+              LOG.warn("Error Reporting JDO operation to Metrics system", e);
+            }
+          }
+          return new ThriftCLIServerContext();
+        }
+
+        @Override
+        public void deleteContext(ServerContext serverContext,
+          TProtocol input, TProtocol output) {
+          Metrics metrics = MetricsFactory.getInstance();
+          if (metrics != null) {
+            try {
+              metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+            } catch (Exception e) {
+              LOG.warn("Error Reporting JDO operation to Metrics system", e);
+            }
+          }
+          ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext;
+          SessionHandle sessionHandle = context.getSessionHandle();
+          if (sessionHandle != null) {
+            LOG.info("Session disconnected without closing properly. ");
+            try {
+              boolean close = cliService.getSessionManager().getSession(sessionHandle).getHiveConf()
+                .getBoolVar(ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT);
+              LOG.info((close ? "" : "Not ") + "Closing the session: " + sessionHandle);
+              if (close) {
+                cliService.closeSession(sessionHandle);
+              }
+            } catch (HiveSQLException e) {
+              LOG.warn("Failed to close session: " + e, e);
+            }
+          }
+        }
+
+        @Override
+        public void preServe() {
+        }
+
+        @Override
+        public void processContext(ServerContext serverContext,
+          TTransport input, TTransport output) {
+          currentServerContext.set(serverContext);
+        }
+      });
       String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port
"
           + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
       LOG.info(msg);

http://git-wip-us.apache.org/repos/asf/hive/blob/b30fe72e/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index be9833d..e789a38 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -28,9 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.auth.login.LoginException;
 
-import org.apache.hadoop.hive.common.metrics.common.Metrics;
-import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
-import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.common.ServerUtils;
@@ -38,7 +35,6 @@ import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.service.AbstractService;
 import org.apache.hive.service.ServiceException;
-import org.apache.hive.service.ServiceUtils;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.auth.TSetIpAddressProcessor;
 import org.apache.hive.service.cli.CLIService;
@@ -97,11 +93,8 @@ import org.apache.hive.service.rpc.thrift.TStatus;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
 import org.apache.hive.service.server.HiveServer2;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,7 +127,6 @@ public abstract class ThriftCLIService extends AbstractService implements
TCLISe
   protected int maxWorkerThreads;
   protected long workerKeepAliveTime;
 
-  protected TServerEventHandler serverEventHandler;
   protected ThreadLocal<ServerContext> currentServerContext;
 
   static class ThriftCLIServerContext implements ServerContext {
@@ -153,55 +145,6 @@ public abstract class ThriftCLIService extends AbstractService implements
TCLISe
     super(serviceName);
     this.cliService = service;
     currentServerContext = new ThreadLocal<ServerContext>();
-    serverEventHandler = new TServerEventHandler() {
-      @Override
-      public ServerContext createContext(
-          TProtocol input, TProtocol output) {
-        Metrics metrics = MetricsFactory.getInstance();
-        if (metrics != null) {
-          try {
-            metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS);
-            metrics.incrementCounter(MetricsConstant.CUMULATIVE_CONNECTION_COUNT);
-          } catch (Exception e) {
-            LOG.warn("Error Reporting JDO operation to Metrics system", e);
-          }
-        }
-        return new ThriftCLIServerContext();
-      }
-
-      @Override
-      public void deleteContext(ServerContext serverContext,
-          TProtocol input, TProtocol output) {
-        Metrics metrics = MetricsFactory.getInstance();
-        if (metrics != null) {
-          try {
-            metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS);
-          } catch (Exception e) {
-            LOG.warn("Error Reporting JDO operation to Metrics system", e);
-          }
-        }
-        ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext;
-        SessionHandle sessionHandle = context.getSessionHandle();
-        if (sessionHandle != null) {
-          LOG.info("Session disconnected without closing properly, close it now");
-          try {
-            cliService.closeSession(sessionHandle);
-          } catch (HiveSQLException e) {
-            LOG.warn("Failed to close session: " + e, e);
-          }
-        }
-      }
-
-      @Override
-      public void preServe() {
-      }
-
-      @Override
-      public void processContext(ServerContext serverContext,
-          TTransport input, TTransport output) {
-        currentServerContext.set(serverContext);
-      }
-    };
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b30fe72e/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
index 3bd82e6..d36f6c0 100644
--- a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
+++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
@@ -19,13 +19,17 @@
 package org.apache.hive.service.cli;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.Service;
 import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.session.HiveSession;
 import org.apache.hive.service.cli.thrift.RetryingThriftCLIServiceClient;
 import org.apache.hive.service.cli.thrift.ThriftCLIService;
 import org.apache.hive.service.server.HiveServer2;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+
+import org.junit.Before;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
@@ -41,6 +45,38 @@ import static org.junit.Assert.*;
  */
 public class TestRetryingThriftCLIServiceClient {
   protected static ThriftCLIService service;
+  private HiveConf hiveConf;
+  private HiveServer2 server;
+
+  @Before
+  public void init() {
+    hiveConf = new HiveConf();
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, "localhost");
+    hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString());
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE, "binary");
+    hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT, 3);
+    hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT,
3);
+    hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 10);
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, "1s");
+  }
+
+  private void startHiveServer() throws InterruptedException {
+    // Start hive server2
+    server = new HiveServer2();
+    server.init(hiveConf);
+    server.start();
+    Thread.sleep(5000);
+    System.out.println("## HiveServer started");
+  }
+
+  private void stopHiveServer() {
+    if (server != null) {
+      // kill server
+      server.stop();
+    }
+  }
 
   static class RetryingThriftCLIServiceClientTest extends RetryingThriftCLIServiceClient
{
     int callCount = 0;
@@ -74,31 +110,14 @@ public class TestRetryingThriftCLIServiceClient {
       return super.connect(conf);
     }
   }
+
   @Test
   public void testRetryBehaviour() throws Exception {
-    // Start hive server2
-    HiveConf hiveConf = new HiveConf();
-    hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, "localhost");
-    hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000);
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
-    hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString());
-    hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE, "binary");
-    hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT, 3);
-    hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT,
3);
-    hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 10);
-    hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, "1s");
-
-    final HiveServer2 server = new HiveServer2();
-    server.init(hiveConf);
-    server.start();
-    Thread.sleep(5000);
-    System.out.println("## HiveServer started");
-
+    startHiveServer();
     // Check if giving invalid address causes retry in connection attempt
     hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 17000);
     try {
-      CLIServiceClient cliServiceClient =
-        RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
+      RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
       fail("Expected to throw exception for invalid port");
     } catch (HiveSQLException sqlExc) {
       assertTrue(sqlExc.getCause() instanceof TTransportException);
@@ -112,16 +131,14 @@ public class TestRetryingThriftCLIServiceClient {
       = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
     System.out.println("## Created client");
 
-    // kill server
-    server.stop();
+    stopHiveServer();
     Thread.sleep(5000);
 
     // submit few queries
     try {
-      Map<String, String> confOverlay = new HashMap<String, String>();
       RetryingThriftCLIServiceClientTest.handlerInst.callCount = 0;
       RetryingThriftCLIServiceClientTest.handlerInst.connectCount = 0;
-      SessionHandle session = cliServiceClient.openSession("anonymous", "anonymous");
+      cliServiceClient.openSession("anonymous", "anonymous");
     } catch (HiveSQLException exc) {
       exc.printStackTrace();
       assertTrue(exc.getCause() instanceof TException);
@@ -131,4 +148,69 @@ public class TestRetryingThriftCLIServiceClient {
       cliServiceClient.closeTransport();
     }
   }
+
+  @Test
+  public void testTransportClose() throws InterruptedException, HiveSQLException {
+    hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT,
0);
+    try {
+      startHiveServer();
+      RetryingThriftCLIServiceClient.CLIServiceClientWrapper client
+        = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
+      client.closeTransport();
+      try {
+        client.openSession("anonymous", "anonymous");
+        fail("Shouldn't be able to open session when transport is closed.");
+      } catch(HiveSQLException ignored) {
+
+      }
+    } finally {
+      hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT,
3);
+      stopHiveServer();
+    }
+  }
+
+  @Test
+  public void testSessionLifeAfterTransportClose() throws InterruptedException, HiveSQLException
{
+    try {
+      startHiveServer();
+      CLIService service = null;
+      for (Service s : server.getServices()) {
+        if (s instanceof CLIService) {
+          service = (CLIService) s;
+        }
+      }
+      if (service == null) {
+        service = new CLIService(server);
+      }
+      RetryingThriftCLIServiceClient.CLIServiceClientWrapper client
+        = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
+      Map<String, String> conf = new HashMap<>();
+      conf.put(HiveConf.ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT.varname, "false");
+      SessionHandle sessionHandle = client.openSession("anonymous", "anonymous", conf);
+      assertNotNull(sessionHandle);
+      HiveSession session = service.getSessionManager().getSession(sessionHandle);
+      OperationHandle op1 = session.executeStatementAsync("show databases", null);
+      assertNotNull(op1);
+      client.closeTransport();
+      // Verify that session wasn't closed on transport close.
+      assertEquals(session, service.getSessionManager().getSession(sessionHandle));
+      // Should be able to execute without failure in the session whose transport has been
closed.
+      OperationHandle op2 = session.executeStatementAsync("show databases", null);
+      assertNotNull(op2);
+      // Make new client, since transport was closed for the last one.
+      client = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
+      client.closeSession(sessionHandle);
+      // operations will be lost once owning session is closed.
+      for (OperationHandle op: new OperationHandle[]{op1, op2}) {
+        try {
+          client.getOperationStatus(op);
+          fail("Should have failed.");
+        } catch (HiveSQLException ignored) {
+
+        }
+      }
+    } finally {
+      stopHiveServer();
+    }
+  }
 }


Mime
View raw message