hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sank...@apache.org
Subject [01/12] hive git commit: HIVE-19089: Create/Replicate Allocate write-id event (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
Date Tue, 17 Apr 2018 19:15:11 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-3 0946ab207 -> 9db29e9d4


http://git-wip-us.apache.org/repos/asf/hive/blob/9db29e9d/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 9256b7a..39a0f31 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -124,6 +124,7 @@ import org.apache.hadoop.hive.metastore.datasource.BoneCPDataSourceProvider;
 import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
 import org.apache.hadoop.hive.metastore.datasource.HikariCPDataSourceProvider;
 import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
+import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
 import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
 import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
@@ -584,24 +585,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         stmt = dbConn.createStatement();
 
         if (rqst.isSetReplPolicy()) {
-          List<String> inQueries = new ArrayList<>();
-          StringBuilder prefix = new StringBuilder();
-          StringBuilder suffix = new StringBuilder();
-
-          prefix.append("select RTM_TARGET_TXN_ID from REPL_TXN_MAP where ");
-          suffix.append(" and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()));
-
-          TxnUtils.buildQueryWithINClause(conf, inQueries, prefix, suffix, rqst.getReplSrcTxnIds(),
-                  "RTM_SRC_TXN_ID", false, false);
-
-          for (String query : inQueries) {
-            LOG.debug("Going to execute select <" + query + ">");
-            rs = stmt.executeQuery(query);
-            if (rs.next()) {
-              LOG.info("Transactions " + rqst.getReplSrcTxnIds().toString() +
-                      " are already present for repl policy " + rqst.getReplPolicy());
-              return new OpenTxnsResponse(new ArrayList<>());
+          List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(),
stmt);
+          if (!targetTxnIdList.isEmpty()) {
+            if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
+              LOG.warn("target txn id number " + targetTxnIdList.toString() +
+                      " is not matching with source txn id number " + rqst.getReplSrcTxnIds().toString());
             }
+            LOG.info("Target transactions " + targetTxnIdList.toString() + " are present
for repl policy :" +
+              rqst.getReplPolicy() + " and Source transaction id : " + rqst.getReplSrcTxnIds().toString());
+            return new OpenTxnsResponse(targetTxnIdList);
           }
         }
 
@@ -677,8 +669,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         }
 
         if (transactionalListeners != null) {
-          MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
-                  EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, dbConn, sqlGenerator));
+          MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                  EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn,
sqlGenerator);
         }
 
         LOG.debug("Going to commit");
@@ -699,21 +691,27 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
-  private Long getTargetTxnId(String replPolicy, long sourceTxnId, Statement stmt) throws
SQLException {
+  private List<Long> getTargetTxnIdList(String replPolicy, List<Long> sourceTxnIdList,
Statement stmt)
+          throws SQLException {
     ResultSet rs = null;
     try {
-      String s = "select RTM_TARGET_TXN_ID from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId
+
-
-              " and RTM_REPL_POLICY = " + quoteString(replPolicy);
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
-      if (!rs.next()) {
-        LOG.info("Target txn is missing for the input source txn for ReplPolicy: " +
-                quoteString(replPolicy) + " , srcTxnId: " + sourceTxnId);
-        return -1L;
+      List<String> inQueries = new ArrayList<>();
+      StringBuilder prefix = new StringBuilder();
+      StringBuilder suffix = new StringBuilder();
+      List<Long> targetTxnIdList = new ArrayList<>();
+      prefix.append("select RTM_TARGET_TXN_ID from REPL_TXN_MAP where ");
+      suffix.append(" and RTM_REPL_POLICY = " + quoteString(replPolicy));
+      TxnUtils.buildQueryWithINClause(conf, inQueries, prefix, suffix, sourceTxnIdList,
+              "RTM_SRC_TXN_ID", false, false);
+      for (String query : inQueries) {
+        LOG.debug("Going to execute select <" + query + ">");
+        rs = stmt.executeQuery(query);
+        while (rs.next()) {
+          targetTxnIdList.add(rs.getLong(1));
+        }
       }
-      LOG.debug("targetTxnid for srcTxnId " + sourceTxnId + " is " + rs.getLong(1));
-      return rs.getLong(1);
+      LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString());
+      return targetTxnIdList;
     }  catch (SQLException e) {
       LOG.warn("failed to get target txn ids " + e.getMessage());
       throw e;
@@ -737,10 +735,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         if (rqst.isSetReplPolicy()) {
           sourceTxnId = rqst.getTxnid();
-          txnid = getTargetTxnId(rqst.getReplPolicy(), sourceTxnId, stmt);
-          if (txnid == -1) {
+          List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
+                  Collections.singletonList(sourceTxnId), stmt);
+          if (targetTxnIds.isEmpty()) {
+            LOG.info("Target txn id is missing for source txn id : " + sourceTxnId +
+                    " and repl policy " + rqst.getReplPolicy());
             return;
           }
+          assert targetTxnIds.size() == 1;
+          txnid = targetTxnIds.get(0);
         }
 
         if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
@@ -769,8 +772,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         }
 
         if (transactionalListeners != null) {
-          MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
-                  EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, dbConn, sqlGenerator));
+          MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                  EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, null), dbConn,
sqlGenerator);
         }
 
         LOG.debug("Going to commit");
@@ -808,8 +811,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         for (Long txnId : txnids) {
           if (transactionalListeners != null) {
-            MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
-                    EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, dbConn, sqlGenerator));
+            MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                    EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, null), dbConn,
sqlGenerator);
           }
         }
         LOG.debug("Going to commit");
@@ -880,10 +883,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         if (rqst.isSetReplPolicy()) {
           sourceTxnId = rqst.getTxnid();
-          txnid = getTargetTxnId(rqst.getReplPolicy(), sourceTxnId, stmt);
-          if (txnid == -1) {
+          List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
+                  Collections.singletonList(sourceTxnId), stmt);
+          if (targetTxnIds.isEmpty()) {
+            LOG.info("Target txn id is missing for source txn id : " + sourceTxnId +
+                    " and repl policy " + rqst.getReplPolicy());
             return;
           }
+          assert targetTxnIds.size() == 1;
+          txnid = targetTxnIds.get(0);
         }
 
         /**
@@ -1052,8 +1060,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         }
 
         if (transactionalListeners != null) {
-          MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
-                  EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, dbConn, sqlGenerator));
+          MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                  EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn,
sqlGenerator);
         }
 
         MaterializationsInvalidationCache materializationsInvalidationCache =
@@ -1225,9 +1233,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   }
 
   @Override
+  @RetrySemantics.Idempotent
   public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest
rqst)
           throws NoSuchTxnException, TxnAbortedException, MetaException {
-    List<Long> txnIds = rqst.getTxnIds();
+    List<Long> txnIds;
     String dbName = rqst.getDbName().toLowerCase();
     String tblName = rqst.getTableName().toLowerCase();
     try {
@@ -1235,12 +1244,36 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       Statement stmt = null;
       ResultSet rs = null;
       TxnStore.MutexAPI.LockHandle handle = null;
+      List<TxnToWriteId> txnToWriteIds = new ArrayList<>();
+      List<TxnToWriteId> srcTxnToWriteIds = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
 
-        Collections.sort(txnIds); //easier to read logs
+        if (rqst.isSetReplPolicy()) {
+          srcTxnToWriteIds = rqst.getSrcTxnToWriteIdList();
+          List<Long> srcTxnIds = new ArrayList<>();
+          assert (rqst.isSetSrcTxnToWriteIdList());
+          assert (!rqst.isSetTxnIds());
+          assert (!srcTxnToWriteIds.isEmpty());
+
+          for (TxnToWriteId txnToWriteId :  srcTxnToWriteIds) {
+            srcTxnIds.add(txnToWriteId.getTxnId());
+          }
+          txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, stmt);
+          if (srcTxnIds.size() != txnIds.size()) {
+            LOG.warn("Target txn id is missing for source txn id : " + srcTxnIds.toString()
+
+                    " and repl policy " + rqst.getReplPolicy());
+            throw new RuntimeException("This should never happen for txnIds: " + txnIds);
+          }
+        } else {
+          assert (!rqst.isSetSrcTxnToWriteIdList());
+          assert (rqst.isSetTxnIds());
+          txnIds = rqst.getTxnIds();
+        }
+
+        Collections.sort(txnIds); //easier to read logs and for assumption done in replication
flow
 
         // Check if all the input txns are in open state. Write ID should be allocated only
for open transactions.
         if (!isTxnsInOpenState(txnIds, stmt)) {
@@ -1248,10 +1281,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           throw new RuntimeException("This should never happen for txnIds: " + txnIds);
         }
 
-        List<TxnToWriteId> txnToWriteIds = new ArrayList<>();
-        List<Long> allocatedTxns = new ArrayList<>();
-        long txnId;
         long writeId;
+        String s;
+        long allocatedTxnsCount = 0;
+        long txnId;
         List<String> queries = new ArrayList<>();
         StringBuilder prefix = new StringBuilder();
         StringBuilder suffix = new StringBuilder();
@@ -1274,25 +1307,26 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             txnId = rs.getLong(1);
             writeId = rs.getLong(2);
             txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
-            allocatedTxns.add(txnId);
+            allocatedTxnsCount++;
             LOG.info("Reused already allocated writeID: " + writeId + " for txnId: " + txnId);
           }
         }
 
-        // If all the txns in the list have already allocated write ids, then just skip new
allocations
-        long numOfWriteIds = txnIds.size() - allocatedTxns.size();
-        assert(numOfWriteIds >= 0);
-        if (0 == numOfWriteIds) {
-          // If all the txns in the list have pre-allocated write ids for the given table,
then just return
+        // Batch allocation should always happen atomically. Either write ids for all txns
is allocated or none.
+        long numOfWriteIds = txnIds.size();
+        assert ((allocatedTxnsCount == 0) || (numOfWriteIds == allocatedTxnsCount));
+        if (allocatedTxnsCount == numOfWriteIds) {
+          // If all the txns in the list have pre-allocated write ids for the given table,
then just return.
+          // This is for idempotent case.
           return new AllocateTableWriteIdsResponse(txnToWriteIds);
         }
 
         handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
 
-        // There are some txns in the list which has no write id allocated and hence go ahead
and do it.
+        // There are some txns in the list which does not have write id allocated and hence
go ahead and do it.
         // Get the next write id for the given table and update it with new next write id.
         // This is select for update query which takes a lock if the table entry is already
there in NEXT_WRITE_ID
-        String s = sqlGenerator.addForUpdateClause(
+        s = sqlGenerator.addForUpdateClause(
                 "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
                         + " and nwi_table = " + quoteString(tblName));
         LOG.debug("Going to execute query <" + s + ">");
@@ -1300,14 +1334,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         if (!rs.next()) {
           // First allocation of write id should add the table to the next_write_id meta
table
           // The initial value for write id should be 1 and hence we add 1 with number of
write ids allocated here
+          writeId = 1;
           s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
                   + quoteString(dbName) + "," + quoteString(tblName) + "," + String.valueOf(numOfWriteIds
+ 1) + ")";
           LOG.debug("Going to execute insert <" + s + ">");
           stmt.execute(s);
-          writeId = 1;
         } else {
-          // Update the NEXT_WRITE_ID for the given table after incrementing by number of
write ids allocated
           writeId = rs.getLong(1);
+          // Update the NEXT_WRITE_ID for the given table after incrementing by number of
write ids allocated
           s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds)
                   + " where nwi_database = " + quoteString(dbName)
                   + " and nwi_table = " + quoteString(tblName);
@@ -1319,15 +1353,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         // write ids
         List<String> rows = new ArrayList<>();
         for (long txn : txnIds) {
-          if (allocatedTxns.contains(txn)) {
-            continue;
-          }
           rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", "
+ writeId);
           txnToWriteIds.add(new TxnToWriteId(txn, writeId));
           LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
           writeId++;
         }
 
+        if (rqst.isSetReplPolicy()) {
+          int lastIdx = txnToWriteIds.size()-1;
+          if ((txnToWriteIds.get(0).getWriteId() != srcTxnToWriteIds.get(0).getWriteId())
||
+              (txnToWriteIds.get(lastIdx).getWriteId() != srcTxnToWriteIds.get(lastIdx).getWriteId()))
{
+            LOG.error("Allocated write id range {} is not matching with the input write id
range {}.",
+                    txnToWriteIds, srcTxnToWriteIds);
+            throw new IllegalStateException("Write id allocation failed for: " + srcTxnToWriteIds);
+          }
+        }
+
         // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
         List<String> inserts = sqlGenerator.createInsertValuesStmt(
                 "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);
@@ -1336,6 +1377,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           stmt.execute(insert);
         }
 
+        if (transactionalListeners != null) {
+          MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                  EventMessage.EventType.ALLOC_WRITE_ID,
+                  new AllocWriteIdEvent(txnToWriteIds, rqst.getDbName(), rqst.getTableName(),
null),
+                  dbConn, sqlGenerator);
+        }
+
         LOG.debug("Going to commit");
         dbConn.commit();
         return new AllocateTableWriteIdsResponse(txnToWriteIds);

http://git-wip-us.apache.org/repos/asf/hive/blob/9db29e9d/standalone-metastore/src/main/thrift/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/thrift/hive_metastore.thrift b/standalone-metastore/src/main/thrift/hive_metastore.thrift
index 612afe1..5bba329 100644
--- a/standalone-metastore/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/src/main/thrift/hive_metastore.thrift
@@ -884,9 +884,14 @@ struct GetValidWriteIdsResponse {
 
 // Request msg to allocate table write ids for the given list of txns
 struct AllocateTableWriteIdsRequest {
-    1: required list<i64> txnIds,
-    2: required string dbName,
-    3: required string tableName,
+    1: required string dbName,
+    2: required string tableName,
+    // Either txnIds or replPolicy+srcTxnToWriteIdList can exist in a call. txnIds is used
by normal flow and
+    // replPolicy+srcTxnToWriteIdList is used by replication task.
+    3: optional list<i64> txnIds,
+    4: optional string replPolicy,
+    // The list is assumed to be sorted by both txnids and write ids. The write id list is
assumed to be contiguous.
+    5: optional list<TxnToWriteId> srcTxnToWriteIdList,
 }
 
 // Map for allocated write id against the txn for which it is allocated

http://git-wip-us.apache.org/repos/asf/hive/blob/9db29e9d/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
index ecddc7b..74c057b 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
@@ -2231,9 +2231,22 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient,
AutoClos
   @Override
   public List<TxnToWriteId> allocateTableWriteIdsBatch(List<Long> txnIds, String
dbName, String tableName)
           throws TException {
-    AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(txnIds, dbName,
tableName);
-    AllocateTableWriteIdsResponse writeIds = client.allocate_table_write_ids(rqst);
-    return writeIds.getTxnToWriteIds();
+    AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(dbName, tableName);
+    rqst.setTxnIds(txnIds);
+    return allocateTableWriteIdsBatchIntr(rqst);
+  }
+
+  @Override
+  public List<TxnToWriteId> replAllocateTableWriteIdsBatch(String dbName, String tableName,
+                                         String replPolicy, List<TxnToWriteId> srcTxnToWriteIdList)
throws TException {
+    AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(dbName, tableName);
+    rqst.setReplPolicy(replPolicy);
+    rqst.setSrcTxnToWriteIdList(srcTxnToWriteIdList);
+    return allocateTableWriteIdsBatchIntr(rqst);
+  }
+
+  private List<TxnToWriteId> allocateTableWriteIdsBatchIntr(AllocateTableWriteIdsRequest
rqst) throws TException {
+    return client.allocate_table_write_ids(rqst).getTxnToWriteIds();
   }
 
   @Override


Mime
View raw message