hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [1/8] hive git commit: HIVE-13354 : Add ability to specify Compaction options per table and per request (Wei Zheng, reviewed by Eugene Koifman)
Date Fri, 27 May 2016 22:16:14 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2.1 4b553358b -> e276929df


http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 4db9680..8d88cd7 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -9984,6 +9984,7 @@ class CompactionRequest:
    - partitionname
    - type
    - runas
+   - properties
   """
 
   thrift_spec = (
@@ -9993,14 +9994,16 @@ class CompactionRequest:
     (3, TType.STRING, 'partitionname', None, None, ), # 3
     (4, TType.I32, 'type', None, None, ), # 4
     (5, TType.STRING, 'runas', None, None, ), # 5
+    (6, TType.MAP, 'properties', (TType.STRING,None,TType.STRING,None), None, ), # 6
   )
 
-  def __init__(self, dbname=None, tablename=None, partitionname=None, type=None, runas=None,):
+  def __init__(self, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None,):
     self.dbname = dbname
     self.tablename = tablename
     self.partitionname = partitionname
     self.type = type
     self.runas = runas
+    self.properties = properties
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -10036,6 +10039,17 @@ class CompactionRequest:
           self.runas = iprot.readString()
         else:
           iprot.skip(ftype)
+      elif fid == 6:
+        if ftype == TType.MAP:
+          self.properties = {}
+          (_ktype463, _vtype464, _size462 ) = iprot.readMapBegin()
+          for _i466 in xrange(_size462):
+            _key467 = iprot.readString()
+            _val468 = iprot.readString()
+            self.properties[_key467] = _val468
+          iprot.readMapEnd()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -10066,6 +10080,14 @@ class CompactionRequest:
       oprot.writeFieldBegin('runas', TType.STRING, 5)
       oprot.writeString(self.runas)
       oprot.writeFieldEnd()
+    if self.properties is not None:
+      oprot.writeFieldBegin('properties', TType.MAP, 6)
+      oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.properties))
+      for kiter469,viter470 in self.properties.items():
+        oprot.writeString(kiter469)
+        oprot.writeString(viter470)
+      oprot.writeMapEnd()
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -10086,6 +10108,7 @@ class CompactionRequest:
     value = (value * 31) ^ hash(self.partitionname)
     value = (value * 31) ^ hash(self.type)
     value = (value * 31) ^ hash(self.runas)
+    value = (value * 31) ^ hash(self.properties)
     return value
 
   def __repr__(self):
@@ -10387,11 +10410,11 @@ class ShowCompactResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.compacts = []
-          (_etype465, _size462) = iprot.readListBegin()
-          for _i466 in xrange(_size462):
-            _elem467 = ShowCompactResponseElement()
-            _elem467.read(iprot)
-            self.compacts.append(_elem467)
+          (_etype474, _size471) = iprot.readListBegin()
+          for _i475 in xrange(_size471):
+            _elem476 = ShowCompactResponseElement()
+            _elem476.read(iprot)
+            self.compacts.append(_elem476)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -10408,8 +10431,8 @@ class ShowCompactResponse:
     if self.compacts is not None:
       oprot.writeFieldBegin('compacts', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.compacts))
-      for iter468 in self.compacts:
-        iter468.write(oprot)
+      for iter477 in self.compacts:
+        iter477.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -10490,10 +10513,10 @@ class AddDynamicPartitions:
       elif fid == 4:
         if ftype == TType.LIST:
           self.partitionnames = []
-          (_etype472, _size469) = iprot.readListBegin()
-          for _i473 in xrange(_size469):
-            _elem474 = iprot.readString()
-            self.partitionnames.append(_elem474)
+          (_etype481, _size478) = iprot.readListBegin()
+          for _i482 in xrange(_size478):
+            _elem483 = iprot.readString()
+            self.partitionnames.append(_elem483)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -10527,8 +10550,8 @@ class AddDynamicPartitions:
     if self.partitionnames is not None:
       oprot.writeFieldBegin('partitionnames', TType.LIST, 4)
       oprot.writeListBegin(TType.STRING, len(self.partitionnames))
-      for iter475 in self.partitionnames:
-        oprot.writeString(iter475)
+      for iter484 in self.partitionnames:
+        oprot.writeString(iter484)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.operationType is not None:
@@ -10814,11 +10837,11 @@ class NotificationEventResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.events = []
-          (_etype479, _size476) = iprot.readListBegin()
-          for _i480 in xrange(_size476):
-            _elem481 = NotificationEvent()
-            _elem481.read(iprot)
-            self.events.append(_elem481)
+          (_etype488, _size485) = iprot.readListBegin()
+          for _i489 in xrange(_size485):
+            _elem490 = NotificationEvent()
+            _elem490.read(iprot)
+            self.events.append(_elem490)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -10835,8 +10858,8 @@ class NotificationEventResponse:
     if self.events is not None:
       oprot.writeFieldBegin('events', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.events))
-      for iter482 in self.events:
-        iter482.write(oprot)
+      for iter491 in self.events:
+        iter491.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -10957,10 +10980,10 @@ class InsertEventRequestData:
       if fid == 1:
         if ftype == TType.LIST:
           self.filesAdded = []
-          (_etype486, _size483) = iprot.readListBegin()
-          for _i487 in xrange(_size483):
-            _elem488 = iprot.readString()
-            self.filesAdded.append(_elem488)
+          (_etype495, _size492) = iprot.readListBegin()
+          for _i496 in xrange(_size492):
+            _elem497 = iprot.readString()
+            self.filesAdded.append(_elem497)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -10977,8 +11000,8 @@ class InsertEventRequestData:
     if self.filesAdded is not None:
       oprot.writeFieldBegin('filesAdded', TType.LIST, 1)
       oprot.writeListBegin(TType.STRING, len(self.filesAdded))
-      for iter489 in self.filesAdded:
-        oprot.writeString(iter489)
+      for iter498 in self.filesAdded:
+        oprot.writeString(iter498)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -11131,10 +11154,10 @@ class FireEventRequest:
       elif fid == 5:
         if ftype == TType.LIST:
           self.partitionVals = []
-          (_etype493, _size490) = iprot.readListBegin()
-          for _i494 in xrange(_size490):
-            _elem495 = iprot.readString()
-            self.partitionVals.append(_elem495)
+          (_etype502, _size499) = iprot.readListBegin()
+          for _i503 in xrange(_size499):
+            _elem504 = iprot.readString()
+            self.partitionVals.append(_elem504)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11167,8 +11190,8 @@ class FireEventRequest:
     if self.partitionVals is not None:
       oprot.writeFieldBegin('partitionVals', TType.LIST, 5)
       oprot.writeListBegin(TType.STRING, len(self.partitionVals))
-      for iter496 in self.partitionVals:
-        oprot.writeString(iter496)
+      for iter505 in self.partitionVals:
+        oprot.writeString(iter505)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -11355,12 +11378,12 @@ class GetFileMetadataByExprResult:
       if fid == 1:
         if ftype == TType.MAP:
           self.metadata = {}
-          (_ktype498, _vtype499, _size497 ) = iprot.readMapBegin()
-          for _i501 in xrange(_size497):
-            _key502 = iprot.readI64()
-            _val503 = MetadataPpdResult()
-            _val503.read(iprot)
-            self.metadata[_key502] = _val503
+          (_ktype507, _vtype508, _size506 ) = iprot.readMapBegin()
+          for _i510 in xrange(_size506):
+            _key511 = iprot.readI64()
+            _val512 = MetadataPpdResult()
+            _val512.read(iprot)
+            self.metadata[_key511] = _val512
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -11382,9 +11405,9 @@ class GetFileMetadataByExprResult:
     if self.metadata is not None:
       oprot.writeFieldBegin('metadata', TType.MAP, 1)
       oprot.writeMapBegin(TType.I64, TType.STRUCT, len(self.metadata))
-      for kiter504,viter505 in self.metadata.items():
-        oprot.writeI64(kiter504)
-        viter505.write(oprot)
+      for kiter513,viter514 in self.metadata.items():
+        oprot.writeI64(kiter513)
+        viter514.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.isSupported is not None:
@@ -11454,10 +11477,10 @@ class GetFileMetadataByExprRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype509, _size506) = iprot.readListBegin()
-          for _i510 in xrange(_size506):
-            _elem511 = iprot.readI64()
-            self.fileIds.append(_elem511)
+          (_etype518, _size515) = iprot.readListBegin()
+          for _i519 in xrange(_size515):
+            _elem520 = iprot.readI64()
+            self.fileIds.append(_elem520)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11489,8 +11512,8 @@ class GetFileMetadataByExprRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter512 in self.fileIds:
-        oprot.writeI64(iter512)
+      for iter521 in self.fileIds:
+        oprot.writeI64(iter521)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.expr is not None:
@@ -11564,11 +11587,11 @@ class GetFileMetadataResult:
       if fid == 1:
         if ftype == TType.MAP:
           self.metadata = {}
-          (_ktype514, _vtype515, _size513 ) = iprot.readMapBegin()
-          for _i517 in xrange(_size513):
-            _key518 = iprot.readI64()
-            _val519 = iprot.readString()
-            self.metadata[_key518] = _val519
+          (_ktype523, _vtype524, _size522 ) = iprot.readMapBegin()
+          for _i526 in xrange(_size522):
+            _key527 = iprot.readI64()
+            _val528 = iprot.readString()
+            self.metadata[_key527] = _val528
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -11590,9 +11613,9 @@ class GetFileMetadataResult:
     if self.metadata is not None:
       oprot.writeFieldBegin('metadata', TType.MAP, 1)
       oprot.writeMapBegin(TType.I64, TType.STRING, len(self.metadata))
-      for kiter520,viter521 in self.metadata.items():
-        oprot.writeI64(kiter520)
-        oprot.writeString(viter521)
+      for kiter529,viter530 in self.metadata.items():
+        oprot.writeI64(kiter529)
+        oprot.writeString(viter530)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.isSupported is not None:
@@ -11653,10 +11676,10 @@ class GetFileMetadataRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype525, _size522) = iprot.readListBegin()
-          for _i526 in xrange(_size522):
-            _elem527 = iprot.readI64()
-            self.fileIds.append(_elem527)
+          (_etype534, _size531) = iprot.readListBegin()
+          for _i535 in xrange(_size531):
+            _elem536 = iprot.readI64()
+            self.fileIds.append(_elem536)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11673,8 +11696,8 @@ class GetFileMetadataRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter528 in self.fileIds:
-        oprot.writeI64(iter528)
+      for iter537 in self.fileIds:
+        oprot.writeI64(iter537)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -11780,20 +11803,20 @@ class PutFileMetadataRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype532, _size529) = iprot.readListBegin()
-          for _i533 in xrange(_size529):
-            _elem534 = iprot.readI64()
-            self.fileIds.append(_elem534)
+          (_etype541, _size538) = iprot.readListBegin()
+          for _i542 in xrange(_size538):
+            _elem543 = iprot.readI64()
+            self.fileIds.append(_elem543)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.LIST:
           self.metadata = []
-          (_etype538, _size535) = iprot.readListBegin()
-          for _i539 in xrange(_size535):
-            _elem540 = iprot.readString()
-            self.metadata.append(_elem540)
+          (_etype547, _size544) = iprot.readListBegin()
+          for _i548 in xrange(_size544):
+            _elem549 = iprot.readString()
+            self.metadata.append(_elem549)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11815,15 +11838,15 @@ class PutFileMetadataRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter541 in self.fileIds:
-        oprot.writeI64(iter541)
+      for iter550 in self.fileIds:
+        oprot.writeI64(iter550)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.metadata is not None:
       oprot.writeFieldBegin('metadata', TType.LIST, 2)
       oprot.writeListBegin(TType.STRING, len(self.metadata))
-      for iter542 in self.metadata:
-        oprot.writeString(iter542)
+      for iter551 in self.metadata:
+        oprot.writeString(iter551)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.type is not None:
@@ -11931,10 +11954,10 @@ class ClearFileMetadataRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype546, _size543) = iprot.readListBegin()
-          for _i547 in xrange(_size543):
-            _elem548 = iprot.readI64()
-            self.fileIds.append(_elem548)
+          (_etype555, _size552) = iprot.readListBegin()
+          for _i556 in xrange(_size552):
+            _elem557 = iprot.readI64()
+            self.fileIds.append(_elem557)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11951,8 +11974,8 @@ class ClearFileMetadataRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter549 in self.fileIds:
-        oprot.writeI64(iter549)
+      for iter558 in self.fileIds:
+        oprot.writeI64(iter558)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -12181,11 +12204,11 @@ class GetAllFunctionsResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.functions = []
-          (_etype553, _size550) = iprot.readListBegin()
-          for _i554 in xrange(_size550):
-            _elem555 = Function()
-            _elem555.read(iprot)
-            self.functions.append(_elem555)
+          (_etype562, _size559) = iprot.readListBegin()
+          for _i563 in xrange(_size559):
+            _elem564 = Function()
+            _elem564.read(iprot)
+            self.functions.append(_elem564)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12202,8 +12225,8 @@ class GetAllFunctionsResponse:
     if self.functions is not None:
       oprot.writeFieldBegin('functions', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.functions))
-      for iter556 in self.functions:
-        iter556.write(oprot)
+      for iter565 in self.functions:
+        iter565.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index c7e7cb4..0964cd8 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -2242,13 +2242,15 @@ class CompactionRequest
   PARTITIONNAME = 3
   TYPE = 4
   RUNAS = 5
+  PROPERTIES = 6
 
   FIELDS = {
     DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'},
     TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tablename'},
     PARTITIONNAME => {:type => ::Thrift::Types::STRING, :name => 'partitionname', :optional => true},
     TYPE => {:type => ::Thrift::Types::I32, :name => 'type', :enum_class => ::CompactionType},
-    RUNAS => {:type => ::Thrift::Types::STRING, :name => 'runas', :optional => true}
+    RUNAS => {:type => ::Thrift::Types::STRING, :name => 'runas', :optional => true},
+    PROPERTIES => {:type => ::Thrift::Types::MAP, :name => 'properties', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true}
   }
 
   def struct_fields; FIELDS; end

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 7d5ddee..4487828 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2146,6 +2146,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
   }
 
   @Override
+  @Deprecated
   public void compact(String dbname, String tableName, String partitionName,  CompactionType type)
       throws TException {
     CompactionRequest cr = new CompactionRequest();
@@ -2158,6 +2159,19 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
   }
 
   @Override
+  public void compact(String dbname, String tableName, String partitionName, CompactionType type,
+                      Map<String, String> tblproperties) throws TException {
+    CompactionRequest cr = new CompactionRequest();
+    if (dbname == null) cr.setDbname(DEFAULT_DATABASE_NAME);
+    else cr.setDbname(dbname);
+    cr.setTablename(tableName);
+    if (partitionName != null) cr.setPartitionname(partitionName);
+    cr.setType(type);
+    cr.setProperties(tblproperties);
+    client.compact(cr);
+  }
+
+  @Override
   public ShowCompactResponse showCompactions() throws TException {
     return client.show_compact(new ShowCompactRequest());
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 06a1b58..b6fe502 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -1436,10 +1436,28 @@ public interface IMetaStoreClient {
    * @param type Whether this is a major or minor compaction.
    * @throws TException
    */
+  @Deprecated
   void compact(String dbname, String tableName, String partitionName,  CompactionType type)
       throws TException;
 
   /**
+   * Send a request to compact a table or partition.  This will not block until the compaction is
+   * complete.  It will instead put a request on the queue for that table or partition to be
+   * compacted.  No checking is done on the dbname, tableName, or partitionName to make sure they
+   * refer to valid objects.  It is assumed this has already been done by the caller.
+   * @param dbname Name of the database the table is in.  If null, this will be assumed to be
+   *               'default'.
+   * @param tableName Name of the table to be compacted.  This cannot be null.  If partitionName
+   *                  is null, this must be a non-partitioned table.
+   * @param partitionName Name of the partition to be compacted
+   * @param type Whether this is a major or minor compaction.
+   * @param tblproperties the list of tblproperties to override for this compact. Can be null.
+   * @throws TException
+   */
+  void compact(String dbname, String tableName, String partitionName, CompactionType type,
+               Map<String, String> tblproperties) throws TException;
+
+  /**
    * Get a list of all current compactions.
    * @return List of all current compactions.  This includes compactions waiting to happen,
    * in progress, and finished but waiting to clean the existing files.

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index bea1473..85e0885 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -37,6 +37,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
   String workerId;
   long start;
   public String runAs;
+  public String properties;
   public boolean tooManyAborts = false;
   /**
    * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) 
@@ -102,6 +103,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
       "partName:" + partName + "," +
       "state:" + state + "," +
       "type:" + type + "," +
+      "properties:" + properties + "," +
       "runAs:" + runAs + "," +
       "tooManyAborts:" + tooManyAborts + "," +
       "highestTxnId:" + highestTxnId;
@@ -120,12 +122,13 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
     fullCi.partName = rs.getString(4);
     fullCi.state = rs.getString(5).charAt(0);//cq_state
     fullCi.type = TxnHandler.dbCompactionType2ThriftType(rs.getString(6).charAt(0));
-    fullCi.workerId = rs.getString(7);
-    fullCi.start = rs.getLong(8);
-    fullCi.runAs = rs.getString(9);
-    fullCi.highestTxnId = rs.getLong(10);
-    fullCi.metaInfo = rs.getBytes(11);
-    fullCi.hadoopJobId = rs.getString(12);
+    fullCi.properties = rs.getString(7);
+    fullCi.workerId = rs.getString(8);
+    fullCi.start = rs.getLong(9);
+    fullCi.runAs = rs.getString(10);
+    fullCi.highestTxnId = rs.getLong(11);
+    fullCi.metaInfo = rs.getBytes(12);
+    fullCi.hadoopJobId = rs.getString(13);
     return fullCi;
   }
   static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException {
@@ -135,12 +138,13 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
     pStmt.setString(4, ci.partName);
     pStmt.setString(5, Character.toString(ci.state));
     pStmt.setString(6, Character.toString(TxnHandler.thriftCompactionType2DbType(ci.type)));
-    pStmt.setString(7, ci.workerId);
-    pStmt.setLong(8, ci.start);
-    pStmt.setLong(9, endTime);
-    pStmt.setString(10, ci.runAs);
-    pStmt.setLong(11, ci.highestTxnId);
-    pStmt.setBytes(12, ci.metaInfo);
-    pStmt.setString(13, ci.hadoopJobId);
+    pStmt.setString(7, ci.properties);
+    pStmt.setString(8, ci.workerId);
+    pStmt.setLong(9, ci.start);
+    pStmt.setLong(10, endTime);
+    pStmt.setString(11, ci.runAs);
+    pStmt.setLong(12, ci.highestTxnId);
+    pStmt.setBytes(13, ci.metaInfo);
+    pStmt.setString(14, ci.hadoopJobId);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index d2d6462..75a4d87 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -168,7 +168,7 @@ class CompactionTxnHandler extends TxnHandler {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "select cq_id, cq_database, cq_table, cq_partition, " +
-          "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
+          "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
         if (!rs.next()) {
@@ -184,6 +184,7 @@ class CompactionTxnHandler extends TxnHandler {
           info.tableName = rs.getString(3);
           info.partName = rs.getString(4);
           info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
+          info.properties = rs.getString(6);
           // Now, update this record as being worked on by this worker.
           long now = getDbTime(dbConn);
           s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
@@ -328,7 +329,7 @@ class CompactionTxnHandler extends TxnHandler {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id);
+        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id);
         if(rs.next()) {
           info = CompactionInfo.loadFullFromCompactionQueue(rs);
         }
@@ -344,7 +345,7 @@ class CompactionTxnHandler extends TxnHandler {
           LOG.debug("Going to rollback");
           dbConn.rollback();
         }
-        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
+        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
         info.state = SUCCEEDED_STATE;
         CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn));
         updCount = pStmt.executeUpdate();
@@ -837,7 +838,7 @@ class CompactionTxnHandler extends TxnHandler {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id);
+        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id);
         if(rs.next()) {
           ci = CompactionInfo.loadFullFromCompactionQueue(rs);
           String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id;
@@ -865,7 +866,7 @@ class CompactionTxnHandler extends TxnHandler {
         }
         close(rs, stmt, null);
 
-        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
+        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
         CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
         int updCount = pStmt.executeUpdate();
         LOG.debug("Going to commit");

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index facce54..60674eb 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -123,6 +123,7 @@ public final class TxnDbUtil {
           " CQ_PARTITION varchar(767)," +
           " CQ_STATE char(1) NOT NULL," +
           " CQ_TYPE char(1) NOT NULL," +
+          " CQ_TBLPROPERTIES varchar(2048)," +
           " CQ_WORKER_ID varchar(128)," +
           " CQ_START bigint," +
           " CQ_RUN_AS varchar(128)," +
@@ -140,6 +141,7 @@ public final class TxnDbUtil {
         " CC_PARTITION varchar(767)," +
         " CC_STATE char(1) NOT NULL," +
         " CC_TYPE char(1) NOT NULL," +
+        " CC_TBLPROPERTIES varchar(2048)," +
         " CC_WORKER_ID varchar(128)," +
         " CC_START bigint," +
         " CC_END bigint," +

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 82d685d..9c1b399 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -39,6 +39,7 @@ import org.apache.commons.pool.impl.GenericObjectPool;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.util.StringUtils;
 
@@ -1366,6 +1367,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         String partName = rqst.getPartitionname();
         if (partName != null) buf.append("cq_partition, ");
         buf.append("cq_state, cq_type");
+        if (rqst.getProperties() != null) {
+          buf.append(", cq_tblproperties");
+        }
         if (rqst.getRunas() != null) buf.append(", cq_run_as");
         buf.append(") values (");
         buf.append(id);
@@ -1394,6 +1398,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             dbConn.rollback();
             throw new MetaException("Unexpected compaction type " + rqst.getType().toString());
         }
+        if (rqst.getProperties() != null) {
+          buf.append("', '");
+          buf.append(new StringableMap(rqst.getProperties()).toString());
+        }
         if (rqst.getRunas() != null) {
           buf.append("', '");
           buf.append(rqst.getRunas());

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index b829d9d..644aed1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -30,8 +30,10 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 public class TxnUtils {
@@ -209,4 +211,56 @@ public class TxnUtils {
     long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
     return sizeInBytes / 1024 > queryMemoryLimit;
   }
+
+  public static class StringableMap extends HashMap<String, String> {
+
+    public StringableMap(String s) {
+      String[] parts = s.split(":", 2);
+      // read that many chars
+      int numElements = Integer.parseInt(parts[0]);
+      s = parts[1];
+      for (int i = 0; i < numElements; i++) {
+        parts = s.split(":", 2);
+        int len = Integer.parseInt(parts[0]);
+        String key = null;
+        if (len > 0) key = parts[1].substring(0, len);
+        parts = parts[1].substring(len).split(":", 2);
+        len = Integer.parseInt(parts[0]);
+        String value = null;
+        if (len > 0) value = parts[1].substring(0, len);
+        s = parts[1].substring(len);
+        put(key, value);
+      }
+    }
+
+    public StringableMap(Map<String, String> m) {
+      super(m);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder();
+      buf.append(size());
+      buf.append(':');
+      if (size() > 0) {
+        for (Map.Entry<String, String> entry : entrySet()) {
+          int length = (entry.getKey() == null) ? 0 : entry.getKey().length();
+          buf.append(entry.getKey() == null ? 0 : length);
+          buf.append(':');
+          if (length > 0) buf.append(entry.getKey());
+          length = (entry.getValue() == null) ? 0 : entry.getValue().length();
+          buf.append(length);
+          buf.append(':');
+          if (length > 0) buf.append(entry.getValue());
+        }
+      }
+      return buf.toString();
+    }
+
+    public Properties toProperties() {
+      Properties props = new Properties();
+      props.putAll(this);
+      return props;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 717589a..bc39994 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -1789,7 +1789,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       }
       partName = partitions.get(0).getName();
     }
-    db.compact(tbl.getDbName(), tbl.getTableName(), partName, desc.getCompactionType());
+    db.compact(tbl.getDbName(), tbl.getTableName(), partName, desc.getCompactionType(), desc.getProps());
     console.printInfo("Compaction enqueued.");
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 3fa1233..379eddc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -3456,16 +3456,18 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param partName name of the partition, if null table will be compacted (valid only for
    *                 non-partitioned tables).
    * @param compactType major or minor
+   * @param tblproperties the list of tblproperties to overwrite for this compaction
    * @throws HiveException
    */
-  public void compact(String dbname, String tableName, String partName,  String compactType)
+  public void compact(String dbname, String tableName, String partName, String compactType,
+                      Map<String, String> tblproperties)
       throws HiveException {
     try {
       CompactionType cr = null;
       if ("major".equals(compactType)) cr = CompactionType.MAJOR;
       else if ("minor".equals(compactType)) cr = CompactionType.MINOR;
       else throw new RuntimeException("Unknown compaction type " + compactType);
-      getMSC().compact(dbname, tableName, partName, cr);
+      getMSC().compact(dbname, tableName, partName, cr, tblproperties);
     } catch (Exception e) {
       LOG.error(StringUtils.stringifyException(e));
       throw new HiveException(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 0d735b9..5b32f56 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -1747,6 +1747,11 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
     AlterTableSimpleDesc desc = new AlterTableSimpleDesc(
         tableName, newPartSpec, type);
 
+    if (ast.getChildCount() > 1) {
+      HashMap<String, String> mapProp = getProps((ASTNode) (ast.getChild(1)).getChild(0));
+      desc.setProps(mapProp);
+    }
+
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc), conf));
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index e0a84c1..c411f5e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -1363,8 +1363,8 @@ alterStatementSuffixBucketNum
 alterStatementSuffixCompact
 @init { msgs.push("compaction request"); }
 @after { msgs.pop(); }
-    : KW_COMPACT compactType=StringLiteral
-    -> ^(TOK_ALTERTABLE_COMPACT $compactType)
+    : KW_COMPACT compactType=StringLiteral (KW_WITH KW_OVERWRITE KW_TBLPROPERTIES tableProperties)?
+    -> ^(TOK_ALTERTABLE_COMPACT $compactType tableProperties?)
     ;
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java
index d819d15..2ae70bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java
@@ -33,6 +33,7 @@ public class AlterTableSimpleDesc extends DDLDesc {
   private String compactionType;
 
   AlterTableTypes type;
+  private Map<String, String> props;
 
   public AlterTableSimpleDesc() {
   }
@@ -99,4 +100,11 @@ public class AlterTableSimpleDesc extends DDLDesc {
     return compactionType;
   }
 
+  public Map<String, String> getProps() {
+    return props;
+  }
+
+  public void setProps(Map<String, String> props) {
+    this.props = props;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 931be90..3a5a325 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -40,7 +41,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -93,12 +93,16 @@ public class CompactorMR {
   static final private String DELTA_DIRS = "hive.compactor.delta.dirs";
   static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search";
   static final private String TMPDIR = "_tmp";
+  static final private String TBLPROPS_PREFIX = "tblprops.";
+  static final private String COMPACTOR_PREFIX = "compactor.";
+
+  private JobConf mrJob;  // the MR job for compaction
 
   public CompactorMR() {
   }
 
   private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
-                                    ValidTxnList txns) {
+                                    ValidTxnList txns, CompactionInfo ci) {
     JobConf job = new JobConf(conf);
     job.setJobName(jobName);
     job.setOutputKeyClass(NullWritable.class);
@@ -124,9 +128,52 @@ public class CompactorMR {
     job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString());
     job.setInt(NUM_BUCKETS, sd.getNumBuckets());
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+    overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable
+    if (ci.properties != null) { // override MR properties and general tblproperties if applicable
+      overrideTblProps(job, t.getParameters(), ci.properties);
+    }
     setColumnTypes(job, sd.getCols());
     return job;
   }
+
+  /**
+   * Parse tblproperties specified on "ALTER TABLE ... COMPACT ... WITH OVERWRITE TBLPROPERTIES ..."
+   * and override two categories of properties:
+   * 1. properties of the compactor MR job (with prefix "compactor.")
+   * 2. general hive properties (with prefix "tblprops.")
+   * @param job the compactor MR job
+   * @param tblproperties existing tblproperties
+   * @param properties table properties
+   */
+  private void overrideTblProps(JobConf job, Map<String, String> tblproperties, String properties) {
+    StringableMap stringableMap = new StringableMap(properties);
+    overrideMRProps(job, stringableMap);
+    // mingle existing tblproperties with those specified on the ALTER TABLE command
+    for (String key : stringableMap.keySet()) {
+      if (key.startsWith(TBLPROPS_PREFIX)) {
+        String propKey = key.substring(9);  // 9 is the length of "tblprops.". We only keep the rest
+        tblproperties.put(propKey, stringableMap.get(key));
+      }
+    }
+    // re-set TABLE_PROPS with reloaded tblproperties
+    job.set(TABLE_PROPS, new StringableMap(tblproperties).toString());
+  }
+
+  /**
+   * Parse tblproperties to override relevant properties of compactor MR job with specified values.
+   * For example, compactor.mapreuce.map.memory.mb=1024
+   * @param job the compactor MR job
+   * @param properties table properties
+   */
+  private void overrideMRProps(JobConf job, Map<String, String> properties) {
+    for (String key : properties.keySet()) {
+      if (key.startsWith(COMPACTOR_PREFIX)) {
+        String mrKey = key.substring(10); // 10 is the length of "compactor." We only keep the rest.
+        job.set(mrKey, properties.get(key));
+      }
+    }
+  }
+
   /**
    * Run Compaction which may consist of several jobs on the cluster.
    * @param conf Hive configuration file
@@ -143,7 +190,7 @@ public class CompactorMR {
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
     }
-    JobConf job = createBaseJobConf(conf, jobName, t, sd, txns);
+    JobConf job = createBaseJobConf(conf, jobName, t, sd, txns, ci);
 
     // Figure out and encode what files we need to read.  We do this here (rather than in
     // getSplits below) because as part of this we discover our minimum and maximum transactions,
@@ -168,11 +215,11 @@ public class CompactorMR {
         "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API.");
       int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle;
       for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) {
-        JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns);
+        JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns, ci);
         launchCompactionJob(jobMinorCompact,
           null, CompactionType.MINOR, null,
           parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle),
-          maxDeltastoHandle, -1);
+          maxDeltastoHandle, -1, conf);
       }
       //now recompute state since we've done minor compactions and have different 'best' set of deltas
       dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
@@ -211,14 +258,14 @@ public class CompactorMR {
     }
 
     launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(),
-      dir.getCurrentDirectories().size(), dir.getObsolete().size());
+      dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf);
 
     su.gatherStats();
   }
   private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType,
                                    StringableList dirsToSearch,
                                    List<AcidUtils.ParsedDelta> parsedDeltas,
-                                   int curDirNumber, int obsoleteDirNumber) throws IOException {
+                                   int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf) throws IOException {
     job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR);
     if(dirsToSearch == null) {
       dirsToSearch = new StringableList();
@@ -240,6 +287,10 @@ public class CompactorMR {
     job.setLong(MIN_TXN, minTxn);
     job.setLong(MAX_TXN, maxTxn);
 
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+      mrJob = job;
+    }
+
     LOG.info("Submitting " + compactionType + " compaction job '" +
       job.getJobName() + "' to " + job.getQueueName() + " queue.  " +
       "(current delta dirs count=" + curDirNumber +
@@ -274,6 +325,10 @@ public class CompactorMR {
     HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
   }
 
+  public JobConf getMrJob() {
+    return mrJob;
+  }
+
   static class CompactorInputSplit implements InputSplit {
     private long length = 0;
     private List<String> locations;
@@ -623,58 +678,6 @@ public class CompactorMR {
 
   }
 
-  static class StringableMap extends HashMap<String, String> {
-
-    StringableMap(String s) {
-      String[] parts = s.split(":", 2);
-      // read that many chars
-      int numElements = Integer.parseInt(parts[0]);
-      s = parts[1];
-      for (int i = 0; i < numElements; i++) {
-        parts = s.split(":", 2);
-        int len = Integer.parseInt(parts[0]);
-        String key = null;
-        if (len > 0) key = parts[1].substring(0, len);
-        parts = parts[1].substring(len).split(":", 2);
-        len = Integer.parseInt(parts[0]);
-        String value = null;
-        if (len > 0) value = parts[1].substring(0, len);
-        s = parts[1].substring(len);
-        put(key, value);
-      }
-    }
-
-    StringableMap(Map<String, String> m) {
-      super(m);
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder buf = new StringBuilder();
-      buf.append(size());
-      buf.append(':');
-      if (size() > 0) {
-        for (Map.Entry<String, String> entry : entrySet()) {
-          int length = (entry.getKey() == null) ? 0 : entry.getKey().length();
-          buf.append(entry.getKey() == null ? 0 : length);
-          buf.append(':');
-          if (length > 0) buf.append(entry.getKey());
-          length = (entry.getValue() == null) ? 0 : entry.getValue().length();
-          buf.append(length);
-          buf.append(':');
-          if (length > 0) buf.append(entry.getValue());
-        }
-      }
-      return buf.toString();
-    }
-
-    public Properties toProperties() {
-      Properties props = new Properties();
-      props.putAll(this);
-      return props;
-    }
-  }
-
   static class StringableList extends ArrayList<Path> {
     StringableList() {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a55fa1c..8152c89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -58,6 +59,8 @@ public class Initiator extends CompactorThread {
   static final private String CLASS_NAME = Initiator.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
+  static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold.";
+
   private long checkInterval;
 
   @Override
@@ -144,7 +147,7 @@ public class Initiator extends CompactorThread {
               /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive.
               * Long term we should consider having a thread pool here and running checkForCompactionS
               * in parallel*/
-              CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs);
+              CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, t.getParameters(), runAs);
               if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
             } catch (Throwable t) {
               LOG.error("Caught exception while trying to determine if we should compact " +
@@ -213,6 +216,7 @@ public class Initiator extends CompactorThread {
   private CompactionType checkForCompaction(final CompactionInfo ci,
                                             final ValidTxnList txns,
                                             final StorageDescriptor sd,
+                                            final Map<String, String> tblproperties,
                                             final String runAs)
       throws IOException, InterruptedException {
     // If it's marked as too many aborted, we already know we need to compact
@@ -222,7 +226,7 @@ public class Initiator extends CompactorThread {
       return CompactionType.MAJOR;
     }
     if (runJobAsSelf(runAs)) {
-      return determineCompactionType(ci, txns, sd);
+      return determineCompactionType(ci, txns, sd, tblproperties);
     } else {
       LOG.info("Going to initiate as user " + runAs);
       UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
@@ -230,7 +234,7 @@ public class Initiator extends CompactorThread {
       CompactionType compactionType = ugi.doAs(new PrivilegedExceptionAction<CompactionType>() {
         @Override
         public CompactionType run() throws Exception {
-          return determineCompactionType(ci, txns, sd);
+          return determineCompactionType(ci, txns, sd, tblproperties);
         }
       });
       try {
@@ -244,7 +248,7 @@ public class Initiator extends CompactorThread {
   }
 
   private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns,
-                                                 StorageDescriptor sd)
+                                                 StorageDescriptor sd, Map<String, String> tblproperties)
       throws IOException, InterruptedException {
     boolean noBase = false;
     Path location = new Path(sd.getLocation());
@@ -282,8 +286,11 @@ public class Initiator extends CompactorThread {
     if (baseSize == 0 && deltaSize > 0) {
       noBase = true;
     } else {
-      float deltaPctThreshold = HiveConf.getFloatVar(conf,
+      String deltaPctProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
           HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+      float deltaPctThreshold = deltaPctProp == null ?
+          HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) :
+          Float.parseFloat(deltaPctProp);
       boolean bigEnough =   (float)deltaSize/(float)baseSize > deltaPctThreshold;
       if (LOG.isDebugEnabled()) {
         StringBuilder msg = new StringBuilder("delta size: ");
@@ -299,8 +306,11 @@ public class Initiator extends CompactorThread {
       if (bigEnough) return CompactionType.MAJOR;
     }
 
-    int deltaNumThreshold = HiveConf.getIntVar(conf,
+    String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
         HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+    int deltaNumThreshold = deltaNumProp == null ?
+        HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) :
+        Integer.parseInt(deltaNumProp);
     boolean enough = deltas.size() > deltaNumThreshold;
     if (enough) {
       LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold +

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 767c10c..666f13b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -56,6 +57,7 @@ public class Worker extends CompactorThread {
   static final private int baseThreadNum = 10002;
 
   private String name;
+  private JobConf mrJob; // the MR job for compaction
 
   /**
    * Get the hostname that this worker is run on.  Made static and public so that other classes
@@ -180,6 +182,9 @@ public class Worker extends CompactorThread {
             }
           }
           txnHandler.markCompacted(ci);
+          if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+            mrJob = mr.getMrJob();
+          }
         } catch (Exception e) {
           LOG.error("Caught exception while trying to compact " + ci +
               ".  Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e));
@@ -213,6 +218,10 @@ public class Worker extends CompactorThread {
     setName(name.toString());
   }
 
+  public JobConf getMrJob() {
+    return mrJob;
+  }
+
   static final class StatsUpdater {
     static final private Logger LOG = LoggerFactory.getLogger(StatsUpdater.class);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e276929d/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index cf7eb70..ef7804c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -77,19 +78,19 @@ public class TestWorker extends CompactorTest {
   @Test
   public void stringableMap() throws Exception {
     // Empty map case
-    CompactorMR.StringableMap m = new CompactorMR.StringableMap(new HashMap<String, String>());
+    StringableMap m = new StringableMap(new HashMap<String, String>());
     String s = m.toString();
     Assert.assertEquals("0:", s);
-    m = new CompactorMR.StringableMap(s);
+    m = new StringableMap(s);
     Assert.assertEquals(0, m.size());
 
     Map<String, String> base = new HashMap<String, String>();
     base.put("mary", "poppins");
     base.put("bert", null);
     base.put(null, "banks");
-    m = new CompactorMR.StringableMap(base);
+    m = new StringableMap(base);
     s = m.toString();
-    m = new CompactorMR.StringableMap(s);
+    m = new StringableMap(s);
     Assert.assertEquals(3, m.size());
     Map<String, Boolean> saw = new HashMap<String, Boolean>(3);
     saw.put("mary", false);


Mime
View raw message