From issues-return-194809-archive-asf-public=cust-asf.ponee.io@hive.apache.org Wed Jul 1 07:55:02 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3D062180665 for ; Wed, 1 Jul 2020 09:55:02 +0200 (CEST) Received: (qmail 81990 invoked by uid 500); 1 Jul 2020 07:55:01 -0000 Mailing-List: contact issues-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hive.apache.org Delivered-To: mailing list issues@hive.apache.org Received: (qmail 81977 invoked by uid 99); 1 Jul 2020 07:55:01 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Jul 2020 07:55:01 +0000 Received: from jira-he-de.apache.org (static.172.67.40.188.clients.your-server.de [188.40.67.172]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A50ED42284 for ; Wed, 1 Jul 2020 07:55:00 +0000 (UTC) Received: from jira-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira-he-de.apache.org (ASF Mail Server at jira-he-de.apache.org) with ESMTP id 19FA678021A for ; Wed, 1 Jul 2020 07:55:00 +0000 (UTC) Date: Wed, 1 Jul 2020 07:55:00 +0000 (UTC) From: "ASF GitHub Bot (Jira)" To: issues@hive.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Work logged] (HIVE-23789) Merge ValidTxnManager into DriverTxnHandler MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/HIVE-23789?focusedWorklogId=453281&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-453281 ] ASF GitHub Bot logged work on HIVE-23789: ----------------------------------------- Author: ASF GitHub Bot Created on: 01/Jul/20 07:54 Start Date: 01/Jul/20 07:54 Worklog Time Spent: 10m Work Description: pvary commented on a change in pull request #1194: URL: https://github.com/apache/hive/pull/1194#discussion_r448183060 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java ########## @@ -288,15 +313,231 @@ private void acquireLocksInternal() throws CommandProcessorException, LockExcept } } - public void addHiveLocksFromContext() { + /** + * Write the current set of valid write ids for the operated acid tables into the configuration so + * that it can be read by the input format. + */ + private ValidTxnWriteIdList recordValidWriteIds() throws LockException { + String txnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); + if (Strings.isNullOrEmpty(txnString)) { + throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " + + JavaUtils.txnIdToString(driverContext.getTxnManager().getCurrentTxnId())); + } + + ValidTxnWriteIdList txnWriteIds = getTxnWriteIds(txnString); + setValidWriteIds(txnWriteIds); + + LOG.debug("Encoding valid txn write ids info {} txnid: {}", txnWriteIds.toString(), + driverContext.getTxnManager().getCurrentTxnId()); + return txnWriteIds; + } + + private ValidTxnWriteIdList getTxnWriteIds(String txnString) throws LockException { + List txnTables = getTransactionalTables(getTables(true, true)); + ValidTxnWriteIdList txnWriteIds = null; + if (driverContext.getCompactionWriteIds() != null) { + // This is kludgy: here we need to read with Compactor's snapshot/txn rather than the snapshot of the current + // {@code txnMgr}, in effect simulating a "flashback query" but can't actually share compactor's txn since it + // would run multiple statements. See more comments in {@link org.apache.hadoop.hive.ql.txn.compactor.Worker} + // where it start the compactor txn*/ + if (txnTables.size() != 1) { + throw new LockException("Unexpected tables in compaction: " + txnTables); + } + txnWriteIds = new ValidTxnWriteIdList(driverContext.getCompactorTxnId()); + txnWriteIds.addTableValidWriteIdList(driverContext.getCompactionWriteIds()); + } else { + txnWriteIds = driverContext.getTxnManager().getValidWriteIds(txnTables, txnString); + } + if (driverContext.getTxnType() == TxnType.READ_ONLY && !getTables(false, true).isEmpty()) { + throw new IllegalStateException(String.format( + "Inferred transaction type '%s' doesn't conform to the actual query string '%s'", + driverContext.getTxnType(), driverContext.getQueryState().getQueryString())); + } + return txnWriteIds; + } + + private void setValidWriteIds(ValidTxnWriteIdList txnWriteIds) { + driverContext.getConf().set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, txnWriteIds.toString()); + if (driverContext.getPlan().getFetchTask() != null) { + // This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} optimization which initializes JobConf + // in FetchOperator before recordValidTxns() but this has to be done after locks are acquired to avoid race + // conditions in ACID. This case is supported only for single source query. + Operator source = driverContext.getPlan().getFetchTask().getWork().getSource(); + if (source instanceof TableScanOperator) { + TableScanOperator tsOp = (TableScanOperator)source; + String fullTableName = AcidUtils.getFullTableName(tsOp.getConf().getDatabaseName(), + tsOp.getConf().getTableName()); + ValidWriteIdList writeIdList = txnWriteIds.getTableValidWriteIdList(fullTableName); + if (tsOp.getConf().isTranscationalTable() && (writeIdList == null)) { + throw new IllegalStateException(String.format( + "ACID table: %s is missing from the ValidWriteIdList config: %s", fullTableName, txnWriteIds.toString())); + } + if (writeIdList != null) { + driverContext.getPlan().getFetchTask().setValidWriteIdList(writeIdList.toString()); + } + } + } + } + + /** + * Checks whether txn list has been invalidated while planning the query. + * This would happen if query requires exclusive/semi-shared lock, and there has been a committed transaction + * on the table over which the lock is required. + */ + boolean isValidTxnListState() throws LockException { + // 1) Get valid txn list. + String txnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); + if (txnString == null) { + return true; // Not a transactional op, nothing more to do + } + + // 2) Get locks that are relevant: + // - Exclusive for INSERT OVERWRITE, when shared write is disabled (HiveConf.TXN_WRITE_X_LOCK=false). + // - Excl-write for UPDATE/DELETE, when shared write is disabled, INSERT OVERWRITE - when enabled. + Set nonSharedLockedTables = getNonSharedLockedTables(); + if (nonSharedLockedTables.isEmpty()) { + return true; // Nothing to check + } + + // 3) Get txn tables that are being written + String txnWriteIdListString = driverContext.getConf().get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY); + if (Strings.isNullOrEmpty(txnWriteIdListString)) { + return true; // Nothing to check + } + + GetOpenTxnsResponse openTxns = driverContext.getTxnManager().getOpenTxns(); + ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(openTxns, 0); + long txnId = driverContext.getTxnManager().getCurrentTxnId(); + + String currentTxnString; + if (validTxnList.isTxnRangeValid(txnId + 1, openTxns.getTxn_high_water_mark()) != ValidTxnList.RangeResponse.NONE) { + // If here, there was another txn opened & committed between current SNAPSHOT generation and locking. + validTxnList.removeException(txnId); + currentTxnString = validTxnList.toString(); + } else { + currentTxnString = TxnCommonUtils.createValidReadTxnList(openTxns, txnId).toString(); + } + + if (currentTxnString.equals(txnString)) { + return true; // Still valid, nothing more to do + } + return checkWriteIds(currentTxnString, nonSharedLockedTables, txnWriteIdListString); + } + + private Set getNonSharedLockedTables() { + if (CollectionUtils.isEmpty(context.getHiveLocks())) { + return Collections.emptySet(); // Nothing to check + } + + Set nonSharedLockedTables = new HashSet<>(); + for (HiveLock lock : context.getHiveLocks()) { + if (lock.mayContainComponents()) { + // The lock may have multiple components, e.g., DbHiveLock, hence we need to check for each of them + for (LockComponent lockComponent : lock.getHiveLockComponents()) { + // We only consider tables for which we hold either an exclusive or a excl-write lock + if ((lockComponent.getType() == LockType.EXCLUSIVE || lockComponent.getType() == LockType.EXCL_WRITE) && + lockComponent.getTablename() != null && !DbTxnManager.GLOBAL_LOCKS.equals(lockComponent.getDbname())) { + nonSharedLockedTables.add(TableName.getDbTable(lockComponent.getDbname(), lockComponent.getTablename())); + } + } + } else { + // The lock has a single components, e.g., SimpleHiveLock or ZooKeeperHiveLock. + // Pos 0 of lock paths array contains dbname, pos 1 contains tblname + if ((lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE || lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) && + lock.getHiveLockObject().getPaths().length == 2) { + nonSharedLockedTables.add( + TableName.getDbTable(lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1])); + } + } + } + return nonSharedLockedTables; + } + + private boolean checkWriteIds(String currentTxnString, Set nonSharedLockedTables, String txnWriteIdListString) + throws LockException { + ValidTxnWriteIdList txnWriteIdList = new ValidTxnWriteIdList(txnWriteIdListString); + Map writtenTables = getTables(false, true); + + ValidTxnWriteIdList currentTxnWriteIds = driverContext.getTxnManager().getValidWriteIds( + getTransactionalTables(writtenTables), currentTxnString); + + for (Map.Entry tableInfo : writtenTables.entrySet()) { + String fullQNameForLock = TableName.getDbTable(tableInfo.getValue().getDbName(), + MetaStoreUtils.encodeTableName(tableInfo.getValue().getTableName())); + if (nonSharedLockedTables.contains(fullQNameForLock)) { + // Check if table is transactional + if (AcidUtils.isTransactionalTable(tableInfo.getValue())) { + ValidWriteIdList writeIdList = txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey()); + ValidWriteIdList currentWriteIdList = currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey()); + // Check if there was a conflicting write between current SNAPSHOT generation and locking. + if (currentWriteIdList.isWriteIdRangeValid(writeIdList.getHighWatermark() + 1, + currentWriteIdList.getHighWatermark()) != ValidWriteIdList.RangeResponse.NONE) { + return false; + } + // Check that write id is still valid + if (!TxnIdUtils.checkEquivalentWriteIds(writeIdList, currentWriteIdList)) { + // Write id has changed, it is not valid anymore, we need to recompile + return false; + } + } + nonSharedLockedTables.remove(fullQNameForLock); + } + } + + if (!nonSharedLockedTables.isEmpty()) { + throw new LockException("Wrong state: non-shared locks contain information for tables that have not" + + " been visited when trying to validate the locks from query tables.\n" + + "Tables: " + writtenTables.keySet() + "\n" + + "Remaining locks after check: " + nonSharedLockedTables); + } + + return true; // It passes the test, it is valid + } + + private Map getTables(boolean inputNeeded, boolean outputNeeded) { + Map tables = new HashMap<>(); + if (inputNeeded) { + driverContext.getPlan().getInputs().forEach(input -> addTableFromEntity(input, tables)); + } + if (outputNeeded) { + driverContext.getPlan().getOutputs().forEach(output -> addTableFromEntity(output, tables)); + } + return tables; + } + + private void addTableFromEntity(Entity entity, Map tables) { + Table table; + switch (entity.getType()) { + case TABLE: + table = entity.getTable(); + break; + case PARTITION: + case DUMMYPARTITION: + table = entity.getPartition().getTable(); + break; + default: + return; + } + String fullTableName = AcidUtils.getFullTableName(table.getDbName(), table.getTableName()); + tables.put(fullTableName, table); + } + + private List getTransactionalTables(Map tables) { + return tables.entrySet().stream() + .filter(entry -> AcidUtils.isTransactionalTable(entry.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + void addHiveLocksFromContext() { Review comment: I do not like storing lock related stuff in context. Shouldn't this be private to the DbTxnHandler? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 453281) Time Spent: 20m (was: 10m) > Merge ValidTxnManager into DriverTxnHandler > ------------------------------------------- > > Key: HIVE-23789 > URL: https://issues.apache.org/jira/browse/HIVE-23789 > Project: Hive > Issue Type: Sub-task > Components: Hive > Reporter: Miklos Gergely > Assignee: Miklos Gergely > Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)