hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (Jira)" <j...@apache.org>
Subject [jira] [Work logged] (HIVE-23789) Merge ValidTxnManager into DriverTxnHandler
Date Wed, 01 Jul 2020 08:10:00 GMT

     [ https://issues.apache.org/jira/browse/HIVE-23789?focusedWorklogId=453291&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-453291
]

ASF GitHub Bot logged work on HIVE-23789:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Jul/20 08:09
            Start Date: 01/Jul/20 08:09
    Worklog Time Spent: 10m 
      Work Description: miklosgergely commented on a change in pull request #1194:
URL: https://github.com/apache/hive/pull/1194#discussion_r448191052



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
##########
@@ -288,15 +313,231 @@ private void acquireLocksInternal() throws CommandProcessorException,
LockExcept
     }
   }
 
-  public void addHiveLocksFromContext() {
+  /**
+   *  Write the current set of valid write ids for the operated acid tables into the configuration
so
+   *  that it can be read by the input format.
+   */
+  private ValidTxnWriteIdList recordValidWriteIds() throws LockException {
+    String txnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
+    if (Strings.isNullOrEmpty(txnString)) {
+      throw new IllegalStateException("calling recordValidWritsIdss() without initializing
ValidTxnList " +
+          JavaUtils.txnIdToString(driverContext.getTxnManager().getCurrentTxnId()));
+    }
+
+    ValidTxnWriteIdList txnWriteIds = getTxnWriteIds(txnString);
+    setValidWriteIds(txnWriteIds);
+
+    LOG.debug("Encoding valid txn write ids info {} txnid: {}", txnWriteIds.toString(),
+        driverContext.getTxnManager().getCurrentTxnId());
+    return txnWriteIds;
+  }
+
+  private ValidTxnWriteIdList getTxnWriteIds(String txnString) throws LockException {
+    List<String> txnTables = getTransactionalTables(getTables(true, true));
+    ValidTxnWriteIdList txnWriteIds = null;
+    if (driverContext.getCompactionWriteIds() != null) {
+      // This is kludgy: here we need to read with Compactor's snapshot/txn rather than the
snapshot of the current
+      // {@code txnMgr}, in effect simulating a "flashback query" but can't actually share
compactor's txn since it
+      // would run multiple statements.  See more comments in {@link org.apache.hadoop.hive.ql.txn.compactor.Worker}
+      // where it start the compactor txn*/
+      if (txnTables.size() != 1) {
+        throw new LockException("Unexpected tables in compaction: " + txnTables);
+      }
+      txnWriteIds = new ValidTxnWriteIdList(driverContext.getCompactorTxnId());
+      txnWriteIds.addTableValidWriteIdList(driverContext.getCompactionWriteIds());
+    } else {
+      txnWriteIds = driverContext.getTxnManager().getValidWriteIds(txnTables, txnString);
+    }
+    if (driverContext.getTxnType() == TxnType.READ_ONLY && !getTables(false, true).isEmpty())
{
+      throw new IllegalStateException(String.format(
+          "Inferred transaction type '%s' doesn't conform to the actual query string '%s'",
+          driverContext.getTxnType(), driverContext.getQueryState().getQueryString()));
+    }
+    return txnWriteIds;
+  }
+
+  private void setValidWriteIds(ValidTxnWriteIdList txnWriteIds) {
+    driverContext.getConf().set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, txnWriteIds.toString());
+    if (driverContext.getPlan().getFetchTask() != null) {
+      // This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} optimization
which initializes JobConf
+      // in FetchOperator before recordValidTxns() but this has to be done after locks are
acquired to avoid race
+      // conditions in ACID. This case is supported only for single source query.
+      Operator<?> source = driverContext.getPlan().getFetchTask().getWork().getSource();
+      if (source instanceof TableScanOperator) {
+        TableScanOperator tsOp = (TableScanOperator)source;
+        String fullTableName = AcidUtils.getFullTableName(tsOp.getConf().getDatabaseName(),
+            tsOp.getConf().getTableName());
+        ValidWriteIdList writeIdList = txnWriteIds.getTableValidWriteIdList(fullTableName);
+        if (tsOp.getConf().isTranscationalTable() && (writeIdList == null)) {
+          throw new IllegalStateException(String.format(
+              "ACID table: %s is missing from the ValidWriteIdList config: %s", fullTableName,
txnWriteIds.toString()));
+        }
+        if (writeIdList != null) {
+          driverContext.getPlan().getFetchTask().setValidWriteIdList(writeIdList.toString());
+        }
+      }
+    }
+  }
+
+  /**
+   * Checks whether txn list has been invalidated while planning the query.
+   * This would happen if query requires exclusive/semi-shared lock, and there has been a
committed transaction
+   * on the table over which the lock is required.
+   */
+  boolean isValidTxnListState() throws LockException {
+    // 1) Get valid txn list.
+    String txnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
+    if (txnString == null) {
+      return true; // Not a transactional op, nothing more to do
+    }
+
+    // 2) Get locks that are relevant:
+    // - Exclusive for INSERT OVERWRITE, when shared write is disabled (HiveConf.TXN_WRITE_X_LOCK=false).
+    // - Excl-write for UPDATE/DELETE, when shared write is disabled, INSERT OVERWRITE -
when enabled.
+    Set<String> nonSharedLockedTables = getNonSharedLockedTables();
+    if (nonSharedLockedTables.isEmpty()) {
+      return true; // Nothing to check
+    }
+
+    // 3) Get txn tables that are being written
+    String txnWriteIdListString = driverContext.getConf().get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
+    if (Strings.isNullOrEmpty(txnWriteIdListString)) {
+      return true; // Nothing to check
+    }
+
+    GetOpenTxnsResponse openTxns = driverContext.getTxnManager().getOpenTxns();
+    ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(openTxns, 0);
+    long txnId = driverContext.getTxnManager().getCurrentTxnId();
+
+    String currentTxnString;
+    if (validTxnList.isTxnRangeValid(txnId + 1, openTxns.getTxn_high_water_mark()) != ValidTxnList.RangeResponse.NONE)
{
+      // If here, there was another txn opened & committed between current SNAPSHOT generation
and locking.
+      validTxnList.removeException(txnId);
+      currentTxnString = validTxnList.toString();
+    } else {
+      currentTxnString = TxnCommonUtils.createValidReadTxnList(openTxns, txnId).toString();
+    }
+
+    if (currentTxnString.equals(txnString)) {
+      return true; // Still valid, nothing more to do
+    }
+    return checkWriteIds(currentTxnString, nonSharedLockedTables, txnWriteIdListString);
+  }
+
+  private Set<String> getNonSharedLockedTables() {
+    if (CollectionUtils.isEmpty(context.getHiveLocks())) {
+      return Collections.emptySet(); // Nothing to check
+    }
+
+    Set<String> nonSharedLockedTables = new HashSet<>();
+    for (HiveLock lock : context.getHiveLocks()) {
+      if (lock.mayContainComponents()) {
+        // The lock may have multiple components, e.g., DbHiveLock, hence we need to check
for each of them
+        for (LockComponent lockComponent : lock.getHiveLockComponents()) {
+          // We only consider tables for which we hold either an exclusive or a excl-write
lock
+          if ((lockComponent.getType() == LockType.EXCLUSIVE || lockComponent.getType() ==
LockType.EXCL_WRITE) &&
+              lockComponent.getTablename() != null && !DbTxnManager.GLOBAL_LOCKS.equals(lockComponent.getDbname()))
{
+            nonSharedLockedTables.add(TableName.getDbTable(lockComponent.getDbname(), lockComponent.getTablename()));
+          }
+        }
+      } else {
+        // The lock has a single components, e.g., SimpleHiveLock or ZooKeeperHiveLock.
+        // Pos 0 of lock paths array contains dbname, pos 1 contains tblname
+        if ((lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE || lock.getHiveLockMode() ==
HiveLockMode.SEMI_SHARED) &&
+            lock.getHiveLockObject().getPaths().length == 2) {
+          nonSharedLockedTables.add(
+              TableName.getDbTable(lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1]));
+        }
+      }
+    }
+    return nonSharedLockedTables;
+  }
+
+  private boolean checkWriteIds(String currentTxnString, Set<String> nonSharedLockedTables,
String txnWriteIdListString)
+      throws LockException {
+    ValidTxnWriteIdList txnWriteIdList = new ValidTxnWriteIdList(txnWriteIdListString);
+    Map<String, Table> writtenTables = getTables(false, true);
+
+    ValidTxnWriteIdList currentTxnWriteIds = driverContext.getTxnManager().getValidWriteIds(
+        getTransactionalTables(writtenTables), currentTxnString);
+
+    for (Map.Entry<String, Table> tableInfo : writtenTables.entrySet()) {
+      String fullQNameForLock = TableName.getDbTable(tableInfo.getValue().getDbName(),
+          MetaStoreUtils.encodeTableName(tableInfo.getValue().getTableName()));
+      if (nonSharedLockedTables.contains(fullQNameForLock)) {
+        // Check if table is transactional
+        if (AcidUtils.isTransactionalTable(tableInfo.getValue())) {
+          ValidWriteIdList writeIdList = txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey());
+          ValidWriteIdList currentWriteIdList = currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey());
+          // Check if there was a conflicting write between current SNAPSHOT generation and
locking.
+          if (currentWriteIdList.isWriteIdRangeValid(writeIdList.getHighWatermark() + 1,
+              currentWriteIdList.getHighWatermark()) != ValidWriteIdList.RangeResponse.NONE)
{
+            return false;
+          }
+          // Check that write id is still valid
+          if (!TxnIdUtils.checkEquivalentWriteIds(writeIdList, currentWriteIdList)) {
+            // Write id has changed, it is not valid anymore, we need to recompile
+            return false;
+          }
+        }
+        nonSharedLockedTables.remove(fullQNameForLock);
+      }
+    }
+
+    if (!nonSharedLockedTables.isEmpty()) {
+      throw new LockException("Wrong state: non-shared locks contain information for tables
that have not" +
+          " been visited when trying to validate the locks from query tables.\n" +
+          "Tables: " + writtenTables.keySet() + "\n" +
+          "Remaining locks after check: " + nonSharedLockedTables);
+    }
+
+    return true; // It passes the test, it is valid
+  }
+
+  private Map<String, Table> getTables(boolean inputNeeded, boolean outputNeeded) {
+    Map<String, Table> tables = new HashMap<>();
+    if (inputNeeded) {
+      driverContext.getPlan().getInputs().forEach(input -> addTableFromEntity(input, tables));
+    }
+    if (outputNeeded) {
+      driverContext.getPlan().getOutputs().forEach(output -> addTableFromEntity(output,
tables));
+    }
+    return tables;
+  }
+
+  private void addTableFromEntity(Entity entity, Map<String, Table> tables) {
+    Table table;
+    switch (entity.getType()) {
+    case TABLE:
+      table = entity.getTable();
+      break;
+    case PARTITION:
+    case DUMMYPARTITION:
+      table = entity.getPartition().getTable();
+      break;
+    default:
+      return;
+    }
+    String fullTableName = AcidUtils.getFullTableName(table.getDbName(), table.getTableName());
+    tables.put(fullTableName, table);
+  }
+
+  private List<String> getTransactionalTables(Map<String, Table> tables) {
+    return tables.entrySet().stream()
+      .filter(entry -> AcidUtils.isTransactionalTable(entry.getValue()))
+      .map(Map.Entry::getKey)
+      .collect(Collectors.toList());
+  }
+
+  void addHiveLocksFromContext() {

Review comment:
       I agree, but in a separate jira. This one is for merging the two classes only, so we'll
have a clean and easy to understand history.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 453291)
    Time Spent: 40m  (was: 0.5h)

> Merge ValidTxnManager into DriverTxnHandler
> -------------------------------------------
>
>                 Key: HIVE-23789
>                 URL: https://issues.apache.org/jira/browse/HIVE-23789
>             Project: Hive
>          Issue Type: Sub-task
>          Components: Hive
>            Reporter: Miklos Gergely
>            Assignee: Miklos Gergely
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message