hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sunc...@apache.org
Subject svn commit: r1665615 - in /hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift: ThriftBinaryCLIService.java ThriftCLIService.java
Date Tue, 10 Mar 2015 16:54:17 GMT
Author: sunchao
Date: Tue Mar 10 16:54:17 2015
New Revision: 1665615

URL: http://svn.apache.org/r1665615
Log:
HIVE-9601 - New Beeline queries will hang If Beeline terminates in-properly [Spark Branch]
(Jimmy via Chao)

Modified:
    hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
    hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1665615&r1=1665614&r2=1665615&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
(original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
Tue Mar 10 16:54:17 2015
@@ -92,6 +92,7 @@ public class ThriftBinaryCLIService exte
 
       // TCP Server
       server = new TThreadPoolServer(sargs);
+      server.setServerEventHandler(serverEventHandler);
       server.serve();
       String msg = "Started " + ThriftBinaryCLIService.class.getSimpleName() + " on port
"
           + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1665615&r1=1665614&r2=1665615&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
(original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
Tue Mar 10 16:54:17 2015
@@ -36,11 +36,25 @@ import org.apache.hive.service.ServiceEx
 import org.apache.hive.service.ServiceUtils;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.auth.TSetIpAddressProcessor;
-import org.apache.hive.service.cli.*;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.FetchType;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.SessionManager;
 import org.apache.hive.service.server.HiveServer2;
 import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
 
 /**
  * ThriftCLIService.
@@ -52,7 +66,6 @@ public abstract class ThriftCLIService e
 
   protected CLIService cliService;
   private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
-  private static final TStatus ERROR_STATUS = new TStatus(TStatusCode.ERROR_STATUS);
   protected static HiveAuthFactory hiveAuthFactory;
 
   protected int portNum;
@@ -70,9 +83,57 @@ public abstract class ThriftCLIService e
   protected int maxWorkerThreads;
   protected long workerKeepAliveTime;
 
-  public ThriftCLIService(CLIService cliService, String serviceName) {
+  protected TServerEventHandler serverEventHandler;
+  protected ThreadLocal<ServerContext> currentServerContext;
+
+  static class ThriftCLIServerContext implements ServerContext {
+    private SessionHandle sessionHandle = null;
+
+    public void setSessionHandle(SessionHandle sessionHandle) {
+      this.sessionHandle = sessionHandle;
+    }
+
+    public SessionHandle getSessionHandle() {
+      return sessionHandle;
+    }
+  }
+
+  public ThriftCLIService(CLIService service, String serviceName) {
     super(serviceName);
-    this.cliService = cliService;
+    this.cliService = service;
+    currentServerContext = new ThreadLocal<ServerContext>();
+    serverEventHandler = new TServerEventHandler() {
+      @Override
+      public ServerContext createContext(
+          TProtocol input, TProtocol output) {
+        return new ThriftCLIServerContext();
+      }
+
+      @Override
+      public void deleteContext(ServerContext serverContext,
+          TProtocol input, TProtocol output) {
+        ThriftCLIServerContext context = (ThriftCLIServerContext)serverContext;
+        SessionHandle sessionHandle = context.getSessionHandle();
+        if (sessionHandle != null) {
+          LOG.info("Session disconnected without closing properly, close it now");
+          try {
+            cliService.closeSession(sessionHandle);
+          } catch (HiveSQLException e) {
+            LOG.warn("Failed to close session: " + e, e);
+          }
+        }
+      }
+
+      @Override
+      public void preServe() {
+      }
+
+      @Override
+      public void processContext(ServerContext serverContext,
+          TTransport input, TTransport output) {
+        currentServerContext.set(serverContext);
+      }
+    };
   }
 
   @Override
@@ -238,6 +299,11 @@ public abstract class ThriftCLIService e
       // TODO: set real configuration map
       resp.setConfiguration(new HashMap<String, String>());
       resp.setStatus(OK_STATUS);
+      ThriftCLIServerContext context =
+        (ThriftCLIServerContext)currentServerContext.get();
+      if (context != null) {
+        context.setSessionHandle(sessionHandle);
+      }
     } catch (Exception e) {
       LOG.warn("Error opening session: ", e);
       resp.setStatus(HiveSQLException.toTStatus(e));
@@ -381,6 +447,11 @@ public abstract class ThriftCLIService e
       SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
       cliService.closeSession(sessionHandle);
       resp.setStatus(OK_STATUS);
+      ThriftCLIServerContext context =
+        (ThriftCLIServerContext)currentServerContext.get();
+      if (context != null) {
+        context.setSessionHandle(null);
+      }
     } catch (Exception e) {
       LOG.warn("Error closing session: ", e);
       resp.setStatus(HiveSQLException.toTStatus(e));



Mime
View raw message