Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 75657200D27 for ; Wed, 11 Oct 2017 04:29:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 73DBB160BE1; Wed, 11 Oct 2017 02:29:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E9EEB160BE0 for ; Wed, 11 Oct 2017 04:29:14 +0200 (CEST) Received: (qmail 27785 invoked by uid 500); 11 Oct 2017 02:29:14 -0000 Mailing-List: contact notifications-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list notifications@asterixdb.apache.org Received: (qmail 27775 invoked by uid 99); 11 Oct 2017 02:29:14 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Oct 2017 02:29:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 0B0BB1959BD for ; Wed, 11 Oct 2017 02:29:13 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.072 X-Spam-Level: **** X-Spam-Status: No, score=4.072 tagged_above=-999 required=6.31 tests=[MISSING_HEADERS=1.207, REPLYTO_WITHOUT_TO_CC=1.946, SPF_FAIL=0.919] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id IzqqCGPUJhHA for ; Wed, 11 Oct 2017 02:28:52 +0000 (UTC) Received: from vitalstatistix.ics.uci.edu (vitalstatistix.ics.uci.edu [128.195.52.38]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 272B65F659 for ; Wed, 11 Oct 2017 02:28:51 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by vitalstatistix.ics.uci.edu (Postfix) with ESMTP id 7C68110072A; Tue, 10 Oct 2017 19:28:50 -0700 (PDT) Date: Tue, 10 Oct 2017 19:28:50 -0700 From: "Luo Chen (Code Review)" Message-ID: Reply-To: cluo8@uci.edu X-Gerrit-MessageType: newchange Subject: Change in asterixdb[master]: [ASTERIXDB-2103][STO] Too many disk components for Correlate... X-Gerrit-Change-Id: I8c9bac74a7f4fed3a424e239acd352d074a270f3 X-Gerrit-ChangeURL: X-Gerrit-Commit: 3455a58f5d7a30e799ebc6aae4648fe51b9a04bb MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.12.7 archived-at: Wed, 11 Oct 2017 02:29:17 -0000 Luo Chen has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2066 Change subject: [ASTERIXDB-2103][STO] Too many disk components for CorrelatedPolicy ...................................................................... [ASTERIXDB-2103][STO] Too many disk components for CorrelatedPolicy - user model changes: no - storage format changes: no - interface changes: yes Details: Currently CorrelatedMergePolicy uses component Ids to ensure disk components of primary and secondary indexes are merged together, but without synchronization. However, this results in too many disk components for secondary InvertedIndex. The reason is that secondary index could miss some round of merges, if the merge policy finds out the corresponding secondary components are not available (either being merged or being flushed). Even though flow-control on secondary indexes can guarantee the secondary index would catch up the next time, it is still possible that the primary component is finialized, which leaves the secondary components which miss this round of merge are never merged again. This patch fixes this bug by: - Add the mechanism of depending operations to LSM IO operation. An operation finishes only after all depending operations have finished. - For correlated merge policy, the flush/merge of the primary index depends on all flushes/merges of secondary indexes. This ensures when the correlated policy schedules merge, all related components of all indexes are available to merge. Change-Id: I8c9bac74a7f4fed3a424e239acd352d074a270f3 --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java 55 files changed, 435 insertions(+), 153 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/66/2066/1 diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index 76bec8c..5e0e072 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -132,9 +132,9 @@ public void createIndex() throws Exception { List> partitioningKeys = new ArrayList<>(); partitioningKeys.add(Collections.singletonList("key")); - dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, - NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, - partitioningKeys, null, null, null, false, null, false), + dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null, + null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, partitioningKeys, null, null, null, + false, null, false), null, DatasetType.INTERNAL, DATASET_ID, 0); PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST); @@ -448,7 +448,7 @@ for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); merger.waitUntilCount(1); // now that we enetered, we will rollback Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn)); @@ -635,7 +635,7 @@ for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); @@ -707,7 +707,7 @@ for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); @@ -736,7 +736,7 @@ } private class Rollerback { - private Thread task; + private final Thread task; private Exception failure; public Rollerback(TestLsmBtree lsmBtree, Predicate predicate) { @@ -766,7 +766,7 @@ } private class Searcher { - private ExecutorService executor = Executors.newSingleThreadExecutor(); + private final ExecutorService executor = Executors.newSingleThreadExecutor(); private Future task; private volatile boolean entered = false; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index a20e660..e18181c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -31,6 +31,7 @@ import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy; @@ -91,14 +92,17 @@ Set indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos(); int partition = getIndexPartition(index, indexInfos); - triggerScheduledMerge(minID, maxID, + List dependingMerges = scheduleSecondaryIndexes(minID, maxID, indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet())); + + schedulePrimaryIndex(minID, maxID, index, dependingMerges); + return true; } /** * Submit merge requests for all disk components within [minID, maxID] - * of all indexes of a given dataset in the given partition + * of all of secondary indexes of a given dataset in the given partition * * @param minID * @param maxID @@ -106,17 +110,39 @@ * @param indexInfos * @throws HyracksDataException */ - private void triggerScheduledMerge(long minID, long maxID, Set indexInfos) throws HyracksDataException { + private List scheduleSecondaryIndexes(long minID, long maxID, Set indexInfos) + throws HyracksDataException { + List mergeOps = new ArrayList<>(); for (IndexInfo info : indexInfos) { ILSMIndex lsmIndex = info.getIndex(); - - List immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents()); - if (isMergeOngoing(immutableComponents)) { + List diskComponents = lsmIndex.getDiskComponents(); + if (lsmIndex.isPrimaryIndex() || isMergeOngoing(diskComponents)) { continue; } - List mergableComponents = new ArrayList<>(); - for (ILSMDiskComponent component : immutableComponents) { - ILSMDiskComponentId id = component.getComponentId(); + List mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents); + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + mergeOps.add(accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergeableComponents, null)); + } + return mergeOps; + } + + private void schedulePrimaryIndex(long minID, long maxID, ILSMIndex primaryIndex, + List dependingMerges) throws HyracksDataException { + assert primaryIndex.isPrimaryIndex(); + List diskComponents = primaryIndex.getDiskComponents(); + List mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents); + ILSMIndexAccessor accessor = + primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + accessor.scheduleMerge(primaryIndex.getIOOperationCallback(), mergeableComponents, dependingMerges); + } + + private List collectMergeableComponents(long minID, long maxID, + List diskComponents) throws HyracksDataException { + List mergableComponents = new ArrayList<>(); + for (ILSMDiskComponent component : diskComponents) { + ILSMDiskComponentId id = component.getComponentId(); + if (!id.notFound()) { if (id.getMinId() >= minID && id.getMaxId() <= maxID) { mergableComponents.add(component); } @@ -126,10 +152,8 @@ break; } } - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); } + return mergableComponents; } private int getIndexPartition(ILSMIndex index, Set indexInfos) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java index 71d4a96..0df8dcc 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java @@ -34,6 +34,7 @@ private boolean isRegistered; private boolean memoryAllocated; private boolean durable; + private boolean correlated; public DatasetInfo(int datasetID) { this.indexes = new HashMap<>(); @@ -41,6 +42,7 @@ this.datasetID = datasetID; this.setRegistered(false); this.setMemoryAllocated(false); + this.setCorrelated(false); } @Override @@ -195,4 +197,12 @@ public void setLastAccess(long lastAccess) { this.lastAccess = lastAccess; } + + public void setCorrelated(boolean correlated) { + this.correlated = correlated; + } + + public boolean isCorrelated() { + return correlated; + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 37bd789..ad9f6a5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -223,7 +223,7 @@ if (iInfo.isOpen()) { ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); + accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback(), null); } // Wait for the above flush op. @@ -417,16 +417,22 @@ } if (asyncFlush) { - for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - ILSMIndexAccessor accessor = - iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); - } + PrimaryIndexOperationTracker.flushDatasetIndexes(dsInfo.getDatsetIndexInfos(), dsInfo.isCorrelated()); } else { + List primaryIndexes = new ArrayList<>(); for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - // TODO: This is not efficient since we flush the indexes sequentially. - // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this - // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. + if (iInfo.getIndex().isPrimaryIndex()) { + // primary indexes are flushed later to guarantee the correctness of the correlated merge policy + primaryIndexes.add(iInfo); + } else { + // TODO: This is not efficient since we flush the indexes sequentially. + // Think of a way to allow submitting the flush requests concurrently. + // We don't do them concurrently because this may lead to a deadlock scenario + // between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. + flushAndWaitForIO(dsInfo, iInfo); + } + } + for (IndexInfo iInfo : primaryIndexes) { flushAndWaitForIO(dsInfo, iInfo); } } @@ -591,4 +597,5 @@ } } } + } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index f2f3b93..5eb3c02 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -23,6 +23,7 @@ import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.LocalResource; @@ -95,6 +96,10 @@ datasetInfo.setExternal(!index.hasMemoryComponents()); datasetInfo.setRegistered(true); datasetInfo.setDurable(((ILSMIndex) index).isDurable()); + //TODO use a general mechanism to set correlated property when we have more + // correlated merge policies + datasetInfo.setCorrelated( + ((AbstractLSMIndex) index).getMergePolicy() instanceof CorrelatedPrefixMergePolicy); } } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 67b25b6..4eb1c3a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -19,6 +19,9 @@ package org.apache.asterix.common.context; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -31,6 +34,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; @@ -141,17 +145,16 @@ //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled. public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { - for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) { - //get resource - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + Set indexInfos = dsInfo.getDatsetIndexInfos(); + for (IndexInfo iInfo : indexInfos) { //update resource lsn AbstractLSMIOOperationCallback ioOpCallback = - (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); + (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); ioOpCallback.updateLastLSN(logRecord.getLSN()); - //schedule flush after update - accessor.scheduleFlush(lsmIndex.getIOOperationCallback()); } + + flushDatasetIndexes(indexInfos, dsInfo.isCorrelated()); + flushLogCreated = false; } @@ -198,4 +201,63 @@ return flushLogCreated; } + public static void flushDatasetIndexes(Set indexes, boolean correlated) throws HyracksDataException { + if (!correlated) { + // if not correlated, we simply schedule flushes of each index independently + for (IndexInfo iInfo : indexes) { + ILSMIndex lsmIndex = iInfo.getIndex(); + //get resource + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + //schedule flush after update + accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null); + } + } else { + // otherwise, we need to schedule indexes properly s.t. the primary index would depend on + // all secondary indexes in the same partition + + // collect partitions + Set partitions = new HashSet<>(); + indexes.forEach(iInfo -> partitions.add(iInfo.getPartition())); + for (Integer partition : partitions) { + flushCorrelatedDatasetIndexes(indexes, partition); + } + + } + } + + private static void flushCorrelatedDatasetIndexes(Set indexes, int partition) + throws HyracksDataException { + ILSMIndex primaryIndex = null; + List flushOps = new ArrayList<>(); + for (IndexInfo iInfo : indexes) { + if (iInfo.getPartition() != partition) { + continue; + } + ILSMIndex lsmIndex = iInfo.getIndex(); + if (lsmIndex.isPrimaryIndex()) { + primaryIndex = lsmIndex; + } else { + //get resource + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + //schedule flush + ILSMIOOperation flushOp = accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null); + if (flushOp != null) { + flushOps.add(flushOp); + } + } + } + + if (primaryIndex != null) { + //get resource + ILSMIndexAccessor accessor = + primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + //schedule flush after update + accessor.scheduleFlush(primaryIndex.getIOOperationCallback(), flushOps); + + } + + } + } diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java index 9f071bb..ee795d5 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java @@ -236,7 +236,7 @@ return null; } }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class), - Mockito.anyListOf(ILSMDiskComponent.class)); + Mockito.anyListOf(ILSMDiskComponent.class), Mockito.any()); Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class), Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index 3775985..f5c3013 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -56,6 +56,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor.ICursorFactory; +import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; import org.apache.hyracks.storage.common.IIndexBulkLoader; import org.apache.hyracks.storage.common.IIndexCursor; @@ -170,7 +171,7 @@ // The only reason to override the following method is that it uses a different context object // in addition, determining whether or not to keep deleted tuples is different here @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ExternalBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, -1); opCtx.setOperation(IndexOperation.MERGE); @@ -195,9 +196,11 @@ LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName()); ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); - ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, cursor, + MergeOperation mergeOp = new LSMBTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath())); + callback, fileManager.getBaseDir().getAbsolutePath(), ctx.getDependingOps()); + ioScheduler.scheduleOperation(mergeOp); + return mergeOp; } // This function should only be used when a transaction fail. it doesn't @@ -369,7 +372,7 @@ // Not supported @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-BTree"); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index ff17905..071a4bd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -319,7 +319,7 @@ } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw HyracksDataException.create(ErrorCode.FLUSH_NOT_SUPPORTED_IN_EXTERNAL_INDEX); } @@ -342,7 +342,7 @@ } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexOperationContext bctx = createOpContext(NoOpOperationCallback.INSTANCE, 0); bctx.setOperation(IndexOperation.MERGE); @@ -363,11 +363,12 @@ keepDeleteTuples = mergingComponents.get(mergingComponents.size() - 1) != secondDiskComponents .get(secondDiskComponents.size() - 1); } - - ioScheduler.scheduleOperation( + ILSMIOOperation mergeOp = new LSMBTreeWithBuddyMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples)); + callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples, ctx.getDependingOps()); + ioScheduler.scheduleOperation(mergeOp); + return mergeOp; } // This method creates the appropriate opContext for the targeted version diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index c7d45e1..00a489e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -462,7 +462,8 @@ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) { ILSMIndexAccessor accessor = createAccessor(opCtx); return new LSMBTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), - componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), + opCtx.getDependingOps()); } @Override @@ -611,6 +612,7 @@ } ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples); return new LSMBTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), - mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), + opCtx.getDependingOps()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java index e3424e5..92b53de 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java @@ -18,7 +18,10 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -27,8 +30,9 @@ private final FileReference bloomFilterFlushTarget; public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, - FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, flushTarget, callback, indexIdentifier); + FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier, + List dependingOps) { + super(accessor, flushTarget, callback, indexIdentifier, dependingOps); this.bloomFilterFlushTarget = bloomFilterFlushTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java index ec96303..40ac7b1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java @@ -19,8 +19,11 @@ package org.apache.hyracks.storage.am.lsm.btree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -30,8 +33,9 @@ private final FileReference bloomFilterMergeTarget; public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, - FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, target, callback, indexIdentifier, cursor); + FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier, + List dependingOps) { + super(accessor, target, callback, indexIdentifier, cursor, dependingOps); this.bloomFilterMergeTarget = bloomFilterMergeTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java index f682bde..e0c1512 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java @@ -18,8 +18,11 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -32,8 +35,8 @@ public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, FileReference buddyBtreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, - String indexIdentifier, boolean keepDeletedTuples) { - super(accessor, target, callback, indexIdentifier, cursor); + String indexIdentifier, boolean keepDeletedTuples, List dependingOps) { + super(accessor, target, callback, indexIdentifier, cursor, dependingOps); this.buddyBtreeMergeTarget = buddyBtreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; this.keepDeletedTuples = keepDeletedTuples; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 89c8cb9..380b2f2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -105,10 +105,12 @@ * * @param ctx * @param callback + * @return The scheduled merge operation, used for the caller to track its status * @throws HyracksDataException * @throws IndexException */ - void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; /** * Schedule full merge @@ -135,9 +137,11 @@ * * @param ctx * @param callback + * @return The scheduled flush operation, used for the caller to track its status * @throws HyracksDataException */ - void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; /** * Perform a flush diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java index c2ae786..ff1613b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.common.api; +import java.util.List; import java.util.concurrent.Callable; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -63,7 +64,19 @@ FileReference getTarget(); /** + * <<<<<<< HEAD + * * @return the accessor of the operation */ ILSMIndexAccessor getAccessor(); + + /** + * @return whether this operation has finished + */ + boolean isFinished(); + + /** + * @return a list of operations that this operation depends on + */ + List getDependingOps(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java index addeb27..5f43bcd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java @@ -67,11 +67,13 @@ public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException; - void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException; - void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java index b8d64af..b303a39 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java @@ -46,9 +46,13 @@ * * @param callback * the IO operation callback + * @param dependingOps + * other operations that this operation depends on + * @return The scheduled flush operation * @throws HyracksDataException */ - void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List dependingOps) + throws HyracksDataException; /** * Schedule a merge operation @@ -57,11 +61,13 @@ * the merge operation callback * @param components * the components to be merged + * @param dependingOps + * other operations that this operation depends on + * @return The scheduled merge operation * @throws HyracksDataException - * @throws IndexException */ - void scheduleMerge(ILSMIOOperationCallback callback, List components) - throws HyracksDataException; + ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List components, + List dependingOps) throws HyracksDataException; /** * Schedule a full merge diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java index 5b0378a..66af93f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java @@ -56,4 +56,8 @@ PermutingTupleReference getFilterTuple(); MultiComparator getFilterCmp(); + + List getDependingOps(); + + void setDependingOps(List dependingOps); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java index aee46f0..1d13c94 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java @@ -18,6 +18,10 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; @@ -30,13 +34,32 @@ protected final FileReference target; protected final ILSMIOOperationCallback callback; protected final String indexIdentifier; + protected final List dependingOps; + + protected AtomicBoolean isFinished = new AtomicBoolean(false); public AbstractIoOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier) { + String indexIdentifier, List dependingOps) { this.accessor = accessor; this.target = target; this.callback = callback; this.indexIdentifier = indexIdentifier; + this.dependingOps = dependingOps; + } + + protected abstract void callInternal() throws HyracksDataException; + + @Override + public Boolean call() throws HyracksDataException { + try { + callInternal(); + } finally { + synchronized (this) { + isFinished.set(true); + notifyAll(); + } + } + return true; } @Override @@ -63,4 +86,14 @@ public String getIndexIdentifier() { return indexIdentifier; } + + @Override + public List getDependingOps() { + return dependingOps; + } + + @Override + public boolean isFinished() { + return isFinished.get(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index dc64f9b..e7b21cb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -86,6 +86,7 @@ protected final int[] treeFields; protected final int[] filterFields; protected final boolean durable; + protected final ILSMMergePolicy mergePolicy; protected boolean isActive; protected final AtomicBoolean[] flushRequests; protected boolean memoryComponentsAllocated = false; @@ -113,6 +114,7 @@ this.inactiveDiskComponents = new LinkedList<>(); this.durable = durable; this.tracer = tracer; + this.mergePolicy = mergePolicy; lsmHarness = new LSMHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(), tracer); isActive = false; diskComponents = new ArrayList<>(); @@ -135,6 +137,7 @@ this.ioScheduler = ioScheduler; this.ioOpCallback = ioOpCallback; this.durable = durable; + this.mergePolicy = mergePolicy; lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled()); isActive = false; diskComponents = new LinkedList<>(); @@ -199,7 +202,7 @@ protected void flushMemoryComponent() throws HyracksDataException { BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback); ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(cb); + accessor.scheduleFlush(cb, null); try { cb.waitForIO(); } catch (InterruptedException e) { @@ -326,19 +329,21 @@ } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); opCtx.setOperation(ctx.getOperation()); opCtx.getComponentHolder().addAll(ctx.getComponentHolder()); + opCtx.setDependingOps(ctx.getDependingOps()); ILSMIOOperation flushOp = createFlushOperation(opCtx, componentFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(flushOp, tracer)); + return flushOp; } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { List mergingComponents = ctx.getComponentHolder(); // merge must create a different op ctx @@ -346,11 +351,13 @@ createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); opCtx.setOperation(ctx.getOperation()); opCtx.getComponentHolder().addAll(mergingComponents); + opCtx.setDependingOps(ctx.getDependingOps()); ILSMDiskComponent firstComponent = (ILSMDiskComponent) mergingComponents.get(0); ILSMDiskComponent lastComponent = (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1); LSMComponentFileReferences mergeFileRefs = getMergeFileReferences(firstComponent, lastComponent); - ILSMIOOperation mergeOp = createMergeOperation(opCtx, mergeFileRefs, callback); + MergeOperation mergeOp = (MergeOperation) createMergeOperation(opCtx, mergeFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(mergeOp, tracer)); + return mergeOp; } private void addOperationalMutableComponents(List operationalComponents) { @@ -628,6 +635,10 @@ : doMerge(operation); } + public ILSMMergePolicy getMergePolicy() { + return mergePolicy; + } + public abstract Set getLSMComponentPhysicalFiles(ILSMComponent newComponent); protected abstract void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java index 065d465..5fdeafd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java @@ -27,6 +27,7 @@ import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; @@ -47,6 +48,7 @@ protected IndexOperation op; protected boolean accessingComponents = false; protected ISearchPredicate searchPredicate; + protected final List dependingOps; public AbstractLSMIndexOperationContext(int[] treeFields, int[] filterFields, IBinaryComparatorFactory[] filterCmpFactories, ISearchOperationCallback searchCallback, @@ -56,6 +58,7 @@ this.componentHolder = new LinkedList<>(); this.componentsToBeMerged = new LinkedList<>(); this.componentsToBeReplicated = new LinkedList<>(); + this.dependingOps = new LinkedList<>(); if (filterFields != null) { indexTuple = new PermutingTupleReference(treeFields); filterCmp = MultiComparator.create(filterCmpFactories); @@ -153,4 +156,17 @@ public ISearchPredicate getSearchPredicate() { return searchPredicate; } + + @Override + public List getDependingOps() { + return dependingOps; + } + + @Override + public void setDependingOps(List dependingOps) { + this.dependingOps.clear(); + if (dependingOps != null) { + this.dependingOps.addAll(dependingOps); + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java index 847b882..7ac9bfb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java @@ -49,7 +49,7 @@ } else if (immutableComponents.size() >= numComponents) { ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); + accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null); } } @@ -106,7 +106,7 @@ } ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); + accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null); return true; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index 2f65b18..b93d943 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -203,13 +203,13 @@ } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { callback.afterFinalize(LSMOperationType.MERGE, null); - return; + return null; } - lsmIndex.scheduleMerge(ctx, callback); + return lsmIndex.scheduleMerge(ctx, callback); } @Override @@ -297,9 +297,10 @@ } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { callback.afterFinalize(LSMOperationType.FLUSH, null); + return null; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java index 7b7f950..750c690 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.List; import java.util.Objects; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -30,14 +31,13 @@ public class FlushOperation extends AbstractIoOperation implements Comparable { public FlushOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier) { - super(accessor, target, callback, indexIdentifier); + String indexIdentifier, List dependingOps) { + super(accessor, target, callback, indexIdentifier, dependingOps); } @Override - public Boolean call() throws HyracksDataException { + protected void callInternal() throws HyracksDataException { accessor.flush(this); - return true; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 1069f8f..bc3a5a1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -500,13 +500,13 @@ } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) { callback.afterFinalize(LSMOperationType.FLUSH, null); - return; + return null; } - lsmIndex.scheduleFlush(ctx, callback); + return lsmIndex.scheduleFlush(ctx, callback); } @Override @@ -519,6 +519,7 @@ boolean failedOperation = false; try { newComponent = lsmIndex.flush(operation); + waitForDependingOps(operation); operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { @@ -537,13 +538,13 @@ } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { callback.afterFinalize(LSMOperationType.MERGE, null); - return; + return null; } - lsmIndex.scheduleMerge(ctx, callback); + return lsmIndex.scheduleMerge(ctx, callback); } @Override @@ -570,6 +571,7 @@ boolean failedOperation = false; try { newComponent = lsmIndex.merge(operation); + waitForDependingOps(operation); operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { @@ -754,6 +756,32 @@ } } + /** + * Wait for depending operations to finish. + * + * @param op + */ + private void waitForDependingOps(ILSMIOOperation op) throws HyracksDataException { + List dependingOps = op.getDependingOps(); + if (dependingOps == null) { + return; + } + for (ILSMIOOperation dependingOp : dependingOps) { + if (dependingOp != null && !dependingOp.isFinished()) { + synchronized (dependingOp) { + while (!dependingOp.isFinished()) { + try { + dependingOp.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } + } + } + } + } + @Override public void deleteComponents(ILSMIndexOperationContext ctx, Predicate predicate) throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index c0fd443..f008fde 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -135,18 +135,21 @@ } @Override - public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException { + public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List dependingOps) + throws HyracksDataException { ctx.setOperation(IndexOperation.FLUSH); - lsmHarness.scheduleFlush(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleFlush(ctx, callback); } @Override - public void scheduleMerge(ILSMIOOperationCallback callback, List components) - throws HyracksDataException { + public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List components, + List dependingOps) throws HyracksDataException { ctx.setOperation(IndexOperation.MERGE); ctx.getComponentsToBeMerged().clear(); ctx.getComponentsToBeMerged().addAll(components); - lsmHarness.scheduleMerge(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleMerge(ctx, callback); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java index c83d534..2210fd0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.common.IIndexCursor; @@ -31,19 +32,13 @@ protected final IIndexCursor cursor; public MergeOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier, IIndexCursor cursor) { - super(accessor, target, callback, indexIdentifier); + String indexIdentifier, IIndexCursor cursor, List dependingOps) { + super(accessor, target, callback, indexIdentifier, dependingOps); this.cursor = cursor; } public List getMergingComponents() { return accessor.getOpContext().getComponentHolder(); - } - - @Override - public Boolean call() throws HyracksDataException { - accessor.merge(this); - return true; } @Override @@ -54,4 +49,10 @@ public IIndexCursor getCursor() { return cursor; } + + @Override + protected void callInternal() throws HyracksDataException { + accessor.merge(this); + + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java index 7d7266e..f159232 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java @@ -247,7 +247,7 @@ Collections.reverse(mergableComponents); ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents); + accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents, null); } /** diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java index 85081a1..7cdcc52 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java @@ -59,7 +59,7 @@ && index.hasFlushRequestForCurrentMutableComponent()) { ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java index 9cc8022..3e35e51 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java @@ -19,6 +19,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.List; import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -100,6 +101,16 @@ public ILSMIndexAccessor getAccessor() { return ioOp.getAccessor(); } + + @Override + public boolean isFinished() { + return ioOp.isFinished(); + } + + @Override + public List getDependingOps() { + return ioOp.getDependingOps(); + } } class ComparableTracedIOOperation extends TracedIOOperation implements Comparable { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index eb3924c..60da50a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -691,17 +691,20 @@ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { return new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(getLsmHarness(), opCtx), - componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), - componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + componentFileRefs.getInsertIndexFileReference(), + componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), + callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); } @Override protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, - LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences mergeFileRefs, + ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getLsmHarness(), opCtx); IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(opCtx); - return new LSMInvertedIndexMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), - mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, - fileManager.getBaseDir().getAbsolutePath()); + return new LSMInvertedIndexMergeOperation(accessor, cursor, + mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), + mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), + opCtx.getDependingOps()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index 61fc84e..242bc83 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -85,9 +85,11 @@ } @Override - public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException { + public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List dependingOps) + throws HyracksDataException { ctx.setOperation(IndexOperation.FLUSH); - lsmHarness.scheduleFlush(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleFlush(ctx, callback); } @Override @@ -96,12 +98,13 @@ } @Override - public void scheduleMerge(ILSMIOOperationCallback callback, List components) - throws HyracksDataException { + public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List components, + List dependingOps) throws HyracksDataException { ctx.setOperation(IndexOperation.MERGE); ctx.getComponentsToBeMerged().clear(); ctx.getComponentsToBeMerged().addAll(components); - lsmHarness.scheduleMerge(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleMerge(ctx, callback); } @Override @@ -116,6 +119,7 @@ @Override public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException { ctx.setOperation(IndexOperation.FULL_MERGE); + ctx.setDependingOps(null); lsmHarness.scheduleFullMerge(ctx, callback); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java index 2106f6a..30e1cec 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java @@ -19,7 +19,10 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -30,8 +33,8 @@ public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, FileReference deletedKeysBTreeFlushTarget, FileReference bloomFilterFlushTarget, - ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, flushTarget, callback, indexIdentifier); + ILSMIOOperationCallback callback, String indexIdentifier, List dependingOps) { + super(accessor, flushTarget, callback, indexIdentifier, dependingOps); this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget; this.bloomFilterFlushTarget = bloomFilterFlushTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java index 2c1db0f..8361f24 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java @@ -19,7 +19,10 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -31,8 +34,8 @@ public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, FileReference target, FileReference deletedKeysBTreeMergeTarget, FileReference bloomFilterMergeTarget, - ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, target, callback, indexIdentifier, cursor); + ILSMIOOperationCallback callback, String indexIdentifier, List dependingOps) { + super(accessor, target, callback, indexIdentifier, cursor, dependingOps); this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java index 6595403..110d873 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java @@ -416,7 +416,7 @@ // Not supported @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-RTree"); } @@ -623,7 +623,7 @@ // The only change the the schedule merge is the method used to create the // opCtx. first line <- in schedule merge, we-> @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexOperationContext rctx = createOpContext(NoOpOperationCallback.INSTANCE, -1); rctx.setOperation(IndexOperation.MERGE); @@ -634,10 +634,12 @@ (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1)); ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), rctx, buddyBTreeFields); // create the merge operation. - LSMRTreeMergeOperation mergeOp = new LSMRTreeMergeOperation(accessor, cursor, - relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(), - relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + ILSMIOOperation mergeOp = + new LSMRTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), + relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), + callback, fileManager.getBaseDir().getAbsolutePath(), ctx.getDependingOps()); ioScheduler.scheduleOperation(mergeOp); + return mergeOp; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index ca0e4e1..a4d6a65 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -420,7 +420,7 @@ LSMRTreeAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields); return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath()); + callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); } @Override @@ -430,6 +430,6 @@ ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields); return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, - fileManager.getBaseDir().getAbsolutePath()); + fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java index 6991c56..a08d854 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java @@ -18,7 +18,10 @@ */ package org.apache.hyracks.storage.am.lsm.rtree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -29,8 +32,9 @@ private final FileReference bloomFilterFlushTarget; public LSMRTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, FileReference btreeFlushTarget, - FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, flushTarget, callback, indexIdentifier); + FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier, + List dependingOps) { + super(accessor, flushTarget, callback, indexIdentifier, dependingOps); this.btreeFlushTarget = btreeFlushTarget; this.bloomFilterFlushTarget = bloomFilterFlushTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java index 83872cf..a07e57b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java @@ -18,8 +18,11 @@ */ package org.apache.hyracks.storage.am.lsm.rtree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -30,8 +33,8 @@ public LSMRTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, - String indexIdentifier) { - super(accessor, target, callback, indexIdentifier, cursor); + String indexIdentifier, List dependingOps) { + super(accessor, target, callback, indexIdentifier, cursor, dependingOps); this.btreeMergeTarget = btreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java index 1e15455..a18f10c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java @@ -332,7 +332,7 @@ throws HyracksDataException { ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), null, null, - callback, fileManager.getBaseDir().getAbsolutePath()); + callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); } @Override @@ -346,6 +346,6 @@ ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(opCtx, returnDeletedTuples); ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); return new MergeOperation(accessor, mergeFileRefs.getInsertIndexFileReference(), callback, - fileManager.getBaseDir().getAbsolutePath(), cursor); + fileManager.getBaseDir().getAbsolutePath(), cursor, opCtx.getDependingOps()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java index 5d6d8de..4f7515e 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java @@ -72,7 +72,7 @@ accessor.insert(tuple); // Flush to generate a disk component - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); // Make sure the disk component was generated LSMBTree btree = (LSMBTree) ctx.getIndex(); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java index c5eb97c..98faf01 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java @@ -117,7 +117,7 @@ StubIOOperationCallback stub = new StubIOOperationCallback(); BlockingIOOperationCallbackWrapper waiter = new BlockingIOOperationCallbackWrapper(stub); - accessor.scheduleFlush(waiter); + accessor.scheduleFlush(waiter, null); waiter.waitForIO(); if (minMax != null) { Pair obsMinMax = @@ -146,7 +146,7 @@ } } accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMBTree) ctx.getIndex()).getDiskComponents()); + ((LSMBTree) ctx.getIndex()).getDiskComponents(), null); flushedComponents = ((LSMBTree) ctx.getIndex()).getDiskComponents(); Pair mergedMinMax = diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java index 7dac1e5..15dbfdb 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java @@ -76,7 +76,7 @@ ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor(); accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMBTree) ctx.getIndex()).getDiskComponents()); + ((LSMBTree) ctx.getIndex()).getDiskComponents(), null); orderedIndexTestUtils.checkPointSearches(ctx); orderedIndexTestUtils.checkScan(ctx); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java index b633614..11ecf0b 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java @@ -81,7 +81,7 @@ } if (j == 1) { - accessor.scheduleFlush(ioOpCallback); + accessor.scheduleFlush(ioOpCallback, null); ioOpCallback.waitForIO(); isFoundNull = true; } else { @@ -94,7 +94,7 @@ } if (j == 1) { - accessor.scheduleFlush(ioOpCallback); + accessor.scheduleFlush(ioOpCallback, null); ioOpCallback.waitForIO(); isFoundNull = true; } else { @@ -106,7 +106,7 @@ accessor.delete(tuple); } - accessor.scheduleFlush(ioOpCallback); + accessor.scheduleFlush(ioOpCallback, null); ioOpCallback.waitForIO(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java index acbeaef..f3cf9de 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java @@ -19,7 +19,7 @@ package org.apache.hyracks.storage.am.lsm.btree; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -116,17 +116,17 @@ //component 2 contains 1 and 2 upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes)); upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); //component 1 contains 1 and -2 upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes)); deleteTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); //component 0 contains 2 and 3 upsertTuple(ctx, fieldSerdes, getValue(3, fieldSerdes)); upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); LSMBTree btree = (LSMBTree) ctx.getIndex(); Assert.assertEquals("Check disk components", 3, btree.getDiskComponents().size()); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java index fbcbcc2..a7efd6d 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java @@ -19,7 +19,7 @@ package org.apache.hyracks.storage.am.lsm.btree; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -352,7 +352,7 @@ op1.performOperation(ctx, AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT); op2.performOperation(ctx, AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT / AccessMethodTestsConfig.BTREE_NUM_INSERT_ROUNDS); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); LSMBTree btree = (LSMBTree) ctx.getIndex(); Assert.assertEquals("Check disk components", 1, btree.getDiskComponents().size()); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java index e059faa..adcb3d9 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java @@ -109,7 +109,7 @@ } if (j == 1) { - lsmAccessor.scheduleFlush(ioOpCallback); + lsmAccessor.scheduleFlush(ioOpCallback, null); ioOpCallback.waitForIO(); isFoundNull = true; isUpdated = false; @@ -124,7 +124,7 @@ } if (j == 1) { - lsmAccessor.scheduleFlush(ioOpCallback); + lsmAccessor.scheduleFlush(ioOpCallback, null); ioOpCallback.waitForIO(); } else { isFoundNull = false; diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java index 6c1a406..0e16280 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java @@ -103,17 +103,19 @@ } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { - super.scheduleFlush(ctx, callback); + ILSMIOOperation flushOp = super.scheduleFlush(ctx, callback); numScheduledFlushes++; + return flushOp; } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { - super.scheduleMerge(ctx, callback); + ILSMIOOperation mergeOp = super.scheduleMerge(ctx, callback); numScheduledMerges++; + return mergeOp; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java index 1667e47..377ce05 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java @@ -119,7 +119,7 @@ case MERGE: accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - lsmBTree.getDiskComponents()); + lsmBTree.getDiskComponents(), null); break; default: diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java index e521b4b..eaa0321 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java @@ -221,7 +221,7 @@ return null; } }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class), - Mockito.anyListOf(ILSMDiskComponent.class)); + Mockito.anyListOf(ILSMDiskComponent.class), Mockito.any()); Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class), Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java index d093aac..fd744b0 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java @@ -58,7 +58,7 @@ } // Perform merge. invIndexAccessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMInvertedIndex) invIndex).getDiskComponents()); + ((LSMInvertedIndex) invIndex).getDiskComponents(), null); validateAndCheckIndex(testCtx); runTinySearchWorkload(testCtx, tupleGen); } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java index 3dc7262..cfca8f1 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java @@ -60,7 +60,7 @@ } // Perform merge. invIndexAccessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMInvertedIndex) invIndex).getDiskComponents()); + ((LSMInvertedIndex) invIndex).getDiskComponents(), null); validateAndCheckIndex(testCtx); runTinySearchWorkload(testCtx, tupleGen); } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java index 2345698..594e019 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java @@ -116,7 +116,7 @@ case MERGE: { accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - invIndex.getDiskComponents()); + invIndex.getDiskComponents(), null); break; } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java index 9209a3e..58c71dd 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java @@ -78,7 +78,7 @@ ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor(); accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((AbstractLSMRTree) ctx.getIndex()).getDiskComponents()); + ((AbstractLSMRTree) ctx.getIndex()).getDiskComponents(), null); rTreeTestUtils.checkScan(ctx); rTreeTestUtils.checkDiskOrderScan(ctx); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java index fe4870b..4630c28 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java @@ -79,7 +79,7 @@ case MERGE: accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - lsmRTree.getDiskComponents()); + lsmRTree.getDiskComponents(), null); break; default: diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java index 2855f2e..bbada89 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java @@ -68,7 +68,7 @@ case MERGE: accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((AbstractLSMRTree) lsmRTree).getDiskComponents()); + ((AbstractLSMRTree) lsmRTree).getDiskComponents(), null); break; default: -- To view, visit https://asterix-gerrit.ics.uci.edu/2066 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I8c9bac74a7f4fed3a424e239acd352d074a270f3 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen