hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From the...@apache.org
Subject [1/3] hive git commit: HIVE-17483 : HS2 kill command to kill queries using query id (Teddy Choi, reviewed by Thejas Nair )
Date Tue, 26 Sep 2017 21:04:31 GMT
Repository: hive
Updated Branches:
  refs/heads/master ccd02c837 -> 2d698b679


http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
index acc8c3a..8421e8f 100644
--- a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
+++ b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
@@ -7588,3 +7588,138 @@ class TProgressUpdateResp:
 
   def __ne__(self, other):
     return not (self == other)
+
+class TGetQueryIdReq:
+  """
+  Attributes:
+   - operationHandle
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRUCT, 'operationHandle', (TOperationHandle, TOperationHandle.thrift_spec),
None, ), # 1
+  )
+
+  def __init__(self, operationHandle=None,):
+    self.operationHandle = operationHandle
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRUCT:
+          self.operationHandle = TOperationHandle()
+          self.operationHandle.read(iprot)
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec
is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('TGetQueryIdReq')
+    if self.operationHandle is not None:
+      oprot.writeFieldBegin('operationHandle', TType.STRUCT, 1)
+      self.operationHandle.write(oprot)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.operationHandle is None:
+      raise TProtocol.TProtocolException(message='Required field operationHandle is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.operationHandle)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class TGetQueryIdResp:
+  """
+  Attributes:
+   - queryId
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'queryId', None, None, ), # 1
+  )
+
+  def __init__(self, queryId=None,):
+    self.queryId = queryId
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.queryId = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec
is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('TGetQueryIdResp')
+    if self.queryId is not None:
+      oprot.writeFieldBegin('queryId', TType.STRING, 1)
+      oprot.writeString(self.queryId)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.queryId is None:
+      raise TProtocol.TProtocolException(message='Required field queryId is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.queryId)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service.rb
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service.rb b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service.rb
index a50fe25..11e058c 100644
--- a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service.rb
+++ b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service.rb
@@ -326,6 +326,21 @@ module TCLIService
       raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT,
'RenewDelegationToken failed: unknown result')
     end
 
+    def GetQueryId(req)
+      send_GetQueryId(req)
+      return recv_GetQueryId()
+    end
+
+    def send_GetQueryId(req)
+      send_message('GetQueryId', GetQueryId_args, :req => req)
+    end
+
+    def recv_GetQueryId()
+      result = receive_message(GetQueryId_result)
+      return result.success unless result.success.nil?
+      raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT,
'GetQueryId failed: unknown result')
+    end
+
   end
 
   class Processor
@@ -478,6 +493,13 @@ module TCLIService
       write_result(result, oprot, 'RenewDelegationToken', seqid)
     end
 
+    def process_GetQueryId(seqid, iprot, oprot)
+      args = read_args(iprot, GetQueryId_args)
+      result = GetQueryId_result.new()
+      result.success = @handler.GetQueryId(args.req)
+      write_result(result, oprot, 'GetQueryId', seqid)
+    end
+
   end
 
   # HELPER FUNCTIONS AND STRUCTURES
@@ -1154,5 +1176,37 @@ module TCLIService
     ::Thrift::Struct.generate_accessors self
   end
 
+  class GetQueryId_args
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    REQ = 1
+
+    FIELDS = {
+      REQ => {:type => ::Thrift::Types::STRUCT, :name => 'req', :class => ::TGetQueryIdReq}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
+  class GetQueryId_result
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    SUCCESS = 0
+
+    FIELDS = {
+      SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class =>
::TGetQueryIdResp}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
 end
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
index 6695aee..994df1e 100644
--- a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
+++ b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
@@ -1917,3 +1917,37 @@ class TProgressUpdateResp
   ::Thrift::Struct.generate_accessors self
 end
 
+class TGetQueryIdReq
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+  OPERATIONHANDLE = 1
+
+  FIELDS = {
+    OPERATIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'operationHandle',
:class => ::TOperationHandle}
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required
field operationHandle is unset!') unless @operationHandle
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
+class TGetQueryIdResp
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+  QUERYID = 1
+
+  FIELDS = {
+    QUERYID => {:type => ::Thrift::Types::STRING, :name => 'queryId'}
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required
field queryId is unset!') unless @queryId
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/java/org/apache/hive/service/cli/CLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java
index 689b948..9b13ea7 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -43,6 +43,7 @@ import org.apache.hive.service.ServiceUtils;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.cli.operation.Operation;
 import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
 import org.apache.hive.service.server.HiveServer2;
 import org.slf4j.Logger;
@@ -606,6 +607,15 @@ public class CLIService extends CompositeService implements ICLIService
{
     LOG.info(sessionHandle  + ": renewDelegationToken()");
   }
 
+  @Override
+  public String getQueryId(TOperationHandle opHandle) throws HiveSQLException {
+    Operation operation = sessionManager.getOperationManager().getOperation(
+        new OperationHandle(opHandle));
+    final String queryId = operation.getParentSession().getHiveConf().getVar(ConfVars.HIVEQUERYID);
+    LOG.debug(opHandle + ": getQueryId() " + queryId);
+    return queryId;
+  }
+
   public SessionManager getSessionManager() {
     return sessionManager;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
index 43fbb00..98125d3 100644
--- a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
 
 
 /**
@@ -226,4 +227,9 @@ public class EmbeddedCLIServiceClient extends CLIServiceClient {
     return cliService.getCrossReference(sessionHandle, primaryCatalog, primarySchema,
       primaryTable, foreignCatalog, foreignSchema, foreignTable);
   }
+
+  @Override
+  public String getQueryId(TOperationHandle operationHandle) throws HiveSQLException {
+    return cliService.getQueryId(operationHandle);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/java/org/apache/hive/service/cli/ICLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/ICLIService.java b/service/src/java/org/apache/hive/service/cli/ICLIService.java
index 9f2039c..8c993a5 100644
--- a/service/src/java/org/apache/hive/service/cli/ICLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/ICLIService.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
 
 public interface ICLIService {
 
@@ -78,6 +79,8 @@ public interface ICLIService {
   OperationStatus getOperationStatus(OperationHandle opHandle, boolean getProgressUpdate)
       throws HiveSQLException;
 
+  String getQueryId(TOperationHandle operationHandle) throws HiveSQLException;
+
   void cancelOperation(OperationHandle opHandle)
       throws HiveSQLException;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 46f524d..1cf4392 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.QueryInfo;
 import org.apache.hadoop.hive.ql.log.LogDivertAppender;
 import org.apache.hadoop.hive.ql.log.LogDivertAppenderForTest;
 import org.apache.hadoop.hive.ql.session.OperationLog;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.service.AbstractService;
 import org.apache.hive.service.cli.FetchOrientation;
 import org.apache.hive.service.cli.HiveSQLException;
@@ -49,6 +50,12 @@ import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.server.KillQueryImpl;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +67,8 @@ public class OperationManager extends AbstractService {
   private final Logger LOG = LoggerFactory.getLogger(OperationManager.class.getName());
   private final ConcurrentHashMap<OperationHandle, Operation> handleToOperation =
       new ConcurrentHashMap<OperationHandle, Operation>();
+  private final ConcurrentHashMap<String, Operation> queryIdOperation =
+      new ConcurrentHashMap<String, Operation>();
 
   //Following fields for displaying queries on WebUI
   private Object webuiLock = new Object();
@@ -183,8 +192,13 @@ public class OperationManager extends AbstractService {
     return handleToOperation.get(operationHandle);
   }
 
+  private String getQueryId(Operation operation) {
+    return operation.getParentSession().getHiveConf().getVar(ConfVars.HIVEQUERYID);
+  }
+
   private void addOperation(Operation operation) {
     LOG.info("Adding operation: " + operation.getHandle());
+    queryIdOperation.put(getQueryId(operation), operation);
     handleToOperation.put(operation.getHandle(), operation);
     if (operation instanceof SQLOperation) {
       synchronized (webuiLock) {
@@ -196,6 +210,7 @@ public class OperationManager extends AbstractService {
 
   private Operation removeOperation(OperationHandle opHandle) {
     Operation operation = handleToOperation.remove(opHandle);
+    queryIdOperation.remove(getQueryId(operation));
     if (operation instanceof SQLOperation) {
       removeSafeQueryInfo(opHandle);
     }
@@ -215,11 +230,7 @@ public class OperationManager extends AbstractService {
         }
       }
 
-      handleToOperation.remove(operationHandle, operation);
-      if (operation instanceof SQLOperation) {
-        removeSafeQueryInfo(operationHandle);
-      }
-      return operation;
+      return removeOperation(operationHandle);
     }
     return null;
   }
@@ -400,4 +411,8 @@ public class OperationManager extends AbstractService {
       return historicalQueryInfos.get(handle);
     }
   }
+
+  public Operation getOperationByQueryId(String queryId) {
+    return queryIdOperation.get(queryId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 906565c..02efc19 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -73,6 +73,7 @@ import org.apache.hive.service.cli.operation.MetadataOperation;
 import org.apache.hive.service.cli.operation.Operation;
 import org.apache.hive.service.cli.operation.OperationManager;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.server.KillQueryImpl;
 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,6 +161,14 @@ public class HiveSessionImpl implements HiveSession {
     sessionState.setIsHiveServerQuery(true);
     sessionState.setForwardedAddresses(SessionManager.getForwardedAddresses());
     sessionState.setIsUsingThriftJDBCBinarySerDe(updateIsUsingThriftJDBCBinarySerDe());
+    try {
+      if (sessionManager != null) {
+        sessionState.setHiveServer2Host(sessionManager.getHiveServer2HostName());
+      }
+    } catch (Exception e) {
+      throw new HiveSQLException(e);
+    }
+    sessionState.setKillQuery(new KillQueryImpl(operationManager));
     SessionState.start(sessionState);
     try {
       sessionState.loadAuxJars();

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index 5082689..9b2ae57 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -571,5 +571,12 @@ public class SessionManager extends CompositeService {
   public int getOpenSessionCount() {
     return handleToSession.size();
   }
+
+  public String getHiveServer2HostName() throws Exception {
+    if (hiveServer2 == null) {
+      return null;
+    }
+    return hiveServer2.getServerHost();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
index 0e76c91..71e53b7 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
@@ -47,6 +47,7 @@ import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.SessionHandle;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
 import org.apache.thrift.TApplicationException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -195,6 +196,11 @@ public class RetryingThriftCLIServiceClient implements InvocationHandler
{
     }
 
     @Override
+    public String getQueryId(TOperationHandle operationHandle) throws HiveSQLException {
+      return cliService.getQueryId(operationHandle);
+    }
+
+    @Override
     public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
       cliService.cancelOperation(opHandle);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 9880fc6..7012a25 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -88,6 +88,8 @@ import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
 import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
 import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
 import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysResp;
+import org.apache.hive.service.rpc.thrift.TGetQueryIdReq;
+import org.apache.hive.service.rpc.thrift.TGetQueryIdResp;
 import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq;
 import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp;
 import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
@@ -802,6 +804,15 @@ public abstract class ThriftCLIService extends AbstractService implements
TCLISe
   }
 
   @Override
+  public TGetQueryIdResp GetQueryId(TGetQueryIdReq req) throws TException {
+    try {
+      return new TGetQueryIdResp(cliService.getQueryId(req.getOperationHandle()));
+    } catch (HiveSQLException e) {
+      throw new TException(e);
+    }
+  }
+
+  @Override
   public abstract void run();
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
index 617bc40..d43f125 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
@@ -65,6 +65,7 @@ import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
 import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
 import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
 import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysResp;
+import org.apache.hive.service.rpc.thrift.TGetQueryIdReq;
 import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq;
 import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp;
 import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
@@ -77,6 +78,7 @@ import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
 import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
 import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
 import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
 import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
 import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
@@ -553,4 +555,13 @@ public class ThriftCLIServiceClient extends CLIServiceClient {
       throw new HiveSQLException(e);
     }
   }
+
+  @Override
+  public String getQueryId(TOperationHandle operationHandle) throws HiveSQLException {
+    try {
+      return cliService.GetQueryId(new TGetQueryIdReq(operationHandle)).getQueryId();
+    } catch (TException e) {
+      throw new HiveSQLException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index e5f4491..a55cf59 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -481,7 +481,7 @@ public class HiveServer2 extends CompositeService {
         + thriftCLIService.getPortNumber();
   }
 
-  private String getServerHost() throws Exception {
+  public String getServerHost() throws Exception {
     if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
       throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
new file mode 100644
index 0000000..5412371
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.KillQuery;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.operation.Operation;
+import org.apache.hive.service.cli.operation.OperationManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KillQueryImpl implements KillQuery {
+  private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class);
+
+  private final OperationManager operationManager;
+
+  public KillQueryImpl(OperationManager operationManager) {
+    this.operationManager = operationManager;
+  }
+
+  @Override
+  public void killQuery(String queryId) throws HiveException {
+    try {
+      Operation operation = operationManager.getOperationByQueryId(queryId);
+      if (operation == null) {
+        LOG.info("Query not found: " + queryId);
+      } else {
+        OperationHandle handle = operation.getHandle();
+        operationManager.cancelOperation(handle);
+      }
+    } catch (HiveSQLException e) {
+      throw new HiveException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2d698b67/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
index efc2e1a..5f418c7 100644
--- a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
+++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
@@ -78,8 +78,7 @@ public class TestSessionManagerMetrics {
     conf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, false);
     MetricsFactory.init(conf);
 
-    HiveServer2 hs2 = new HiveServer2();
-    sm = new SessionManager(hs2);
+    sm = new SessionManager(null);
     sm.init(conf);
 
     metrics = (CodahaleMetrics) MetricsFactory.getInstance();


Mime
View raw message