Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0D980200D27 for ; Wed, 11 Oct 2017 04:22:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0C049160BE0; Wed, 11 Oct 2017 02:22:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8BC51160BE1 for ; Wed, 11 Oct 2017 04:22:49 +0200 (CEST) Received: (qmail 23643 invoked by uid 500); 11 Oct 2017 02:22:48 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 23629 invoked by uid 99); 11 Oct 2017 02:22:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Oct 2017 02:22:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5DAC2F5AD5; Wed, 11 Oct 2017 02:22:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: luochen@apache.org To: commits@asterixdb.apache.org Date: Wed, 11 Oct 2017 02:22:48 -0000 Message-Id: <9155ab5241cf486caa37ccc65d2993a6@git.apache.org> In-Reply-To: <7b509e728bf74511a4d3c879e88d3837@git.apache.org> References: <7b509e728bf74511a4d3c879e88d3837@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] asterixdb git commit: Revert "[ASTERIXDB-2103][STO] Too many disk components for CorrelatedPolicy" archived-at: Wed, 11 Oct 2017 02:22:52 -0000 Revert "[ASTERIXDB-2103][STO] Too many disk components for CorrelatedPolicy" This reverts commit 21ed0f72681a20ccb6a654f9aa4d54b8d0ea9c5c. Change-Id: I670545acd09c678f21be25313353ab306be86202 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2063 Tested-by: Jenkins Contrib: Jenkins Reviewed-by: Ian Maxon Integration-Tests: Jenkins Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1dc8228b Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1dc8228b Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1dc8228b Branch: refs/heads/master Commit: 1dc8228b7262e47c06487443da0f735321e63afe Parents: 860fcde Author: Luo Chen Authored: Tue Oct 10 11:42:29 2017 -0700 Committer: Luo Chen Committed: Tue Oct 10 19:22:25 2017 -0700 ---------------------------------------------------------------------- .../test/dataflow/ComponentRollbackTest.java | 16 ++--- .../context/CorrelatedPrefixMergePolicy.java | 48 ++++--------- .../asterix/common/context/DatasetInfo.java | 10 --- .../common/context/DatasetLifecycleManager.java | 25 +++---- .../asterix/common/context/DatasetResource.java | 5 -- .../context/PrimaryIndexOperationTracker.java | 76 ++------------------ .../CorrelatedPrefixMergePolicyTest.java | 2 +- .../am/lsm/btree/impls/ExternalBTree.java | 11 ++- .../lsm/btree/impls/ExternalBTreeWithBuddy.java | 11 ++- .../storage/am/lsm/btree/impls/LSMBTree.java | 6 +- .../lsm/btree/impls/LSMBTreeFlushOperation.java | 8 +-- .../lsm/btree/impls/LSMBTreeMergeOperation.java | 8 +-- .../impls/LSMBTreeWithBuddyMergeOperation.java | 7 +- .../storage/am/lsm/common/api/ILSMHarness.java | 8 +-- .../am/lsm/common/api/ILSMIOOperation.java | 13 ---- .../storage/am/lsm/common/api/ILSMIndex.java | 6 +- .../am/lsm/common/api/ILSMIndexAccessor.java | 14 ++-- .../common/api/ILSMIndexOperationContext.java | 4 -- .../lsm/common/impls/AbstractIoOperation.java | 35 +-------- .../am/lsm/common/impls/AbstractLSMIndex.java | 19 ++--- .../impls/AbstractLSMIndexOperationContext.java | 16 ----- .../lsm/common/impls/ConstantMergePolicy.java | 4 +- .../lsm/common/impls/ExternalIndexHarness.java | 9 ++- .../am/lsm/common/impls/FlushOperation.java | 8 +-- .../storage/am/lsm/common/impls/LSMHarness.java | 40 ++--------- .../lsm/common/impls/LSMTreeIndexAccessor.java | 13 ++-- .../am/lsm/common/impls/MergeOperation.java | 17 +++-- .../am/lsm/common/impls/PrefixMergePolicy.java | 2 +- .../lsm/common/impls/ThreadCountingTracker.java | 2 +- .../am/lsm/common/impls/TracedIOOperation.java | 11 --- .../invertedindex/impls/LSMInvertedIndex.java | 15 ++-- .../impls/LSMInvertedIndexAccessor.java | 14 ++-- .../impls/LSMInvertedIndexFlushOperation.java | 7 +- .../impls/LSMInvertedIndexMergeOperation.java | 7 +- .../am/lsm/rtree/impls/ExternalRTree.java | 12 ++-- .../storage/am/lsm/rtree/impls/LSMRTree.java | 4 +- .../lsm/rtree/impls/LSMRTreeFlushOperation.java | 8 +-- .../lsm/rtree/impls/LSMRTreeMergeOperation.java | 7 +- .../impls/LSMRTreeWithAntiMatterTuples.java | 4 +- .../am/lsm/btree/LSMBTreeFileManagerTest.java | 2 +- .../btree/LSMBTreeFilterMergeTestDriver.java | 4 +- .../am/lsm/btree/LSMBTreeMergeTestDriver.java | 2 +- ...MBTreeModificationOperationCallbackTest.java | 6 +- .../btree/LSMBTreeScanDiskComponentsTest.java | 8 +-- ...TreeUpdateInPlaceScanDiskComponentsTest.java | 4 +- .../am/lsm/btree/LSMBTreeUpdateInPlaceTest.java | 4 +- .../storage/am/lsm/btree/impl/TestLsmBtree.java | 10 ++- .../btree/multithread/LSMBTreeTestWorker.java | 2 +- .../lsm/common/test/PrefixMergePolicyTest.java | 2 +- .../LSMInvertedIndexMergeTest.java | 2 +- .../PartitionedLSMInvertedIndexMergeTest.java | 2 +- .../multithread/LSMInvertedIndexTestWorker.java | 2 +- .../am/lsm/rtree/LSMRTreeMergeTestDriver.java | 2 +- .../rtree/multithread/LSMRTreeTestWorker.java | 2 +- .../LSMRTreeWithAntiMatterTuplesTestWorker.java | 2 +- 55 files changed, 153 insertions(+), 435 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index 5e0e072..76bec8c 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -132,9 +132,9 @@ public class ComponentRollbackTest { public void createIndex() throws Exception { List> partitioningKeys = new ArrayList<>(); partitioningKeys.add(Collections.singletonList("key")); - dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null, - null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, partitioningKeys, null, null, null, - false, null, false), + dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, + NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, + partitioningKeys, null, null, null, false, null, false), null, DatasetType.INTERNAL, DATASET_ID, 0); PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST); @@ -448,7 +448,7 @@ public class ComponentRollbackTest { for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); merger.waitUntilCount(1); // now that we enetered, we will rollback Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn)); @@ -635,7 +635,7 @@ public class ComponentRollbackTest { for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); @@ -707,7 +707,7 @@ public class ComponentRollbackTest { for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); @@ -736,7 +736,7 @@ public class ComponentRollbackTest { } private class Rollerback { - private final Thread task; + private Thread task; private Exception failure; public Rollerback(TestLsmBtree lsmBtree, Predicate predicate) { @@ -766,7 +766,7 @@ public class ComponentRollbackTest { } private class Searcher { - private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private ExecutorService executor = Executors.newSingleThreadExecutor(); private Future task; private volatile boolean entered = false; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index e18181c..a20e660 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -31,7 +31,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy; @@ -92,17 +91,14 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { Set indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos(); int partition = getIndexPartition(index, indexInfos); - List dependingMerges = scheduleSecondaryIndexes(minID, maxID, + triggerScheduledMerge(minID, maxID, indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet())); - - schedulePrimaryIndex(minID, maxID, index, dependingMerges); - return true; } /** * Submit merge requests for all disk components within [minID, maxID] - * of all of secondary indexes of a given dataset in the given partition + * of all indexes of a given dataset in the given partition * * @param minID * @param maxID @@ -110,39 +106,17 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { * @param indexInfos * @throws HyracksDataException */ - private List scheduleSecondaryIndexes(long minID, long maxID, Set indexInfos) - throws HyracksDataException { - List mergeOps = new ArrayList<>(); + private void triggerScheduledMerge(long minID, long maxID, Set indexInfos) throws HyracksDataException { for (IndexInfo info : indexInfos) { ILSMIndex lsmIndex = info.getIndex(); - List diskComponents = lsmIndex.getDiskComponents(); - if (lsmIndex.isPrimaryIndex() || isMergeOngoing(diskComponents)) { + + List immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents()); + if (isMergeOngoing(immutableComponents)) { continue; } - List mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents); - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - mergeOps.add(accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergeableComponents, null)); - } - return mergeOps; - } - - private void schedulePrimaryIndex(long minID, long maxID, ILSMIndex primaryIndex, - List dependingMerges) throws HyracksDataException { - assert primaryIndex.isPrimaryIndex(); - List diskComponents = primaryIndex.getDiskComponents(); - List mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents); - ILSMIndexAccessor accessor = - primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(primaryIndex.getIOOperationCallback(), mergeableComponents, dependingMerges); - } - - private List collectMergeableComponents(long minID, long maxID, - List diskComponents) throws HyracksDataException { - List mergableComponents = new ArrayList<>(); - for (ILSMDiskComponent component : diskComponents) { - ILSMDiskComponentId id = component.getComponentId(); - if (!id.notFound()) { + List mergableComponents = new ArrayList<>(); + for (ILSMDiskComponent component : immutableComponents) { + ILSMDiskComponentId id = component.getComponentId(); if (id.getMinId() >= minID && id.getMaxId() <= maxID) { mergableComponents.add(component); } @@ -152,8 +126,10 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { break; } } + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); } - return mergableComponents; } private int getIndexPartition(ILSMIndex index, Set indexInfos) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java index 0df8dcc..71d4a96 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java @@ -34,7 +34,6 @@ public class DatasetInfo extends Info implements Comparable { private boolean isRegistered; private boolean memoryAllocated; private boolean durable; - private boolean correlated; public DatasetInfo(int datasetID) { this.indexes = new HashMap<>(); @@ -42,7 +41,6 @@ public class DatasetInfo extends Info implements Comparable { this.datasetID = datasetID; this.setRegistered(false); this.setMemoryAllocated(false); - this.setCorrelated(false); } @Override @@ -197,12 +195,4 @@ public class DatasetInfo extends Info implements Comparable { public void setLastAccess(long lastAccess) { this.lastAccess = lastAccess; } - - public void setCorrelated(boolean correlated) { - this.correlated = correlated; - } - - public boolean isCorrelated() { - return correlated; - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index ad9f6a5..37bd789 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -223,7 +223,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC if (iInfo.isOpen()) { ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback(), null); + accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); } // Wait for the above flush op. @@ -417,22 +417,16 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } if (asyncFlush) { - PrimaryIndexOperationTracker.flushDatasetIndexes(dsInfo.getDatsetIndexInfos(), dsInfo.isCorrelated()); - } else { - List primaryIndexes = new ArrayList<>(); for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - if (iInfo.getIndex().isPrimaryIndex()) { - // primary indexes are flushed later to guarantee the correctness of the correlated merge policy - primaryIndexes.add(iInfo); - } else { - // TODO: This is not efficient since we flush the indexes sequentially. - // Think of a way to allow submitting the flush requests concurrently. - // We don't do them concurrently because this may lead to a deadlock scenario - // between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. - flushAndWaitForIO(dsInfo, iInfo); - } + ILSMIndexAccessor accessor = + iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); } - for (IndexInfo iInfo : primaryIndexes) { + } else { + for (IndexInfo iInfo : dsInfo.getIndexes().values()) { + // TODO: This is not efficient since we flush the indexes sequentially. + // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this + // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. flushAndWaitForIO(dsInfo, iInfo); } } @@ -597,5 +591,4 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } } } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index 5eb3c02..f2f3b93 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.LocalResource; @@ -96,10 +95,6 @@ public class DatasetResource implements Comparable { datasetInfo.setExternal(!index.hasMemoryComponents()); datasetInfo.setRegistered(true); datasetInfo.setDurable(((ILSMIndex) index).isDurable()); - //TODO use a general mechanism to set correlated property when we have more - // correlated merge policies - datasetInfo.setCorrelated( - ((AbstractLSMIndex) index).getMergePolicy() instanceof CorrelatedPrefixMergePolicy); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 4eb1c3a..67b25b6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -19,9 +19,6 @@ package org.apache.asterix.common.context; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -34,7 +31,6 @@ import org.apache.asterix.common.utils.TransactionUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; @@ -145,16 +141,17 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled. public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { - Set indexInfos = dsInfo.getDatsetIndexInfos(); - for (IndexInfo iInfo : indexInfos) { + for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) { + //get resource + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); //update resource lsn AbstractLSMIOOperationCallback ioOpCallback = - (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); + (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); ioOpCallback.updateLastLSN(logRecord.getLSN()); + //schedule flush after update + accessor.scheduleFlush(lsmIndex.getIOOperationCallback()); } - - flushDatasetIndexes(indexInfos, dsInfo.isCorrelated()); - flushLogCreated = false; } @@ -201,63 +198,4 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { return flushLogCreated; } - public static void flushDatasetIndexes(Set indexes, boolean correlated) throws HyracksDataException { - if (!correlated) { - // if not correlated, we simply schedule flushes of each index independently - for (IndexInfo iInfo : indexes) { - ILSMIndex lsmIndex = iInfo.getIndex(); - //get resource - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - //schedule flush after update - accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null); - } - } else { - // otherwise, we need to schedule indexes properly s.t. the primary index would depend on - // all secondary indexes in the same partition - - // collect partitions - Set partitions = new HashSet<>(); - indexes.forEach(iInfo -> partitions.add(iInfo.getPartition())); - for (Integer partition : partitions) { - flushCorrelatedDatasetIndexes(indexes, partition); - } - - } - } - - private static void flushCorrelatedDatasetIndexes(Set indexes, int partition) - throws HyracksDataException { - ILSMIndex primaryIndex = null; - List flushOps = new ArrayList<>(); - for (IndexInfo iInfo : indexes) { - if (iInfo.getPartition() != partition) { - continue; - } - ILSMIndex lsmIndex = iInfo.getIndex(); - if (lsmIndex.isPrimaryIndex()) { - primaryIndex = lsmIndex; - } else { - //get resource - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - //schedule flush - ILSMIOOperation flushOp = accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null); - if (flushOp != null) { - flushOps.add(flushOp); - } - } - } - - if (primaryIndex != null) { - //get resource - ILSMIndexAccessor accessor = - primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - //schedule flush after update - accessor.scheduleFlush(primaryIndex.getIOOperationCallback(), flushOps); - - } - - } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java index ee795d5..9f071bb 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java @@ -236,7 +236,7 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { return null; } }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class), - Mockito.anyListOf(ILSMDiskComponent.class), Mockito.any()); + Mockito.anyListOf(ILSMDiskComponent.class)); Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class), Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index f5c3013..3775985 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -56,7 +56,6 @@ import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor.ICursorFactory; -import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; import org.apache.hyracks.storage.common.IIndexBulkLoader; import org.apache.hyracks.storage.common.IIndexCursor; @@ -171,7 +170,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { // The only reason to override the following method is that it uses a different context object // in addition, determining whether or not to keep deleted tuples is different here @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ExternalBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, -1); opCtx.setOperation(IndexOperation.MERGE); @@ -196,11 +195,9 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName()); ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); - MergeOperation mergeOp = new LSMBTreeMergeOperation(accessor, cursor, + ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath(), ctx.getDependingOps()); - ioScheduler.scheduleOperation(mergeOp); - return mergeOp; + callback, fileManager.getBaseDir().getAbsolutePath())); } // This function should only be used when a transaction fail. it doesn't @@ -372,7 +369,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { // Not supported @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-BTree"); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index 071a4bd..ff17905 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -319,7 +319,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd } @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw HyracksDataException.create(ErrorCode.FLUSH_NOT_SUPPORTED_IN_EXTERNAL_INDEX); } @@ -342,7 +342,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd } @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexOperationContext bctx = createOpContext(NoOpOperationCallback.INSTANCE, 0); bctx.setOperation(IndexOperation.MERGE); @@ -363,12 +363,11 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd keepDeleteTuples = mergingComponents.get(mergingComponents.size() - 1) != secondDiskComponents .get(secondDiskComponents.size() - 1); } - ILSMIOOperation mergeOp = + + ioScheduler.scheduleOperation( new LSMBTreeWithBuddyMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples, ctx.getDependingOps()); - ioScheduler.scheduleOperation(mergeOp); - return mergeOp; + callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples)); } // This method creates the appropriate opContext for the targeted version http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index 00a489e..c7d45e1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -462,8 +462,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) { ILSMIndexAccessor accessor = createAccessor(opCtx); return new LSMBTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), - componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), - opCtx.getDependingOps()); + componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); } @Override @@ -612,7 +611,6 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { } ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples); return new LSMBTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), - mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), - opCtx.getDependingOps()); + mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java index 92b53de..e3424e5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java @@ -18,10 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -30,9 +27,8 @@ public class LSMBTreeFlushOperation extends FlushOperation { private final FileReference bloomFilterFlushTarget; public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, - FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier, - List dependingOps) { - super(accessor, flushTarget, callback, indexIdentifier, dependingOps); + FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, flushTarget, callback, indexIdentifier); this.bloomFilterFlushTarget = bloomFilterFlushTarget; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java index 40ac7b1..ec96303 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java @@ -19,11 +19,8 @@ package org.apache.hyracks.storage.am.lsm.btree.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -33,9 +30,8 @@ public class LSMBTreeMergeOperation extends MergeOperation { private final FileReference bloomFilterMergeTarget; public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, - FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier, - List dependingOps) { - super(accessor, target, callback, indexIdentifier, cursor, dependingOps); + FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, target, callback, indexIdentifier, cursor); this.bloomFilterMergeTarget = bloomFilterMergeTarget; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java index e0c1512..f682bde 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java @@ -18,11 +18,8 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -35,8 +32,8 @@ public class LSMBTreeWithBuddyMergeOperation extends MergeOperation { public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, FileReference buddyBtreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, - String indexIdentifier, boolean keepDeletedTuples, List dependingOps) { - super(accessor, target, callback, indexIdentifier, cursor, dependingOps); + String indexIdentifier, boolean keepDeletedTuples) { + super(accessor, target, callback, indexIdentifier, cursor); this.buddyBtreeMergeTarget = buddyBtreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; this.keepDeletedTuples = keepDeletedTuples; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 380b2f2..89c8cb9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -105,12 +105,10 @@ public interface ILSMHarness { * * @param ctx * @param callback - * @return The scheduled merge operation, used for the caller to track its status * @throws HyracksDataException * @throws IndexException */ - ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) - throws HyracksDataException; + void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; /** * Schedule full merge @@ -137,11 +135,9 @@ public interface ILSMHarness { * * @param ctx * @param callback - * @return The scheduled flush operation, used for the caller to track its status * @throws HyracksDataException */ - ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) - throws HyracksDataException; + void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; /** * Perform a flush http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java index ff1613b..c2ae786 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java @@ -18,7 +18,6 @@ */ package org.apache.hyracks.storage.am.lsm.common.api; -import java.util.List; import java.util.concurrent.Callable; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -64,19 +63,7 @@ public interface ILSMIOOperation extends Callable { FileReference getTarget(); /** - * <<<<<<< HEAD - * * @return the accessor of the operation */ ILSMIndexAccessor getAccessor(); - - /** - * @return whether this operation has finished - */ - boolean isFinished(); - - /** - * @return a list of operations that this operation depends on - */ - List getDependingOps(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java index 5f43bcd..addeb27 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java @@ -67,13 +67,11 @@ public interface ILSMIndex extends IIndex { public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException; - ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) - throws HyracksDataException; + void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException; - ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) - throws HyracksDataException; + void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java index b303a39..b8d64af 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java @@ -46,13 +46,9 @@ public interface ILSMIndexAccessor extends IIndexAccessor { * * @param callback * the IO operation callback - * @param dependingOps - * other operations that this operation depends on - * @return The scheduled flush operation * @throws HyracksDataException */ - ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List dependingOps) - throws HyracksDataException; + void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException; /** * Schedule a merge operation @@ -61,13 +57,11 @@ public interface ILSMIndexAccessor extends IIndexAccessor { * the merge operation callback * @param components * the components to be merged - * @param dependingOps - * other operations that this operation depends on - * @return The scheduled merge operation * @throws HyracksDataException + * @throws IndexException */ - ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List components, - List dependingOps) throws HyracksDataException; + void scheduleMerge(ILSMIOOperationCallback callback, List components) + throws HyracksDataException; /** * Schedule a full merge http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java index 66af93f..5b0378a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java @@ -56,8 +56,4 @@ public interface ILSMIndexOperationContext extends IIndexOperationContext { PermutingTupleReference getFilterTuple(); MultiComparator getFilterCmp(); - - List getDependingOps(); - - void setDependingOps(List dependingOps); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java index 1d13c94..aee46f0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java @@ -18,10 +18,6 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; @@ -34,32 +30,13 @@ public abstract class AbstractIoOperation implements ILSMIOOperation { protected final FileReference target; protected final ILSMIOOperationCallback callback; protected final String indexIdentifier; - protected final List dependingOps; - - protected AtomicBoolean isFinished = new AtomicBoolean(false); public AbstractIoOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier, List dependingOps) { + String indexIdentifier) { this.accessor = accessor; this.target = target; this.callback = callback; this.indexIdentifier = indexIdentifier; - this.dependingOps = dependingOps; - } - - protected abstract void callInternal() throws HyracksDataException; - - @Override - public Boolean call() throws HyracksDataException { - try { - callInternal(); - } finally { - synchronized (this) { - isFinished.set(true); - notifyAll(); - } - } - return true; } @Override @@ -86,14 +63,4 @@ public abstract class AbstractIoOperation implements ILSMIOOperation { public String getIndexIdentifier() { return indexIdentifier; } - - @Override - public List getDependingOps() { - return dependingOps; - } - - @Override - public boolean isFinished() { - return isFinished.get(); - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index e7b21cb..dc64f9b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -86,7 +86,6 @@ public abstract class AbstractLSMIndex implements ILSMIndex { protected final int[] treeFields; protected final int[] filterFields; protected final boolean durable; - protected final ILSMMergePolicy mergePolicy; protected boolean isActive; protected final AtomicBoolean[] flushRequests; protected boolean memoryComponentsAllocated = false; @@ -114,7 +113,6 @@ public abstract class AbstractLSMIndex implements ILSMIndex { this.inactiveDiskComponents = new LinkedList<>(); this.durable = durable; this.tracer = tracer; - this.mergePolicy = mergePolicy; lsmHarness = new LSMHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(), tracer); isActive = false; diskComponents = new ArrayList<>(); @@ -137,7 +135,6 @@ public abstract class AbstractLSMIndex implements ILSMIndex { this.ioScheduler = ioScheduler; this.ioOpCallback = ioOpCallback; this.durable = durable; - this.mergePolicy = mergePolicy; lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled()); isActive = false; diskComponents = new LinkedList<>(); @@ -202,7 +199,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { protected void flushMemoryComponent() throws HyracksDataException { BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback); ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(cb, null); + accessor.scheduleFlush(cb); try { cb.waitForIO(); } catch (InterruptedException e) { @@ -329,21 +326,19 @@ public abstract class AbstractLSMIndex implements ILSMIndex { } @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); opCtx.setOperation(ctx.getOperation()); opCtx.getComponentHolder().addAll(ctx.getComponentHolder()); - opCtx.setDependingOps(ctx.getDependingOps()); ILSMIOOperation flushOp = createFlushOperation(opCtx, componentFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(flushOp, tracer)); - return flushOp; } @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { List mergingComponents = ctx.getComponentHolder(); // merge must create a different op ctx @@ -351,13 +346,11 @@ public abstract class AbstractLSMIndex implements ILSMIndex { createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); opCtx.setOperation(ctx.getOperation()); opCtx.getComponentHolder().addAll(mergingComponents); - opCtx.setDependingOps(ctx.getDependingOps()); ILSMDiskComponent firstComponent = (ILSMDiskComponent) mergingComponents.get(0); ILSMDiskComponent lastComponent = (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1); LSMComponentFileReferences mergeFileRefs = getMergeFileReferences(firstComponent, lastComponent); - MergeOperation mergeOp = (MergeOperation) createMergeOperation(opCtx, mergeFileRefs, callback); + ILSMIOOperation mergeOp = createMergeOperation(opCtx, mergeFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(mergeOp, tracer)); - return mergeOp; } private void addOperationalMutableComponents(List operationalComponents) { @@ -635,10 +628,6 @@ public abstract class AbstractLSMIndex implements ILSMIndex { : doMerge(operation); } - public ILSMMergePolicy getMergePolicy() { - return mergePolicy; - } - public abstract Set getLSMComponentPhysicalFiles(ILSMComponent newComponent); protected abstract void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java index 5fdeafd..065d465 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java @@ -27,7 +27,6 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; @@ -48,7 +47,6 @@ public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOpera protected IndexOperation op; protected boolean accessingComponents = false; protected ISearchPredicate searchPredicate; - protected final List dependingOps; public AbstractLSMIndexOperationContext(int[] treeFields, int[] filterFields, IBinaryComparatorFactory[] filterCmpFactories, ISearchOperationCallback searchCallback, @@ -58,7 +56,6 @@ public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOpera this.componentHolder = new LinkedList<>(); this.componentsToBeMerged = new LinkedList<>(); this.componentsToBeReplicated = new LinkedList<>(); - this.dependingOps = new LinkedList<>(); if (filterFields != null) { indexTuple = new PermutingTupleReference(treeFields); filterCmp = MultiComparator.create(filterCmpFactories); @@ -156,17 +153,4 @@ public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOpera public ISearchPredicate getSearchPredicate() { return searchPredicate; } - - @Override - public List getDependingOps() { - return dependingOps; - } - - @Override - public void setDependingOps(List dependingOps) { - this.dependingOps.clear(); - if (dependingOps != null) { - this.dependingOps.addAll(dependingOps); - } - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java index 7ac9bfb..847b882 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java @@ -49,7 +49,7 @@ public class ConstantMergePolicy implements ILSMMergePolicy { } else if (immutableComponents.size() >= numComponents) { ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null); + accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); } } @@ -106,7 +106,7 @@ public class ConstantMergePolicy implements ILSMMergePolicy { } ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null); + accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); return true; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index b93d943..2f65b18 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -203,13 +203,13 @@ public class ExternalIndexHarness extends LSMHarness { } @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { callback.afterFinalize(LSMOperationType.MERGE, null); - return null; + return; } - return lsmIndex.scheduleMerge(ctx, callback); + lsmIndex.scheduleMerge(ctx, callback); } @Override @@ -297,10 +297,9 @@ public class ExternalIndexHarness extends LSMHarness { } @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { callback.afterFinalize(LSMOperationType.FLUSH, null); - return null; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java index 750c690..7b7f950 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java @@ -18,7 +18,6 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; -import java.util.List; import java.util.Objects; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -31,13 +30,14 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; public class FlushOperation extends AbstractIoOperation implements Comparable { public FlushOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier, List dependingOps) { - super(accessor, target, callback, indexIdentifier, dependingOps); + String indexIdentifier) { + super(accessor, target, callback, indexIdentifier); } @Override - protected void callInternal() throws HyracksDataException { + public Boolean call() throws HyracksDataException { accessor.flush(this); + return true; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index bc3a5a1..1069f8f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -500,13 +500,13 @@ public class LSMHarness implements ILSMHarness { } @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) { callback.afterFinalize(LSMOperationType.FLUSH, null); - return null; + return; } - return lsmIndex.scheduleFlush(ctx, callback); + lsmIndex.scheduleFlush(ctx, callback); } @Override @@ -519,7 +519,6 @@ public class LSMHarness implements ILSMHarness { boolean failedOperation = false; try { newComponent = lsmIndex.flush(operation); - waitForDependingOps(operation); operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { @@ -538,13 +537,13 @@ public class LSMHarness implements ILSMHarness { } @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { callback.afterFinalize(LSMOperationType.MERGE, null); - return null; + return; } - return lsmIndex.scheduleMerge(ctx, callback); + lsmIndex.scheduleMerge(ctx, callback); } @Override @@ -571,7 +570,6 @@ public class LSMHarness implements ILSMHarness { boolean failedOperation = false; try { newComponent = lsmIndex.merge(operation); - waitForDependingOps(operation); operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { @@ -756,32 +754,6 @@ public class LSMHarness implements ILSMHarness { } } - /** - * Wait for depending operations to finish. - * - * @param op - */ - private void waitForDependingOps(ILSMIOOperation op) throws HyracksDataException { - List dependingOps = op.getDependingOps(); - if (dependingOps == null) { - return; - } - for (ILSMIOOperation dependingOp : dependingOps) { - if (dependingOp != null && !dependingOp.isFinished()) { - synchronized (dependingOp) { - while (!dependingOp.isFinished()) { - try { - dependingOp.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw HyracksDataException.create(e); - } - } - } - } - } - } - @Override public void deleteComponents(ILSMIndexOperationContext ctx, Predicate predicate) throws HyracksDataException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index f008fde..c0fd443 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -135,21 +135,18 @@ public class LSMTreeIndexAccessor implements ILSMIndexAccessor { } @Override - public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List dependingOps) - throws HyracksDataException { + public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException { ctx.setOperation(IndexOperation.FLUSH); - ctx.setDependingOps(dependingOps); - return lsmHarness.scheduleFlush(ctx, callback); + lsmHarness.scheduleFlush(ctx, callback); } @Override - public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List components, - List dependingOps) throws HyracksDataException { + public void scheduleMerge(ILSMIOOperationCallback callback, List components) + throws HyracksDataException { ctx.setOperation(IndexOperation.MERGE); ctx.getComponentsToBeMerged().clear(); ctx.getComponentsToBeMerged().addAll(components); - ctx.setDependingOps(dependingOps); - return lsmHarness.scheduleMerge(ctx, callback); + lsmHarness.scheduleMerge(ctx, callback); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java index 2210fd0..c83d534 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.common.IIndexCursor; @@ -32,8 +31,8 @@ public class MergeOperation extends AbstractIoOperation { protected final IIndexCursor cursor; public MergeOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier, IIndexCursor cursor, List dependingOps) { - super(accessor, target, callback, indexIdentifier, dependingOps); + String indexIdentifier, IIndexCursor cursor) { + super(accessor, target, callback, indexIdentifier); this.cursor = cursor; } @@ -42,6 +41,12 @@ public class MergeOperation extends AbstractIoOperation { } @Override + public Boolean call() throws HyracksDataException { + accessor.merge(this); + return true; + } + + @Override public LSMIOOpertionType getIOOpertionType() { return LSMIOOpertionType.MERGE; } @@ -49,10 +54,4 @@ public class MergeOperation extends AbstractIoOperation { public IIndexCursor getCursor() { return cursor; } - - @Override - protected void callInternal() throws HyracksDataException { - accessor.merge(this); - - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java index f159232..7d7266e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java @@ -247,7 +247,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy { Collections.reverse(mergableComponents); ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents, null); + accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents); } /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java index 7cdcc52..85081a1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java @@ -59,7 +59,7 @@ public class ThreadCountingTracker implements ILSMOperationTracker { && index.hasFlushRequestForCurrentMutableComponent()) { ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java index 3e35e51..9cc8022 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java @@ -19,7 +19,6 @@ package org.apache.hyracks.storage.am.lsm.common.impls; -import java.util.List; import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -101,16 +100,6 @@ class TracedIOOperation implements ILSMIOOperation { public ILSMIndexAccessor getAccessor() { return ioOp.getAccessor(); } - - @Override - public boolean isFinished() { - return ioOp.isFinished(); - } - - @Override - public List getDependingOps() { - return ioOp.getDependingOps(); - } } class ComparableTracedIOOperation extends TracedIOOperation implements Comparable { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index 60da50a..eb3924c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -691,20 +691,17 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { return new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(getLsmHarness(), opCtx), - componentFileRefs.getInsertIndexFileReference(), - componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); + componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), + componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); } @Override protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, - LSMComponentFileReferences mergeFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getLsmHarness(), opCtx); IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(opCtx); - return new LSMInvertedIndexMergeOperation(accessor, cursor, - mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), - mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), - opCtx.getDependingOps()); + return new LSMInvertedIndexMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), + mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, + fileManager.getBaseDir().getAbsolutePath()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index 242bc83..61fc84e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -85,11 +85,9 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd } @Override - public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List dependingOps) - throws HyracksDataException { + public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException { ctx.setOperation(IndexOperation.FLUSH); - ctx.setDependingOps(dependingOps); - return lsmHarness.scheduleFlush(ctx, callback); + lsmHarness.scheduleFlush(ctx, callback); } @Override @@ -98,13 +96,12 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd } @Override - public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List components, - List dependingOps) throws HyracksDataException { + public void scheduleMerge(ILSMIOOperationCallback callback, List components) + throws HyracksDataException { ctx.setOperation(IndexOperation.MERGE); ctx.getComponentsToBeMerged().clear(); ctx.getComponentsToBeMerged().addAll(components); - ctx.setDependingOps(dependingOps); - return lsmHarness.scheduleMerge(ctx, callback); + lsmHarness.scheduleMerge(ctx, callback); } @Override @@ -119,7 +116,6 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd @Override public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException { ctx.setOperation(IndexOperation.FULL_MERGE); - ctx.setDependingOps(null); lsmHarness.scheduleFullMerge(ctx, callback); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1dc8228b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java index 30e1cec..2106f6a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java @@ -19,10 +19,7 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -33,8 +30,8 @@ public class LSMInvertedIndexFlushOperation extends FlushOperation { public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, FileReference deletedKeysBTreeFlushTarget, FileReference bloomFilterFlushTarget, - ILSMIOOperationCallback callback, String indexIdentifier, List dependingOps) { - super(accessor, flushTarget, callback, indexIdentifier, dependingOps); + ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, flushTarget, callback, indexIdentifier); this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget; this.bloomFilterFlushTarget = bloomFilterFlushTarget; }