spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [01/25] spark git commit: [SPARK-14987][SQL] inline hive-service (cli) into sql/hive-thriftserver
Date Fri, 29 Apr 2016 16:32:49 GMT
Repository: spark
Updated Branches:
  refs/heads/master b6fa7e593 -> 7feeb82cb


http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
new file mode 100644
index 0000000..1af4539
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.*;
+import org.apache.thrift.TException;
+
+/**
+ * ThriftCLIServiceClient.
+ *
+ */
+public class ThriftCLIServiceClient extends CLIServiceClient {
+  private final TCLIService.Iface cliService;
+
+  public ThriftCLIServiceClient(TCLIService.Iface cliService) {
+    this.cliService = cliService;
+  }
+
+  public void checkStatus(TStatus status) throws HiveSQLException {
+    if (TStatusCode.ERROR_STATUS.equals(status.getStatusCode())) {
+      throw new HiveSQLException(status);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#openSession(java.lang.String, java.lang.String, java.util.Map)
+   */
+  @Override
+  public SessionHandle openSession(String username, String password,
+      Map<String, String> configuration)
+          throws HiveSQLException {
+    try {
+      TOpenSessionReq req = new TOpenSessionReq();
+      req.setUsername(username);
+      req.setPassword(password);
+      req.setConfiguration(configuration);
+      TOpenSessionResp resp = cliService.OpenSession(req);
+      checkStatus(resp.getStatus());
+      return new SessionHandle(resp.getSessionHandle(), resp.getServerProtocolVersion());
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle)
+   */
+  @Override
+  public SessionHandle openSessionWithImpersonation(String username, String password,
+      Map<String, String> configuration, String delegationToken) throws HiveSQLException {
+    throw new HiveSQLException("open with impersonation operation is not supported in the client");
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle)
+   */
+  @Override
+  public void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
+    try {
+      TCloseSessionReq req = new TCloseSessionReq(sessionHandle.toTSessionHandle());
+      TCloseSessionResp resp = cliService.CloseSession(req);
+      checkStatus(resp.getStatus());
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#getInfo(org.apache.hive.service.cli.SessionHandle, java.util.List)
+   */
+  @Override
+  public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType)
+      throws HiveSQLException {
+    try {
+      // FIXME extract the right info type
+      TGetInfoReq req = new TGetInfoReq(sessionHandle.toTSessionHandle(), infoType.toTGetInfoType());
+      TGetInfoResp resp = cliService.GetInfo(req);
+      checkStatus(resp.getStatus());
+      return new GetInfoValue(resp.getInfoValue());
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map)
+   */
+  @Override
+  public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
+      Map<String, String> confOverlay)
+          throws HiveSQLException {
+    return executeStatementInternal(sessionHandle, statement, confOverlay, false);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map)
+   */
+  @Override
+  public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+      Map<String, String> confOverlay)
+          throws HiveSQLException {
+    return executeStatementInternal(sessionHandle, statement, confOverlay, true);
+  }
+
+  private OperationHandle executeStatementInternal(SessionHandle sessionHandle, String statement,
+      Map<String, String> confOverlay, boolean isAsync)
+          throws HiveSQLException {
+    try {
+      TExecuteStatementReq req =
+          new TExecuteStatementReq(sessionHandle.toTSessionHandle(), statement);
+      req.setConfOverlay(confOverlay);
+      req.setRunAsync(isAsync);
+      TExecuteStatementResp resp = cliService.ExecuteStatement(req);
+      checkStatus(resp.getStatus());
+      TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+      return new OperationHandle(resp.getOperationHandle(), protocol);
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#getTypeInfo(org.apache.hive.service.cli.SessionHandle)
+   */
+  @Override
+  public OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException {
+    try {
+      TGetTypeInfoReq req = new TGetTypeInfoReq(sessionHandle.toTSessionHandle());
+      TGetTypeInfoResp resp = cliService.GetTypeInfo(req);
+      checkStatus(resp.getStatus());
+      TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+      return new OperationHandle(resp.getOperationHandle(), protocol);
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#getCatalogs(org.apache.hive.service.cli.SessionHandle)
+   */
+  @Override
+  public OperationHandle getCatalogs(SessionHandle sessionHandle) throws HiveSQLException {
+    try {
+      TGetCatalogsReq req = new TGetCatalogsReq(sessionHandle.toTSessionHandle());
+      TGetCatalogsResp resp = cliService.GetCatalogs(req);
+      checkStatus(resp.getStatus());
+      TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+      return new OperationHandle(resp.getOperationHandle(), protocol);
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#getSchemas(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String)
+   */
+  @Override
+  public OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName,
+      String schemaName)
+          throws HiveSQLException {
+    try {
+      TGetSchemasReq req = new TGetSchemasReq(sessionHandle.toTSessionHandle());
+      req.setCatalogName(catalogName);
+      req.setSchemaName(schemaName);
+      TGetSchemasResp resp = cliService.GetSchemas(req);
+      checkStatus(resp.getStatus());
+      TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+      return new OperationHandle(resp.getOperationHandle(), protocol);
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#getTables(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String, java.lang.String, java.util.List)
+   */
+  @Override
+  public OperationHandle getTables(SessionHandle sessionHandle, String catalogName,
+      String schemaName, String tableName, List<String> tableTypes)
+          throws HiveSQLException {
+    try {
+      TGetTablesReq req = new TGetTablesReq(sessionHandle.toTSessionHandle());
+      req.setTableName(tableName);
+      req.setTableTypes(tableTypes);
+      req.setSchemaName(schemaName);
+      TGetTablesResp resp = cliService.GetTables(req);
+      checkStatus(resp.getStatus());
+      TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+      return new OperationHandle(resp.getOperationHandle(), protocol);
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#getTableTypes(org.apache.hive.service.cli.SessionHandle)
+   */
+  @Override
+  public OperationHandle getTableTypes(SessionHandle sessionHandle) throws HiveSQLException {
+    try {
+      TGetTableTypesReq req = new TGetTableTypesReq(sessionHandle.toTSessionHandle());
+      TGetTableTypesResp resp = cliService.GetTableTypes(req);
+      checkStatus(resp.getStatus());
+      TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+      return new OperationHandle(resp.getOperationHandle(), protocol);
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#getColumns(org.apache.hive.service.cli.SessionHandle)
+   */
+  @Override
+  public OperationHandle getColumns(SessionHandle sessionHandle,
+      String catalogName, String schemaName, String tableName, String columnName)
+          throws HiveSQLException {
+    try {
+      TGetColumnsReq req = new TGetColumnsReq();
+      req.setSessionHandle(sessionHandle.toTSessionHandle());
+      req.setCatalogName(catalogName);
+      req.setSchemaName(schemaName);
+      req.setTableName(tableName);
+      req.setColumnName(columnName);
+      TGetColumnsResp resp = cliService.GetColumns(req);
+      checkStatus(resp.getStatus());
+      TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+      return new OperationHandle(resp.getOperationHandle(), protocol);
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#getFunctions(org.apache.hive.service.cli.SessionHandle)
+   */
+  @Override
+  public OperationHandle getFunctions(SessionHandle sessionHandle,
+      String catalogName, String schemaName, String functionName) throws HiveSQLException {
+    try {
+      TGetFunctionsReq req = new TGetFunctionsReq(sessionHandle.toTSessionHandle(), functionName);
+      req.setCatalogName(catalogName);
+      req.setSchemaName(schemaName);
+      TGetFunctionsResp resp = cliService.GetFunctions(req);
+      checkStatus(resp.getStatus());
+      TProtocolVersion protocol = sessionHandle.getProtocolVersion();
+      return new OperationHandle(resp.getOperationHandle(), protocol);
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle)
+   */
+  @Override
+  public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException {
+    try {
+      TGetOperationStatusReq req = new TGetOperationStatusReq(opHandle.toTOperationHandle());
+      TGetOperationStatusResp resp = cliService.GetOperationStatus(req);
+      // Checks the status of the RPC call, throws an exception in case of error
+      checkStatus(resp.getStatus());
+      OperationState opState = OperationState.getOperationState(resp.getOperationState());
+      HiveSQLException opException = null;
+      if (opState == OperationState.ERROR) {
+        opException = new HiveSQLException(resp.getErrorMessage(), resp.getSqlState(), resp.getErrorCode());
+      }
+      return new OperationStatus(opState, opException);
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#cancelOperation(org.apache.hive.service.cli.OperationHandle)
+   */
+  @Override
+  public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
+    try {
+      TCancelOperationReq req = new TCancelOperationReq(opHandle.toTOperationHandle());
+      TCancelOperationResp resp = cliService.CancelOperation(req);
+      checkStatus(resp.getStatus());
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#closeOperation(org.apache.hive.service.cli.OperationHandle)
+   */
+  @Override
+  public void closeOperation(OperationHandle opHandle)
+      throws HiveSQLException {
+    try {
+      TCloseOperationReq req  = new TCloseOperationReq(opHandle.toTOperationHandle());
+      TCloseOperationResp resp = cliService.CloseOperation(req);
+      checkStatus(resp.getStatus());
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#getResultSetMetadata(org.apache.hive.service.cli.OperationHandle)
+   */
+  @Override
+  public TableSchema getResultSetMetadata(OperationHandle opHandle)
+      throws HiveSQLException {
+    try {
+      TGetResultSetMetadataReq req = new TGetResultSetMetadataReq(opHandle.toTOperationHandle());
+      TGetResultSetMetadataResp resp = cliService.GetResultSetMetadata(req);
+      checkStatus(resp.getStatus());
+      return new TableSchema(resp.getSchema());
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  @Override
+  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows,
+      FetchType fetchType) throws HiveSQLException {
+    try {
+      TFetchResultsReq req = new TFetchResultsReq();
+      req.setOperationHandle(opHandle.toTOperationHandle());
+      req.setOrientation(orientation.toTFetchOrientation());
+      req.setMaxRows(maxRows);
+      req.setFetchType(fetchType.toTFetchType());
+      TFetchResultsResp resp = cliService.FetchResults(req);
+      checkStatus(resp.getStatus());
+      return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion());
+    } catch (HiveSQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle)
+   */
+  @Override
+  public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
+    // TODO: set the correct default fetch size
+    return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000, FetchType.QUERY_OUTPUT);
+  }
+
+  @Override
+  public String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+      String owner, String renewer) throws HiveSQLException {
+    TGetDelegationTokenReq req = new TGetDelegationTokenReq(
+        sessionHandle.toTSessionHandle(), owner, renewer);
+    try {
+      TGetDelegationTokenResp tokenResp = cliService.GetDelegationToken(req);
+      checkStatus(tokenResp.getStatus());
+      return tokenResp.getDelegationToken();
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  @Override
+  public void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+      String tokenStr) throws HiveSQLException {
+    TCancelDelegationTokenReq cancelReq = new TCancelDelegationTokenReq(
+          sessionHandle.toTSessionHandle(), tokenStr);
+    try {
+      TCancelDelegationTokenResp cancelResp =
+        cliService.CancelDelegationToken(cancelReq);
+      checkStatus(cancelResp.getStatus());
+      return;
+    } catch (TException e) {
+      throw new HiveSQLException(e);
+    }
+  }
+
+  @Override
+  public void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+      String tokenStr) throws HiveSQLException {
+    TRenewDelegationTokenReq cancelReq = new TRenewDelegationTokenReq(
+        sessionHandle.toTSessionHandle(), tokenStr);
+    try {
+      TRenewDelegationTokenResp renewResp =
+        cliService.RenewDelegationToken(cancelReq);
+      checkStatus(renewResp.getStatus());
+      return;
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
new file mode 100644
index 0000000..3b57efa
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.thrift.TCLIService.Iface;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServlet;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.server.ssl.SslSelectChannelConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+
+
+public class ThriftHttpCLIService extends ThriftCLIService {
+
+  public ThriftHttpCLIService(CLIService cliService) {
+    super(cliService, ThriftHttpCLIService.class.getSimpleName());
+  }
+
+  /**
+   * Configure Jetty to serve http requests. Example of a client connection URL:
+   * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
+   * e.g. http://gateway:port/hive2/servlets/thrifths2/
+   */
+  @Override
+  public void run() {
+    try {
+      // HTTP Server
+      httpServer = new org.eclipse.jetty.server.Server();
+
+      // Server thread pool
+      // Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
+      String threadPoolName = "HiveServer2-HttpHandler-Pool";
+      ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+          workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+          new ThreadFactoryWithGarbageCleanup(threadPoolName));
+      ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
+      httpServer.setThreadPool(threadPool);
+
+      // Connector configs
+      SelectChannelConnector connector = new SelectChannelConnector();
+      boolean useSsl = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL);
+      String schemeName = useSsl ? "https" : "http";
+      // Change connector if SSL is used
+      if (useSsl) {
+        String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
+        String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
+            HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
+        if (keyStorePath.isEmpty()) {
+          throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+              + " Not configured for SSL connection");
+        }
+        SslContextFactory sslContextFactory = new SslContextFactory();
+        String[] excludedProtocols = hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",");
+        LOG.info("HTTP Server SSL: adding excluded protocols: " + Arrays.toString(excludedProtocols));
+        sslContextFactory.addExcludeProtocols(excludedProtocols);
+        LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = " +
+          Arrays.toString(sslContextFactory.getExcludeProtocols()));
+        sslContextFactory.setKeyStorePath(keyStorePath);
+        sslContextFactory.setKeyStorePassword(keyStorePassword);
+        connector = new SslSelectChannelConnector(sslContextFactory);
+      }
+      connector.setPort(portNum);
+      // Linux:yes, Windows:no
+      connector.setReuseAddress(!Shell.WINDOWS);
+      int maxIdleTime = (int) hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME,
+          TimeUnit.MILLISECONDS);
+      connector.setMaxIdleTime(maxIdleTime);
+
+      httpServer.addConnector(connector);
+
+      // Thrift configs
+      hiveAuthFactory = new HiveAuthFactory(hiveConf);
+      TProcessor processor = new TCLIService.Processor<Iface>(this);
+      TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
+      // Set during the init phase of HiveServer2 if auth mode is kerberos
+      // UGI for the hive/_HOST (kerberos) principal
+      UserGroupInformation serviceUGI = cliService.getServiceUGI();
+      // UGI for the http/_HOST (SPNego) principal
+      UserGroupInformation httpUGI = cliService.getHttpUGI();
+      String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
+      TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType,
+          serviceUGI, httpUGI);
+
+      // Context handler
+      final ServletContextHandler context = new ServletContextHandler(
+          ServletContextHandler.SESSIONS);
+      context.setContextPath("/");
+      String httpPath = getHttpPath(hiveConf
+          .getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
+      httpServer.setHandler(context);
+      context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
+
+      // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc.
+      // Finally, start the server
+      httpServer.start();
+      String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName
+          + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
+          + maxWorkerThreads + " worker threads";
+      LOG.info(msg);
+      httpServer.join();
+    } catch (Throwable t) {
+      LOG.fatal(
+          "Error starting HiveServer2: could not start "
+              + ThriftHttpCLIService.class.getSimpleName(), t);
+      System.exit(-1);
+    }
+  }
+
+  /**
+   * The config parameter can be like "path", "/path", "/path/", "path/*", "/path1/path2/*" and so on.
+   * httpPath should end up as "/*", "/path/*" or "/path1/../pathN/*"
+   * @param httpPath
+   * @return
+   */
+  private String getHttpPath(String httpPath) {
+    if(httpPath == null || httpPath.equals("")) {
+      httpPath = "/*";
+    }
+    else {
+      if(!httpPath.startsWith("/")) {
+        httpPath = "/" + httpPath;
+      }
+      if(httpPath.endsWith("/")) {
+        httpPath = httpPath + "*";
+      }
+      if(!httpPath.endsWith("/*")) {
+        httpPath = httpPath + "/*";
+      }
+    }
+    return httpPath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
new file mode 100644
index 0000000..56c8cb6
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.NewCookie;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.auth.AuthenticationProviderFactory;
+import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.HttpAuthUtils;
+import org.apache.hive.service.auth.HttpAuthenticationException;
+import org.apache.hive.service.auth.PasswdAuthenticationProvider;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.CookieSigner;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServlet;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+
+/**
+ *
+ * ThriftHttpServlet
+ *
+ */
+public class ThriftHttpServlet extends TServlet {
+
+  private static final long serialVersionUID = 1L;
+  public static final Log LOG = LogFactory.getLog(ThriftHttpServlet.class.getName());
+  private final String authType;
+  private final UserGroupInformation serviceUGI;
+  private final UserGroupInformation httpUGI;
+  private HiveConf hiveConf = new HiveConf();
+
+  // Class members for cookie based authentication.
+  private CookieSigner signer;
+  public static final String AUTH_COOKIE = "hive.server2.auth";
+  private static final Random RAN = new Random();
+  private boolean isCookieAuthEnabled;
+  private String cookieDomain;
+  private String cookiePath;
+  private int cookieMaxAge;
+  private boolean isCookieSecure;
+  private boolean isHttpOnlyCookie;
+
+  public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory,
+      String authType, UserGroupInformation serviceUGI, UserGroupInformation httpUGI) {
+    super(processor, protocolFactory);
+    this.authType = authType;
+    this.serviceUGI = serviceUGI;
+    this.httpUGI = httpUGI;
+    this.isCookieAuthEnabled = hiveConf.getBoolVar(
+      ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_AUTH_ENABLED);
+    // Initialize the cookie based authentication related variables.
+    if (isCookieAuthEnabled) {
+      // Generate the signer with secret.
+      String secret = Long.toString(RAN.nextLong());
+      LOG.debug("Using the random number as the secret for cookie generation " + secret);
+      this.signer = new CookieSigner(secret.getBytes());
+      this.cookieMaxAge = (int) hiveConf.getTimeVar(
+        ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE, TimeUnit.SECONDS);
+      this.cookieDomain = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_DOMAIN);
+      this.cookiePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_PATH);
+      this.isCookieSecure = hiveConf.getBoolVar(
+        ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_IS_SECURE);
+      this.isHttpOnlyCookie = hiveConf.getBoolVar(
+        ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_IS_HTTPONLY);
+    }
+  }
+
+  @Override
+  protected void doPost(HttpServletRequest request, HttpServletResponse response)
+      throws ServletException, IOException {
+    String clientUserName = null;
+    String clientIpAddress;
+    boolean requireNewCookie = false;
+
+    try {
+      // If the cookie based authentication is already enabled, parse the
+      // request and validate the request cookies.
+      if (isCookieAuthEnabled) {
+        clientUserName = validateCookie(request);
+        requireNewCookie = (clientUserName == null);
+        if (requireNewCookie) {
+          LOG.info("Could not validate cookie sent, will try to generate a new cookie");
+        }
+      }
+      // If the cookie based authentication is not enabled or the request does
+      // not have a valid cookie, use the kerberos or password based authentication
+      // depending on the server setup.
+      if (clientUserName == null) {
+        // For a kerberos setup
+        if (isKerberosAuthMode(authType)) {
+          clientUserName = doKerberosAuth(request);
+        }
+        // For password based authentication
+        else {
+          clientUserName = doPasswdAuth(request, authType);
+        }
+      }
+      LOG.debug("Client username: " + clientUserName);
+
+      // Set the thread local username to be used for doAs if true
+      SessionManager.setUserName(clientUserName);
+
+      // find proxy user if any from query param
+      String doAsQueryParam = getDoAsQueryParam(request.getQueryString());
+      if (doAsQueryParam != null) {
+        SessionManager.setProxyUserName(doAsQueryParam);
+      }
+
+      clientIpAddress = request.getRemoteAddr();
+      LOG.debug("Client IP Address: " + clientIpAddress);
+      // Set the thread local ip address
+      SessionManager.setIpAddress(clientIpAddress);
+      // Generate new cookie and add it to the response
+      if (requireNewCookie &&
+          !authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.NOSASL.toString())) {
+        String cookieToken = HttpAuthUtils.createCookieToken(clientUserName);
+        Cookie hs2Cookie = createCookie(signer.signCookie(cookieToken));
+
+        if (isHttpOnlyCookie) {
+          response.setHeader("SET-COOKIE", getHttpOnlyCookieHeader(hs2Cookie));
+        } else {
+          response.addCookie(hs2Cookie);
+        }
+        LOG.info("Cookie added for clientUserName " + clientUserName);
+      }
+      super.doPost(request, response);
+    }
+    catch (HttpAuthenticationException e) {
+      LOG.error("Error: ", e);
+      // Send a 401 to the client
+      response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
+      if(isKerberosAuthMode(authType)) {
+        response.addHeader(HttpAuthUtils.WWW_AUTHENTICATE, HttpAuthUtils.NEGOTIATE);
+      }
+      response.getWriter().println("Authentication Error: " + e.getMessage());
+    }
+    finally {
+      // Clear the thread locals
+      SessionManager.clearUserName();
+      SessionManager.clearIpAddress();
+      SessionManager.clearProxyUserName();
+    }
+  }
+
+  /**
+   * Retrieves the client name from cookieString. If the cookie does not
+   * correspond to a valid client, the function returns null.
+   * @param cookies HTTP Request cookies.
+   * @return Client Username if cookieString has a HS2 Generated cookie that is currently valid.
+   * Else, returns null.
+   */
+  private String getClientNameFromCookie(Cookie[] cookies) {
+    // Current Cookie Name, Current Cookie Value
+    String currName, currValue;
+
+    // Following is the main loop which iterates through all the cookies send by the client.
+    // The HS2 generated cookies are of the format hive.server2.auth=<value>
+    // A cookie which is identified as a hiveserver2 generated cookie is validated
+    // by calling signer.verifyAndExtract(). If the validation passes, send the
+    // username for which the cookie is validated to the caller. If no client side
+    // cookie passes the validation, return null to the caller.
+    for (Cookie currCookie : cookies) {
+      // Get the cookie name
+      currName = currCookie.getName();
+      if (!currName.equals(AUTH_COOKIE)) {
+        // Not a HS2 generated cookie, continue.
+        continue;
+      }
+      // If we reached here, we have match for HS2 generated cookie
+      currValue = currCookie.getValue();
+      // Validate the value.
+      currValue = signer.verifyAndExtract(currValue);
+      // Retrieve the user name, do the final validation step.
+      if (currValue != null) {
+        String userName = HttpAuthUtils.getUserNameFromCookieToken(currValue);
+
+        if (userName == null) {
+          LOG.warn("Invalid cookie token " + currValue);
+          continue;
+        }
+        //We have found a valid cookie in the client request.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Validated the cookie for user " + userName);
+        }
+        return userName;
+      }
+    }
+    // No valid HS2 generated cookies found, return null
+    return null;
+  }
+
+  /**
+   * Convert cookie array to human readable cookie string
+   * @param cookies Cookie Array
+   * @return String containing all the cookies separated by a newline character.
+   * Each cookie is of the format [key]=[value]
+   */
+  private String toCookieStr(Cookie[] cookies) {
+	String cookieStr = "";
+
+	for (Cookie c : cookies) {
+     cookieStr += c.getName() + "=" + c.getValue() + " ;\n";
+    }
+    return cookieStr;
+  }
+
+  /**
+   * Validate the request cookie. This function iterates over the request cookie headers
+   * and finds a cookie that represents a valid client/server session. If it finds one, it
+   * returns the client name associated with the session. Else, it returns null.
+   * @param request The HTTP Servlet Request send by the client
+   * @return Client Username if the request has valid HS2 cookie, else returns null
+   * @throws UnsupportedEncodingException
+   */
+  private String validateCookie(HttpServletRequest request) throws UnsupportedEncodingException {
+    // Find all the valid cookies associated with the request.
+    Cookie[] cookies = request.getCookies();
+
+    if (cookies == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No valid cookies associated with the request " + request);
+      }
+      return null;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received cookies: " + toCookieStr(cookies));
+    }
+    return getClientNameFromCookie(cookies);
+  }
+
+  /**
+   * Generate a server side cookie given the cookie value as the input.
+   * @param str Input string token.
+   * @return The generated cookie.
+   * @throws UnsupportedEncodingException
+   */
+  private Cookie createCookie(String str) throws UnsupportedEncodingException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cookie name = " + AUTH_COOKIE + " value = " + str);
+    }
+    Cookie cookie = new Cookie(AUTH_COOKIE, str);
+
+    cookie.setMaxAge(cookieMaxAge);
+    if (cookieDomain != null) {
+      cookie.setDomain(cookieDomain);
+    }
+    if (cookiePath != null) {
+      cookie.setPath(cookiePath);
+    }
+    cookie.setSecure(isCookieSecure);
+    return cookie;
+  }
+
+  /**
+   * Generate httponly cookie from HS2 cookie
+   * @param cookie HS2 generated cookie
+   * @return The httponly cookie
+   */
+  private static String getHttpOnlyCookieHeader(Cookie cookie) {
+    NewCookie newCookie = new NewCookie(cookie.getName(), cookie.getValue(),
+      cookie.getPath(), cookie.getDomain(), cookie.getVersion(),
+      cookie.getComment(), cookie.getMaxAge(), cookie.getSecure());
+    return newCookie + "; HttpOnly";
+  }
+
+  /**
+   * Do the LDAP/PAM authentication
+   * @param request
+   * @param authType
+   * @throws HttpAuthenticationException
+   */
+  private String doPasswdAuth(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String userName = getUsername(request, authType);
+    // No-op when authType is NOSASL
+    if (!authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.NOSASL.toString())) {
+      try {
+        AuthMethods authMethod = AuthMethods.getValidAuthMethod(authType);
+        PasswdAuthenticationProvider provider =
+            AuthenticationProviderFactory.getAuthenticationProvider(authMethod);
+        provider.Authenticate(userName, getPassword(request, authType));
+
+      } catch (Exception e) {
+        throw new HttpAuthenticationException(e);
+      }
+    }
+    return userName;
+  }
+
+  /**
+   * Do the GSS-API kerberos authentication.
+   * We already have a logged in subject in the form of serviceUGI,
+   * which GSS-API will extract information from.
+   * In case of a SPNego request we use the httpUGI,
+   * for the authenticating service tickets.
+   * @param request
+   * @return
+   * @throws HttpAuthenticationException
+   */
+  private String doKerberosAuth(HttpServletRequest request)
+      throws HttpAuthenticationException {
+    // Try authenticating with the http/_HOST principal
+    if (httpUGI != null) {
+      try {
+        return httpUGI.doAs(new HttpKerberosServerAction(request, httpUGI));
+      } catch (Exception e) {
+        LOG.info("Failed to authenticate with http/_HOST kerberos principal, " +
+            "trying with hive/_HOST kerberos principal");
+      }
+    }
+    // Now try with hive/_HOST principal
+    try {
+      return serviceUGI.doAs(new HttpKerberosServerAction(request, serviceUGI));
+    } catch (Exception e) {
+      LOG.error("Failed to authenticate with hive/_HOST kerberos principal");
+      throw new HttpAuthenticationException(e);
+    }
+
+  }
+
+  class HttpKerberosServerAction implements PrivilegedExceptionAction<String> {
+    HttpServletRequest request;
+    UserGroupInformation serviceUGI;
+
+    HttpKerberosServerAction(HttpServletRequest request,
+        UserGroupInformation serviceUGI) {
+      this.request = request;
+      this.serviceUGI = serviceUGI;
+    }
+
+    @Override
+    public String run() throws HttpAuthenticationException {
+      // Get own Kerberos credentials for accepting connection
+      GSSManager manager = GSSManager.getInstance();
+      GSSContext gssContext = null;
+      String serverPrincipal = getPrincipalWithoutRealm(
+          serviceUGI.getUserName());
+      try {
+        // This Oid for Kerberos GSS-API mechanism.
+        Oid kerberosMechOid = new Oid("1.2.840.113554.1.2.2");
+        // Oid for SPNego GSS-API mechanism.
+        Oid spnegoMechOid = new Oid("1.3.6.1.5.5.2");
+        // Oid for kerberos principal name
+        Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1");
+
+        // GSS name for server
+        GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid);
+
+        // GSS credentials for server
+        GSSCredential serverCreds = manager.createCredential(serverName,
+            GSSCredential.DEFAULT_LIFETIME,
+            new Oid[]{kerberosMechOid, spnegoMechOid},
+            GSSCredential.ACCEPT_ONLY);
+
+        // Create a GSS context
+        gssContext = manager.createContext(serverCreds);
+        // Get service ticket from the authorization header
+        String serviceTicketBase64 = getAuthHeader(request, authType);
+        byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes());
+        gssContext.acceptSecContext(inToken, 0, inToken.length);
+        // Authenticate or deny based on its context completion
+        if (!gssContext.isEstablished()) {
+          throw new HttpAuthenticationException("Kerberos authentication failed: " +
+              "unable to establish context with the service ticket " +
+              "provided by the client.");
+        }
+        else {
+          return getPrincipalWithoutRealmAndHost(gssContext.getSrcName().toString());
+        }
+      }
+      catch (GSSException e) {
+        throw new HttpAuthenticationException("Kerberos authentication failed: ", e);
+      }
+      finally {
+        if (gssContext != null) {
+          try {
+            gssContext.dispose();
+          } catch (GSSException e) {
+            // No-op
+          }
+        }
+      }
+    }
+
+    private String getPrincipalWithoutRealm(String fullPrincipal)
+        throws HttpAuthenticationException {
+      KerberosNameShim fullKerberosName;
+      try {
+        fullKerberosName = ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal);
+      } catch (IOException e) {
+        throw new HttpAuthenticationException(e);
+      }
+      String serviceName = fullKerberosName.getServiceName();
+      String hostName = fullKerberosName.getHostName();
+      String principalWithoutRealm = serviceName;
+      if (hostName != null) {
+        principalWithoutRealm = serviceName + "/" + hostName;
+      }
+      return principalWithoutRealm;
+    }
+
+    private String getPrincipalWithoutRealmAndHost(String fullPrincipal)
+        throws HttpAuthenticationException {
+      KerberosNameShim fullKerberosName;
+      try {
+        fullKerberosName = ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal);
+        return fullKerberosName.getShortName();
+      } catch (IOException e) {
+        throw new HttpAuthenticationException(e);
+      }
+    }
+  }
+
+  private String getUsername(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String creds[] = getAuthHeaderTokens(request, authType);
+    // Username must be present
+    if (creds[0] == null || creds[0].isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain username.");
+    }
+    return creds[0];
+  }
+
+  private String getPassword(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String creds[] = getAuthHeaderTokens(request, authType);
+    // Password must be present
+    if (creds[1] == null || creds[1].isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain username.");
+    }
+    return creds[1];
+  }
+
+  private String[] getAuthHeaderTokens(HttpServletRequest request,
+      String authType) throws HttpAuthenticationException {
+    String authHeaderBase64 = getAuthHeader(request, authType);
+    String authHeaderString = StringUtils.newStringUtf8(
+        Base64.decodeBase64(authHeaderBase64.getBytes()));
+    String[] creds = authHeaderString.split(":");
+    return creds;
+  }
+
+  /**
+   * Returns the base64 encoded auth header payload
+   * @param request
+   * @param authType
+   * @return
+   * @throws HttpAuthenticationException
+   */
+  private String getAuthHeader(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String authHeader = request.getHeader(HttpAuthUtils.AUTHORIZATION);
+    // Each http request must have an Authorization header
+    if (authHeader == null || authHeader.isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client is empty.");
+    }
+
+    String authHeaderBase64String;
+    int beginIndex;
+    if (isKerberosAuthMode(authType)) {
+      beginIndex = (HttpAuthUtils.NEGOTIATE + " ").length();
+    }
+    else {
+      beginIndex = (HttpAuthUtils.BASIC + " ").length();
+    }
+    authHeaderBase64String = authHeader.substring(beginIndex);
+    // Authorization header must have a payload
+    if (authHeaderBase64String == null || authHeaderBase64String.isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain any data.");
+    }
+    return authHeaderBase64String;
+  }
+
+  private boolean isKerberosAuthMode(String authType) {
+    return authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
+  }
+
+  private static String getDoAsQueryParam(String queryString) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("URL query string:" + queryString);
+    }
+    if (queryString == null) {
+      return null;
+    }
+    Map<String, String[]> params = javax.servlet.http.HttpUtils.parseQueryString( queryString );
+    Set<String> keySet = params.keySet();
+    for (String key: keySet) {
+      if (key.equalsIgnoreCase("doAs")) {
+        return params.get(key)[0];
+      }
+    }
+    return null;
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
new file mode 100644
index 0000000..58e8e49
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.hive.common.LogUtils;
+import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.HiveVersionInfo;
+import org.apache.hive.service.CompositeService;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
+import org.apache.hive.service.cli.thrift.ThriftCLIService;
+import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * HiveServer2.
+ *
+ */
+public class HiveServer2 extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(HiveServer2.class);
+  private static CountDownLatch deleteSignal;
+
+  private CLIService cliService;
+  private ThriftCLIService thriftCLIService;
+  private PersistentEphemeralNode znode;
+  private String znodePath;
+  private CuratorFramework zooKeeperClient;
+  private boolean registeredWithZooKeeper = false;
+
+  public HiveServer2() {
+    super(HiveServer2.class.getSimpleName());
+    HiveConf.setLoadHiveServer2Config(true);
+  }
+
+  @Override
+  public synchronized void init(HiveConf hiveConf) {
+    cliService = new CLIService(this);
+    addService(cliService);
+    if (isHTTPTransportMode(hiveConf)) {
+      thriftCLIService = new ThriftHttpCLIService(cliService);
+    } else {
+      thriftCLIService = new ThriftBinaryCLIService(cliService);
+    }
+    addService(thriftCLIService);
+    super.init(hiveConf);
+
+    // Add a shutdown hook for catching SIGTERM & SIGINT
+    final HiveServer2 hiveServer2 = this;
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        hiveServer2.stop();
+      }
+    });
+  }
+
+  public static boolean isHTTPTransportMode(HiveConf hiveConf) {
+    String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
+    if (transportMode == null) {
+      transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
+    }
+    if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory
+   */
+  private final ACLProvider zooKeeperAclProvider = new ACLProvider() {
+    List<ACL> nodeAcls = new ArrayList<ACL>();
+
+    @Override
+    public List<ACL> getDefaultAcl() {
+      if (UserGroupInformation.isSecurityEnabled()) {
+        // Read all to the world
+        nodeAcls.addAll(Ids.READ_ACL_UNSAFE);
+        // Create/Delete/Write/Admin to the authenticated user
+        nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS));
+      } else {
+        // ACLs for znodes on a non-kerberized cluster
+        // Create/Read/Delete/Write/Admin to the world
+        nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE);
+      }
+      return nodeAcls;
+    }
+
+    @Override
+    public List<ACL> getAclForPath(String path) {
+      return getDefaultAcl();
+    }
+  };
+
+  /**
+   * Adds a server instance to ZooKeeper as a znode.
+   *
+   * @param hiveConf
+   * @throws Exception
+   */
+  private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
+    String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
+    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+    String instanceURI = getServerInstanceURI(hiveConf);
+    byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
+    setUpZooKeeperAuth(hiveConf);
+    int sessionTimeout =
+        (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+            TimeUnit.MILLISECONDS);
+    int baseSleepTime =
+        (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+            TimeUnit.MILLISECONDS);
+    int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+    // Create a CuratorFramework instance to be used as the ZooKeeper client
+    // Use the zooKeeperAclProvider to create appropriate ACLs
+    zooKeeperClient =
+        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+            .sessionTimeoutMs(sessionTimeout).aclProvider(zooKeeperAclProvider)
+            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
+    zooKeeperClient.start();
+    // Create the parent znodes recursively; ignore if the parent already exists.
+    try {
+      zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+          .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
+      LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
+    } catch (KeeperException e) {
+      if (e.code() != KeeperException.Code.NODEEXISTS) {
+        LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
+        throw e;
+      }
+    }
+    // Create a znode under the rootNamespace parent for this instance of the server
+    // Znode name: serverUri=host:port;version=versionInfo;sequence=sequenceNumber
+    try {
+      String pathPrefix =
+          ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+              + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
+              + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
+      znode =
+          new PersistentEphemeralNode(zooKeeperClient,
+              PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
+      znode.start();
+      // We'll wait for 120s for node creation
+      long znodeCreationTimeout = 120;
+      if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
+        throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
+      }
+      setRegisteredWithZooKeeper(true);
+      znodePath = znode.getActualPath();
+      // Set a watch on the znode
+      if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
+        // No node exists, throw exception
+        throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
+      }
+      LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
+    } catch (Exception e) {
+      LOG.fatal("Unable to create a znode for this server instance", e);
+      if (znode != null) {
+        znode.close();
+      }
+      throw (e);
+    }
+  }
+
+  /**
+   * For a kerberized cluster, we dynamically set up the client's JAAS conf.
+   *
+   * @param hiveConf
+   * @return
+   * @throws Exception
+   */
+  private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
+      if (principal.isEmpty()) {
+        throw new IOException("HiveServer2 Kerberos principal is empty");
+      }
+      String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+      if (keyTabFile.isEmpty()) {
+        throw new IOException("HiveServer2 Kerberos keytab is empty");
+      }
+      // Install the JAAS Configuration for the runtime
+      Utils.setZookeeperClientKerberosJaasConfig(principal, keyTabFile);
+    }
+  }
+
+  /**
+   * The watcher class which sets the de-register flag when the znode corresponding to this server
+   * instance is deleted. Additionally, it shuts down the server if there are no more active client
+   * sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
+   */
+  private class DeRegisterWatcher implements Watcher {
+    @Override
+    public void process(WatchedEvent event) {
+      if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
+        if (znode != null) {
+          try {
+            znode.close();
+            LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
+                + "The server will be shut down after the last client sesssion completes.");
+          } catch (IOException e) {
+            LOG.error("Failed to close the persistent ephemeral znode", e);
+          } finally {
+            HiveServer2.this.setRegisteredWithZooKeeper(false);
+            // If there are no more active client sessions, stop the server
+            if (cliService.getSessionManager().getOpenSessionCount() == 0) {
+              LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+                  + "instances available for dynamic service discovery. "
+                  + "The last client session has ended - will shutdown now.");
+              HiveServer2.this.stop();
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void removeServerInstanceFromZooKeeper() throws Exception {
+    setRegisteredWithZooKeeper(false);
+    if (znode != null) {
+      znode.close();
+    }
+    zooKeeperClient.close();
+    LOG.info("Server instance removed from ZooKeeper.");
+  }
+
+  public boolean isRegisteredWithZooKeeper() {
+    return registeredWithZooKeeper;
+  }
+
+  private void setRegisteredWithZooKeeper(boolean registeredWithZooKeeper) {
+    this.registeredWithZooKeeper = registeredWithZooKeeper;
+  }
+
+  private String getServerInstanceURI(HiveConf hiveConf) throws Exception {
+    if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
+      throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
+    }
+    return thriftCLIService.getServerIPAddress().getHostName() + ":"
+        + thriftCLIService.getPortNumber();
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    LOG.info("Shutting down HiveServer2");
+    HiveConf hiveConf = this.getHiveConf();
+    super.stop();
+    // Remove this server instance from ZooKeeper if dynamic service discovery is set
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+      try {
+        removeServerInstanceFromZooKeeper();
+      } catch (Exception e) {
+        LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
+      }
+    }
+    // There should already be an instance of the session pool manager.
+    // If not, ignoring is fine while stopping HiveServer2.
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
+      try {
+        TezSessionPoolManager.getInstance().stop();
+      } catch (Exception e) {
+        LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. "
+            + "Shutting down HiveServer2 anyway.", e);
+      }
+    }
+
+    if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      try {
+        SparkSessionManagerImpl.getInstance().shutdown();
+      } catch(Exception ex) {
+        LOG.error("Spark session pool manager failed to stop during HiveServer2 shutdown.", ex);
+      }
+    }
+  }
+
+  private static void startHiveServer2() throws Throwable {
+    long attempts = 0, maxAttempts = 1;
+    while (true) {
+      LOG.info("Starting HiveServer2");
+      HiveConf hiveConf = new HiveConf();
+      maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
+      HiveServer2 server = null;
+      try {
+        server = new HiveServer2();
+        server.init(hiveConf);
+        server.start();
+        ShimLoader.getHadoopShims().startPauseMonitor(hiveConf);
+        // If we're supporting dynamic service discovery, we'll add the service uri for this
+        // HiveServer2 instance to Zookeeper as a znode.
+        if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+          server.addServerInstanceToZooKeeper(hiveConf);
+        }
+        if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
+          TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance();
+          sessionPool.setupPool(hiveConf);
+          sessionPool.startPool();
+        }
+
+        if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+          SparkSessionManagerImpl.getInstance().setup(hiveConf);
+        }
+        break;
+      } catch (Throwable throwable) {
+        if (server != null) {
+          try {
+            server.stop();
+          } catch (Throwable t) {
+            LOG.info("Exception caught when calling stop of HiveServer2 before retrying start", t);
+          } finally {
+            server = null;
+          }
+        }
+        if (++attempts >= maxAttempts) {
+          throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable);
+        } else {
+          LOG.warn("Error starting HiveServer2 on attempt " + attempts
+              + ", will retry in 60 seconds", throwable);
+          try {
+            Thread.sleep(60L * 1000L);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Remove all znodes corresponding to the given version number from ZooKeeper
+   *
+   * @param versionNumber
+   * @throws Exception
+   */
+  static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception {
+    HiveConf hiveConf = new HiveConf();
+    String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
+    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+    int baseSleepTime = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+    int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+    CuratorFramework zooKeeperClient =
+        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
+    zooKeeperClient.start();
+    List<String> znodePaths =
+        zooKeeperClient.getChildren().forPath(
+            ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
+    List<String> znodePathsUpdated;
+    // Now for each path that is for the given versionNumber, delete the znode from ZooKeeper
+    for (int i = 0; i < znodePaths.size(); i++) {
+      String znodePath = znodePaths.get(i);
+      deleteSignal = new CountDownLatch(1);
+      if (znodePath.contains("version=" + versionNumber + ";")) {
+        String fullZnodePath =
+            ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+                + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath;
+        LOG.warn("Will attempt to remove the znode: " + fullZnodePath + " from ZooKeeper");
+        System.out.println("Will attempt to remove the znode: " + fullZnodePath + " from ZooKeeper");
+        zooKeeperClient.delete().guaranteed().inBackground(new DeleteCallBack())
+            .forPath(fullZnodePath);
+        // Wait for the delete to complete
+        deleteSignal.await();
+        // Get the updated path list
+        znodePathsUpdated =
+            zooKeeperClient.getChildren().forPath(
+                ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
+        // Gives a list of any new paths that may have been created to maintain the persistent ephemeral node
+        znodePathsUpdated.removeAll(znodePaths);
+        // Add the new paths to the znodes list. We'll try for their removal as well.
+        znodePaths.addAll(znodePathsUpdated);
+      }
+    }
+    zooKeeperClient.close();
+  }
+
+  private static class DeleteCallBack implements BackgroundCallback {
+    @Override
+    public void processResult(CuratorFramework zooKeeperClient, CuratorEvent event)
+        throws Exception {
+      if (event.getType() == CuratorEventType.DELETE) {
+        deleteSignal.countDown();
+      }
+    }
+  }
+
+  public static void main(String[] args) {
+    HiveConf.setLoadHiveServer2Config(true);
+    try {
+      ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");
+      ServerOptionsProcessorResponse oprocResponse = oproc.parse(args);
+
+      // NOTE: It is critical to do this here so that log4j is reinitialized
+      // before any of the other core hive classes are loaded
+      String initLog4jMessage = LogUtils.initHiveLog4j();
+      LOG.debug(initLog4jMessage);
+      HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);
+
+      // Log debug message from "oproc" after log4j initialize properly
+      LOG.debug(oproc.getDebugMessage().toString());
+
+      // Call the executor which will execute the appropriate command based on the parsed options
+      oprocResponse.getServerOptionsExecutor().execute();
+    } catch (LogInitializationException e) {
+      LOG.error("Error initializing log: " + e.getMessage(), e);
+      System.exit(-1);
+    }
+  }
+
+  /**
+   * ServerOptionsProcessor.
+   * Process arguments given to HiveServer2 (-hiveconf property=value)
+   * Set properties in System properties
+   * Create an appropriate response object,
+   * which has executor to execute the appropriate command based on the parsed options.
+   */
+  static class ServerOptionsProcessor {
+    private final Options options = new Options();
+    private org.apache.commons.cli.CommandLine commandLine;
+    private final String serverName;
+    private final StringBuilder debugMessage = new StringBuilder();
+
+    @SuppressWarnings("static-access")
+    ServerOptionsProcessor(String serverName) {
+      this.serverName = serverName;
+      // -hiveconf x=y
+      options.addOption(OptionBuilder
+          .withValueSeparator()
+          .hasArgs(2)
+          .withArgName("property=value")
+          .withLongOpt("hiveconf")
+          .withDescription("Use value for given property")
+          .create());
+      // -deregister <versionNumber>
+      options.addOption(OptionBuilder
+          .hasArgs(1)
+          .withArgName("versionNumber")
+          .withLongOpt("deregister")
+          .withDescription("Deregister all instances of given version from dynamic service discovery")
+          .create());
+      options.addOption(new Option("H", "help", false, "Print help information"));
+    }
+
+    ServerOptionsProcessorResponse parse(String[] argv) {
+      try {
+        commandLine = new GnuParser().parse(options, argv);
+        // Process --hiveconf
+        // Get hiveconf param values and set the System property values
+        Properties confProps = commandLine.getOptionProperties("hiveconf");
+        for (String propKey : confProps.stringPropertyNames()) {
+          // save logging message for log4j output latter after log4j initialize properly
+          debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n");
+          System.setProperty(propKey, confProps.getProperty(propKey));
+        }
+
+        // Process --help
+        if (commandLine.hasOption('H')) {
+          return new ServerOptionsProcessorResponse(new HelpOptionExecutor(serverName, options));
+        }
+
+        // Process --deregister
+        if (commandLine.hasOption("deregister")) {
+          return new ServerOptionsProcessorResponse(new DeregisterOptionExecutor(
+              commandLine.getOptionValue("deregister")));
+        }
+      } catch (ParseException e) {
+        // Error out & exit - we were not able to parse the args successfully
+        System.err.println("Error starting HiveServer2 with given arguments: ");
+        System.err.println(e.getMessage());
+        System.exit(-1);
+      }
+      // Default executor, when no option is specified
+      return new ServerOptionsProcessorResponse(new StartOptionExecutor());
+    }
+
+    StringBuilder getDebugMessage() {
+      return debugMessage;
+    }
+  }
+
+  /**
+   * The response sent back from {@link ServerOptionsProcessor#parse(String[])}
+   */
+  static class ServerOptionsProcessorResponse {
+    private final ServerOptionsExecutor serverOptionsExecutor;
+
+    ServerOptionsProcessorResponse(ServerOptionsExecutor serverOptionsExecutor) {
+      this.serverOptionsExecutor = serverOptionsExecutor;
+    }
+
+    ServerOptionsExecutor getServerOptionsExecutor() {
+      return serverOptionsExecutor;
+    }
+  }
+
+  /**
+   * The executor interface for running the appropriate HiveServer2 command based on parsed options
+   */
+  static interface ServerOptionsExecutor {
+    public void execute();
+  }
+
+  /**
+   * HelpOptionExecutor: executes the --help option by printing out the usage
+   */
+  static class HelpOptionExecutor implements ServerOptionsExecutor {
+    private final Options options;
+    private final String serverName;
+
+    HelpOptionExecutor(String serverName, Options options) {
+      this.options = options;
+      this.serverName = serverName;
+    }
+
+    @Override
+    public void execute() {
+      new HelpFormatter().printHelp(serverName, options);
+      System.exit(0);
+    }
+  }
+
+  /**
+   * StartOptionExecutor: starts HiveServer2.
+   * This is the default executor, when no option is specified.
+   */
+  static class StartOptionExecutor implements ServerOptionsExecutor {
+    @Override
+    public void execute() {
+      try {
+        startHiveServer2();
+      } catch (Throwable t) {
+        LOG.fatal("Error starting HiveServer2", t);
+        System.exit(-1);
+      }
+    }
+  }
+
+  /**
+   * DeregisterOptionExecutor: executes the --deregister option by deregistering all HiveServer2
+   * instances from ZooKeeper of a specific version.
+   */
+  static class DeregisterOptionExecutor implements ServerOptionsExecutor {
+    private final String versionNumber;
+
+    DeregisterOptionExecutor(String versionNumber) {
+      this.versionNumber = versionNumber;
+    }
+
+    @Override
+    public void execute() {
+      try {
+        deleteServerInstancesFromZooKeeper(versionNumber);
+      } catch (Exception e) {
+        LOG.fatal("Error deregistering HiveServer2 instances for version: " + versionNumber
+            + " from ZooKeeper", e);
+        System.out.println("Error deregistering HiveServer2 instances for version: " + versionNumber
+            + " from ZooKeeper." + e);
+        System.exit(-1);
+      }
+      System.exit(0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java
new file mode 100644
index 0000000..fb8141a
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hive.service.server;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hadoop.hive.metastore.RawStore;
+
+/**
+ * A ThreadFactory for constructing new HiveServer2 threads that lets you plug
+ * in custom cleanup code to be called before this thread is GC-ed.
+ * Currently cleans up the following:
+ * 1. ThreadLocal RawStore object:
+ * In case of an embedded metastore, HiveServer2 threads (foreground & background)
+ * end up caching a ThreadLocal RawStore object. The ThreadLocal RawStore object has
+ * an instance of PersistenceManagerFactory & PersistenceManager.
+ * The PersistenceManagerFactory keeps a cache of PersistenceManager objects,
+ * which are only removed when PersistenceManager#close method is called.
+ * HiveServer2 uses ExecutorService for managing thread pools for foreground & background threads.
+ * ExecutorService unfortunately does not provide any hooks to be called,
+ * when a thread from the pool is terminated.
+ * As a solution, we're using this ThreadFactory to keep a cache of RawStore objects per thread.
+ * And we are doing clean shutdown in the finalizer for each thread.
+ */
+public class ThreadFactoryWithGarbageCleanup implements ThreadFactory {
+
+  private static Map<Long, RawStore> threadRawStoreMap = new ConcurrentHashMap<Long, RawStore>();
+
+  private final String namePrefix;
+
+  public ThreadFactoryWithGarbageCleanup(String threadPoolName) {
+    namePrefix = threadPoolName;
+  }
+
+  @Override
+  public Thread newThread(Runnable runnable) {
+    Thread newThread = new ThreadWithGarbageCleanup(runnable);
+    newThread.setName(namePrefix + ": Thread-" + newThread.getId());
+    return newThread;
+  }
+
+  public static Map<Long, RawStore> getThreadRawStoreMap() {
+    return threadRawStoreMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java
new file mode 100644
index 0000000..8ee9810
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hive.service.server;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.RawStore;
+
+/**
+ * A HiveServer2 thread used to construct new server threads.
+ * In particular, this thread ensures an orderly cleanup,
+ * when killed by its corresponding ExecutorService.
+ */
+public class ThreadWithGarbageCleanup extends Thread {
+  private static final Log LOG = LogFactory.getLog(ThreadWithGarbageCleanup.class);
+
+  Map<Long, RawStore> threadRawStoreMap =
+      ThreadFactoryWithGarbageCleanup.getThreadRawStoreMap();
+
+  public ThreadWithGarbageCleanup(Runnable runnable) {
+    super(runnable);
+  }
+
+  /**
+   * Add any Thread specific garbage cleanup code here.
+   * Currently, it shuts down the RawStore object for this thread if it is not null.
+   */
+  @Override
+  public void finalize() throws Throwable {
+    cleanRawStore();
+    super.finalize();
+  }
+
+  private void cleanRawStore() {
+    Long threadId = this.getId();
+    RawStore threadLocalRawStore = threadRawStoreMap.get(threadId);
+    if (threadLocalRawStore != null) {
+      LOG.debug("RawStore: " + threadLocalRawStore + ", for the thread: " +
+          this.getName()  +  " will be closed now.");
+      threadLocalRawStore.shutdown();
+      threadRawStoreMap.remove(threadId);
+    }
+  }
+
+  /**
+   * Cache the ThreadLocal RawStore object. Called from the corresponding thread.
+   */
+  public void cacheThreadLocalRawStore() {
+    Long threadId = this.getId();
+    RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore();
+    if (threadLocalRawStore != null && !threadRawStoreMap.containsKey(threadId)) {
+      LOG.debug("Adding RawStore: " + threadLocalRawStore + ", for the thread: " +
+          this.getName() + " to threadRawStoreMap for future cleanup.");
+      threadRawStoreMap.put(threadId, threadLocalRawStore);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message