hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [24/31] hive git commit: HIVE-14644 : use metastore information on the read path appropriately (Sergey Shelukhin)
Date Mon, 12 Sep 2016 20:24:56 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 8decc94..53f24b9 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -12667,6 +12667,204 @@ class HeartbeatWriteIdResult:
   def __ne__(self, other):
     return not (self == other)
 
+class GetValidWriteIdsRequest:
+  """
+  Attributes:
+   - dbName
+   - tblName
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'dbName', None, None, ), # 1
+    (2, TType.STRING, 'tblName', None, None, ), # 2
+  )
+
+  def __init__(self, dbName=None, tblName=None,):
+    self.dbName = dbName
+    self.tblName = tblName
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.dbName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.tblName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('GetValidWriteIdsRequest')
+    if self.dbName is not None:
+      oprot.writeFieldBegin('dbName', TType.STRING, 1)
+      oprot.writeString(self.dbName)
+      oprot.writeFieldEnd()
+    if self.tblName is not None:
+      oprot.writeFieldBegin('tblName', TType.STRING, 2)
+      oprot.writeString(self.tblName)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.dbName is None:
+      raise TProtocol.TProtocolException(message='Required field dbName is unset!')
+    if self.tblName is None:
+      raise TProtocol.TProtocolException(message='Required field tblName is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.dbName)
+    value = (value * 31) ^ hash(self.tblName)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class GetValidWriteIdsResult:
+  """
+  Attributes:
+   - lowWatermarkId
+   - highWatermarkId
+   - areIdsValid
+   - ids
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.I64, 'lowWatermarkId', None, None, ), # 1
+    (2, TType.I64, 'highWatermarkId', None, None, ), # 2
+    (3, TType.BOOL, 'areIdsValid', None, None, ), # 3
+    (4, TType.LIST, 'ids', (TType.I64,None), None, ), # 4
+  )
+
+  def __init__(self, lowWatermarkId=None, highWatermarkId=None, areIdsValid=None, ids=None,):
+    self.lowWatermarkId = lowWatermarkId
+    self.highWatermarkId = highWatermarkId
+    self.areIdsValid = areIdsValid
+    self.ids = ids
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.I64:
+          self.lowWatermarkId = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.I64:
+          self.highWatermarkId = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.BOOL:
+          self.areIdsValid = iprot.readBool()
+        else:
+          iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.LIST:
+          self.ids = []
+          (_etype562, _size559) = iprot.readListBegin()
+          for _i563 in xrange(_size559):
+            _elem564 = iprot.readI64()
+            self.ids.append(_elem564)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('GetValidWriteIdsResult')
+    if self.lowWatermarkId is not None:
+      oprot.writeFieldBegin('lowWatermarkId', TType.I64, 1)
+      oprot.writeI64(self.lowWatermarkId)
+      oprot.writeFieldEnd()
+    if self.highWatermarkId is not None:
+      oprot.writeFieldBegin('highWatermarkId', TType.I64, 2)
+      oprot.writeI64(self.highWatermarkId)
+      oprot.writeFieldEnd()
+    if self.areIdsValid is not None:
+      oprot.writeFieldBegin('areIdsValid', TType.BOOL, 3)
+      oprot.writeBool(self.areIdsValid)
+      oprot.writeFieldEnd()
+    if self.ids is not None:
+      oprot.writeFieldBegin('ids', TType.LIST, 4)
+      oprot.writeListBegin(TType.I64, len(self.ids))
+      for iter565 in self.ids:
+        oprot.writeI64(iter565)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.lowWatermarkId is None:
+      raise TProtocol.TProtocolException(message='Required field lowWatermarkId is unset!')
+    if self.highWatermarkId is None:
+      raise TProtocol.TProtocolException(message='Required field highWatermarkId is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.lowWatermarkId)
+    value = (value * 31) ^ hash(self.highWatermarkId)
+    value = (value * 31) ^ hash(self.areIdsValid)
+    value = (value * 31) ^ hash(self.ids)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class GetAllFunctionsResponse:
   """
   Attributes:
@@ -12693,11 +12891,11 @@ class GetAllFunctionsResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.functions = []
-          (_etype562, _size559) = iprot.readListBegin()
-          for _i563 in xrange(_size559):
-            _elem564 = Function()
-            _elem564.read(iprot)
-            self.functions.append(_elem564)
+          (_etype569, _size566) = iprot.readListBegin()
+          for _i570 in xrange(_size566):
+            _elem571 = Function()
+            _elem571.read(iprot)
+            self.functions.append(_elem571)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12714,8 +12912,8 @@ class GetAllFunctionsResponse:
     if self.functions is not None:
       oprot.writeFieldBegin('functions', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.functions))
-      for iter565 in self.functions:
-        iter565.write(oprot)
+      for iter572 in self.functions:
+        iter572.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 95f2075..ca60ba4 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -2876,6 +2876,50 @@ class HeartbeatWriteIdResult
   ::Thrift::Struct.generate_accessors self
 end
 
+class GetValidWriteIdsRequest
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+  DBNAME = 1
+  TBLNAME = 2
+
+  FIELDS = {
+    DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
+    TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'}
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbName is unset!') unless @dbName
+    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tblName is unset!') unless @tblName
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
+class GetValidWriteIdsResult
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+  LOWWATERMARKID = 1
+  HIGHWATERMARKID = 2
+  AREIDSVALID = 3
+  IDS = 4
+
+  FIELDS = {
+    LOWWATERMARKID => {:type => ::Thrift::Types::I64, :name => 'lowWatermarkId'},
+    HIGHWATERMARKID => {:type => ::Thrift::Types::I64, :name => 'highWatermarkId'},
+    AREIDSVALID => {:type => ::Thrift::Types::BOOL, :name => 'areIdsValid', :optional => true},
+    IDS => {:type => ::Thrift::Types::LIST, :name => 'ids', :element => {:type => ::Thrift::Types::I64}, :optional => true}
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field lowWatermarkId is unset!') unless @lowWatermarkId
+    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field highWatermarkId is unset!') unless @highWatermarkId
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
 class GetAllFunctionsResponse
   include ::Thrift::Struct, ::Thrift::Struct_Union
   FUNCTIONS = 1

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
index 403e07f..613702f 100644
--- a/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
+++ b/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
@@ -2529,6 +2529,21 @@ module ThriftHiveMetastore
       raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'heartbeat_write_id failed: unknown result')
     end
 
+    def get_valid_write_ids(req)
+      send_get_valid_write_ids(req)
+      return recv_get_valid_write_ids()
+    end
+
+    def send_get_valid_write_ids(req)
+      send_message('get_valid_write_ids', Get_valid_write_ids_args, :req => req)
+    end
+
+    def recv_get_valid_write_ids()
+      result = receive_message(Get_valid_write_ids_result)
+      return result.success unless result.success.nil?
+      raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_valid_write_ids failed: unknown result')
+    end
+
   end
 
   class Processor < ::FacebookService::Processor 
@@ -4396,6 +4411,13 @@ module ThriftHiveMetastore
       write_result(result, oprot, 'heartbeat_write_id', seqid)
     end
 
+    def process_get_valid_write_ids(seqid, iprot, oprot)
+      args = read_args(iprot, Get_valid_write_ids_args)
+      result = Get_valid_write_ids_result.new()
+      result.success = @handler.get_valid_write_ids(args.req)
+      write_result(result, oprot, 'get_valid_write_ids', seqid)
+    end
+
   end
 
   # HELPER FUNCTIONS AND STRUCTURES
@@ -10092,5 +10114,37 @@ module ThriftHiveMetastore
     ::Thrift::Struct.generate_accessors self
   end
 
+  class Get_valid_write_ids_args
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    REQ = 1
+
+    FIELDS = {
+      REQ => {:type => ::Thrift::Types::STRUCT, :name => 'req', :class => ::GetValidWriteIdsRequest}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
+  class Get_valid_write_ids_result
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    SUCCESS = 0
+
+    FIELDS = {
+      SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::GetValidWriteIdsResult}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
 end
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index f99bcd2..e1d41c4 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -119,6 +119,7 @@ import javax.jdo.JDOException;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.sql.SQLException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.AbstractMap;
@@ -134,6 +135,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
 import java.util.concurrent.Callable;
@@ -438,19 +440,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         updateMetrics();
         LOG.info("Finished metadata count metrics: " + initDatabaseCount + " databases, " + initTableCount +
           " tables, " + initPartCount + " partitions.");
-        metrics.addGauge(MetricsConstant.INIT_TOTAL_DATABASES, new MetricsVariable() {
+        metrics.addGauge(MetricsConstant.INIT_TOTAL_DATABASES, new MetricsVariable<Object>() {
           @Override
           public Object getValue() {
             return initDatabaseCount;
           }
         });
-        metrics.addGauge(MetricsConstant.INIT_TOTAL_TABLES, new MetricsVariable() {
+        metrics.addGauge(MetricsConstant.INIT_TOTAL_TABLES, new MetricsVariable<Object>() {
           @Override
           public Object getValue() {
             return initTableCount;
           }
         });
-        metrics.addGauge(MetricsConstant.INIT_TOTAL_PARTITIONS, new MetricsVariable() {
+        metrics.addGauge(MetricsConstant.INIT_TOTAL_PARTITIONS, new MetricsVariable<Object>() {
           @Override
           public Object getValue() {
             return initPartCount;
@@ -1264,26 +1266,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       return (ms.getType(typeName) != null);
     }
 
-    private void drop_type_core(final RawStore ms, String typeName)
-        throws NoSuchObjectException, MetaException {
-      boolean success = false;
-      try {
-        ms.openTransaction();
-        // drop any partitions
-        if (!is_type_exists(ms, typeName)) {
-          throw new NoSuchObjectException(typeName + " doesn't exist");
-        }
-        if (!ms.dropType(typeName)) {
-          throw new MetaException("Unable to drop type " + typeName);
-        }
-        success = ms.commitTransaction();
-      } finally {
-        if (!success) {
-          ms.rollbackTransaction();
-        }
-      }
-    }
-
     @Override
     public boolean drop_type(final String name) throws MetaException, NoSuchObjectException {
       startFunction("drop_type", ": " + name);
@@ -1818,7 +1800,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           //No drop part listener events fired for public listeners historically, for drop table case.
           //Limiting to internal listeners for now, to avoid unexpected calls for public listeners.
           if (listener instanceof HMSMetricsListener) {
-            for (Partition part : partsToDelete) {
+            for (@SuppressWarnings("unused") Partition part : partsToDelete) {
               listener.onDropPartition(null);
             }
           }
@@ -2294,7 +2276,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
 
 
-          partFutures.add(threadPool.submit(new Callable() {
+          partFutures.add(threadPool.submit(new Callable<Partition>() {
             @Override
             public Partition call() throws Exception {
               boolean madeDir = createLocationForAddedPartition(table, part);
@@ -2456,8 +2438,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             LOG.info("Not adding partition " + part + " as it already exists");
             continue;
           }
-          partFutures.add(threadPool.submit(new Callable() {
-            @Override public Object call() throws Exception {
+          partFutures.add(threadPool.submit(new Callable<Partition>() {
+            @Override public Partition call() throws Exception {
               boolean madeDir = createLocationForAddedPartition(table, part);
               if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) {
                 // Technically, for ifNotExists case, we could insert one and discard the other
@@ -2474,7 +2456,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         try {
           for (Future<Partition> partFuture : partFutures) {
-            Partition part = partFuture.get();
+            partFuture.get();
           }
         } catch (InterruptedException | ExecutionException e) {
           // cancel other tasks
@@ -3777,6 +3759,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               }
             }
 
+            @SuppressWarnings("deprecation")
             Deserializer s = MetaStoreUtils.getDeserializer(curConf, tbl, false);
             ret = MetaStoreUtils.getFieldsFromDeserializer(tableName, s);
           } catch (SerDeException e) {
@@ -5745,7 +5728,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           throw newMetaException(e);
         }
       }
-      endFunction("partition_name_has_valid_characters", true, null);
+      endFunction("partition_name_has_valid_characters", true, ex);
       return ret;
     }
 
@@ -6044,21 +6027,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       return new GetRoleGrantsForPrincipalResponse(roleMaps);
     }
 
-    /**
-     * Convert each MRoleMap object into a thrift RolePrincipalGrant object
-     * @param roles
-     * @return
-     */
-    private List<RolePrincipalGrant> getRolePrincipalGrants(List<Role> roles) throws MetaException {
-      List<RolePrincipalGrant> rolePrinGrantList = new ArrayList<RolePrincipalGrant>();
-      if (roles != null) {
-        for (Role role : roles) {
-          rolePrinGrantList.addAll(getMS().listRoleMembers(role.getRoleName()));
-        }
-      }
-      return rolePrinGrantList;
-    }
-
     @Override
     public AggrStats get_aggr_stats_for(PartitionsStatsRequest request)
         throws NoSuchObjectException, MetaException, TException {
@@ -6448,31 +6416,47 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       }
     }
 
+    private final Random random = new Random();
     @Override
     public GetNextWriteIdResult get_next_write_id(GetNextWriteIdRequest req) throws TException {
       RawStore ms = getMS();
       String dbName = req.getDbName(), tblName = req.getTblName();
       startFunction("get_next_write_id", " : db=" + dbName + " tbl=" + tblName);
-      Exception ex = null;
+      Exception exception = null;
       long writeId = -1;
-      // TODO# see TXN about how to handle conflicts
       try {
-        boolean ok = false;
-        ms.openTransaction();
-        try {
-          Table tbl = ms.getTable(dbName, tblName);
-          if (tbl == null) {
-            throw new NoSuchObjectException(dbName + "." + tblName);
+        int deadlockTryCount = 10;
+        int deadlockRetryBackoffMs = 200;
+        while (deadlockTryCount > 0) {
+          boolean ok = false;
+          ms.openTransaction();
+          try {
+            Table tbl = ms.getTable(dbName, tblName);
+            if (tbl == null) {
+              throw new NoSuchObjectException(dbName + "." + tblName);
+            }
+            writeId = tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0;
+            tbl.setMmNextWriteId(writeId + 1);
+            ms.alterTable(dbName, tblName, tbl);
+            ok = true;
+          } finally {
+            if (!ok) {
+              ms.rollbackTransaction();
+              // Exception should propagate; don't override it by breaking out of the loop.
+            } else {
+              Boolean commitResult = ms.commitTransactionExpectDeadlock();
+              if (commitResult != null) {
+                if (commitResult) break; // Assume no exception; ok to break out of the loop.
+                throw new MetaException("Failed to commit");
+              }
+            }
           }
-          writeId = tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0;
-          tbl.setMmNextWriteId(writeId + 1);
-          ms.alterTable(dbName, tblName, tbl);
-          ok = true;
-        } finally {
-          commitOrRollback(ms, ok);
+          LOG.warn("Getting the next write ID failed due to a deadlock; retrying");
+          Thread.sleep(random.nextInt(deadlockRetryBackoffMs));
         }
+
         // Do a separate txn after we have reserved the number. TODO: If we fail, ignore on read.
-        ok = false;
+        boolean ok = false;
         ms.openTransaction();
         try {
           Table tbl = ms.getTable(dbName, tblName);
@@ -6482,10 +6466,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           commitOrRollback(ms, ok);
         }
       } catch (Exception e) {
-        ex = e;
+        exception = e;
         throwMetaException(e);
       } finally {
-        endFunction("get_next_write_id", ex == null, ex, tblName);
+        endFunction("get_next_write_id", exception == null, exception, tblName);
       }
       return new GetNextWriteIdResult(writeId);
     }
@@ -6562,10 +6546,65 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       assert tw.getState().length() == 1;
       char state = tw.getState().charAt(0);
       if (state != MM_WRITE_OPEN) {
-        throw new MetaException("Invalid write state to finalize: " + state);
+        throw new MetaException("Invalid write state: " + state);
       }
       return tw;
     }
+
+    @Override
+    public GetValidWriteIdsResult get_valid_write_ids(
+        GetValidWriteIdsRequest req) throws TException {
+      RawStore ms = getMS();
+      String dbName = req.getDbName(), tblName = req.getTblName();
+      startFunction("get_valid_write_ids", " : db=" + dbName + " tbl=" + tblName);
+      GetValidWriteIdsResult result = new GetValidWriteIdsResult();
+      Exception ex = null;
+      try {
+        boolean ok = false;
+        ms.openTransaction();
+        try {
+          Table tbl = ms.getTable(dbName, tblName);
+          if (tbl == null) {
+            throw new InvalidObjectException(dbName + "." + tblName);
+          }
+          long nextId = tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0;
+          long watermarkId = tbl.isSetMmWatermarkWriteId() ? tbl.getMmWatermarkWriteId() : -1;
+          if (nextId > (watermarkId + 1)) {
+            // There may be some intermediate failed or active writes; get the valid ones.
+            List<Long> ids = ms.getWriteIds(
+                dbName, tblName, watermarkId, nextId, MM_WRITE_COMMITTED);
+            // TODO: we could optimize here and send the smaller of the lists, and also use ranges
+            if (ids != null) {
+              Iterator<Long> iter = ids.iterator();
+              long oldWatermarkId = watermarkId;
+              while (iter.hasNext()) {
+                if (iter.next() != watermarkId + 1) break;
+                ++watermarkId;
+              }
+              long removed = watermarkId - oldWatermarkId;
+              if (removed > 0) {
+                ids = ids.subList((int)removed, ids.size());
+              }
+              if (!ids.isEmpty()) {
+                result.setIds(ids);
+                result.setAreIdsValid(true);
+              }
+            }
+          }
+          result.setHighWatermarkId(nextId);
+          result.setLowWatermarkId(watermarkId);
+          ok = true;
+        } finally {
+          commitOrRollback(ms, ok);
+        }
+      } catch (Exception e) {
+        ex = e;
+        throwMetaException(e);
+      } finally {
+        endFunction("get_valid_write_ids", ex == null, ex, tblName);
+      }
+      return result;
+    }
   }
 
 
@@ -7053,7 +7092,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
   }
 
   private static MetaStoreThread instantiateThread(String classname) throws Exception {
-    Class c = Class.forName(classname);
+    Class<?> c = Class.forName(classname);
     Object o = c.newInstance();
     if (MetaStoreThread.class.isAssignableFrom(o.getClass())) {
       return (MetaStoreThread)o;
@@ -7082,7 +7121,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService"));
     startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidWriteSetService"));
   }
-  private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception {
+  private static void startHouseKeeperService(HiveConf conf, Class<?> c) throws Exception {
     //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop()
     //should be called form it
     HouseKeeperService houseKeeper = (HouseKeeperService)c.newInstance();

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 6bd6d92..0325854 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2404,4 +2404,10 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
       String dbName, String tableName, long writeId) throws TException {
     client.heartbeat_write_id(new HeartbeatWriteIdRequest(dbName, tableName, writeId));
   }
+
+  @Override
+  public GetValidWriteIdsResult getValidWriteIds(
+      String dbName, String tableName) throws TException {
+    return client.get_valid_write_ids(new GetValidWriteIdsRequest(dbName, tableName));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index f5d611d..8706312 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
 import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResult;
 import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
 import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
 import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
@@ -1626,4 +1627,6 @@ public interface IMetaStoreClient {
 
   void finalizeTableWrite(String dbName, String tableName, long writeId,
       boolean commit) throws TException;
+
+  GetValidWriteIdsResult getValidWriteIds(String dbName, String tableName) throws TException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 561f3e3..125a3e5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -113,15 +113,8 @@ class MetaStoreDirectSql {
   private final boolean isAggregateStatsCacheEnabled;
   private AggregateStatsCache aggrStatsCache;
 
-  public MetaStoreDirectSql(PersistenceManager pm, Configuration conf) {
+  public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, DatabaseProduct dbType) {
     this.pm = pm;
-    DatabaseProduct dbType = null;
-    try {
-      dbType = DatabaseProduct.determineDatabaseProduct(getProductName());
-    } catch (SQLException e) {
-      LOG.warn("Cannot determine database product; assuming OTHER", e);
-      dbType = DatabaseProduct.OTHER;
-    }
     this.dbType = dbType;
     int batchSize = HiveConf.getIntVar(conf, ConfVars.METASTORE_DIRECT_SQL_PARTITION_BATCH_SIZE);
     if (batchSize == DETECT_BATCHING) {

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 9dc80b1..fb3b1ad 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -25,6 +25,8 @@ import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -52,6 +54,7 @@ import javax.jdo.PersistenceManagerFactory;
 import javax.jdo.Query;
 import javax.jdo.Transaction;
 import javax.jdo.datastore.DataStoreCache;
+import javax.jdo.datastore.JDOConnection;
 import javax.jdo.identity.IntIdentity;
 
 import com.google.common.collect.Maps;
@@ -220,6 +223,7 @@ public class ObjectStore implements RawStore, Configurable {
   private boolean isInitialized = false;
   private PersistenceManager pm = null;
   private MetaStoreDirectSql directSql = null;
+  private DatabaseProduct dbType = null;
   private PartitionExpressionProxy expressionProxy = null;
   private Configuration hiveConf;
   private volatile int openTrasactionCalls = 0;
@@ -329,15 +333,37 @@ public class ObjectStore implements RawStore, Configurable {
     pm = getPersistenceManager();
     isInitialized = pm != null;
     if (isInitialized) {
+      dbType = determineDatabaseProduct();
       expressionProxy = createExpressionProxy(hiveConf);
       if (HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL)) {
-        directSql = new MetaStoreDirectSql(pm, hiveConf);
+        directSql = new MetaStoreDirectSql(pm, hiveConf, dbType);
       }
     }
     LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm +
         " created in the thread with id: " + Thread.currentThread().getId());
   }
 
+  private DatabaseProduct determineDatabaseProduct() {
+    try {
+      return DatabaseProduct.determineDatabaseProduct(getProductName(pm));
+    } catch (SQLException e) {
+      LOG.warn("Cannot determine database product; assuming OTHER", e);
+      return DatabaseProduct.OTHER;
+    }
+  }
+
+  private static String getProductName(PersistenceManager pm) {
+    JDOConnection jdoConn = pm.getDataStoreConnection();
+    try {
+      return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName();
+    } catch (Throwable t) {
+      LOG.warn("Error retrieving product name", t);
+      return null;
+    } finally {
+      jdoConn.close(); // We must release the connection before we call other pm methods.
+    }
+  }
+
   /**
    * Creates the proxy used to evaluate expressions. This is here to prevent circular
    * dependency - ql -&gt; metastore client &lt;-&gt metastore server -&gt ql. If server and
@@ -511,15 +537,52 @@ public class ObjectStore implements RawStore, Configurable {
     return result;
   }
 
-  /**
-   * if this is the commit of the first open call then an actual commit is
-   * called.
-   *
-   * @return Always returns true
-   */
   @Override
   @SuppressWarnings("nls")
   public boolean commitTransaction() {
+    if (!startCommitTransaction()) return false;
+
+    openTrasactionCalls--;
+    debugLog("Commit transaction: count = " + openTrasactionCalls + ", isactive "+ currentTransaction.isActive());
+    if ((openTrasactionCalls == 0) && currentTransaction.isActive()) {
+      transactionStatus = TXN_STATUS.COMMITED;
+      currentTransaction.commit();
+    }
+
+    return true;
+  }
+
+  @Override
+  @CanNotRetry
+  public Boolean commitTransactionExpectDeadlock() {
+    if (!startCommitTransaction()) return false;
+
+    if (--openTrasactionCalls != 0) {
+      String msg = "commitTransactionExpectDeadlock cannot be called for a nested transaction";
+      LOG.error(msg);
+      throw new AssertionError(msg);
+    }
+
+    transactionStatus = TXN_STATUS.COMMITED;
+    try {
+      currentTransaction.commit();
+    } catch (Exception ex) {
+      Throwable candidate = ex;
+      while (candidate != null && !(candidate instanceof SQLException)) {
+        candidate = candidate.getCause();
+      }
+      if (candidate == null) throw ex;
+      if (DatabaseProduct.isDeadlock(dbType, (SQLException)candidate)) {
+        LOG.info("Deadlock exception during commit: " + candidate.getMessage());
+        return null;
+      }
+      throw ex;
+    }
+
+    return true;
+  }
+
+  private boolean startCommitTransaction() {
     if (TXN_STATUS.ROLLBACK == transactionStatus) {
       debugLog("Commit transaction: rollback");
       return false;
@@ -538,14 +601,6 @@ public class ObjectStore implements RawStore, Configurable {
       LOG.error("Unbalanced calls to open/commit Transaction", e);
       throw e;
     }
-    openTrasactionCalls--;
-    debugLog("Commit transaction: count = " + openTrasactionCalls + ", isactive "+ currentTransaction.isActive());
-
-    if ((openTrasactionCalls == 0) && currentTransaction.isActive()) {
-      transactionStatus = TXN_STATUS.COMMITED;
-      currentTransaction.commit();
-    }
-
     return true;
   }
 
@@ -1487,7 +1542,7 @@ public class ObjectStore implements RawStore, Configurable {
         .getCreateTime(), tbl.getLastAccessTime(), tbl.getRetention(),
         convertToMFieldSchemas(tbl.getPartitionKeys()), tbl.getParameters(),
         tbl.getViewOriginalText(), tbl.getViewExpandedText(),
-        tableType, tbl.isSetMmNextWriteId() ?  tbl.getMmNextWriteId() : -1,
+        tableType, tbl.isSetMmNextWriteId() ?  tbl.getMmNextWriteId() : 0,
             tbl.isSetMmWatermarkWriteId() ?  tbl.getMmWatermarkWriteId() : -1);
   }
 
@@ -2718,7 +2773,8 @@ public class ObjectStore implements RawStore, Configurable {
       boolean isConfigEnabled = HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL)
           && (HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL_DDL) || !isInTxn);
       if (isConfigEnabled && directSql == null) {
-        directSql = new MetaStoreDirectSql(pm, getConf());
+        dbType = determineDatabaseProduct();
+        directSql = new MetaStoreDirectSql(pm, getConf(), dbType);
       }
 
       if (!allowJdo && isConfigEnabled && !directSql.isCompatibleDatastore()) {
@@ -8692,16 +8748,10 @@ public class ObjectStore implements RawStore, Configurable {
     Query query = null;
     try {
       openTransaction();
-      dbName = HiveStringUtils.normalizeIdentifier(dbName);
-      tblName = HiveStringUtils.normalizeIdentifier(tblName);
-      MTable mtbl = getMTable(dbName, tblName);
-      if (mtbl == null) {
-        success = true;
-        return null;
-      }
       query = pm.newQuery(MTableWrite.class,
               "table.tableName == t1 && table.database.name == t2 && writeId == t3");
       query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.Long t3");
+      @SuppressWarnings("unchecked")
       List<MTableWrite> writes = (List<MTableWrite>) query.execute(tblName, dbName, writeId);
       pm.retrieveAll(writes);
       success = true;
@@ -8723,4 +8773,34 @@ public class ObjectStore implements RawStore, Configurable {
     }
   }
 
+  @Override
+  public List<Long> getWriteIds(String dbName, String tblName,
+      long watermarkId, long nextWriteId, char state) throws MetaException {
+    boolean success = false;
+    Query query = null;
+    try {
+      openTransaction();
+      query = pm.newQuery("select writeId from org.apache.hadoop.hive.metastore.model.MTableWrite"
+          + " where table.tableName == t1 && table.database.name == t2 && writeId >= t3"
+          + " && writeId < t4 && state == t5");
+      query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.Long t3, "
+          + "java.lang.Long t4, java.lang.String t5");
+      query.setResult("writeId");
+      query.setOrdering("writeId asc");
+      @SuppressWarnings("unchecked")
+      List<Long> writes = (List<Long>) query.executeWithArray(
+          tblName, dbName, watermarkId, nextWriteId, String.valueOf(state));
+      success = true;
+      return (writes == null || writes.isEmpty()) ? null : new ArrayList<>(writes);
+    } finally {
+      if (success) {
+        commitTransaction();
+      } else {
+        rollbackTransaction();
+      }
+      if (query != null) {
+        query.closeAll();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index c5359cf..170c07d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -94,6 +94,15 @@ public interface RawStore extends Configurable {
   public abstract boolean commitTransaction();
 
   /**
+   * Commits transaction and detects if the failure to do so is a deadlock or not.
+   * Must be called on the top level with regard to openTransaction calls; attempting to
+   * call this after several nested openTransaction calls will throw.
+   * @return true or false - same as commitTransaction; null in case of deadlock.
+   */
+  @CanNotRetry
+  public abstract Boolean commitTransactionExpectDeadlock();
+
+  /**
    * Rolls back the current transaction if it is active
    */
   @CanNotRetry
@@ -687,4 +696,6 @@ public interface RawStore extends Configurable {
   MTableWrite getTableWrite(String dbName, String tblName, long writeId) throws MetaException;
 
   void createTableWrite(Table tbl, long writeId, char state, long heartbeat);
+
+  List<Long> getWriteIds(String dbName, String tblName, long watermarkId, long nextWriteId, char state) throws MetaException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 4fbeb9e..829f0ae 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.PartFilterExprUtil;
 import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
 import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.RawStore.CanNotRetry;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -131,13 +132,27 @@ public class HBaseStore implements RawStore {
   @Override
   public boolean commitTransaction() {
     if (--txnNestLevel == 0) {
-      LOG.debug("Committing HBase transaction");
-      getHBase().commit();
+      commitInternal();
     }
     return true;
   }
 
   @Override
+  @CanNotRetry
+  public Boolean commitTransactionExpectDeadlock() {
+    if (--txnNestLevel != 0) {
+      throw new AssertionError("Cannot be called on a nested transaction");
+    }
+    commitInternal();
+    return true;
+  }
+
+  private void commitInternal() {
+    LOG.debug("Committing HBase transaction");
+    getHBase().commit();
+  }
+
+  @Override
   public void rollbackTransaction() {
     txnNestLevel = 0;
     LOG.debug("Rolling back HBase transaction");
@@ -2741,4 +2756,12 @@ public class HBaseStore implements RawStore {
     // TODO: Auto-generated method stub
     throw new UnsupportedOperationException();
   }
+ 
+
+  @Override
+  public List<Long> getWriteIds(
+      String dbName, String tblName, long watermarkId, long nextWriteId, char state) {
+    // TODO: Auto-generated method stub
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/model/package.jdo
----------------------------------------------------------------------
diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo
index 5126556..bd71056 100644
--- a/metastore/src/model/package.jdo
+++ b/metastore/src/model/package.jdo
@@ -53,7 +53,7 @@
            <column name="PARAM_VALUE" length="4000" jdbc-type="VARCHAR"/>
         </value>
       </field>
-      <field name="ownerName">	
+      <field name="ownerName">    
         <column name="OWNER_NAME" length="128" jdbc-type="VARCHAR" allows-null="true"/>
       </field>
        <field name="ownerType">
@@ -183,10 +183,10 @@
         <column name="TBL_TYPE" length="128" jdbc-type="VARCHAR"/>
       </field>
       <field name="mmNextWriteId">
-        <column name="MM_NEXT_WRITE_ID" jdbc-type="BIGINT"/>
+        <column name="MM_NEXT_WRITE_ID" jdbc-type="BIGINT" default-value="0" />
       </field>
       <field name="mmWatermarkWriteId">
-        <column name="MM_WATERMARK_WRITE_ID" jdbc-type="BIGINT"/>
+        <column name="MM_WATERMARK_WRITE_ID" jdbc-type="BIGINT" default-value="-1" />
       </field>
     </class>
 
@@ -210,7 +210,7 @@
         <column name="PARENT_CD_ID"/>
       </field>
       <field name="parentIntegerIndex">
-	<column name="PARENT_INTEGER_IDX"/>
+    <column name="PARENT_INTEGER_IDX"/>
       </field>
       <field name="parentTable">
         <column name="PARENT_TBL_ID"/>
@@ -219,7 +219,7 @@
         <column name="CONSTRAINT_TYPE"/>
       </field>
       <field name="deleteRule">
-	<column name="DELETE_RULE"/>
+        <column name="DELETE_RULE"/>
       </field>
       <field name="updateRule">
         <column name="UPDATE_RULE"/>
@@ -288,7 +288,7 @@
           </embedded>
         </element>
       </field>
-	</class>
+    </class>
 
    <class name="MStringList" identity-type="datastore" table="Skewed_STRING_LIST" detachable="true">
      <datastore-identity>
@@ -308,7 +308,7 @@
         <column name="SD_ID"/>
       </datastore-identity>
       <field name="cd">
-      	<column name="CD_ID"/>
+          <column name="CD_ID"/>
       </field>
       <field name="location">
         <column name="LOCATION" length="4000" jdbc-type="VARCHAR"/>
@@ -1003,7 +1003,7 @@
       <field name="className">
         <column name="CLASS_NAME" length="4000" jdbc-type="VARCHAR"/>
       </field>
-      <field name="ownerName">	
+      <field name="ownerName">    
         <column name="OWNER_NAME" length="128" jdbc-type="VARCHAR"/>
       </field>
        <field name="ownerType">

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 9fffd3f..98c543f 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -878,4 +878,16 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
   public MTableWrite getTableWrite(String dbName, String tblName, long writeId) {
     return null;
   }
+
+  @Override
+  @CanNotRetry
+  public Boolean commitTransactionExpectDeadlock() {
+    return null;
+  }
+
+  @Override
+  public List<Long> getWriteIds(
+      String dbName, String tblName, long watermarkId, long nextWriteId, char state) {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index a763085..8e54b16 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -104,14 +104,17 @@ public class DummyRawStoreForJdoConnection implements RawStore {
 
   @Override
   public boolean commitTransaction() {
+    return false;
+  }
 
+  @Override
+  @CanNotRetry
+  public Boolean commitTransactionExpectDeadlock() {
     return false;
   }
 
   @Override
   public void rollbackTransaction() {
-
-
   }
 
   @Override
@@ -893,6 +896,12 @@ public class DummyRawStoreForJdoConnection implements RawStore {
   public MTableWrite getTableWrite(String dbName, String tblName, long writeId) {
     return null;
   }
+
+  @Override
+  public List<Long> getWriteIds(
+      String dbName, String tblName, long watermarkId, long nextWriteId, char state) {
+    return null;
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 42d398d..45a80e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -39,6 +39,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
@@ -46,8 +47,11 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.conf.HiveVariableSource;
 import org.apache.hadoop.hive.conf.VariableSubstitution;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.Schema;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -71,6 +75,7 @@ import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContext;
 import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContextImpl;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
@@ -1416,6 +1421,11 @@ public class Driver implements CommandProcessor {
         return rollback(createProcessorResponse(ret));
       }
     }
+    try {
+      acquireWriteIds(plan, conf);
+    } catch (HiveException e) {
+      return handleHiveException(e, 1);
+    }
     ret = execute();
     if (ret != 0) {
       //if needRequireLock is false, the release here will do nothing because there is no lock
@@ -1458,6 +1468,34 @@ public class Driver implements CommandProcessor {
     return createProcessorResponse(ret);
   }
 
+  private static void acquireWriteIds(QueryPlan plan, HiveConf conf) throws HiveException {
+    // Output IDs are put directly into FileSinkDesc; here, we only need to take care of inputs.
+    for (ReadEntity input : plan.getInputs()) {
+      Table t = extractMmTable(input);
+      if (t == null) continue;
+      ValidWriteIds ids = Hive.get().getValidWriteIdsForTable(t.getDbName(), t.getTableName());
+      ids.addToConf(conf, t.getDbName(), t.getTableName());
+      if (plan.getFetchTask() != null) {
+        ids.addToConf(plan.getFetchTask().getFetchConf(), t.getDbName(), t.getTableName());
+      }
+    }
+  }
+
+  private static Table extractMmTable(ReadEntity input) {
+    Table t = null;
+    switch (input.getType()) {
+      case TABLE:
+        t = input.getTable();
+        break;
+      case DUMMYPARTITION:
+      case PARTITION:
+        t = input.getPartition().getTable();
+        break;
+      default: return null;
+    }
+    return (t != null && !t.isTemporary() && AcidUtils.isMmTable(t)) ? t : null;
+  }
+
   private CommandProcessorResponse rollback(CommandProcessorResponse cpr) {
     //console.printError(cpr.toString());
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 601ad08..7375cd4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -30,6 +30,7 @@ import java.util.Properties;
 
 import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -695,4 +696,8 @@ public class FetchOperator implements Serializable {
       return inputFormat.getRecordReader(getInputSplit(), job, Reporter.NULL);
     }
   }
+
+  public Configuration getJobConf() {
+    return job;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index 8c7d99d..93c03fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
@@ -193,4 +194,8 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
     }
   }
 
+  public Configuration getFetchConf() {
+    return fetch.getJobConf();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 6a0143a..e4e0153 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -40,10 +40,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
@@ -239,7 +239,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         }
       }
       if (isMmTable) {
-        Path manifestPath = new Path(specPath, "_tmp." + getMmPrefixedTaskId() + MANIFEST_EXTENSION);
+        Path manifestPath = new Path(specPath, "_tmp." + ValidWriteIds.getMmFilePrefix(
+            conf.getMmWriteId()) + "_" + taskId + MANIFEST_EXTENSION);
         Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths);
         try {
           try (FSDataOutputStream out = fs.create(manifestPath)) {
@@ -323,11 +324,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           }
           outPaths[filesIdx] = getTaskOutPath(taskId);
         } else {
+          String subdirPath = ValidWriteIds.getMmFilePrefix(conf.getMmWriteId()) + "/" + taskId;
           if (!bDynParts && !isSkewedStoredAsSubDirectories) {
-            finalPaths[filesIdx] = getFinalPath(getMmPrefixedTaskId(), specPath, extension);
+            finalPaths[filesIdx] = getFinalPath(subdirPath, specPath, extension);
           } else {
-            // TODO# wrong!
-            finalPaths[filesIdx] = getFinalPath(getMmPrefixedTaskId(), specPath, extension);
+            // TODO# wrong! special case #N bucketing
+            finalPaths[filesIdx] = getFinalPath(subdirPath, specPath, extension);
           }
           outPaths[filesIdx] = finalPaths[filesIdx];
         }
@@ -721,10 +723,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     }
   }
 
-  private String getMmPrefixedTaskId() {
-    return AcidUtils.getMmFilePrefix(conf.getMmWriteId()) + taskId;
-  }
-
   protected Writable recordValue;
 
 
@@ -1195,21 +1193,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     super.jobCloseOp(hconf, success);
   }
 
-  private static class ExecPrefixPathFilter implements PathFilter {
-    private final String prefix, tmpPrefix;
-    public ExecPrefixPathFilter(String prefix) {
-      this.prefix = prefix;
-      this.tmpPrefix = "_tmp." + prefix;
-    }
-
-    @Override
-    public boolean accept(Path path) {
-      String name = path.getName();
-      return name.startsWith(prefix) || name.startsWith(tmpPrefix);
-    }
-  }
-
-
   private void handleMmTable(Path specPath, Configuration hconf, boolean success,
       DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter)
           throws IOException, HiveException {
@@ -1217,7 +1200,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     int targetLevel = (dpCtx == null) ? 1 : dpCtx.getNumDPCols();
     if (!success) {
       FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(specPath, targetLevel, fs,
-          new ExecPrefixPathFilter(AcidUtils.getMmFilePrefix(conf.getMmWriteId())));
+          new ValidWriteIds.IdPathFilter(conf.getMmWriteId(), true));
       for (FileStatus status : statuses) {
         Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure");
         tryDelete(fs, status.getPath());
@@ -1225,15 +1208,19 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       return;
     }
     FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(specPath, targetLevel, fs,
-        new ExecPrefixPathFilter(AcidUtils.getMmFilePrefix(conf.getMmWriteId())));
+        new ValidWriteIds.IdPathFilter(conf.getMmWriteId(), true));
     if (statuses == null) return;
     LinkedList<FileStatus> results = new LinkedList<>();
     List<Path> manifests = new ArrayList<>(statuses.length);
     for (FileStatus status : statuses) {
       if (status.getPath().getName().endsWith(MANIFEST_EXTENSION)) {
         manifests.add(status.getPath());
+      } else if (!status.isDirectory()) {
+        Path path = status.getPath();
+        Utilities.LOG14535.warn("Unknown file found - neither a manifest nor directory: " + path);
+        tryDelete(fs, path);
       } else {
-        results.add(status);
+        results.addAll(Lists.newArrayList(fs.listStatus(status.getPath())));
       }
     }
     HashSet<String> committed = new HashSet<>();
@@ -1254,7 +1241,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       if (!committed.remove(rfs.getPath().toString())) {
         iter.remove();
         Utilities.LOG14535.info("Deleting " + rfs.getPath() + " that was not committed");
-        tryDelete(fs, rfs.getPath());
+        // We should actually succeed here - if we fail, don't commit the query.
+        if (!fs.delete(rfs.getPath(), true)) {
+          throw new HiveException("Failed to delete an uncommitted path " + rfs.getPath());
+        }
       }
     }
     if (!committed.isEmpty()) {
@@ -1268,6 +1258,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     if (results.isEmpty()) return;
     FileStatus[] finalResults = results.toArray(new FileStatus[results.size()]);
 
+    // TODO# dp will break - removeTempOrDuplicateFiles assumes dirs in results. Why? We recurse...
     List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
         fs, finalResults, dpCtx, conf, hconf);
     // create empty buckets if necessary
@@ -1278,7 +1269,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
 
   private void tryDelete(FileSystem fs, Path path) {
     try {
-      fs.delete(path, false);
+      fs.delete(path, true);
     } catch (IOException ex) {
       LOG.error("Failed to delete " + path, ex);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index f2389ea..3be21c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -314,17 +314,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
         checkFileFormats(db, tbd, table);
 
         boolean isAcid = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID;
-        if (tbd.isMmTable()) {
-          if (tbd.getReplace()) {
-            // TODO#: would need a list of new files to support. Then, old ones only would need
-            //        to be removed from MS (and FS). Also, per-partition IOW is problematic for
-            //        the prefix case.
-            throw new HiveException("Replace and MM are not supported");
-          }
-          if (isAcid) {
-            // TODO# need to make sure ACID writes to final directories. Otherwise, might need to move.
-            throw new HiveException("ACID and MM are not supported");
-          }
+        if (tbd.isMmTable() && isAcid) {
+          // TODO# need to make sure ACID writes to final directories. Otherwise, might need to move.
+          throw new HiveException("ACID and MM are not supported");
         }
 
         // Create a data container

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 9e6a201..03abdc1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -123,6 +123,7 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
 import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
@@ -161,6 +162,7 @@ import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.Serializer;
@@ -192,6 +194,7 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Shell;
 import org.apache.hive.common.util.ReflectionUtil;
@@ -199,6 +202,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.esotericsoftware.kryo.Kryo;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -651,6 +655,7 @@ public final class Utilities {
     }
   }
 
+  @VisibleForTesting
   public static TableDesc defaultTd;
   static {
     // by default we expect ^A separated strings
@@ -658,7 +663,16 @@ public final class Utilities {
     // PlanUtils.getDefaultTableDesc(String separatorCode, String columns)
     // or getBinarySortableTableDesc(List<FieldSchema> fieldSchemas) when
     // we know the column names.
-    defaultTd = PlanUtils.getDefaultTableDesc("" + Utilities.ctrlaCode);
+    /**
+     * Generate the table descriptor of MetadataTypedColumnsetSerDe with the
+     * separatorCode. MetaDataTypedColumnsetSerDe is used because LazySimpleSerDe
+     * does not support a table with a single column "col" with type
+     * "array<string>".
+     */
+    defaultTd = new TableDesc(TextInputFormat.class, IgnoreKeyTextOutputFormat.class,
+        Utilities.makeProperties(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT,
+            "" + Utilities.ctrlaCode, serdeConstants.SERIALIZATION_LIB,
+            MetadataTypedColumnsetSerDe.class.getName()));
   }
 
   public static final int carriageReturnCode = 13;
@@ -1528,14 +1542,9 @@ public final class Utilities {
           // get the missing buckets and generate empty buckets
           String taskID1 = taskIDToFile.keySet().iterator().next();
           Path bucketPath = taskIDToFile.values().iterator().next().getPath();
+          Utilities.LOG14535.info("Bucket path " + bucketPath);
           for (int j = 0; j < dpCtx.getNumBuckets(); ++j) {
-            String taskID2 = replaceTaskId(taskID1, j);
-            if (!taskIDToFile.containsKey(taskID2)) {
-              // create empty bucket, file name should be derived from taskID2
-              URI bucketUri = bucketPath.toUri();
-              String path2 = replaceTaskIdFromFilename(bucketUri.getPath().toString(), j);
-              result.add(new Path(bucketUri.getScheme(), bucketUri.getAuthority(), path2));
-            }
+            addBucketFileIfMissing(result, taskIDToFile, taskID1, bucketPath, j);
           }
         }
       }
@@ -1550,14 +1559,9 @@ public final class Utilities {
           // get the missing buckets and generate empty buckets for non-dynamic partition
         String taskID1 = taskIDToFile.keySet().iterator().next();
         Path bucketPath = taskIDToFile.values().iterator().next().getPath();
+        Utilities.LOG14535.info("Bucket path " + bucketPath);
         for (int j = 0; j < conf.getTable().getNumBuckets(); ++j) {
-          String taskID2 = replaceTaskId(taskID1, j);
-          if (!taskIDToFile.containsKey(taskID2)) {
-            // create empty bucket, file name should be derived from taskID2
-            URI bucketUri = bucketPath.toUri();
-            String path2 = replaceTaskIdFromFilename(bucketUri.getPath().toString(), j);
-            result.add(new Path(bucketUri.getScheme(), bucketUri.getAuthority(), path2));
-          }
+          addBucketFileIfMissing(result, taskIDToFile, taskID1, bucketPath, j);
         }
       }
     }
@@ -1565,6 +1569,19 @@ public final class Utilities {
     return result;
   }
 
+  private static void addBucketFileIfMissing(List<Path> result,
+      HashMap<String, FileStatus> taskIDToFile, String taskID1, Path bucketPath, int j) {
+    // TODO# this will probably break with directories cause buckets would be above (or not?)
+    String taskID2 = replaceTaskId(taskID1, j);
+    if (!taskIDToFile.containsKey(taskID2)) {
+      // create empty bucket, file name should be derived from taskID2
+      URI bucketUri = bucketPath.toUri();
+      String path2 = replaceTaskIdFromFilename(bucketUri.getPath().toString(), j);
+      Utilities.LOG14535.info("Creating an empty bucket file " + path2);
+      result.add(new Path(bucketUri.getScheme(), bucketUri.getAuthority(), path2));
+    }
+  }
+
   public static HashMap<String, FileStatus> removeTempOrDuplicateFiles(FileStatus[] items,
       FileSystem fs) throws IOException {
 
@@ -2976,8 +2993,9 @@ public final class Utilities {
 
       // The alias may not have any path
       Path path = null;
-      for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) {
-        List<String> aliases = work.getPathToAliases().get(file);
+      for (Map.Entry<Path, ArrayList<String>> e : work.getPathToAliases().entrySet()) {
+        Path file = e.getKey();
+        List<String> aliases = e.getValue();
         if (aliases.contains(alias)) {
           path = file;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 1ef15cb..70b129e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1167,8 +1167,4 @@ public class AcidUtils {
     }
     return AcidOperationalProperties.parseString(resultStr);
   }
-
-  public static String getMmFilePrefix(long mmWriteId) {
-    return "mm_" + mmWriteId + "_";
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index c4b9940..0510e08 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -23,9 +23,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -39,8 +41,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
@@ -345,7 +349,10 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
    */
   private void addSplitsForGroup(List<Path> dirs, TableScanOperator tableScan, JobConf conf,
       InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits,
-      TableDesc table, List<InputSplit> result) throws IOException {
+      TableDesc table, Map<String, ValidWriteIds> writeIdMap, List<InputSplit> result)
+          throws IOException {
+    ValidWriteIds writeIds = extractWriteIds(writeIdMap, conf, table.getTableName());
+    Utilities.LOG14535.info("Observing " + table.getTableName() + ": " + writeIds);
 
     Utilities.copyTablePropertiesToConf(table, conf);
 
@@ -353,7 +360,19 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       pushFilters(conf, tableScan);
     }
 
-    FileInputFormat.setInputPaths(conf, dirs.toArray(new Path[dirs.size()]));
+    if (writeIds == null) {
+      FileInputFormat.setInputPaths(conf, dirs.toArray(new Path[dirs.size()]));
+    } else {
+      List<Path> finalPaths = new ArrayList<>(dirs.size());
+      for (Path dir : dirs) {
+        processForWriteIds(dir, conf, writeIds, finalPaths);
+      }
+      if (finalPaths.isEmpty()) {
+        LOG.warn("No valid inputs found in " + dirs);
+        return;
+      }
+      FileInputFormat.setInputPaths(conf, finalPaths.toArray(new Path[finalPaths.size()]));
+    }
     conf.setInputFormat(inputFormat.getClass());
 
     int headerCount = 0;
@@ -373,6 +392,24 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     }
   }
 
+  private void processForWriteIds(Path dir, JobConf conf,
+      ValidWriteIds writeIds, List<Path> finalPaths) throws IOException {
+    FileStatus[] files = dir.getFileSystem(conf).listStatus(dir); // TODO: batch?
+    for (FileStatus file : files) {
+      Path subdir = file.getPath();
+      if (!file.isDirectory()) {
+        Utilities.LOG14535.warn("Found a file not in subdirectory " + subdir);
+        continue;
+      }
+      if (!writeIds.isValidInput(subdir)) {
+        Utilities.LOG14535.warn("Ignoring an uncommitted directory " + subdir);
+        continue;
+      }
+      Utilities.LOG14535.info("Adding input " + subdir);
+      finalPaths.add(subdir);
+    }
+  }
+
   Path[] getInputPaths(JobConf job) throws IOException {
     Path[] dirs;
     if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
@@ -416,6 +453,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     StringBuilder readColumnNamesBuffer = new StringBuilder(newjob.
       get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""));
     // for each dir, get the InputFormat, and do getSplits.
+    Map<String, ValidWriteIds> writeIdMap = new HashMap<>();
     for (Path dir : dirs) {
       PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
       Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
@@ -466,7 +504,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
         addSplitsForGroup(currentDirs, currentTableScan, newjob,
             getInputFormatFromCache(currentInputFormatClass, job),
             currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length),
-            currentTable, result);
+            currentTable, writeIdMap, result);
       }
 
       currentDirs.clear();
@@ -488,7 +526,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       addSplitsForGroup(currentDirs, currentTableScan, newjob,
           getInputFormatFromCache(currentInputFormatClass, job),
           currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length),
-          currentTable, result);
+          currentTable, writeIdMap, result);
     }
 
     Utilities.clearWorkMapForConf(job);
@@ -499,6 +537,19 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     return result.toArray(new HiveInputSplit[result.size()]);
   }
 
+  private static ValidWriteIds extractWriteIds(Map<String, ValidWriteIds> writeIdMap,
+      JobConf newjob, String tableName) {
+    if (StringUtils.isBlank(tableName)) return null;
+    ValidWriteIds writeIds = writeIdMap.get(tableName);
+    if (writeIds == null) {
+      writeIds = ValidWriteIds.createFromConf(newjob, tableName);
+      writeIdMap.put(tableName, writeIds != null ? writeIds : ValidWriteIds.NO_WRITE_IDS);
+    } else if (writeIds == ValidWriteIds.NO_WRITE_IDS) {
+      writeIds = null;
+    }
+    return writeIds;
+  }
+
   private void pushProjection(final JobConf newjob, final StringBuilder readColumnsBuffer,
       final StringBuilder readColumnNamesBuffer) {
     String readColIds = readColumnsBuffer.toString();

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 2ba4fa2..f3609df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -96,6 +97,7 @@ import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResult;
 import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
 import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
 import org.apache.hadoop.hive.metastore.api.HiveObjectType;
@@ -1563,6 +1565,11 @@ public class Hive {
         if (areEventsForDmlNeeded(tbl, oldPart)) {
           newFiles = listFilesCreatedByQuery(loadPath, mmWriteId);
         }
+        if (replace) {
+          Path tableDest = tbl.getPath();
+          deleteOldPathForReplace(newPartPath, oldPartPath,
+              getConf(), new ValidWriteIds.IdPathFilter(mmWriteId, false));
+        }
       } else {
         if (replace || (oldPart == null && !isAcid)) {
           replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(),
@@ -1652,7 +1659,7 @@ public class Hive {
 
   private List<Path> listFilesCreatedByQuery(Path loadPath, long mmWriteId) throws HiveException {
     List<Path> newFiles = new ArrayList<Path>();
-    final String filePrefix = AcidUtils.getMmFilePrefix(mmWriteId);
+    final String filePrefix = ValidWriteIds.getMmFilePrefix(mmWriteId);
     FileStatus[] srcs;
     FileSystem srcFs;
     try {
@@ -1920,7 +1927,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       for (Future future : futures) {
         future.get();
       }
-      // TODO# we would commit the txn to metastore here
+      // TODO# special case #N - DP - we would commit the txn to metastore here
     } catch (InterruptedException | ExecutionException e) {
       LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks");
       //cancel other futures
@@ -1993,6 +2000,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
         }
       }
     } else {
+      if (replace) {
+        Path tableDest = tbl.getPath();
+        deleteOldPathForReplace(tableDest, tableDest, sessionConf,
+            new ValidWriteIds.IdPathFilter(mmWriteId, false));
+      }
       newFiles = listFilesCreatedByQuery(loadPath, mmWriteId);
     }
     if (!this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
@@ -3376,39 +3388,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
       }
 
       if (oldPath != null) {
-        boolean oldPathDeleted = false;
-        boolean isOldPathUnderDestf = false;
-        FileStatus[] statuses = null;
-        try {
-          FileSystem oldFs = oldPath.getFileSystem(conf);
-          statuses = oldFs.listStatus(oldPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
-          // Do not delete oldPath if:
-          //  - destf is subdir of oldPath
-          isOldPathUnderDestf = isSubDir(oldPath, destf, oldFs, destFs, false);
-          if (isOldPathUnderDestf) {
-            // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
-            // existing content might result in incorrect (extra) data.
-            // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
-            // not the destf or its subdir?
-            oldPathDeleted = trashFiles(oldFs, statuses, conf);
-          }
-        } catch (IOException e) {
-          if (isOldPathUnderDestf) {
-            // if oldPath is a subdir of destf but it could not be cleaned
-            throw new HiveException("Directory " + oldPath.toString()
-                + " could not be cleaned up.", e);
-          } else {
-            //swallow the exception since it won't affect the final result
-            LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e);
-          }
-        }
-        if (statuses != null && statuses.length > 0) {
-          if (isOldPathUnderDestf && !oldPathDeleted) {
-            throw new HiveException("Destination directory " + destf + " has not be cleaned up.");
-          }
-        }
+        deleteOldPathForReplace(destf, oldPath, conf, FileUtils.HIDDEN_FILES_PATH_FILTER);
       }
 
+      // TODO# what are the paths that use this? MM tables will need to do this beforehand
       // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
       // destf with inherited permissions
       boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars
@@ -3442,6 +3425,37 @@ private void constructOneLBLocationMap(FileStatus fSta,
   }
 
 
+  private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf,
+      PathFilter pathFilter) throws HiveException {
+    boolean isOldPathUnderDestf = false;
+    try {
+      FileSystem oldFs = oldPath.getFileSystem(conf);
+      FileSystem destFs = destPath.getFileSystem(conf);
+      // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
+      // existing content might result in incorrect (extra) data.
+      // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
+      // not the destf or its subdir?
+      isOldPathUnderDestf = isSubDir(oldPath, destPath, oldFs, destFs, false);
+      if (isOldPathUnderDestf) {
+        FileStatus[] statuses = oldFs.listStatus(oldPath, pathFilter);
+        if (statuses != null && statuses.length > 0 && !trashFiles(oldFs, statuses, conf)) {
+          throw new HiveException("Destination directory " + destPath
+              + " has not been cleaned up.");
+        }
+      }
+    } catch (IOException e) {
+      if (isOldPathUnderDestf) {
+        // if oldPath is a subdir of destf but it could not be cleaned
+        throw new HiveException("Directory " + oldPath.toString()
+            + " could not be cleaned up.", e);
+      } else {
+        //swallow the exception since it won't affect the final result
+        LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e);
+      }
+    }
+  }
+
+
   /**
    * Trashes or deletes all files under a directory. Leaves the directory as is.
    * @param fs FileSystem to use
@@ -4007,7 +4021,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
   }
 
-
   public long getNextTableWriteId(String dbName, String tableName) throws HiveException {
     try {
       return getMSC().getNextTableWriteId(dbName, tableName);
@@ -4015,4 +4028,17 @@ private void constructOneLBLocationMap(FileStatus fSta,
       throw new HiveException(e);
     }
   }
+
+  public ValidWriteIds getValidWriteIdsForTable(
+      String dbName, String tableName) throws HiveException {
+    try {
+      // TODO: decode ID ranges here if we use that optimization
+      GetValidWriteIdsResult result = getMSC().getValidWriteIds(dbName, tableName);
+      return new ValidWriteIds(result.getLowWatermarkId(), result.getHighWatermarkId(),
+          result.isSetAreIdsValid() && result.isAreIdsValid(),
+          result.isSetIds() ? new HashSet<Long>(result.getIds()) : null);
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
 };

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index bb7001a..675bfd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1808,7 +1808,7 @@ public final class GenMapRedUtils {
 
       // Create the required temporary file in the HDFS location if the destination
       // path of the FileSinkOperator table is a blobstore path.
-      // TODO# HERE
+      // TODO# special case #N - linked FDs (unions?)
       Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath());
 
       // Change all the linked file sink descriptors

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
index e2887fd..ee67443 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
@@ -71,7 +71,6 @@ public class AnnotateRunTimeStatsOptimizer implements PhysicalPlanResolver {
       Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
       Set<Operator<? extends OperatorDesc>> ops = new HashSet<>();
 
-      /* TODO# wtf
       if (currTask instanceof MapRedTask) {
         MapRedTask mr = (MapRedTask) currTask;
         ops.addAll(mr.getWork().getAllOperators());
@@ -85,7 +84,7 @@ public class AnnotateRunTimeStatsOptimizer implements PhysicalPlanResolver {
         for (BaseWork w : sparkWork.getAllWork()) {
           ops.addAll(w.getAllOperators());
         }
-      }*/
+      }
 
       setOrAnnotateStats(ops, physicalContext.getParseContext());
       return null;

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 422be8e..93fe0e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -206,7 +206,7 @@ public abstract class TaskCompiler {
       }
     } else if (!isCStats) {
       for (LoadTableDesc ltd : loadTableWork) {
-        // TODO# HERE
+        // TODO# move task is created here; handle MM tables
         Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
         mvTask.add(tsk);
         // Check to see if we are stale'ing any indexes and auto-update them if we want

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 5cc3663..1be4d84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.JobConf;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Interner;
 
 /**
@@ -375,6 +376,7 @@ public class MapWork extends BaseWork {
   }
 
   @SuppressWarnings("nls")
+  @VisibleForTesting
   public void addMapWork(Path path, String alias, Operator<?> work,
       PartitionDesc pd) {
     ArrayList<String> curAliases = pathToAliases.get(path);

http://git-wip-us.apache.org/repos/asf/hive/blob/3e481b47/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 5dc3aa6..f055cde 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -381,20 +381,6 @@ public final class PlanUtils {
   }
 
   /**
-   * Generate the table descriptor of MetadataTypedColumnsetSerDe with the
-   * separatorCode. MetaDataTypedColumnsetSerDe is used because LazySimpleSerDe
-   * does not support a table with a single column "col" with type
-   * "array<string>".
-   */
-  public static TableDesc getDefaultTableDesc(String separatorCode) {
-    return new TableDesc(
-        TextInputFormat.class, IgnoreKeyTextOutputFormat.class, Utilities
-        .makeProperties(
-            org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT,separatorCode,
-            serdeConstants.SERIALIZATION_LIB,MetadataTypedColumnsetSerDe.class.getName()));
-  }
-
-  /**
    * Generate the table descriptor for reduce key.
    */
   public static TableDesc getReduceKeyTableDesc(List<FieldSchema> fieldSchemas,


Mime
View raw message