From notifications-return-91278-archive-asf-public=cust-asf.ponee.io@asterixdb.apache.org Mon Mar 12 09:01:28 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2316918064D for ; Mon, 12 Mar 2018 09:01:26 +0100 (CET) Received: (qmail 30477 invoked by uid 500); 12 Mar 2018 08:01:25 -0000 Mailing-List: contact notifications-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list notifications@asterixdb.apache.org Received: (qmail 30468 invoked by uid 99); 12 Mar 2018 08:01:25 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Mar 2018 08:01:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 495601A4F36 for ; Mon, 12 Mar 2018 08:01:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.126 X-Spam-Level: ** X-Spam-Status: No, score=2.126 tagged_above=-999 required=6.31 tests=[MISSING_HEADERS=1.207, SPF_FAIL=0.919] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ym7-7cdihTOF for ; Mon, 12 Mar 2018 08:01:20 +0000 (UTC) Received: from vitalstatistix.ics.uci.edu (vitalstatistix.ics.uci.edu [128.195.52.38]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 9E3855F2C2 for ; Mon, 12 Mar 2018 08:01:19 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by vitalstatistix.ics.uci.edu (Postfix) with ESMTP id AE3701009CF; Mon, 12 Mar 2018 01:01:18 -0700 (PDT) Date: Mon, 12 Mar 2018 01:01:18 -0700 From: "Murtadha Hubail (Code Review)" CC: Jenkins , abdullah alamoudi , Ian Maxon , Till Westmann , Michael Blow , Luo Chen Reply-To: mhubail@apache.org X-Gerrit-MessageType: merged Subject: Change in asterixdb[master]: [NO ISSUE][TX] Ensure Uncommitted Atomic Txns Not Flushed X-Gerrit-Change-Id: If8d5df630f1d9119002ef91da5c282da18901acc X-Gerrit-ChangeURL: X-Gerrit-Commit: c5ca3db262e98ba17001bf295588b3862af2fc2b In-Reply-To: References: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.12.7 Message-Id: <20180312080118.AE3701009CF@vitalstatistix.ics.uci.edu> Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][TX] Ensure Uncommitted Atomic Txns Not Flushed ...................................................................... [NO ISSUE][TX] Ensure Uncommitted Atomic Txns Not Flushed - user model changes: no - storage format changes: no - interface changes: yes Details: - Make all metadata indexes modifications as force modifications. - Do not decrement ops of atomic transactions until they fully commit or abort to prevent flushing partial records. - Do not schedule flush if a force modification starts before the flush log is written to disk. - Unify code path for completing operations after commit/abort in op tracker. - Remove unneeded update log commit notification. - Add test case for failing flush due to force modification. Change-Id: If8d5df630f1d9119002ef91da5c282da18901acc Reviewed-on: https://asterix-gerrit.ics.uci.edu/2456 Sonar-Qube: Jenkins Tested-by: Jenkins Contrib: Jenkins Reviewed-by: abdullah alamoudi Integration-Tests: Jenkins --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java 9 files changed, 161 insertions(+), 73 deletions(-) Approvals: Anon. E. Moose #1000171: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java index 70e5f6e..0f6adf6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java @@ -19,8 +19,10 @@ package org.apache.asterix.test.metadata; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -31,19 +33,25 @@ import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.config.DatasetConfig; import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.context.DatasetInfo; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.api.IMetadataIndex; import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities; +import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.NodeGroup; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; +import org.apache.hyracks.test.support.TestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -245,6 +253,68 @@ } } + @Test + public void failedFlushOnUncommittedMetadataTxn() throws Exception { + ICcApplicationContext ccAppCtx = + (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext(); + final MetadataProvider metadataProvider = new MetadataProvider(ccAppCtx, null); + final MetadataTransactionContext mdTxn = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxn); + final String nodeGroupName = "ng"; + try { + final List ngNodes = Collections.singletonList("asterix_nc1"); + MetadataManager.INSTANCE.addNodegroup(mdTxn, new NodeGroup(nodeGroupName, ngNodes)); + MetadataManager.INSTANCE.commitTransaction(mdTxn); + } finally { + metadataProvider.getLocks().unlock(); + } + INcApplicationContext appCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext(); + IDatasetLifecycleManager dlcm = appCtx.getDatasetLifecycleManager(); + dlcm.flushAllDatasets(); + IMetadataIndex idx = MetadataPrimaryIndexes.NODEGROUP_DATASET; + DatasetInfo datasetInfo = dlcm.getDatasetInfo(idx.getDatasetId().getId()); + AbstractLSMIndex index = (AbstractLSMIndex) appCtx.getDatasetLifecycleManager() + .getIndex(idx.getDatasetId().getId(), idx.getResourceId()); + PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker(); + final MetadataTransactionContext mdTxn2 = MetadataManager.INSTANCE.beginTransaction(); + int mutableComponentBeforeFlush = index.getCurrentMemoryComponentIndex(); + int diskComponentsBeforeFlush = index.getDiskComponents().size(); + // lock opTracker to prevent log flusher from triggering flush + synchronized (opTracker) { + opTracker.setFlushOnExit(true); + opTracker.flushIfNeeded(); + Assert.assertTrue(opTracker.isFlushLogCreated()); + metadataProvider.setMetadataTxnContext(mdTxn2); + // make sure force operation will processed + MetadataManager.INSTANCE.dropNodegroup(mdTxn2, nodeGroupName, false); + Assert.assertEquals(1, opTracker.getNumActiveOperations()); + Assert.assertFalse(index.hasFlushRequestForCurrentMutableComponent()); + // release opTracker lock now to allow log flusher to schedule the flush + InvokeUtil.runWithTimeout(() -> { + synchronized (opTracker) { + opTracker.wait(1000); + } + }, () -> !opTracker.isFlushLogCreated(), 10, TimeUnit.SECONDS); + } + // ensure flush failed to be scheduled + datasetInfo.waitForIO(); + Assert.assertEquals(mutableComponentBeforeFlush, index.getCurrentMemoryComponentIndex()); + Assert.assertEquals(diskComponentsBeforeFlush, index.getDiskComponents().size()); + // after committing, the flush should be scheduled successfully + opTracker.setFlushOnExit(true); + MetadataManager.INSTANCE.commitTransaction(mdTxn2); + metadataProvider.getLocks().unlock(); + InvokeUtil.runWithTimeout(() -> { + synchronized (opTracker) { + opTracker.wait(1000); + } + }, () -> !opTracker.isFlushLogCreated(), 10, TimeUnit.SECONDS); + // ensure flush completed successfully and the component was switched + datasetInfo.waitForIO(); + Assert.assertNotEquals(mutableComponentBeforeFlush, index.getCurrentMemoryComponentIndex()); + Assert.assertEquals(diskComponentsBeforeFlush + 1, index.getDiskComponents().size()); + } + private void addDataset(ICcApplicationContext appCtx, Dataset source, int datasetPostfix, boolean abort) throws Exception { Dataset dataset = new Dataset(source.getDataverseName(), "ds_" + datasetPostfix, source.getDataverseName(), diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 3886115..47f7ae8 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -24,6 +24,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.transactions.AbstractOperationCallback; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; @@ -95,7 +96,7 @@ } public synchronized void flushIfNeeded() throws HyracksDataException { - if (numActiveOperations.get() == 0) { + if (canSafelyFlush()) { flushIfRequested(); } } @@ -117,7 +118,8 @@ } if (needsFlush || flushOnExit) { - //Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is scheduled. + // make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering + // them until the current flush is scheduled. LSMComponentId primaryId = null; for (ILSMIndex lsmIndex : indexes) { ILSMOperationTracker opTracker = lsmIndex.getOperationTracker(); @@ -137,7 +139,7 @@ LogRecord logRecord = new LogRecord(); flushOnExit = false; if (dsInfo.isDurable()) { - /** + /* * Generate a FLUSH log. * Flush will be triggered when the log is written to disk by LogFlusher. */ @@ -158,18 +160,30 @@ //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled. public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { - idGenerator.refresh(); - for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) { - //get resource - ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE); - //update resource lsn - AbstractLSMIOOperationCallback ioOpCallback = - (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); - ioOpCallback.updateLastLSN(logRecord.getLSN()); - //schedule flush after update - accessor.scheduleFlush(lsmIndex.getIOOperationCallback()); + try { + if (!canSafelyFlush()) { + // if a force modification operation started before the flush is scheduled, this flush will fail + // and a next attempt will be made when that operation completes. This is only expected for metadata + // datasets since they always use force modification + if (MetadataIndexImmutableProperties.isMetadataDataset(datasetID)) { + return; + } + throw new IllegalStateException("Operation started while index was pending scheduling a flush"); + } + idGenerator.refresh(); + for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) { + //get resource + ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE); + //update resource lsn + AbstractLSMIOOperationCallback ioOpCallback = + (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); + ioOpCallback.updateLastLSN(logRecord.getLSN()); + //schedule flush after update + accessor.scheduleFlush(lsmIndex.getIOOperationCallback()); + } + } finally { + flushLogCreated = false; } - flushLogCreated = false; } public int getNumActiveOperations() { @@ -194,14 +208,6 @@ } } - public void cleanupNumActiveOperationsForAbortedJob(int numberOfActiveOperations) { - numberOfActiveOperations *= -1; - numActiveOperations.getAndAdd(numberOfActiveOperations); - if (numActiveOperations.get() < 0) { - throw new IllegalStateException("The number of active operations cannot be negative!"); - } - } - public boolean isFlushOnExit() { return flushOnExit; } @@ -218,4 +224,7 @@ return partition; } + private boolean canSafelyFlush() { + return numActiveOperations.get() == 0; + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java index a3d5bc5..940535f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java @@ -23,9 +23,8 @@ /** * A typical transaction lifecycle goes through the following steps: - * 1. {@link ITransactionContext#register(long, ILSMIndex, IModificationOperationCallback, boolean)} + * 1. {@link ITransactionContext#register(long, int, ILSMIndex, IModificationOperationCallback, boolean)} * 2. {@link ITransactionContext#beforeOperation(long)} - * 3. {@link ITransactionContext#notifyUpdateCommitted(long)} * 4. {@link ITransactionContext#notifyEntityCommitted} * 5. {@link ITransactionContext#afterOperation(long)} * 6. {@link ITransactionContext#complete()} @@ -123,15 +122,6 @@ * @param resourceId */ void beforeOperation(long resourceId); - - /** - * Called to notify the transaction that an update log belonging - * to this transaction on index with {@code resourceId} has been - * flushed to disk. - * - * @param resourceId - */ - void notifyUpdateCommitted(long resourceId); /** * Called to notify the transaction that an entity commit diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index f81d7da..616d92b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -490,19 +490,17 @@ LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager()); switch (op) { case INSERT: - indexAccessor.insert(tuple); + indexAccessor.forceInsert(tuple); break; case DELETE: - indexAccessor.delete(tuple); + indexAccessor.forceDelete(tuple); break; case UPSERT: - indexAccessor.upsert(tuple); + indexAccessor.forceUpsert(tuple); break; default: throw new IllegalStateException("Unknown operation type: " + op); } - PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) lsmIndex.getOperationTracker(); - opTracker.flushIfNeeded(); // there is a window where the flush is not triggerred after an operation } finally { datasetLifecycleManager.close(resourceName); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index a630caa..21268e5 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -226,10 +226,6 @@ if (txnSubsystem.getTransactionProperties().isCommitProfilerEnabled()) { txnSubsystem.incrementEntityCommitCount(); } - } else if (logRecord.getLogType() == LogType.UPDATE) { - reusableTxnId.setId(logRecord.getTxnId()); - txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId); - txnCtx.notifyUpdateCommitted(logRecord.getResourceId()); } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { notifyJobTermination(); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java index b3d5e49..95cabf9 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java @@ -107,8 +107,8 @@ @Override public void complete() { try { - if (txnState.get() == ITransactionManager.ABORTED) { - cleanupForAbort(); + if (isWriteTxn()) { + cleanup(); } } finally { synchronized (txnOpTrackers) { @@ -141,5 +141,5 @@ return sb.toString(); } - protected abstract void cleanupForAbort(); + protected abstract void cleanup(); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java index 219cf07..079e99a 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java @@ -22,8 +22,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -57,16 +57,6 @@ } @Override - public void notifyUpdateCommitted(long resourceId) { - try { - opTrackers.get(resourceId).completeOperation(null, LSMOperationType.MODIFICATION, null, - callbacks.get(resourceId)); - } catch (HyracksDataException e) { - throw new ACIDException(e); - } - } - - @Override public void notifyEntityCommitted(int partition) { throw new IllegalStateException("Unexpected entity commit in atomic transaction"); } @@ -82,10 +72,26 @@ } @Override - public void cleanupForAbort() { - // each opTracker should be cleaned - opTrackers.forEach((resId, opTracker) -> ((PrimaryIndexOperationTracker) opTracker) - .cleanupNumActiveOperationsForAbortedJob(indexPendingOps.get(resId).get())); + public void cleanup() { + switch (getTxnState()) { + case ITransactionManager.ABORTED: + case ITransactionManager.COMMITTED: + for (Map.Entry opTracker : opTrackers.entrySet()) { + try { + final long resId = opTracker.getKey(); + final int idxPendingOps = indexPendingOps.get(resId).intValue(); + for (int i = 0; i < idxPendingOps; i++) { + opTracker.getValue().completeOperation(null, LSMOperationType.FORCE_MODIFICATION, null, + callbacks.get(resId)); + } + } catch (HyracksDataException e) { + throw new ACIDException(e); + } + } + break; + default: + throw new IllegalStateException("invalid state in txn clean up: " + getTxnState()); + } } @Override diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java index 9d2f54b..9fcb08b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java @@ -20,11 +20,11 @@ import java.util.HashMap; import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -56,8 +56,7 @@ resourcePendingOps.put(resourceId, pendingOps); if (primaryIndex) { Pair pair = - new Pair( - (PrimaryIndexOperationTracker) index.getOperationTracker(), callback); + new Pair<>((PrimaryIndexOperationTracker) index.getOperationTracker(), callback); primaryIndexTrackers.put(partition, pair); } } @@ -66,11 +65,6 @@ @Override public void beforeOperation(long resourceId) { resourcePendingOps.get(resourceId).incrementAndGet(); - } - - @Override - public void notifyUpdateCommitted(long resourceId) { - // no op } @Override @@ -90,11 +84,18 @@ } @Override - protected void cleanupForAbort() { - for (Entry> e : primaryIndexTrackers - .entrySet()) { - AtomicInteger pendingOps = partitionPendingOps.get(e.getKey()); - e.getValue().first.cleanupNumActiveOperationsForAbortedJob(pendingOps.get()); + protected void cleanup() { + if (getTxnState() == ITransactionManager.ABORTED) { + primaryIndexTrackers.forEach((partitionId, opTracker) -> { + int pendingOps = partitionPendingOps.get(partitionId).intValue(); + for (int i = 0; i < pendingOps; i++) { + try { + opTracker.first.completeOperation(null, LSMOperationType.MODIFICATION, null, opTracker.second); + } catch (HyracksDataException ex) { + throw new ACIDException(ex); + } + } + }); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java index c60e673..fb2bdeb 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@ -22,6 +22,8 @@ import java.nio.channels.ClosedByInterruptException; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BooleanSupplier; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.util.IOInterruptibleAction; @@ -234,4 +236,20 @@ } } } + + /** + * Runs the supplied {@code action} until {@code stopCondition} is met or timeout. + */ + public static void runWithTimeout(ThrowingAction action, BooleanSupplier stopCondition, long timeout, TimeUnit unit) + throws Exception { + long remainingTime = unit.toNanos(timeout); + final long startTime = System.nanoTime(); + while (!stopCondition.getAsBoolean()) { + if (remainingTime <= 0) { + throw new TimeoutException("Stop condition was not met after " + unit.toSeconds(timeout) + " seconds."); + } + action.run(); + remainingTime -= System.nanoTime() - startTime; + } + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2456 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: If8d5df630f1d9119002ef91da5c282da18901acc Gerrit-PatchSet: 9 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon Gerrit-Reviewer: Jenkins Gerrit-Reviewer: Luo Chen Gerrit-Reviewer: Michael Blow Gerrit-Reviewer: Murtadha Hubail Gerrit-Reviewer: Till Westmann Gerrit-Reviewer: abdullah alamoudi