spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [02/25] spark git commit: [SPARK-14987][SQL] inline hive-service (cli) into sql/hive-thriftserver
Date Fri, 29 Apr 2016 16:32:50 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
new file mode 100644
index 0000000..cc3e807
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.cli.HiveFileProcessor;
+import org.apache.hadoop.hive.common.cli.IHiveFileProcessor;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.exec.FetchFormatter;
+import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.history.HiveHistory;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.processors.SetProcessor;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.common.util.HiveVersionInfo;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.FetchType;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
+import org.apache.hive.service.cli.operation.GetCatalogsOperation;
+import org.apache.hive.service.cli.operation.GetColumnsOperation;
+import org.apache.hive.service.cli.operation.GetFunctionsOperation;
+import org.apache.hive.service.cli.operation.GetSchemasOperation;
+import org.apache.hive.service.cli.operation.GetTableTypesOperation;
+import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
+import org.apache.hive.service.cli.operation.MetadataOperation;
+import org.apache.hive.service.cli.operation.Operation;
+import org.apache.hive.service.cli.operation.OperationManager;
+import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.ThreadWithGarbageCleanup;
+
+/**
+ * HiveSession
+ *
+ */
+public class HiveSessionImpl implements HiveSession {
+  private final SessionHandle sessionHandle;
+  private String username;
+  private final String password;
+  private HiveConf hiveConf;
+  private SessionState sessionState;
+  private String ipAddress;
+  private static final String FETCH_WORK_SERDE_CLASS =
+      "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+  private static final Log LOG = LogFactory.getLog(HiveSessionImpl.class);
+  private SessionManager sessionManager;
+  private OperationManager operationManager;
+  private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>();
+  private boolean isOperationLogEnabled;
+  private File sessionLogDir;
+  private volatile long lastAccessTime;
+  private volatile long lastIdleTime;
+
+  public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
+      HiveConf serverhiveConf, String ipAddress) {
+    this.username = username;
+    this.password = password;
+    this.sessionHandle = new SessionHandle(protocol);
+    this.hiveConf = new HiveConf(serverhiveConf);
+    this.ipAddress = ipAddress;
+
+    try {
+      // In non-impersonation mode, map scheduler queue to current user
+      // if fair scheduler is configured.
+      if (! hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
+        hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) {
+        ShimLoader.getHadoopShims().refreshDefaultQueue(hiveConf, username);
+      }
+    } catch (IOException e) {
+      LOG.warn("Error setting scheduler queue: " + e, e);
+    }
+    // Set an explicit session name to control the download directory name
+    hiveConf.set(ConfVars.HIVESESSIONID.varname,
+        sessionHandle.getHandleIdentifier().toString());
+    // Use thrift transportable formatter
+    hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER,
+        FetchFormatter.ThriftFormatter.class.getName());
+    hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue());
+  }
+
+  @Override
+  /**
+   * Opens a new HiveServer2 session for the client connection.
+   * Creates a new SessionState object that will be associated with this HiveServer2 session.
+   * When the server executes multiple queries in the same session,
+   * this SessionState object is reused across multiple queries.
+   * Note that if doAs is true, this call goes through a proxy object,
+   * which wraps the method logic in a UserGroupInformation#doAs.
+   * That's why it is important to create SessionState here rather than in the constructor.
+   */
+  public void open(Map<String, String> sessionConfMap) throws HiveSQLException {
+    sessionState = new SessionState(hiveConf, username);
+    sessionState.setUserIpAddress(ipAddress);
+    sessionState.setIsHiveServerQuery(true);
+    SessionState.start(sessionState);
+    try {
+      sessionState.reloadAuxJars();
+    } catch (IOException e) {
+      String msg = "Failed to load reloadable jar file path: " + e;
+      LOG.error(msg, e);
+      throw new HiveSQLException(msg, e);
+    }
+    // Process global init file: .hiverc
+    processGlobalInitFile();
+    if (sessionConfMap != null) {
+      configureSession(sessionConfMap);
+    }
+    lastAccessTime = System.currentTimeMillis();
+    lastIdleTime = lastAccessTime;
+  }
+
+  /**
+   * It is used for processing hiverc file from HiveServer2 side.
+   */
+  private class GlobalHivercFileProcessor extends HiveFileProcessor {
+    @Override
+    protected BufferedReader loadFile(String fileName) throws IOException {
+      FileInputStream initStream = null;
+      BufferedReader bufferedReader = null;
+      initStream = new FileInputStream(fileName);
+      bufferedReader = new BufferedReader(new InputStreamReader(initStream));
+      return bufferedReader;
+    }
+
+    @Override
+    protected int processCmd(String cmd) {
+      int rc = 0;
+      String cmd_trimed = cmd.trim();
+      try {
+        executeStatementInternal(cmd_trimed, null, false);
+      } catch (HiveSQLException e) {
+        rc = -1;
+        LOG.warn("Failed to execute HQL command in global .hiverc file.", e);
+      }
+      return rc;
+    }
+  }
+
+  private void processGlobalInitFile() {
+    IHiveFileProcessor processor = new GlobalHivercFileProcessor();
+
+    try {
+      String hiverc = hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION);
+      if (hiverc != null) {
+        File hivercFile = new File(hiverc);
+        if (hivercFile.isDirectory()) {
+          hivercFile = new File(hivercFile, SessionManager.HIVERCFILE);
+        }
+        if (hivercFile.isFile()) {
+          LOG.info("Running global init file: " + hivercFile);
+          int rc = processor.processFile(hivercFile.getAbsolutePath());
+          if (rc != 0) {
+            LOG.error("Failed on initializing global .hiverc file");
+          }
+        } else {
+          LOG.debug("Global init file " + hivercFile + " does not exist");
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed on initializing global .hiverc file", e);
+    }
+  }
+
+  private void configureSession(Map<String, String> sessionConfMap) throws HiveSQLException {
+    SessionState.setCurrentSessionState(sessionState);
+    for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) {
+      String key = entry.getKey();
+      if (key.startsWith("set:")) {
+        try {
+          SetProcessor.setVariable(key.substring(4), entry.getValue());
+        } catch (Exception e) {
+          throw new HiveSQLException(e);
+        }
+      } else if (key.startsWith("use:")) {
+        SessionState.get().setCurrentDatabase(entry.getValue());
+      } else {
+        hiveConf.verifyAndSet(key, entry.getValue());
+      }
+    }
+  }
+
+  @Override
+  public void setOperationLogSessionDir(File operationLogRootDir) {
+    sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier().toString());
+    isOperationLogEnabled = true;
+    if (!sessionLogDir.exists()) {
+      if (!sessionLogDir.mkdir()) {
+        LOG.warn("Unable to create operation log session directory: " +
+            sessionLogDir.getAbsolutePath());
+        isOperationLogEnabled = false;
+      }
+    }
+    if (isOperationLogEnabled) {
+      LOG.info("Operation log session directory is created: " + sessionLogDir.getAbsolutePath());
+    }
+  }
+
+  @Override
+  public boolean isOperationLogEnabled() {
+    return isOperationLogEnabled;
+  }
+
+  @Override
+  public File getOperationLogSessionDir() {
+    return sessionLogDir;
+  }
+
+  @Override
+  public TProtocolVersion getProtocolVersion() {
+    return sessionHandle.getProtocolVersion();
+  }
+
+  @Override
+  public SessionManager getSessionManager() {
+    return sessionManager;
+  }
+
+  @Override
+  public void setSessionManager(SessionManager sessionManager) {
+    this.sessionManager = sessionManager;
+  }
+
+  private OperationManager getOperationManager() {
+    return operationManager;
+  }
+
+  @Override
+  public void setOperationManager(OperationManager operationManager) {
+    this.operationManager = operationManager;
+  }
+
+  protected synchronized void acquire(boolean userAccess) {
+    // Need to make sure that the this HiveServer2's session's SessionState is
+    // stored in the thread local for the handler thread.
+    SessionState.setCurrentSessionState(sessionState);
+    if (userAccess) {
+      lastAccessTime = System.currentTimeMillis();
+    }
+  }
+
+  /**
+   * 1. We'll remove the ThreadLocal SessionState as this thread might now serve
+   * other requests.
+   * 2. We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
+   * when this thread is garbage collected later.
+   * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
+   */
+  protected synchronized void release(boolean userAccess) {
+    SessionState.detachSession();
+    if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
+      ThreadWithGarbageCleanup currentThread =
+          (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
+      currentThread.cacheThreadLocalRawStore();
+    }
+    if (userAccess) {
+      lastAccessTime = System.currentTimeMillis();
+    }
+    if (opHandleSet.isEmpty()) {
+      lastIdleTime = System.currentTimeMillis();
+    } else {
+      lastIdleTime = 0;
+    }
+  }
+
+  @Override
+  public SessionHandle getSessionHandle() {
+    return sessionHandle;
+  }
+
+  @Override
+  public String getUsername() {
+    return username;
+  }
+
+  @Override
+  public String getPassword() {
+    return password;
+  }
+
+  @Override
+  public HiveConf getHiveConf() {
+    hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE, FETCH_WORK_SERDE_CLASS);
+    return hiveConf;
+  }
+
+  @Override
+  public IMetaStoreClient getMetaStoreClient() throws HiveSQLException {
+    try {
+      return Hive.get(getHiveConf()).getMSC();
+    } catch (HiveException e) {
+      throw new HiveSQLException("Failed to get metastore connection", e);
+    } catch (MetaException e) {
+      throw new HiveSQLException("Failed to get metastore connection", e);
+    }
+  }
+
+  @Override
+  public GetInfoValue getInfo(GetInfoType getInfoType)
+      throws HiveSQLException {
+    acquire(true);
+    try {
+      switch (getInfoType) {
+      case CLI_SERVER_NAME:
+        return new GetInfoValue("Hive");
+      case CLI_DBMS_NAME:
+        return new GetInfoValue("Apache Hive");
+      case CLI_DBMS_VER:
+        return new GetInfoValue(HiveVersionInfo.getVersion());
+      case CLI_MAX_COLUMN_NAME_LEN:
+        return new GetInfoValue(128);
+      case CLI_MAX_SCHEMA_NAME_LEN:
+        return new GetInfoValue(128);
+      case CLI_MAX_TABLE_NAME_LEN:
+        return new GetInfoValue(128);
+      case CLI_TXN_CAPABLE:
+      default:
+        throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
+      }
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public OperationHandle executeStatement(String statement, Map<String, String> confOverlay)
+      throws HiveSQLException {
+    return executeStatementInternal(statement, confOverlay, false);
+  }
+
+  @Override
+  public OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay)
+      throws HiveSQLException {
+    return executeStatementInternal(statement, confOverlay, true);
+  }
+
+  private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
+      boolean runAsync)
+          throws HiveSQLException {
+    acquire(true);
+
+    OperationManager operationManager = getOperationManager();
+    ExecuteStatementOperation operation = operationManager
+        .newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync);
+    OperationHandle opHandle = operation.getHandle();
+    try {
+      operation.run();
+      opHandleSet.add(opHandle);
+      return opHandle;
+    } catch (HiveSQLException e) {
+      // Refering to SQLOperation.java,there is no chance that a HiveSQLException throws and the asyn
+      // background operation submits to thread pool successfully at the same time. So, Cleanup
+      // opHandle directly when got HiveSQLException
+      operationManager.closeOperation(opHandle);
+      throw e;
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public OperationHandle getTypeInfo()
+      throws HiveSQLException {
+    acquire(true);
+
+    OperationManager operationManager = getOperationManager();
+    GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
+    OperationHandle opHandle = operation.getHandle();
+    try {
+      operation.run();
+      opHandleSet.add(opHandle);
+      return opHandle;
+    } catch (HiveSQLException e) {
+      operationManager.closeOperation(opHandle);
+      throw e;
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public OperationHandle getCatalogs()
+      throws HiveSQLException {
+    acquire(true);
+
+    OperationManager operationManager = getOperationManager();
+    GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
+    OperationHandle opHandle = operation.getHandle();
+    try {
+      operation.run();
+      opHandleSet.add(opHandle);
+      return opHandle;
+    } catch (HiveSQLException e) {
+      operationManager.closeOperation(opHandle);
+      throw e;
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public OperationHandle getSchemas(String catalogName, String schemaName)
+      throws HiveSQLException {
+    acquire(true);
+
+    OperationManager operationManager = getOperationManager();
+    GetSchemasOperation operation =
+        operationManager.newGetSchemasOperation(getSession(), catalogName, schemaName);
+    OperationHandle opHandle = operation.getHandle();
+    try {
+      operation.run();
+      opHandleSet.add(opHandle);
+      return opHandle;
+    } catch (HiveSQLException e) {
+      operationManager.closeOperation(opHandle);
+      throw e;
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public OperationHandle getTables(String catalogName, String schemaName, String tableName,
+      List<String> tableTypes)
+          throws HiveSQLException {
+    acquire(true);
+
+    OperationManager operationManager = getOperationManager();
+    MetadataOperation operation =
+        operationManager.newGetTablesOperation(getSession(), catalogName, schemaName, tableName, tableTypes);
+    OperationHandle opHandle = operation.getHandle();
+    try {
+      operation.run();
+      opHandleSet.add(opHandle);
+      return opHandle;
+    } catch (HiveSQLException e) {
+      operationManager.closeOperation(opHandle);
+      throw e;
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public OperationHandle getTableTypes()
+      throws HiveSQLException {
+    acquire(true);
+
+    OperationManager operationManager = getOperationManager();
+    GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
+    OperationHandle opHandle = operation.getHandle();
+    try {
+      operation.run();
+      opHandleSet.add(opHandle);
+      return opHandle;
+    } catch (HiveSQLException e) {
+      operationManager.closeOperation(opHandle);
+      throw e;
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public OperationHandle getColumns(String catalogName, String schemaName,
+      String tableName, String columnName)  throws HiveSQLException {
+    acquire(true);
+    String addedJars = Utilities.getResourceFiles(hiveConf, SessionState.ResourceType.JAR);
+    if (StringUtils.isNotBlank(addedJars)) {
+       IMetaStoreClient metastoreClient = getSession().getMetaStoreClient();
+       metastoreClient.setHiveAddedJars(addedJars);
+    }
+    OperationManager operationManager = getOperationManager();
+    GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(),
+        catalogName, schemaName, tableName, columnName);
+    OperationHandle opHandle = operation.getHandle();
+    try {
+      operation.run();
+      opHandleSet.add(opHandle);
+      return opHandle;
+    } catch (HiveSQLException e) {
+      operationManager.closeOperation(opHandle);
+      throw e;
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
+      throws HiveSQLException {
+    acquire(true);
+
+    OperationManager operationManager = getOperationManager();
+    GetFunctionsOperation operation = operationManager
+        .newGetFunctionsOperation(getSession(), catalogName, schemaName, functionName);
+    OperationHandle opHandle = operation.getHandle();
+    try {
+      operation.run();
+      opHandleSet.add(opHandle);
+      return opHandle;
+    } catch (HiveSQLException e) {
+      operationManager.closeOperation(opHandle);
+      throw e;
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public void close() throws HiveSQLException {
+    try {
+      acquire(true);
+      // Iterate through the opHandles and close their operations
+      for (OperationHandle opHandle : opHandleSet) {
+        operationManager.closeOperation(opHandle);
+      }
+      opHandleSet.clear();
+      // Cleanup session log directory.
+      cleanupSessionLogDir();
+      HiveHistory hiveHist = sessionState.getHiveHistory();
+      if (null != hiveHist) {
+        hiveHist.closeStream();
+      }
+      try {
+        sessionState.close();
+      } finally {
+        sessionState = null;
+      }
+    } catch (IOException ioe) {
+      throw new HiveSQLException("Failure to close", ioe);
+    } finally {
+      if (sessionState != null) {
+        try {
+          sessionState.close();
+        } catch (Throwable t) {
+          LOG.warn("Error closing session", t);
+        }
+        sessionState = null;
+      }
+      release(true);
+    }
+  }
+
+  private void cleanupSessionLogDir() {
+    if (isOperationLogEnabled) {
+      try {
+        FileUtils.forceDelete(sessionLogDir);
+      } catch (Exception e) {
+        LOG.error("Failed to cleanup session log dir: " + sessionHandle, e);
+      }
+    }
+  }
+
+  @Override
+  public SessionState getSessionState() {
+    return sessionState;
+  }
+
+  @Override
+  public String getUserName() {
+    return username;
+  }
+
+  @Override
+  public void setUserName(String userName) {
+    this.username = userName;
+  }
+
+  @Override
+  public long getLastAccessTime() {
+    return lastAccessTime;
+  }
+
+  @Override
+  public void closeExpiredOperations() {
+    OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
+    if (handles.length > 0) {
+      List<Operation> operations = operationManager.removeExpiredOperations(handles);
+      if (!operations.isEmpty()) {
+        closeTimedOutOperations(operations);
+      }
+    }
+  }
+
+  @Override
+  public long getNoOperationTime() {
+    return lastIdleTime > 0 ? System.currentTimeMillis() - lastIdleTime : 0;
+  }
+
+  private void closeTimedOutOperations(List<Operation> operations) {
+    acquire(false);
+    try {
+      for (Operation operation : operations) {
+        opHandleSet.remove(operation.getHandle());
+        try {
+          operation.close();
+        } catch (Exception e) {
+          LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e);
+        }
+      }
+    } finally {
+      release(false);
+    }
+  }
+
+  @Override
+  public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
+    acquire(true);
+    try {
+      sessionManager.getOperationManager().cancelOperation(opHandle);
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
+    acquire(true);
+    try {
+      operationManager.closeOperation(opHandle);
+      opHandleSet.remove(opHandle);
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
+    acquire(true);
+    try {
+      return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
+    } finally {
+      release(true);
+    }
+  }
+
+  @Override
+  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+      long maxRows, FetchType fetchType) throws HiveSQLException {
+    acquire(true);
+    try {
+      if (fetchType == FetchType.QUERY_OUTPUT) {
+        return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
+      }
+      return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows);
+    } finally {
+      release(true);
+    }
+  }
+
+  protected HiveSession getSession() {
+    return this;
+  }
+
+  @Override
+  public String getIpAddress() {
+    return ipAddress;
+  }
+
+  @Override
+  public void setIpAddress(String ipAddress) {
+    this.ipAddress = ipAddress;
+  }
+
+  @Override
+  public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer)
+      throws HiveSQLException {
+    HiveAuthFactory.verifyProxyAccess(getUsername(), owner, getIpAddress(), getHiveConf());
+    return authFactory.getDelegationToken(owner, renewer);
+  }
+
+  @Override
+  public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+      throws HiveSQLException {
+    HiveAuthFactory.verifyProxyAccess(getUsername(), getUserFromToken(authFactory, tokenStr),
+        getIpAddress(), getHiveConf());
+    authFactory.cancelDelegationToken(tokenStr);
+  }
+
+  @Override
+  public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+      throws HiveSQLException {
+    HiveAuthFactory.verifyProxyAccess(getUsername(), getUserFromToken(authFactory, tokenStr),
+        getIpAddress(), getHiveConf());
+    authFactory.renewDelegationToken(tokenStr);
+  }
+
+  // extract the real user from the given token string
+  private String getUserFromToken(HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException {
+    return authFactory.getUserFromToken(tokenStr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
new file mode 100644
index 0000000..a29e5d1
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.thrift.TProtocolVersion;
+
+/**
+ *
+ * HiveSessionImplwithUGI.
+ * HiveSession with connecting user's UGI and delegation token if required
+ */
+public class HiveSessionImplwithUGI extends HiveSessionImpl {
+  public static final String HS2TOKEN = "HiveServer2ImpersonationToken";
+
+  private UserGroupInformation sessionUgi = null;
+  private String delegationTokenStr = null;
+  private Hive sessionHive = null;
+  private HiveSession proxySession = null;
+  static final Log LOG = LogFactory.getLog(HiveSessionImplwithUGI.class);
+
+  public HiveSessionImplwithUGI(TProtocolVersion protocol, String username, String password,
+      HiveConf hiveConf, String ipAddress, String delegationToken) throws HiveSQLException {
+    super(protocol, username, password, hiveConf, ipAddress);
+    setSessionUGI(username);
+    setDelegationToken(delegationToken);
+
+    // create a new metastore connection for this particular user session
+    Hive.set(null);
+    try {
+      sessionHive = Hive.get(getHiveConf());
+    } catch (HiveException e) {
+      throw new HiveSQLException("Failed to setup metastore connection", e);
+    }
+  }
+
+  // setup appropriate UGI for the session
+  public void setSessionUGI(String owner) throws HiveSQLException {
+    if (owner == null) {
+      throw new HiveSQLException("No username provided for impersonation");
+    }
+    if (UserGroupInformation.isSecurityEnabled()) {
+      try {
+        sessionUgi = UserGroupInformation.createProxyUser(
+            owner, UserGroupInformation.getLoginUser());
+      } catch (IOException e) {
+        throw new HiveSQLException("Couldn't setup proxy user", e);
+      }
+    } else {
+      sessionUgi = UserGroupInformation.createRemoteUser(owner);
+    }
+  }
+
+  public UserGroupInformation getSessionUgi() {
+    return this.sessionUgi;
+  }
+
+  public String getDelegationToken () {
+    return this.delegationTokenStr;
+  }
+
+  @Override
+  protected synchronized void acquire(boolean userAccess) {
+    super.acquire(userAccess);
+    // if we have a metastore connection with impersonation, then set it first
+    if (sessionHive != null) {
+      Hive.set(sessionHive);
+    }
+  }
+
+  /**
+   * Close the file systems for the session and remove it from the FileSystem cache.
+   * Cancel the session's delegation token and close the metastore connection
+   */
+  @Override
+  public void close() throws HiveSQLException {
+    try {
+      acquire(true);
+      cancelDelegationToken();
+    } finally {
+      try {
+        super.close();
+      } finally {
+        try {
+          FileSystem.closeAllForUGI(sessionUgi);
+        } catch (IOException ioe) {
+          throw new HiveSQLException("Could not clean up file-system handles for UGI: "
+              + sessionUgi, ioe);
+        }
+      }
+    }
+  }
+
+  /**
+   * Enable delegation token for the session
+   * save the token string and set the token.signature in hive conf. The metastore client uses
+   * this token.signature to determine where to use kerberos or delegation token
+   * @throws HiveException
+   * @throws IOException
+   */
+  private void setDelegationToken(String delegationTokenStr) throws HiveSQLException {
+    this.delegationTokenStr = delegationTokenStr;
+    if (delegationTokenStr != null) {
+      getHiveConf().set("hive.metastore.token.signature", HS2TOKEN);
+      try {
+        Utils.setTokenStr(sessionUgi, delegationTokenStr, HS2TOKEN);
+      } catch (IOException e) {
+        throw new HiveSQLException("Couldn't setup delegation token in the ugi", e);
+      }
+    }
+  }
+
+  // If the session has a delegation token obtained from the metastore, then cancel it
+  private void cancelDelegationToken() throws HiveSQLException {
+    if (delegationTokenStr != null) {
+      try {
+        Hive.get(getHiveConf()).cancelDelegationToken(delegationTokenStr);
+      } catch (HiveException e) {
+        throw new HiveSQLException("Couldn't cancel delegation token", e);
+      }
+      // close the metastore connection created with this delegation token
+      Hive.closeCurrent();
+    }
+  }
+
+  @Override
+  protected HiveSession getSession() {
+    assert proxySession != null;
+
+    return proxySession;
+  }
+
+  public void setProxySession(HiveSession proxySession) {
+    this.proxySession = proxySession;
+  }
+
+  @Override
+  public String getDelegationToken(HiveAuthFactory authFactory, String owner,
+      String renewer) throws HiveSQLException {
+    return authFactory.getDelegationToken(owner, renewer);
+  }
+
+  @Override
+  public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+      throws HiveSQLException {
+    authFactory.cancelDelegationToken(tokenStr);
+  }
+
+  @Override
+  public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+      throws HiveSQLException {
+    authFactory.renewDelegationToken(tokenStr);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionProxy.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionProxy.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionProxy.java
new file mode 100644
index 0000000..5b10521
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionProxy.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+/**
+ * Proxy wrapper on HiveSession to execute operations
+ * by impersonating given user
+ */
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.cli.HiveSQLException;
+
+public class HiveSessionProxy implements InvocationHandler {
+  private final HiveSession base;
+  private final UserGroupInformation ugi;
+
+  public HiveSessionProxy(HiveSession hiveSession, UserGroupInformation ugi) {
+    this.base = hiveSession;
+    this.ugi = ugi;
+  }
+
+  public static HiveSession getProxy(HiveSession hiveSession, UserGroupInformation ugi)
+      throws IllegalArgumentException, HiveSQLException {
+    return (HiveSession)Proxy.newProxyInstance(HiveSession.class.getClassLoader(),
+        new Class<?>[] {HiveSession.class},
+        new HiveSessionProxy(hiveSession, ugi));
+  }
+
+  @Override
+  public Object invoke(Object arg0, final Method method, final Object[] args)
+      throws Throwable {
+    try {
+      if (method.getDeclaringClass() == HiveSessionBase.class) {
+        return invoke(method, args);
+      }
+      return ugi.doAs(
+        new PrivilegedExceptionAction<Object> () {
+          @Override
+          public Object run() throws HiveSQLException {
+            return invoke(method, args);
+          }
+        });
+    } catch (UndeclaredThrowableException e) {
+      Throwable innerException = e.getCause();
+      if (innerException instanceof PrivilegedActionException) {
+        throw innerException.getCause();
+      } else {
+        throw e.getCause();
+      }
+    }
+  }
+
+  private Object invoke(final Method method, final Object[] args) throws HiveSQLException {
+    try {
+      return method.invoke(base, args);
+    } catch (InvocationTargetException e) {
+      if (e.getCause() instanceof HiveSQLException) {
+        throw (HiveSQLException)e.getCause();
+      }
+      throw new RuntimeException(e.getCause());
+    } catch (IllegalArgumentException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
new file mode 100644
index 0000000..36a30b1
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.hooks.HookUtils;
+import org.apache.hive.service.CompositeService;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.operation.OperationManager;
+import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.HiveServer2;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
+
+/**
+ * SessionManager.
+ *
+ */
+public class SessionManager extends CompositeService {
+
+  private static final Log LOG = LogFactory.getLog(CompositeService.class);
+  public static final String HIVERCFILE = ".hiverc";
+  private HiveConf hiveConf;
+  private final Map<SessionHandle, HiveSession> handleToSession =
+      new ConcurrentHashMap<SessionHandle, HiveSession>();
+  private final OperationManager operationManager = new OperationManager();
+  private ThreadPoolExecutor backgroundOperationPool;
+  private boolean isOperationLogEnabled;
+  private File operationLogRootDir;
+
+  private long checkInterval;
+  private long sessionTimeout;
+  private boolean checkOperation;
+
+  private volatile boolean shutdown;
+  // The HiveServer2 instance running this service
+  private final HiveServer2 hiveServer2;
+
+  public SessionManager(HiveServer2 hiveServer2) {
+    super(SessionManager.class.getSimpleName());
+    this.hiveServer2 = hiveServer2;
+  }
+
+  @Override
+  public synchronized void init(HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
+    //Create operation log root directory, if operation logging is enabled
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+      initOperationLogRootDir();
+    }
+    createBackgroundOperationPool();
+    addService(operationManager);
+    super.init(hiveConf);
+  }
+
+  private void createBackgroundOperationPool() {
+    int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
+    LOG.info("HiveServer2: Background operation thread pool size: " + poolSize);
+    int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
+    LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize);
+    long keepAliveTime = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS);
+    LOG.info(
+        "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds");
+
+    // Create a thread pool with #poolSize threads
+    // Threads terminate when they are idle for more than the keepAliveTime
+    // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize
+    String threadPoolName = "HiveServer2-Background-Pool";
+    backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize,
+        keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(poolQueueSize),
+        new ThreadFactoryWithGarbageCleanup(threadPoolName));
+    backgroundOperationPool.allowCoreThreadTimeOut(true);
+
+    checkInterval = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
+    sessionTimeout = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+    checkOperation = HiveConf.getBoolVar(hiveConf,
+        ConfVars.HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION);
+  }
+
+  private void initOperationLogRootDir() {
+    operationLogRootDir = new File(
+        hiveConf.getVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION));
+    isOperationLogEnabled = true;
+
+    if (operationLogRootDir.exists() && !operationLogRootDir.isDirectory()) {
+      LOG.warn("The operation log root directory exists, but it is not a directory: " +
+          operationLogRootDir.getAbsolutePath());
+      isOperationLogEnabled = false;
+    }
+
+    if (!operationLogRootDir.exists()) {
+      if (!operationLogRootDir.mkdirs()) {
+        LOG.warn("Unable to create operation log root directory: " +
+            operationLogRootDir.getAbsolutePath());
+        isOperationLogEnabled = false;
+      }
+    }
+
+    if (isOperationLogEnabled) {
+      LOG.info("Operation log root directory is created: " + operationLogRootDir.getAbsolutePath());
+      try {
+        FileUtils.forceDeleteOnExit(operationLogRootDir);
+      } catch (IOException e) {
+        LOG.warn("Failed to schedule cleanup HS2 operation logging root dir: " +
+            operationLogRootDir.getAbsolutePath(), e);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+    if (checkInterval > 0) {
+      startTimeoutChecker();
+    }
+  }
+
+  private void startTimeoutChecker() {
+    final long interval = Math.max(checkInterval, 3000l);  // minimum 3 seconds
+    Runnable timeoutChecker = new Runnable() {
+      @Override
+      public void run() {
+        for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+          long current = System.currentTimeMillis();
+          for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
+            if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current
+                && (!checkOperation || session.getNoOperationTime() > sessionTimeout)) {
+              SessionHandle handle = session.getSessionHandle();
+              LOG.warn("Session " + handle + " is Timed-out (last access : " +
+                  new Date(session.getLastAccessTime()) + ") and will be closed");
+              try {
+                closeSession(handle);
+              } catch (HiveSQLException e) {
+                LOG.warn("Exception is thrown closing session " + handle, e);
+              }
+            } else {
+              session.closeExpiredOperations();
+            }
+          }
+        }
+      }
+
+      private void sleepInterval(long interval) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+      }
+    };
+    backgroundOperationPool.execute(timeoutChecker);
+  }
+
+  @Override
+  public synchronized void stop() {
+    super.stop();
+    shutdown = true;
+    if (backgroundOperationPool != null) {
+      backgroundOperationPool.shutdown();
+      long timeout = hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
+      try {
+        backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
+            " seconds has been exceeded. RUNNING background operations will be shut down", e);
+      }
+      backgroundOperationPool = null;
+    }
+    cleanupLoggingRootDir();
+  }
+
+  private void cleanupLoggingRootDir() {
+    if (isOperationLogEnabled) {
+      try {
+        FileUtils.forceDelete(operationLogRootDir);
+      } catch (Exception e) {
+        LOG.warn("Failed to cleanup root dir of HS2 logging: " + operationLogRootDir
+            .getAbsolutePath(), e);
+      }
+    }
+  }
+
+  public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress,
+      Map<String, String> sessionConf) throws HiveSQLException {
+    return openSession(protocol, username, password, ipAddress, sessionConf, false, null);
+  }
+
+  /**
+   * Opens a new session and creates a session handle.
+   * The username passed to this method is the effective username.
+   * If withImpersonation is true (==doAs true) we wrap all the calls in HiveSession
+   * within a UGI.doAs, where UGI corresponds to the effective user.
+   * @see org.apache.hive.service.cli.thrift.ThriftCLIService#getUserName()
+   *
+   * @param protocol
+   * @param username
+   * @param password
+   * @param ipAddress
+   * @param sessionConf
+   * @param withImpersonation
+   * @param delegationToken
+   * @return
+   * @throws HiveSQLException
+   */
+  public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress,
+      Map<String, String> sessionConf, boolean withImpersonation, String delegationToken)
+          throws HiveSQLException {
+    HiveSession session;
+    // If doAs is set to true for HiveServer2, we will create a proxy object for the session impl.
+    // Within the proxy object, we wrap the method call in a UserGroupInformation#doAs
+    if (withImpersonation) {
+      HiveSessionImplwithUGI sessionWithUGI = new HiveSessionImplwithUGI(protocol, username, password,
+          hiveConf, ipAddress, delegationToken);
+      session = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi());
+      sessionWithUGI.setProxySession(session);
+    } else {
+      session = new HiveSessionImpl(protocol, username, password, hiveConf, ipAddress);
+    }
+    session.setSessionManager(this);
+    session.setOperationManager(operationManager);
+    try {
+      session.open(sessionConf);
+    } catch (Exception e) {
+      try {
+        session.close();
+      } catch (Throwable t) {
+        LOG.warn("Error closing session", t);
+      }
+      session = null;
+      throw new HiveSQLException("Failed to open new session: " + e, e);
+    }
+    if (isOperationLogEnabled) {
+      session.setOperationLogSessionDir(operationLogRootDir);
+    }
+    try {
+      executeSessionHooks(session);
+    } catch (Exception e) {
+      try {
+        session.close();
+      } catch (Throwable t) {
+        LOG.warn("Error closing session", t);
+      }
+      session = null;
+      throw new HiveSQLException("Failed to execute session hooks", e);
+    }
+    handleToSession.put(session.getSessionHandle(), session);
+    return session.getSessionHandle();
+  }
+
+  public void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
+    HiveSession session = handleToSession.remove(sessionHandle);
+    if (session == null) {
+      throw new HiveSQLException("Session does not exist!");
+    }
+    try {
+      session.close();
+    } finally {
+      // Shutdown HiveServer2 if it has been deregistered from ZooKeeper and has no active sessions
+      if (!(hiveServer2 == null) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY))
+          && (!hiveServer2.isRegisteredWithZooKeeper())) {
+        // Asynchronously shutdown this instance of HiveServer2,
+        // if there are no active client sessions
+        if (getOpenSessionCount() == 0) {
+          LOG.info("This instance of HiveServer2 has been removed from the list of server "
+              + "instances available for dynamic service discovery. "
+              + "The last client session has ended - will shutdown now.");
+          Thread shutdownThread = new Thread() {
+            @Override
+            public void run() {
+              hiveServer2.stop();
+            }
+          };
+          shutdownThread.start();
+        }
+      }
+    }
+  }
+
+  public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException {
+    HiveSession session = handleToSession.get(sessionHandle);
+    if (session == null) {
+      throw new HiveSQLException("Invalid SessionHandle: " + sessionHandle);
+    }
+    return session;
+  }
+
+  public OperationManager getOperationManager() {
+    return operationManager;
+  }
+
+  private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
+    @Override
+    protected synchronized String initialValue() {
+      return null;
+    }
+  };
+
+  public static void setIpAddress(String ipAddress) {
+    threadLocalIpAddress.set(ipAddress);
+  }
+
+  public static void clearIpAddress() {
+    threadLocalIpAddress.remove();
+  }
+
+  public static String getIpAddress() {
+    return threadLocalIpAddress.get();
+  }
+
+  private static ThreadLocal<String> threadLocalUserName = new ThreadLocal<String>(){
+    @Override
+    protected synchronized String initialValue() {
+      return null;
+    }
+  };
+
+  public static void setUserName(String userName) {
+    threadLocalUserName.set(userName);
+  }
+
+  public static void clearUserName() {
+    threadLocalUserName.remove();
+  }
+
+  public static String getUserName() {
+    return threadLocalUserName.get();
+  }
+
+  private static ThreadLocal<String> threadLocalProxyUserName = new ThreadLocal<String>(){
+    @Override
+    protected synchronized String initialValue() {
+      return null;
+    }
+  };
+
+  public static void setProxyUserName(String userName) {
+    LOG.debug("setting proxy user name based on query param to: " + userName);
+    threadLocalProxyUserName.set(userName);
+  }
+
+  public static String getProxyUserName() {
+    return threadLocalProxyUserName.get();
+  }
+
+  public static void clearProxyUserName() {
+    threadLocalProxyUserName.remove();
+  }
+
+  // execute session hooks
+  private void executeSessionHooks(HiveSession session) throws Exception {
+    List<HiveSessionHook> sessionHooks = HookUtils.getHooks(hiveConf,
+        HiveConf.ConfVars.HIVE_SERVER2_SESSION_HOOK, HiveSessionHook.class);
+    for (HiveSessionHook sessionHook : sessionHooks) {
+      sessionHook.run(new HiveSessionHookContextImpl(session));
+    }
+  }
+
+  public Future<?> submitBackgroundOperation(Runnable r) {
+    return backgroundOperationPool.submit(r);
+  }
+
+  public int getOpenSessionCount() {
+    return handleToSession.size();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
new file mode 100644
index 0000000..ac63537
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.ICLIService;
+
+
+/**
+ * EmbeddedThriftBinaryCLIService.
+ *
+ */
+public class EmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService {
+
+  public EmbeddedThriftBinaryCLIService() {
+    super(new CLIService(null));
+    isEmbedded = true;
+    HiveConf.setLoadHiveServer2Config(true);
+  }
+
+  @Override
+  public synchronized void init(HiveConf hiveConf) {
+    cliService.init(hiveConf);
+    cliService.start();
+    super.init(hiveConf);
+  }
+
+  public ICLIService getService() {
+    return cliService;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
new file mode 100644
index 0000000..6c9efba
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportFactory;
+
+
+public class ThriftBinaryCLIService extends ThriftCLIService {
+
+  public ThriftBinaryCLIService(CLIService cliService) {
+    super(cliService, ThriftBinaryCLIService.class.getSimpleName());
+  }
+
+  @Override
+  public void run() {
+    try {
+      // Server thread pool
+      String threadPoolName = "HiveServer2-Handler-Pool";
+      ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+          workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+          new ThreadFactoryWithGarbageCleanup(threadPoolName));
+
+      // Thrift configs
+      hiveAuthFactory = new HiveAuthFactory(hiveConf);
+      TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
+      TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
+      TServerSocket serverSocket = null;
+      List<String> sslVersionBlacklist = new ArrayList<String>();
+      for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
+        sslVersionBlacklist.add(sslVersion);
+      }
+      if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
+        serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum);
+      } else {
+        String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
+        if (keyStorePath.isEmpty()) {
+          throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+              + " Not configured for SSL connection");
+        }
+        String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
+            HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
+        serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath,
+            keyStorePassword, sslVersionBlacklist);
+      }
+
+      // Server args
+      int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
+      int requestTimeout = (int) hiveConf.getTimeVar(
+          HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
+      int beBackoffSlotLength = (int) hiveConf.getTimeVar(
+          HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
+      TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
+          .processorFactory(processorFactory).transportFactory(transportFactory)
+          .protocolFactory(new TBinaryProtocol.Factory())
+          .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
+          .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
+          .beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
+          .executorService(executorService);
+
+      // TCP Server
+      server = new TThreadPoolServer(sargs);
+      server.setServerEventHandler(serverEventHandler);
+      String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+          + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
+      LOG.info(msg);
+      server.serve();
+    } catch (Throwable t) {
+      LOG.fatal(
+          "Error starting HiveServer2: could not start "
+              + ThriftBinaryCLIService.class.getSimpleName(), t);
+      System.exit(-1);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
new file mode 100644
index 0000000..5a0f1c8
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -0,0 +1,740 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.ServiceException;
+import org.apache.hive.service.ServiceUtils;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.TSetIpAddressProcessor;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.FetchType;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.server.HiveServer2;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * ThriftCLIService.
+ *
+ */
+public abstract class ThriftCLIService extends AbstractService implements TCLIService.Iface, Runnable {
+
+  public static final Log LOG = LogFactory.getLog(ThriftCLIService.class.getName());
+
+  protected CLIService cliService;
+  private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
+  protected static HiveAuthFactory hiveAuthFactory;
+
+  protected int portNum;
+  protected InetAddress serverIPAddress;
+  protected String hiveHost;
+  protected TServer server;
+  protected org.eclipse.jetty.server.Server httpServer;
+
+  private boolean isStarted = false;
+  protected boolean isEmbedded = false;
+
+  protected HiveConf hiveConf;
+
+  protected int minWorkerThreads;
+  protected int maxWorkerThreads;
+  protected long workerKeepAliveTime;
+
+  protected TServerEventHandler serverEventHandler;
+  protected ThreadLocal<ServerContext> currentServerContext;
+
+  static class ThriftCLIServerContext implements ServerContext {
+    private SessionHandle sessionHandle = null;
+
+    public void setSessionHandle(SessionHandle sessionHandle) {
+      this.sessionHandle = sessionHandle;
+    }
+
+    public SessionHandle getSessionHandle() {
+      return sessionHandle;
+    }
+  }
+
+  public ThriftCLIService(CLIService service, String serviceName) {
+    super(serviceName);
+    this.cliService = service;
+    currentServerContext = new ThreadLocal<ServerContext>();
+    serverEventHandler = new TServerEventHandler() {
+      @Override
+      public ServerContext createContext(
+          TProtocol input, TProtocol output) {
+        return new ThriftCLIServerContext();
+      }
+
+      @Override
+      public void deleteContext(ServerContext serverContext,
+          TProtocol input, TProtocol output) {
+        ThriftCLIServerContext context = (ThriftCLIServerContext)serverContext;
+        SessionHandle sessionHandle = context.getSessionHandle();
+        if (sessionHandle != null) {
+          LOG.info("Session disconnected without closing properly, close it now");
+          try {
+            cliService.closeSession(sessionHandle);
+          } catch (HiveSQLException e) {
+            LOG.warn("Failed to close session: " + e, e);
+          }
+        }
+      }
+
+      @Override
+      public void preServe() {
+      }
+
+      @Override
+      public void processContext(ServerContext serverContext,
+          TTransport input, TTransport output) {
+        currentServerContext.set(serverContext);
+      }
+    };
+  }
+
+  @Override
+  public synchronized void init(HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
+    // Initialize common server configs needed in both binary & http modes
+    String portString;
+    hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+    if (hiveHost == null) {
+      hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+    }
+    try {
+      if (hiveHost != null && !hiveHost.isEmpty()) {
+        serverIPAddress = InetAddress.getByName(hiveHost);
+      } else {
+        serverIPAddress = InetAddress.getLocalHost();
+      }
+    } catch (UnknownHostException e) {
+      throw new ServiceException(e);
+    }
+    // HTTP mode
+    if (HiveServer2.isHTTPTransportMode(hiveConf)) {
+      workerKeepAliveTime =
+          hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME,
+              TimeUnit.SECONDS);
+      portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
+      if (portString != null) {
+        portNum = Integer.valueOf(portString);
+      } else {
+        portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
+      }
+    }
+    // Binary mode
+    else {
+      workerKeepAliveTime =
+          hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+      portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
+      if (portString != null) {
+        portNum = Integer.valueOf(portString);
+      } else {
+        portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+      }
+    }
+    minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
+    maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+    super.init(hiveConf);
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+    if (!isStarted && !isEmbedded) {
+      new Thread(this).start();
+      isStarted = true;
+    }
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (isStarted && !isEmbedded) {
+      if(server != null) {
+        server.stop();
+        LOG.info("Thrift server has stopped");
+      }
+      if((httpServer != null) && httpServer.isStarted()) {
+        try {
+          httpServer.stop();
+          LOG.info("Http server has stopped");
+        } catch (Exception e) {
+          LOG.error("Error stopping Http server: ", e);
+        }
+      }
+      isStarted = false;
+    }
+    super.stop();
+  }
+
+  public int getPortNumber() {
+    return portNum;
+  }
+
+  public InetAddress getServerIPAddress() {
+    return serverIPAddress;
+  }
+
+  @Override
+  public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req)
+      throws TException {
+    TGetDelegationTokenResp resp = new TGetDelegationTokenResp();
+
+    if (hiveAuthFactory == null) {
+      resp.setStatus(unsecureTokenErrorStatus());
+    } else {
+      try {
+        String token = cliService.getDelegationToken(
+            new SessionHandle(req.getSessionHandle()),
+            hiveAuthFactory, req.getOwner(), req.getRenewer());
+        resp.setDelegationToken(token);
+        resp.setStatus(OK_STATUS);
+      } catch (HiveSQLException e) {
+        LOG.error("Error obtaining delegation token", e);
+        TStatus tokenErrorStatus = HiveSQLException.toTStatus(e);
+        tokenErrorStatus.setSqlState("42000");
+        resp.setStatus(tokenErrorStatus);
+      }
+    }
+    return resp;
+  }
+
+  @Override
+  public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req)
+      throws TException {
+    TCancelDelegationTokenResp resp = new TCancelDelegationTokenResp();
+
+    if (hiveAuthFactory == null) {
+      resp.setStatus(unsecureTokenErrorStatus());
+    } else {
+      try {
+        cliService.cancelDelegationToken(new SessionHandle(req.getSessionHandle()),
+            hiveAuthFactory, req.getDelegationToken());
+        resp.setStatus(OK_STATUS);
+      } catch (HiveSQLException e) {
+        LOG.error("Error canceling delegation token", e);
+        resp.setStatus(HiveSQLException.toTStatus(e));
+      }
+    }
+    return resp;
+  }
+
+  @Override
+  public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req)
+      throws TException {
+    TRenewDelegationTokenResp resp = new TRenewDelegationTokenResp();
+    if (hiveAuthFactory == null) {
+      resp.setStatus(unsecureTokenErrorStatus());
+    } else {
+      try {
+        cliService.renewDelegationToken(new SessionHandle(req.getSessionHandle()),
+            hiveAuthFactory, req.getDelegationToken());
+        resp.setStatus(OK_STATUS);
+      } catch (HiveSQLException e) {
+        LOG.error("Error obtaining renewing token", e);
+        resp.setStatus(HiveSQLException.toTStatus(e));
+      }
+    }
+    return resp;
+  }
+
+  private TStatus unsecureTokenErrorStatus() {
+    TStatus errorStatus = new TStatus(TStatusCode.ERROR_STATUS);
+    errorStatus.setErrorMessage("Delegation token only supported over remote " +
+        "client with kerberos authentication");
+    return errorStatus;
+  }
+
+  @Override
+  public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
+    LOG.info("Client protocol version: " + req.getClient_protocol());
+    TOpenSessionResp resp = new TOpenSessionResp();
+    try {
+      SessionHandle sessionHandle = getSessionHandle(req, resp);
+      resp.setSessionHandle(sessionHandle.toTSessionHandle());
+      // TODO: set real configuration map
+      resp.setConfiguration(new HashMap<String, String>());
+      resp.setStatus(OK_STATUS);
+      ThriftCLIServerContext context =
+        (ThriftCLIServerContext)currentServerContext.get();
+      if (context != null) {
+        context.setSessionHandle(sessionHandle);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error opening session: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  private String getIpAddress() {
+    String clientIpAddress;
+    // Http transport mode.
+    // We set the thread local ip address, in ThriftHttpServlet.
+    if (cliService.getHiveConf().getVar(
+        ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+      clientIpAddress = SessionManager.getIpAddress();
+    }
+    else {
+      // Kerberos
+      if (isKerberosAuthMode()) {
+        clientIpAddress = hiveAuthFactory.getIpAddress();
+      }
+      // Except kerberos, NOSASL
+      else {
+        clientIpAddress = TSetIpAddressProcessor.getUserIpAddress();
+      }
+    }
+    LOG.debug("Client's IP Address: " + clientIpAddress);
+    return clientIpAddress;
+  }
+
+  /**
+   * Returns the effective username.
+   * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user
+   * 2. If hive.server2.allow.user.substitution = true: the username of the end user,
+   * that the connecting user is trying to proxy for.
+   * This includes a check whether the connecting user is allowed to proxy for the end user.
+   * @param req
+   * @return
+   * @throws HiveSQLException
+   */
+  private String getUserName(TOpenSessionReq req) throws HiveSQLException {
+    String userName = null;
+    // Kerberos
+    if (isKerberosAuthMode()) {
+      userName = hiveAuthFactory.getRemoteUser();
+    }
+    // Except kerberos, NOSASL
+    if (userName == null) {
+      userName = TSetIpAddressProcessor.getUserName();
+    }
+    // Http transport mode.
+    // We set the thread local username, in ThriftHttpServlet.
+    if (cliService.getHiveConf().getVar(
+        ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+      userName = SessionManager.getUserName();
+    }
+    if (userName == null) {
+      userName = req.getUsername();
+    }
+
+    userName = getShortName(userName);
+    String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress());
+    LOG.debug("Client's username: " + effectiveClientUser);
+    return effectiveClientUser;
+  }
+
+  private String getShortName(String userName) {
+    String ret = null;
+    if (userName != null) {
+      int indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(userName);
+      ret = (indexOfDomainMatch <= 0) ? userName :
+          userName.substring(0, indexOfDomainMatch);
+    }
+
+    return ret;
+  }
+
+  /**
+   * Create a session handle
+   * @param req
+   * @param res
+   * @return
+   * @throws HiveSQLException
+   * @throws LoginException
+   * @throws IOException
+   */
+  SessionHandle getSessionHandle(TOpenSessionReq req, TOpenSessionResp res)
+      throws HiveSQLException, LoginException, IOException {
+    String userName = getUserName(req);
+    String ipAddress = getIpAddress();
+    TProtocolVersion protocol = getMinVersion(CLIService.SERVER_VERSION,
+        req.getClient_protocol());
+    SessionHandle sessionHandle;
+    if (cliService.getHiveConf().getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
+        (userName != null)) {
+      String delegationTokenStr = getDelegationToken(userName);
+      sessionHandle = cliService.openSessionWithImpersonation(protocol, userName,
+          req.getPassword(), ipAddress, req.getConfiguration(), delegationTokenStr);
+    } else {
+      sessionHandle = cliService.openSession(protocol, userName, req.getPassword(),
+          ipAddress, req.getConfiguration());
+    }
+    res.setServerProtocolVersion(protocol);
+    return sessionHandle;
+  }
+
+
+  private String getDelegationToken(String userName)
+      throws HiveSQLException, LoginException, IOException {
+    if (userName == null || !cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)
+        .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString())) {
+      return null;
+    }
+    try {
+      return cliService.getDelegationTokenFromMetaStore(userName);
+    } catch (UnsupportedOperationException e) {
+      // The delegation token is not applicable in the given deployment mode
+    }
+    return null;
+  }
+
+  private TProtocolVersion getMinVersion(TProtocolVersion... versions) {
+    TProtocolVersion[] values = TProtocolVersion.values();
+    int current = values[values.length - 1].getValue();
+    for (TProtocolVersion version : versions) {
+      if (current > version.getValue()) {
+        current = version.getValue();
+      }
+    }
+    for (TProtocolVersion version : values) {
+      if (version.getValue() == current) {
+        return version;
+      }
+    }
+    throw new IllegalArgumentException("never");
+  }
+
+  @Override
+  public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
+    TCloseSessionResp resp = new TCloseSessionResp();
+    try {
+      SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+      cliService.closeSession(sessionHandle);
+      resp.setStatus(OK_STATUS);
+      ThriftCLIServerContext context =
+        (ThriftCLIServerContext)currentServerContext.get();
+      if (context != null) {
+        context.setSessionHandle(null);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error closing session: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
+    TGetInfoResp resp = new TGetInfoResp();
+    try {
+      GetInfoValue getInfoValue =
+          cliService.getInfo(new SessionHandle(req.getSessionHandle()),
+              GetInfoType.getGetInfoType(req.getInfoType()));
+      resp.setInfoValue(getInfoValue.toTGetInfoValue());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting info: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
+    TExecuteStatementResp resp = new TExecuteStatementResp();
+    try {
+      SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+      String statement = req.getStatement();
+      Map<String, String> confOverlay = req.getConfOverlay();
+      Boolean runAsync = req.isRunAsync();
+      OperationHandle operationHandle = runAsync ?
+          cliService.executeStatementAsync(sessionHandle, statement, confOverlay)
+          : cliService.executeStatement(sessionHandle, statement, confOverlay);
+          resp.setOperationHandle(operationHandle.toTOperationHandle());
+          resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error executing statement: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
+    TGetTypeInfoResp resp = new TGetTypeInfoResp();
+    try {
+      OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(operationHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting type info: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
+    TGetCatalogsResp resp = new TGetCatalogsResp();
+    try {
+      OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting catalogs: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException {
+    TGetSchemasResp resp = new TGetSchemasResp();
+    try {
+      OperationHandle opHandle = cliService.getSchemas(
+          new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting schemas: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTablesResp GetTables(TGetTablesReq req) throws TException {
+    TGetTablesResp resp = new TGetTablesResp();
+    try {
+      OperationHandle opHandle = cliService
+          .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+              req.getSchemaName(), req.getTableName(), req.getTableTypes());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting tables: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException {
+    TGetTableTypesResp resp = new TGetTableTypesResp();
+    try {
+      OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting table types: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException {
+    TGetColumnsResp resp = new TGetColumnsResp();
+    try {
+      OperationHandle opHandle = cliService.getColumns(
+          new SessionHandle(req.getSessionHandle()),
+          req.getCatalogName(),
+          req.getSchemaName(),
+          req.getTableName(),
+          req.getColumnName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting columns: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException {
+    TGetFunctionsResp resp = new TGetFunctionsResp();
+    try {
+      OperationHandle opHandle = cliService.getFunctions(
+          new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+          req.getSchemaName(), req.getFunctionName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting functions: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException {
+    TGetOperationStatusResp resp = new TGetOperationStatusResp();
+    try {
+      OperationStatus operationStatus = cliService.getOperationStatus(
+          new OperationHandle(req.getOperationHandle()));
+      resp.setOperationState(operationStatus.getState().toTOperationState());
+      HiveSQLException opException = operationStatus.getOperationException();
+      if (opException != null) {
+        resp.setSqlState(opException.getSQLState());
+        resp.setErrorCode(opException.getErrorCode());
+        resp.setErrorMessage(opException.getMessage());
+      }
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting operation status: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException {
+    TCancelOperationResp resp = new TCancelOperationResp();
+    try {
+      cliService.cancelOperation(new OperationHandle(req.getOperationHandle()));
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error cancelling operation: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException {
+    TCloseOperationResp resp = new TCloseOperationResp();
+    try {
+      cliService.closeOperation(new OperationHandle(req.getOperationHandle()));
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error closing operation: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req)
+      throws TException {
+    TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
+    try {
+      TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle()));
+      resp.setSchema(schema.toTTableSchema());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting result set metadata: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
+    TFetchResultsResp resp = new TFetchResultsResp();
+    try {
+      RowSet rowSet = cliService.fetchResults(
+          new OperationHandle(req.getOperationHandle()),
+          FetchOrientation.getFetchOrientation(req.getOrientation()),
+          req.getMaxRows(),
+          FetchType.getFetchType(req.getFetchType()));
+      resp.setResults(rowSet.toTRowSet());
+      resp.setHasMoreRows(false);
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error fetching results: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public abstract void run();
+
+  /**
+   * If the proxy user name is provided then check privileges to substitute the user.
+   * @param realUser
+   * @param sessionConf
+   * @param ipAddress
+   * @return
+   * @throws HiveSQLException
+   */
+  private String getProxyUser(String realUser, Map<String, String> sessionConf,
+      String ipAddress) throws HiveSQLException {
+    String proxyUser = null;
+    // Http transport mode.
+    // We set the thread local proxy username, in ThriftHttpServlet.
+    if (cliService.getHiveConf().getVar(
+        ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+      proxyUser = SessionManager.getProxyUserName();
+      LOG.debug("Proxy user from query string: " + proxyUser);
+    }
+
+    if (proxyUser == null && sessionConf != null && sessionConf.containsKey(HiveAuthFactory.HS2_PROXY_USER)) {
+      String proxyUserFromThriftBody = sessionConf.get(HiveAuthFactory.HS2_PROXY_USER);
+      LOG.debug("Proxy user from thrift body: " + proxyUserFromThriftBody);
+      proxyUser = proxyUserFromThriftBody;
+    }
+
+    if (proxyUser == null) {
+      return realUser;
+    }
+
+    // check whether substitution is allowed
+    if (!hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ALLOW_USER_SUBSTITUTION)) {
+      throw new HiveSQLException("Proxy user substitution is not allowed");
+    }
+
+    // If there's no authentication, then directly substitute the user
+    if (HiveAuthFactory.AuthTypes.NONE.toString().
+        equalsIgnoreCase(hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION))) {
+      return proxyUser;
+    }
+
+    // Verify proxy user privilege of the realUser for the proxyUser
+    HiveAuthFactory.verifyProxyAccess(realUser, proxyUser, ipAddress, hiveConf);
+    LOG.debug("Verified proxy user: " + proxyUser);
+    return proxyUser;
+  }
+
+  private boolean isKerberosAuthMode() {
+    return cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)
+        .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message