Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EC85111A63 for ; Thu, 28 Aug 2014 03:16:40 +0000 (UTC) Received: (qmail 73951 invoked by uid 500); 28 Aug 2014 03:16:40 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 73901 invoked by uid 500); 28 Aug 2014 03:16:40 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 73890 invoked by uid 99); 28 Aug 2014 03:16:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Aug 2014 03:16:40 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Aug 2014 03:16:00 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9E3952388C4E; Thu, 28 Aug 2014 03:15:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1621031 [10/10] - in /hive/branches/cbo: ./ common/src/java/org/apache/hadoop/hive/conf/ contrib/src/test/results/clientnegative/ contrib/src/test/results/clientpositive/ data/files/ hbase-handler/src/test/results/negative/ hcatalog/core/s... Date: Thu, 28 Aug 2014 03:15:19 -0000 To: commits@hive.apache.org From: gunther@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140828031532.9E3952388C4E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hive/branches/cbo/service/src/gen/thrift/gen-cpp/TCLIService_types.h URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/gen/thrift/gen-cpp/TCLIService_types.h?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/gen/thrift/gen-cpp/TCLIService_types.h (original) +++ hive/branches/cbo/service/src/gen/thrift/gen-cpp/TCLIService_types.h Thu Aug 28 03:15:13 2014 @@ -3602,14 +3602,18 @@ class TGetResultSetMetadataResp { void swap(TGetResultSetMetadataResp &a, TGetResultSetMetadataResp &b); +typedef struct _TFetchResultsReq__isset { + _TFetchResultsReq__isset() : fetchType(true) {} + bool fetchType; +} _TFetchResultsReq__isset; class TFetchResultsReq { public: - static const char* ascii_fingerprint; // = "1B96A8C05BA9DD699FC8CD842240ABDE"; - static const uint8_t binary_fingerprint[16]; // = {0x1B,0x96,0xA8,0xC0,0x5B,0xA9,0xDD,0x69,0x9F,0xC8,0xCD,0x84,0x22,0x40,0xAB,0xDE}; + static const char* ascii_fingerprint; // = "B4CB1E4F8F8F4D50183DD372AD11753A"; + static const uint8_t binary_fingerprint[16]; // = {0xB4,0xCB,0x1E,0x4F,0x8F,0x8F,0x4D,0x50,0x18,0x3D,0xD3,0x72,0xAD,0x11,0x75,0x3A}; - TFetchResultsReq() : orientation((TFetchOrientation::type)0), maxRows(0) { + TFetchResultsReq() : orientation((TFetchOrientation::type)0), maxRows(0), fetchType(0) { orientation = (TFetchOrientation::type)0; } @@ -3619,6 +3623,9 @@ class TFetchResultsReq { TOperationHandle operationHandle; TFetchOrientation::type orientation; int64_t maxRows; + int16_t fetchType; + + _TFetchResultsReq__isset __isset; void __set_operationHandle(const TOperationHandle& val) { operationHandle = val; @@ -3632,6 +3639,11 @@ class TFetchResultsReq { maxRows = val; } + void __set_fetchType(const int16_t val) { + fetchType = val; + __isset.fetchType = true; + } + bool operator == (const TFetchResultsReq & rhs) const { if (!(operationHandle == rhs.operationHandle)) @@ -3640,6 +3652,10 @@ class TFetchResultsReq { return false; if (!(maxRows == rhs.maxRows)) return false; + if (__isset.fetchType != rhs.__isset.fetchType) + return false; + else if (__isset.fetchType && !(fetchType == rhs.fetchType)) + return false; return true; } bool operator != (const TFetchResultsReq &rhs) const { Modified: hive/branches/cbo/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java (original) +++ hive/branches/cbo/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java Thu Aug 28 03:15:13 2014 @@ -37,6 +37,7 @@ public class TFetchResultsReq implements private static final org.apache.thrift.protocol.TField OPERATION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationHandle", org.apache.thrift.protocol.TType.STRUCT, (short)1); private static final org.apache.thrift.protocol.TField ORIENTATION_FIELD_DESC = new org.apache.thrift.protocol.TField("orientation", org.apache.thrift.protocol.TType.I32, (short)2); private static final org.apache.thrift.protocol.TField MAX_ROWS_FIELD_DESC = new org.apache.thrift.protocol.TField("maxRows", org.apache.thrift.protocol.TType.I64, (short)3); + private static final org.apache.thrift.protocol.TField FETCH_TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("fetchType", org.apache.thrift.protocol.TType.I16, (short)4); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -47,6 +48,7 @@ public class TFetchResultsReq implements private TOperationHandle operationHandle; // required private TFetchOrientation orientation; // required private long maxRows; // required + private short fetchType; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -56,7 +58,8 @@ public class TFetchResultsReq implements * @see TFetchOrientation */ ORIENTATION((short)2, "orientation"), - MAX_ROWS((short)3, "maxRows"); + MAX_ROWS((short)3, "maxRows"), + FETCH_TYPE((short)4, "fetchType"); private static final Map byName = new HashMap(); @@ -77,6 +80,8 @@ public class TFetchResultsReq implements return ORIENTATION; case 3: // MAX_ROWS return MAX_ROWS; + case 4: // FETCH_TYPE + return FETCH_TYPE; default: return null; } @@ -118,7 +123,9 @@ public class TFetchResultsReq implements // isset id assignments private static final int __MAXROWS_ISSET_ID = 0; + private static final int __FETCHTYPE_ISSET_ID = 1; private byte __isset_bitfield = 0; + private _Fields optionals[] = {_Fields.FETCH_TYPE}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -128,6 +135,8 @@ public class TFetchResultsReq implements new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, TFetchOrientation.class))); tmpMap.put(_Fields.MAX_ROWS, new org.apache.thrift.meta_data.FieldMetaData("maxRows", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.FETCH_TYPE, new org.apache.thrift.meta_data.FieldMetaData("fetchType", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I16))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TFetchResultsReq.class, metaDataMap); } @@ -135,6 +144,8 @@ public class TFetchResultsReq implements public TFetchResultsReq() { this.orientation = org.apache.hive.service.cli.thrift.TFetchOrientation.FETCH_NEXT; + this.fetchType = (short)0; + } public TFetchResultsReq( @@ -161,6 +172,7 @@ public class TFetchResultsReq implements this.orientation = other.orientation; } this.maxRows = other.maxRows; + this.fetchType = other.fetchType; } public TFetchResultsReq deepCopy() { @@ -174,6 +186,8 @@ public class TFetchResultsReq implements setMaxRowsIsSet(false); this.maxRows = 0; + this.fetchType = (short)0; + } public TOperationHandle getOperationHandle() { @@ -252,6 +266,28 @@ public class TFetchResultsReq implements __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __MAXROWS_ISSET_ID, value); } + public short getFetchType() { + return this.fetchType; + } + + public void setFetchType(short fetchType) { + this.fetchType = fetchType; + setFetchTypeIsSet(true); + } + + public void unsetFetchType() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __FETCHTYPE_ISSET_ID); + } + + /** Returns true if field fetchType is set (has been assigned a value) and false otherwise */ + public boolean isSetFetchType() { + return EncodingUtils.testBit(__isset_bitfield, __FETCHTYPE_ISSET_ID); + } + + public void setFetchTypeIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __FETCHTYPE_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case OPERATION_HANDLE: @@ -278,6 +314,14 @@ public class TFetchResultsReq implements } break; + case FETCH_TYPE: + if (value == null) { + unsetFetchType(); + } else { + setFetchType((Short)value); + } + break; + } } @@ -292,6 +336,9 @@ public class TFetchResultsReq implements case MAX_ROWS: return Long.valueOf(getMaxRows()); + case FETCH_TYPE: + return Short.valueOf(getFetchType()); + } throw new IllegalStateException(); } @@ -309,6 +356,8 @@ public class TFetchResultsReq implements return isSetOrientation(); case MAX_ROWS: return isSetMaxRows(); + case FETCH_TYPE: + return isSetFetchType(); } throw new IllegalStateException(); } @@ -353,6 +402,15 @@ public class TFetchResultsReq implements return false; } + boolean this_present_fetchType = true && this.isSetFetchType(); + boolean that_present_fetchType = true && that.isSetFetchType(); + if (this_present_fetchType || that_present_fetchType) { + if (!(this_present_fetchType && that_present_fetchType)) + return false; + if (this.fetchType != that.fetchType) + return false; + } + return true; } @@ -375,6 +433,11 @@ public class TFetchResultsReq implements if (present_maxRows) builder.append(maxRows); + boolean present_fetchType = true && (isSetFetchType()); + builder.append(present_fetchType); + if (present_fetchType) + builder.append(fetchType); + return builder.toHashCode(); } @@ -416,6 +479,16 @@ public class TFetchResultsReq implements return lastComparison; } } + lastComparison = Boolean.valueOf(isSetFetchType()).compareTo(typedOther.isSetFetchType()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetFetchType()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fetchType, typedOther.fetchType); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -455,6 +528,12 @@ public class TFetchResultsReq implements sb.append("maxRows:"); sb.append(this.maxRows); first = false; + if (isSetFetchType()) { + if (!first) sb.append(", "); + sb.append("fetchType:"); + sb.append(this.fetchType); + first = false; + } sb.append(")"); return sb.toString(); } @@ -540,6 +619,14 @@ public class TFetchResultsReq implements org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 4: // FETCH_TYPE + if (schemeField.type == org.apache.thrift.protocol.TType.I16) { + struct.fetchType = iprot.readI16(); + struct.setFetchTypeIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -566,6 +653,11 @@ public class TFetchResultsReq implements oprot.writeFieldBegin(MAX_ROWS_FIELD_DESC); oprot.writeI64(struct.maxRows); oprot.writeFieldEnd(); + if (struct.isSetFetchType()) { + oprot.writeFieldBegin(FETCH_TYPE_FIELD_DESC); + oprot.writeI16(struct.fetchType); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -586,6 +678,14 @@ public class TFetchResultsReq implements struct.operationHandle.write(oprot); oprot.writeI32(struct.orientation.getValue()); oprot.writeI64(struct.maxRows); + BitSet optionals = new BitSet(); + if (struct.isSetFetchType()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetFetchType()) { + oprot.writeI16(struct.fetchType); + } } @Override @@ -598,6 +698,11 @@ public class TFetchResultsReq implements struct.setOrientationIsSet(true); struct.maxRows = iprot.readI64(); struct.setMaxRowsIsSet(true); + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.fetchType = iprot.readI16(); + struct.setFetchTypeIsSet(true); + } } } Modified: hive/branches/cbo/service/src/gen/thrift/gen-py/TCLIService/ttypes.py URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/gen/thrift/gen-py/TCLIService/ttypes.py?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/gen/thrift/gen-py/TCLIService/ttypes.py (original) +++ hive/branches/cbo/service/src/gen/thrift/gen-py/TCLIService/ttypes.py Thu Aug 28 03:15:13 2014 @@ -5752,6 +5752,7 @@ class TFetchResultsReq: - operationHandle - orientation - maxRows + - fetchType """ thrift_spec = ( @@ -5759,12 +5760,14 @@ class TFetchResultsReq: (1, TType.STRUCT, 'operationHandle', (TOperationHandle, TOperationHandle.thrift_spec), None, ), # 1 (2, TType.I32, 'orientation', None, 0, ), # 2 (3, TType.I64, 'maxRows', None, None, ), # 3 + (4, TType.I16, 'fetchType', None, 0, ), # 4 ) - def __init__(self, operationHandle=None, orientation=thrift_spec[2][4], maxRows=None,): + def __init__(self, operationHandle=None, orientation=thrift_spec[2][4], maxRows=None, fetchType=thrift_spec[4][4],): self.operationHandle = operationHandle self.orientation = orientation self.maxRows = maxRows + self.fetchType = fetchType def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -5791,6 +5794,11 @@ class TFetchResultsReq: self.maxRows = iprot.readI64(); else: iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I16: + self.fetchType = iprot.readI16(); + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -5813,6 +5821,10 @@ class TFetchResultsReq: oprot.writeFieldBegin('maxRows', TType.I64, 3) oprot.writeI64(self.maxRows) oprot.writeFieldEnd() + if self.fetchType is not None: + oprot.writeFieldBegin('fetchType', TType.I16, 4) + oprot.writeI16(self.fetchType) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() Modified: hive/branches/cbo/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb (original) +++ hive/branches/cbo/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb Thu Aug 28 03:15:13 2014 @@ -1598,11 +1598,13 @@ class TFetchResultsReq OPERATIONHANDLE = 1 ORIENTATION = 2 MAXROWS = 3 + FETCHTYPE = 4 FIELDS = { OPERATIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'operationHandle', :class => ::TOperationHandle}, ORIENTATION => {:type => ::Thrift::Types::I32, :name => 'orientation', :default => 0, :enum_class => ::TFetchOrientation}, - MAXROWS => {:type => ::Thrift::Types::I64, :name => 'maxRows'} + MAXROWS => {:type => ::Thrift::Types::I64, :name => 'maxRows'}, + FETCHTYPE => {:type => ::Thrift::Types::I16, :name => 'fetchType', :default => 0, :optional => true} } def struct_fields; FIELDS; end Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIService.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIService.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIService.java Thu Aug 28 03:15:13 2014 @@ -46,7 +46,6 @@ import org.apache.hive.service.Composite import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.operation.Operation; -import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; @@ -67,7 +66,6 @@ public class CLIService extends Composit private HiveConf hiveConf; private SessionManager sessionManager; - private IMetaStoreClient metastoreClient; private UserGroupInformation serviceUGI; private UserGroupInformation httpUGI; @@ -80,11 +78,8 @@ public class CLIService extends Composit this.hiveConf = hiveConf; sessionManager = new SessionManager(); addService(sessionManager); - /** - * If auth mode is Kerberos, do a kerberos login for the service from the keytab - */ - if (hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION).equalsIgnoreCase( - HiveAuthFactory.AuthTypes.KERBEROS.toString())) { + // If the hadoop cluster is secure, do a kerberos login for the service from the keytab + if (ShimLoader.getHadoopShims().isSecurityEnabled()) { try { HiveAuthFactory.loginFromKeytab(hiveConf); this.serviceUGI = ShimLoader.getHadoopShims().getUGIForConf(hiveConf); @@ -132,21 +127,23 @@ public class CLIService extends Composit } catch (IOException eIO) { throw new ServiceException("Error setting stage directories", eIO); } - + // Initialize and test a connection to the metastore + IMetaStoreClient metastoreClient = null; try { - // Initialize and test a connection to the metastore metastoreClient = new HiveMetaStoreClient(hiveConf); metastoreClient.getDatabases("default"); } catch (Exception e) { throw new ServiceException("Unable to connect to MetaStore!", e); } + finally { + if (metastoreClient != null) { + metastoreClient.close(); + } + } } @Override public synchronized void stop() { - if (metastoreClient != null) { - metastoreClient.close(); - } super.stop(); } @@ -170,7 +167,7 @@ public class CLIService extends Composit throws HiveSQLException { SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, null, configuration, true, delegationToken); - LOG.debug(sessionHandle + ": openSession()"); + LOG.debug(sessionHandle + ": openSessionWithImpersonation()"); return sessionHandle; } @@ -423,25 +420,20 @@ public class CLIService extends Composit } /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) + * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) */ @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) + public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { - RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle) - .getParentSession().fetchResults(opHandle, orientation, maxRows); - LOG.debug(opHandle + ": fetchResults()"); - return rowSet; + return fetchResults(opHandle, Operation.DEFAULT_FETCH_ORIENTATION, + Operation.DEFAULT_FETCH_MAX_ROWS, FetchType.QUERY_OUTPUT); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) - */ @Override - public RowSet fetchResults(OperationHandle opHandle) - throws HiveSQLException { + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException { RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle) - .getParentSession().fetchResults(opHandle); + .getParentSession().fetchResults(opHandle, orientation, maxRows, fetchType); LOG.debug(opHandle + ": fetchResults()"); return rowSet; } Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java Thu Aug 28 03:15:13 2014 @@ -28,19 +28,17 @@ import org.apache.hive.service.auth.Hive * */ public abstract class CLIServiceClient implements ICLIService { + private static final long DEFAULT_MAX_ROWS = 1000; public SessionHandle openSession(String username, String password) throws HiveSQLException { return openSession(username, password, Collections.emptyMap()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) - */ @Override public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { // TODO: provide STATIC default value - return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000); + return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, DEFAULT_MAX_ROWS, FetchType.QUERY_OUTPUT); } @Override Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java Thu Aug 28 03:15:13 2014 @@ -181,13 +181,10 @@ public class EmbeddedCLIServiceClient ex return cliService.getResultSetMetadata(opHandle); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) - */ @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { - return cliService.fetchResults(opHandle, orientation, maxRows); + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException { + return cliService.fetchResults(opHandle, orientation, maxRows, fetchType); } Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/ICLIService.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/ICLIService.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/ICLIService.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/ICLIService.java Thu Aug 28 03:15:13 2014 @@ -27,79 +27,78 @@ import org.apache.hive.service.auth.Hive public interface ICLIService { - public abstract SessionHandle openSession(String username, String password, + SessionHandle openSession(String username, String password, Map configuration) throws HiveSQLException; - public abstract SessionHandle openSessionWithImpersonation(String username, String password, + SessionHandle openSessionWithImpersonation(String username, String password, Map configuration, String delegationToken) throws HiveSQLException; - public abstract void closeSession(SessionHandle sessionHandle) + void closeSession(SessionHandle sessionHandle) throws HiveSQLException; - public abstract GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType) + GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType) throws HiveSQLException; - public abstract OperationHandle executeStatement(SessionHandle sessionHandle, String statement, + OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException; - public abstract OperationHandle executeStatementAsync(SessionHandle sessionHandle, + OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException; - public abstract OperationHandle getTypeInfo(SessionHandle sessionHandle) + OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException; - public abstract OperationHandle getCatalogs(SessionHandle sessionHandle) + OperationHandle getCatalogs(SessionHandle sessionHandle) throws HiveSQLException; - public abstract OperationHandle getSchemas(SessionHandle sessionHandle, + OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName, String schemaName) throws HiveSQLException; - public abstract OperationHandle getTables(SessionHandle sessionHandle, + OperationHandle getTables(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, List tableTypes) throws HiveSQLException; - public abstract OperationHandle getTableTypes(SessionHandle sessionHandle) + OperationHandle getTableTypes(SessionHandle sessionHandle) throws HiveSQLException; - public abstract OperationHandle getColumns(SessionHandle sessionHandle, + OperationHandle getColumns(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, String columnName) throws HiveSQLException; - public abstract OperationHandle getFunctions(SessionHandle sessionHandle, + OperationHandle getFunctions(SessionHandle sessionHandle, String catalogName, String schemaName, String functionName) throws HiveSQLException; - public abstract OperationStatus getOperationStatus(OperationHandle opHandle) + OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException; - public abstract void cancelOperation(OperationHandle opHandle) + void cancelOperation(OperationHandle opHandle) throws HiveSQLException; - public abstract void closeOperation(OperationHandle opHandle) + void closeOperation(OperationHandle opHandle) throws HiveSQLException; - public abstract TableSchema getResultSetMetadata(OperationHandle opHandle) + TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - public abstract RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, - long maxRows) - throws HiveSQLException; - - public abstract RowSet fetchResults(OperationHandle opHandle) + RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException; - public abstract String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, + RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException; + + String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException; - public abstract void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, + void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; - public abstract void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, + void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java Thu Aug 28 03:15:13 2014 @@ -42,11 +42,8 @@ public class GetCatalogsOperation extend rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); setState(OperationState.FINISHED); } Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java Thu Aug 28 03:15:13 2014 @@ -114,11 +114,8 @@ public class GetColumnsOperation extends this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java Thu Aug 28 03:15:13 2014 @@ -68,11 +68,8 @@ public class GetFunctionsOperation exten this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { if ((null == catalogName || "".equals(catalogName)) Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java Thu Aug 28 03:15:13 2014 @@ -50,11 +50,8 @@ public class GetSchemasOperation extends this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java Thu Aug 28 03:15:13 2014 @@ -50,11 +50,8 @@ public class GetTableTypesOperation exte rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { for (TableType type : TableType.values()) { Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java Thu Aug 28 03:15:13 2014 @@ -71,11 +71,8 @@ public class GetTablesOperation extends this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java Thu Aug 28 03:15:13 2014 @@ -79,11 +79,8 @@ public class GetTypeInfoOperation extend rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { for (Type type : Type.values()) { Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java Thu Aug 28 03:15:13 2014 @@ -94,11 +94,8 @@ public class HiveCommandOperation extend IOUtils.cleanup(LOG, parentSession.getSessionState().err); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.operation.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { String command = getStatement().trim(); @@ -136,6 +133,7 @@ public class HiveCommandOperation extend setState(OperationState.CLOSED); tearDownSessionIO(); cleanTmpFile(); + cleanupOperationLog(); } /* (non-Javadoc) Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java Thu Aug 28 03:15:13 2014 @@ -46,6 +46,7 @@ public abstract class MetadataOperation @Override public void close() throws HiveSQLException { setState(OperationState.CLOSED); + cleanupOperationLog(); } /** Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/Operation.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/Operation.java Thu Aug 28 03:15:13 2014 @@ -17,6 +17,8 @@ */ package org.apache.hive.service.cli.operation; +import java.io.File; +import java.io.FileNotFoundException; import java.util.EnumSet; import java.util.concurrent.Future; @@ -41,11 +43,14 @@ public abstract class Operation { private final OperationHandle opHandle; private HiveConf configuration; public static final Log LOG = LogFactory.getLog(Operation.class.getName()); + public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT; public static final long DEFAULT_FETCH_MAX_ROWS = 100; protected boolean hasResultSet; protected volatile HiveSQLException operationException; protected final boolean runAsync; protected volatile Future backgroundHandle; + protected OperationLog operationLog; + protected boolean isOperationLogEnabled; protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); @@ -106,6 +111,11 @@ public abstract class Operation { opHandle.setHasResultSet(hasResultSet); } + + public OperationLog getOperationLog() { + return operationLog; + } + protected final OperationState setState(OperationState newState) throws HiveSQLException { state.validateTransition(newState); this.state = newState; @@ -138,7 +148,97 @@ public abstract class Operation { return OperationState.ERROR.equals(state); } - public abstract void run() throws HiveSQLException; + protected void createOperationLog() { + if (parentSession.isOperationLogEnabled()) { + File operationLogFile = new File(parentSession.getOperationLogSessionDir(), + opHandle.getHandleIdentifier().toString()); + isOperationLogEnabled = true; + + // create log file + try { + if (operationLogFile.exists()) { + LOG.warn("The operation log file should not exist, but it is already there: " + + operationLogFile.getAbsolutePath()); + operationLogFile.delete(); + } + if (!operationLogFile.createNewFile()) { + // the log file already exists and cannot be deleted. + // If it can be read/written, keep its contents and use it. + if (!operationLogFile.canRead() || !operationLogFile.canWrite()) { + LOG.warn("The already existed operation log file cannot be recreated, " + + "and it cannot be read or written: " + operationLogFile.getAbsolutePath()); + isOperationLogEnabled = false; + return; + } + } + } catch (Exception e) { + LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e); + isOperationLogEnabled = false; + return; + } + + // create OperationLog object with above log file + try { + operationLog = new OperationLog(opHandle.toString(), operationLogFile); + } catch (FileNotFoundException e) { + LOG.warn("Unable to instantiate OperationLog object for operation: " + + opHandle, e); + isOperationLogEnabled = false; + return; + } + + // register this operationLog to current thread + OperationLog.setCurrentOperationLog(operationLog); + } + } + + protected void unregisterOperationLog() { + if (isOperationLogEnabled) { + OperationLog.removeCurrentOperationLog(); + } + } + + /** + * Invoked before runInternal(). + * Set up some preconditions, or configurations. + */ + protected void beforeRun() { + createOperationLog(); + } + + /** + * Invoked after runInternal(), even if an exception is thrown in runInternal(). + * Clean up resources, which was set up in beforeRun(). + */ + protected void afterRun() { + unregisterOperationLog(); + } + + /** + * Implemented by subclass of Operation class to execute specific behaviors. + * @throws HiveSQLException + */ + protected abstract void runInternal() throws HiveSQLException; + + public void run() throws HiveSQLException { + beforeRun(); + try { + runInternal(); + } finally { + afterRun(); + } + } + + protected void cleanupOperationLog() { + if (isOperationLogEnabled) { + if (operationLog == null) { + LOG.error("Operation [ " + opHandle.getHandleIdentifier() + " ] " + + "logging is enabled, but its OperationLog object cannot be found."); + } else { + operationLog.close(); + } + } + } // TODO: make this abstract and implement in subclasses. public void cancel() throws HiveSQLException { Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Thu Aug 28 03:15:13 2014 @@ -18,6 +18,7 @@ package org.apache.hive.service.cli.operation; +import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -25,22 +26,19 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hive.service.AbstractService; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationState; -import org.apache.hive.service.cli.OperationStatus; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.log4j.*; /** * OperationManager. * */ public class OperationManager extends AbstractService { - + private static final String DEFAULT_LAYOUT_PATTERN = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"; private final Log LOG = LogFactory.getLog(OperationManager.class.getName()); private HiveConf hiveConf; @@ -54,7 +52,11 @@ public class OperationManager extends Ab @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { + initOperationLogCapture(); + } else { + LOG.debug("Operation level logging is turned off"); + } super.init(hiveConf); } @@ -70,6 +72,30 @@ public class OperationManager extends Ab super.stop(); } + private void initOperationLogCapture() { + // There should be a ConsoleAppender. Copy its Layout. + Logger root = Logger.getRootLogger(); + Layout layout = null; + + Enumeration appenders = root.getAllAppenders(); + while (appenders.hasMoreElements()) { + Appender ap = (Appender) appenders.nextElement(); + if (ap.getClass().equals(ConsoleAppender.class)) { + layout = ap.getLayout(); + break; + } + } + + if (layout == null) { + layout = new PatternLayout(DEFAULT_LAYOUT_PATTERN); + LOG.info("Cannot find a Layout from a ConsoleAppender. Using default Layout pattern."); + } + + // Register another Appender (with the same layout) that talks to us. + Appender ap = new LogDivertAppender(layout, this); + root.addAppender(ap); + } + public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runAsync) throws HiveSQLException { @@ -191,4 +217,39 @@ public class OperationManager extends Ab throws HiveSQLException { return getOperation(opHandle).getNextRowSet(orientation, maxRows); } + + public RowSet getOperationLogRowSet(OperationHandle opHandle, + FetchOrientation orientation, long maxRows) + throws HiveSQLException { + // get the OperationLog object from the operation + OperationLog operationLog = getOperation(opHandle).getOperationLog(); + if (operationLog == null) { + throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle); + } + + // read logs + List logs = operationLog.readOperationLog(orientation, maxRows); + + // convert logs to RowSet + TableSchema tableSchema = new TableSchema(getLogSchema()); + RowSet rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion()); + for (String log : logs) { + rowSet.addRow(new String[] {log}); + } + + return rowSet; + } + + private Schema getLogSchema() { + Schema schema = new Schema(); + FieldSchema fieldSchema = new FieldSchema(); + fieldSchema.setName("operation_log"); + fieldSchema.setType("string"); + schema.addToFieldSchemas(fieldSchema); + return schema; + } + + public OperationLog getOperationLogByThread() { + return OperationLog.getCurrentOperationLog(); + } } Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Thu Aug 28 03:15:13 2014 @@ -60,6 +60,7 @@ import org.apache.hive.service.cli.RowSe import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.server.ThreadWithGarbageCleanup; /** * SQLOperation. @@ -134,7 +135,7 @@ public class SQLOperation extends Execut } } - private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException { + private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException { try { // In Hive server mode, we are not able to retry in the FetchTask // case, when calling fetch queries since execute() has returned. @@ -164,50 +165,63 @@ public class SQLOperation extends Execut } @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.PENDING); final HiveConf opConfig = getConfigForOperation(); prepare(opConfig); if (!shouldRunAsync()) { - runInternal(opConfig); + runQuery(opConfig); } else { + // We'll pass ThreadLocals in the background thread from the foreground (handler) thread final SessionState parentSessionState = SessionState.get(); - // current Hive object needs to be set in aysnc thread in case of remote metastore. - // The metastore client in Hive is associated with right user - final Hive sessionHive = getCurrentHive(); - // current UGI will get used by metastore when metsatore is in embedded mode - // so this needs to get passed to the new async thread + // ThreadLocal Hive object needs to be set in background thread. + // The metastore client in Hive is associated with right user. + final Hive parentHive = getSessionHive(); + // Current UGI will get used by metastore when metsatore is in embedded mode + // So this needs to get passed to the new background thread final UserGroupInformation currentUGI = getCurrentUGI(opConfig); - // Runnable impl to call runInternal asynchronously, // from a different thread Runnable backgroundOperation = new Runnable() { - @Override public void run() { PrivilegedExceptionAction doAsAction = new PrivilegedExceptionAction() { @Override public Object run() throws HiveSQLException { - - // Storing the current Hive object necessary when doAs is enabled - // User information is part of the metastore client member in Hive - Hive.set(sessionHive); + Hive.set(parentHive); SessionState.setCurrentSessionState(parentSessionState); + // Set current OperationLog in this async thread for keeping on saving query log. + registerCurrentOperationLog(); try { - runInternal(opConfig); + runQuery(opConfig); } catch (HiveSQLException e) { setOperationException(e); LOG.error("Error running hive query: ", e); + } finally { + unregisterOperationLog(); } return null; } }; + try { ShimLoader.getHadoopShims().doAs(currentUGI, doAsAction); } catch (Exception e) { setOperationException(new HiveSQLException(e)); LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); } + finally { + /** + * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup + * when this thread is garbage collected later. + * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() + */ + if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { + ThreadWithGarbageCleanup currentThread = + (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); + currentThread.cacheThreadLocalRawStore(); + } + } } }; try { @@ -223,6 +237,12 @@ public class SQLOperation extends Execut } } + /** + * Returns the current UGI on the stack + * @param opConfig + * @return UserGroupInformation + * @throws HiveSQLException + */ private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLException { try { return ShimLoader.getHadoopShims().getUGIForConf(opConfig); @@ -231,11 +251,28 @@ public class SQLOperation extends Execut } } - private Hive getCurrentHive() throws HiveSQLException { + /** + * Returns the ThreadLocal Hive for the current thread + * @return Hive + * @throws HiveSQLException + */ + private Hive getSessionHive() throws HiveSQLException { try { return Hive.get(); } catch (HiveException e) { - throw new HiveSQLException("Failed to get current Hive object", e); + throw new HiveSQLException("Failed to get ThreadLocal Hive object", e); + } + } + + private void registerCurrentOperationLog() { + if (isOperationLogEnabled) { + if (operationLog == null) { + LOG.warn("Failed to get current OperationLog object of Operation: " + + getHandle().getHandleIdentifier()); + isOperationLogEnabled = false; + return; + } + OperationLog.setCurrentOperationLog(operationLog); } } @@ -267,6 +304,7 @@ public class SQLOperation extends Execut @Override public void close() throws HiveSQLException { cleanup(OperationState.CLOSED); + cleanupOperationLog(); } @Override Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSession.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSession.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSession.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSession.java Thu Aug 28 03:15:13 2014 @@ -23,13 +23,7 @@ import java.util.Map; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hive.service.auth.HiveAuthFactory; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; public interface HiveSession extends HiveSessionBase { @@ -144,10 +138,8 @@ public interface HiveSession extends Hiv public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException; - - public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException; + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException; public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException; Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java Thu Aug 28 03:15:13 2014 @@ -24,6 +24,7 @@ import org.apache.hive.service.cli.Sessi import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import java.io.File; import java.util.Map; /** @@ -38,40 +39,57 @@ public interface HiveSessionBase { * Set the session manager for the session * @param sessionManager */ - public void setSessionManager(SessionManager sessionManager); + void setSessionManager(SessionManager sessionManager); /** * Get the session manager for the session */ - public SessionManager getSessionManager(); + SessionManager getSessionManager(); /** * Set operation manager for the session * @param operationManager */ - public void setOperationManager(OperationManager operationManager); + void setOperationManager(OperationManager operationManager); /** * Initialize the session * @param sessionConfMap */ - public void initialize(Map sessionConfMap) throws Exception; + void initialize(Map sessionConfMap) throws Exception; - public SessionHandle getSessionHandle(); + /** + * Check whether operation logging is enabled and session dir is created successfully + */ + boolean isOperationLogEnabled(); + + /** + * Get the session dir, which is the parent dir of operation logs + * @return a file representing the parent directory of operation logs + */ + File getOperationLogSessionDir(); + + /** + * Set the session dir, which is the parent dir of operation logs + * @param operationLogRootDir the parent dir of the session dir + */ + void setOperationLogSessionDir(File operationLogRootDir); + + SessionHandle getSessionHandle(); - public String getUsername(); + String getUsername(); - public String getPassword(); + String getPassword(); - public HiveConf getHiveConf(); + HiveConf getHiveConf(); - public SessionState getSessionState(); + SessionState getSessionState(); - public String getUserName(); + String getUserName(); - public void setUserName(String userName); + void setUserName(String userName); - public String getIpAddress(); + String getIpAddress(); - public void setIpAddress(String ipAddress); + void setIpAddress(String ipAddress); } Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Thu Aug 28 03:15:13 2014 @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.cli.HiveFileProcessor; @@ -44,14 +45,7 @@ import org.apache.hadoop.hive.ql.process import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.operation.ExecuteStatementOperation; import org.apache.hive.service.cli.operation.GetCatalogsOperation; import org.apache.hive.service.cli.operation.GetColumnsOperation; @@ -62,6 +56,7 @@ import org.apache.hive.service.cli.opera import org.apache.hive.service.cli.operation.MetadataOperation; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import org.apache.hive.service.server.ThreadWithGarbageCleanup; /** * HiveSession @@ -86,6 +81,8 @@ public class HiveSessionImpl implements private OperationManager operationManager; private IMetaStoreClient metastoreClient = null; private final Set opHandleSet = new HashSet(); + private boolean isOperationLogEnabled; + private File sessionLogDir; public HiveSessionImpl(TProtocolVersion protocol, String username, String password, HiveConf serverhiveConf, String ipAddress) { @@ -95,14 +92,19 @@ public class HiveSessionImpl implements this.hiveConf = new HiveConf(serverhiveConf); this.ipAddress = ipAddress; - // set an explicit session name to control the download directory name + // Set an explicit session name to control the download directory name hiveConf.set(ConfVars.HIVESESSIONID.varname, sessionHandle.getHandleIdentifier().toString()); - // use thrift transportable formatter + // Use thrift transportable formatter hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER, FetchFormatter.ThriftFormatter.class.getName()); hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue()); + /** + * Create a new SessionState object that will be associated with this HiveServer2 session. + * When the server executes multiple queries in the same session, + * this SessionState object is reused across multiple queries. + */ sessionState = new SessionState(hiveConf, username); sessionState.setUserIpAddress(ipAddress); sessionState.setIsHiveServerQuery(true); @@ -111,11 +113,9 @@ public class HiveSessionImpl implements @Override public void initialize(Map sessionConfMap) throws Exception { - //process global init file: .hiverc + // Process global init file: .hiverc processGlobalInitFile(); - SessionState.setCurrentSessionState(sessionState); - - //set conf properties specified by user from client side + // Set conf properties specified by user from client side if (sessionConfMap != null) { configureSession(sessionConfMap); } @@ -169,6 +169,7 @@ public class HiveSessionImpl implements } private void configureSession(Map sessionConfMap) throws Exception { + SessionState.setCurrentSessionState(sessionState); for (Map.Entry entry : sessionConfMap.entrySet()) { String key = entry.getKey(); if (key.startsWith("set:")) { @@ -182,6 +183,34 @@ public class HiveSessionImpl implements } @Override + public void setOperationLogSessionDir(File operationLogRootDir) { + sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier().toString()); + isOperationLogEnabled = true; + + if (!sessionLogDir.exists()) { + if (!sessionLogDir.mkdir()) { + LOG.warn("Unable to create operation log session directory: " + + sessionLogDir.getAbsolutePath()); + isOperationLogEnabled = false; + } + } + + if (isOperationLogEnabled) { + LOG.info("Operation log session directory is created: " + sessionLogDir.getAbsolutePath()); + } + } + + @Override + public boolean isOperationLogEnabled() { + return isOperationLogEnabled; + } + + @Override + public File getOperationLogSessionDir() { + return sessionLogDir; + } + + @Override public TProtocolVersion getProtocolVersion() { return sessionHandle.getProtocolVersion(); } @@ -211,14 +240,26 @@ public class HiveSessionImpl implements } protected synchronized void acquire() throws HiveSQLException { - // need to make sure that the this connections session state is - // stored in the thread local for sessions. + // Need to make sure that the this HiveServer2's session's session state is + // stored in the thread local for the handler thread. SessionState.setCurrentSessionState(sessionState); } + /** + * 1. We'll remove the ThreadLocal SessionState as this thread might now serve + * other requests. + * 2. We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup + * when this thread is garbage collected later. + * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() + */ protected synchronized void release() { assert sessionState != null; SessionState.detachSession(); + if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { + ThreadWithGarbageCleanup currentThread = + (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); + currentThread.cacheThreadLocalRawStore(); + } } @Override @@ -468,7 +509,7 @@ public class HiveSessionImpl implements try { acquire(); /** - * For metadata operations like getTables(), getColumns() etc, + * For metadata operations like getTables(), getColumns() etc, * the session allocates a private metastore handler which should be * closed at the end of the session */ @@ -480,6 +521,9 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); } opHandleSet.clear(); + // Cleanup session log directory. + cleanupSessionLogDir(); + HiveHistory hiveHist = sessionState.getHiveHistory(); if (null != hiveHist) { hiveHist.closeStream(); @@ -492,6 +536,16 @@ public class HiveSessionImpl implements } } + private void cleanupSessionLogDir() { + if (isOperationLogEnabled) { + try { + FileUtils.forceDelete(sessionLogDir); + } catch (Exception e) { + LOG.error("Failed to cleanup session log dir: " + sessionHandle, e); + } + } + } + @Override public SessionState getSessionState() { return sessionState; @@ -539,22 +593,17 @@ public class HiveSessionImpl implements } @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { - acquire(); - try { - return sessionManager.getOperationManager() - .getOperationNextRowSet(opHandle, orientation, maxRows); - } finally { - release(); - } - } - - @Override - public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException { acquire(); try { - return sessionManager.getOperationManager().getOperationNextRowSet(opHandle); + if (fetchType == FetchType.QUERY_OUTPUT) { + return sessionManager.getOperationManager() + .getOperationNextRowSet(opHandle, orientation, maxRows); + } else { + return sessionManager.getOperationManager() + .getOperationLogRowSet(opHandle, orientation, maxRows); + } } finally { release(); } Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Thu Aug 28 03:15:13 2014 @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.session; +import java.io.File; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -26,6 +28,7 @@ import java.util.concurrent.LinkedBlocki import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -38,6 +41,7 @@ import org.apache.hive.service.cli.HiveS import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; /** * SessionManager. @@ -52,6 +56,8 @@ public class SessionManager extends Comp new ConcurrentHashMap(); private final OperationManager operationManager = new OperationManager(); private ThreadPoolExecutor backgroundOperationPool; + private boolean isOperationLogEnabled; + private File operationLogRootDir; public SessionManager() { super("SessionManager"); @@ -64,22 +70,31 @@ public class SessionManager extends Comp } catch (HiveException e) { throw new RuntimeException("Error applying authorization policy on hive configuration", e); } - this.hiveConf = hiveConf; + //Create operation log root directory, if operation logging is enabled + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { + initOperationLogRootDir(); + } + createBackgroundOperationPool(); + addService(operationManager); + super.init(hiveConf); + } + + private void createBackgroundOperationPool() { int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); - LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize); + LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize); int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); - LOG.info("HiveServer2: Async execution wait queue size: " + backgroundPoolQueueSize); + LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize); int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME); - LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime); + LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime); // Create a thread pool with #backgroundPoolSize threads // Threads terminate when they are idle for more than the keepAliveTime - // An bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize + // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize + String threadPoolName = "HiveServer2-Background-Pool"; backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, - keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize)); + keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize), + new ThreadFactoryWithGarbageCleanup(threadPoolName)); backgroundOperationPool.allowCoreThreadTimeOut(true); - addService(operationManager); - super.init(hiveConf); } private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException { @@ -91,6 +106,36 @@ public class SessionManager extends Comp ss.applyAuthorizationPolicy(); } + private void initOperationLogRootDir() { + operationLogRootDir = new File( + hiveConf.getVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION)); + isOperationLogEnabled = true; + + if (operationLogRootDir.exists() && !operationLogRootDir.isDirectory()) { + LOG.warn("The operation log root directory exists, but it is not a directory: " + + operationLogRootDir.getAbsolutePath()); + isOperationLogEnabled = false; + } + + if (!operationLogRootDir.exists()) { + if (!operationLogRootDir.mkdirs()) { + LOG.warn("Unable to create operation log root directory: " + + operationLogRootDir.getAbsolutePath()); + isOperationLogEnabled = false; + } + } + + if (isOperationLogEnabled) { + LOG.info("Operation log root directory is created: " + operationLogRootDir.getAbsolutePath()); + try { + FileUtils.forceDeleteOnExit(operationLogRootDir); + } catch (IOException e) { + LOG.warn("Failed to schedule cleanup HS2 operation logging root dir: " + + operationLogRootDir.getAbsolutePath(), e); + } + } + } + @Override public synchronized void start() { super.start(); @@ -109,6 +154,18 @@ public class SessionManager extends Comp " seconds has been exceeded. RUNNING background operations will be shut down", e); } } + cleanupLoggingRootDir(); + } + + private void cleanupLoggingRootDir() { + if (isOperationLogEnabled) { + try { + FileUtils.forceDelete(operationLogRootDir); + } catch (Exception e) { + LOG.warn("Failed to cleanup root dir of HS2 logging: " + operationLogRootDir + .getAbsolutePath(), e); + } + } } public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress, @@ -132,6 +189,9 @@ public class SessionManager extends Comp session.setOperationManager(operationManager); try { session.initialize(sessionConf); + if (isOperationLogEnabled) { + session.setOperationLogSessionDir(operationLogRootDir); + } session.open(); } catch (Exception e) { throw new HiveSQLException("Failed to open new session", e); Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Thu Aug 28 03:15:13 2014 @@ -19,12 +19,17 @@ package org.apache.hive.service.cli.thrift; import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TThreadPoolServer; @@ -65,6 +70,11 @@ public class ThriftBinaryCLIService exte minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); + workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME); + String threadPoolName = "HiveServer2-Handler-Pool"; + ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, + workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), + new ThreadFactoryWithGarbageCleanup(threadPoolName)); TServerSocket serverSocket = null; if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { @@ -84,8 +94,7 @@ public class ThriftBinaryCLIService exte .processorFactory(processorFactory) .transportFactory(transportFactory) .protocolFactory(new TBinaryProtocol.Factory()) - .minWorkerThreads(minWorkerThreads) - .maxWorkerThreads(maxWorkerThreads); + .executorService(executorService); server = new TThreadPoolServer(sargs); Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Thu Aug 28 03:15:13 2014 @@ -29,20 +29,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.AbstractService; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.TSetIpAddressProcessor; -import org.apache.hive.service.cli.CLIService; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationStatus; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.session.SessionManager; import org.apache.thrift.TException; import org.apache.thrift.server.TServer; @@ -71,6 +61,7 @@ public abstract class ThriftCLIService e protected int minWorkerThreads; protected int maxWorkerThreads; + protected int workerKeepAliveTime; protected static HiveAuthFactory hiveAuthFactory; @@ -242,7 +233,9 @@ public abstract class ThriftCLIService e if (userName == null) { userName = req.getUsername(); } - return getProxyUser(userName, req.getConfiguration(), getIpAddress()); + String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress()); + LOG.debug("Client's username: " + effectiveClientUser); + return effectiveClientUser; } /** @@ -532,7 +525,8 @@ public abstract class ThriftCLIService e RowSet rowSet = cliService.fetchResults( new OperationHandle(req.getOperationHandle()), FetchOrientation.getFetchOrientation(req.getOrientation()), - req.getMaxRows()); + req.getMaxRows(), + FetchType.getFetchType(req.getFetchType())); resp.setResults(rowSet.toTRowSet()); resp.setHasMoreRows(false); resp.setStatus(OK_STATUS); Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java Thu Aug 28 03:15:13 2014 @@ -22,18 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.hive.service.auth.HiveAuthFactory; -import org.apache.hive.service.cli.CLIServiceClient; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationState; -import org.apache.hive.service.cli.OperationStatus; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.RowSetFactory; -import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.thrift.TException; /** @@ -377,17 +366,15 @@ public class ThriftCLIServiceClient exte } } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) - */ @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, + FetchType fetchType) throws HiveSQLException { try { TFetchResultsReq req = new TFetchResultsReq(); req.setOperationHandle(opHandle.toTOperationHandle()); req.setOrientation(orientation.toTFetchOrientation()); req.setMaxRows(maxRows); + req.setFetchType(fetchType.toTFetchType()); TFetchResultsResp resp = cliService.FetchResults(req); checkStatus(resp.getStatus()); return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion()); @@ -404,7 +391,7 @@ public class ThriftCLIServiceClient exte @Override public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { // TODO: set the correct default fetch size - return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000); + return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000, FetchType.QUERY_OUTPUT); } @Override Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1621031&r1=1621030&r2=1621031&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Thu Aug 28 03:15:13 2014 @@ -18,6 +18,11 @@ package org.apache.hive.service.cli.thrift; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; @@ -26,6 +31,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes; import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; @@ -36,7 +42,7 @@ import org.eclipse.jetty.server.ssl.SslS import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; public class ThriftHttpCLIService extends ThriftCLIService { @@ -63,13 +69,17 @@ public class ThriftHttpCLIService extend minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS); + workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME); String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); httpServer = new org.eclipse.jetty.server.Server(); - QueuedThreadPool threadPool = new QueuedThreadPool(); - threadPool.setMinThreads(minWorkerThreads); - threadPool.setMaxThreads(maxWorkerThreads); + String threadPoolName = "HiveServer2-HttpHandler-Pool"; + ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, + workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(), + new ThreadFactoryWithGarbageCleanup(threadPoolName)); + + ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService); httpServer.setThreadPool(threadPool); SelectChannelConnector connector = new SelectChannelConnector();;