Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 89C41200D39 for ; Wed, 4 Oct 2017 07:53:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 88A281609DE; Wed, 4 Oct 2017 05:53:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 324B4160BDA for ; Wed, 4 Oct 2017 07:53:14 +0200 (CEST) Received: (qmail 96424 invoked by uid 500); 4 Oct 2017 05:53:13 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 96345 invoked by uid 99); 4 Oct 2017 05:53:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Oct 2017 05:53:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CADB8F56FF; Wed, 4 Oct 2017 05:53:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.apache.org Date: Wed, 04 Oct 2017 05:53:12 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/4] asterixdb git commit: [NO ISSUE][STO] Component Deletes Through flushes and merges archived-at: Wed, 04 Oct 2017 05:53:16 -0000 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java new file mode 100644 index 0000000..4511f42 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.external.indexing.FilesIndexDescription; +import org.apache.asterix.external.indexing.IndexingConstants; +import org.apache.asterix.metadata.api.IResourceFactoryProvider; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.utils.IndexUtil; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.algebricks.data.ITypeTraitProvider; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; +import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtreeLocalResourceFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; +import org.apache.hyracks.storage.common.IResourceFactory; +import org.apache.hyracks.storage.common.IStorageManager; + +public class TestLsmBTreeResourceFactoryProvider implements IResourceFactoryProvider { + + public static final TestLsmBTreeResourceFactoryProvider INSTANCE = new TestLsmBTreeResourceFactoryProvider(); + + private TestLsmBTreeResourceFactoryProvider() { + } + + @Override + public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Dataset dataset, Index index, + ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, + Map mergePolicyProperties, ITypeTraits[] filterTypeTraits, + IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException { + int[] filterFields = IndexUtil.getFilterFields(dataset, index, filterTypeTraits); + int[] btreeFields = IndexUtil.getBtreeFieldsIfFiltered(dataset, index); + IStorageComponentProvider storageComponentProvider = mdProvider.getStorageComponentProvider(); + ITypeTraits[] typeTraits = getTypeTraits(mdProvider, dataset, index, recordType, metaType); + IBinaryComparatorFactory[] cmpFactories = getCmpFactories(mdProvider, dataset, index, recordType, metaType); + int[] bloomFilterFields = getBloomFilterFields(dataset, index); + boolean durable = !dataset.isTemp(); + double bloomFilterFalsePositiveRate = mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate(); + ILSMOperationTrackerFactory opTrackerFactory = dataset.getIndexOperationTrackerFactory(index); + ILSMIOOperationCallbackFactory ioOpCallbackFactory = dataset.getIoOperationCallbackFactory(index); + IStorageManager storageManager = storageComponentProvider.getStorageManager(); + IMetadataPageManagerFactory metadataPageManagerFactory = + storageComponentProvider.getMetadataPageManagerFactory(); + ILSMIOOperationSchedulerProvider ioSchedulerProvider = + storageComponentProvider.getIoOperationSchedulerProvider(); + AsterixVirtualBufferCacheProvider vbcProvider = new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()); + return new TestLsmBtreeLocalResourceFactory(storageManager, typeTraits, cmpFactories, filterTypeTraits, + filterCmpFactories, filterFields, opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, + vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable, bloomFilterFields, + bloomFilterFalsePositiveRate, index.isPrimaryIndex(), btreeFields); + } + + private static ITypeTraits[] getTypeTraits(MetadataProvider metadataProvider, Dataset dataset, Index index, + ARecordType recordType, ARecordType metaType) throws AlgebricksException { + ITypeTraits[] primaryTypeTraits = dataset.getPrimaryTypeTraits(metadataProvider, recordType, metaType); + if (index.isPrimaryIndex()) { + return primaryTypeTraits; + } else if (dataset.getDatasetType() == DatasetType.EXTERNAL + && index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) { + return FilesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS; + } + int numPrimaryKeys = dataset.getPrimaryKeys().size(); + int numSecondaryKeys = index.getKeyFieldNames().size(); + ITypeTraitProvider typeTraitProvider = metadataProvider.getStorageComponentProvider().getTypeTraitProvider(); + ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys]; + for (int i = 0; i < numSecondaryKeys; i++) { + ARecordType sourceType; + List keySourceIndicators = index.getKeyFieldSourceIndicators(); + if (keySourceIndicators == null || keySourceIndicators.get(i) == 0) { + sourceType = recordType; + } else { + sourceType = metaType; + } + Pair keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(i), + index.getKeyFieldNames().get(i), sourceType); + IAType keyType = keyTypePair.first; + secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType); + } + // Add serializers and comparators for primary index fields. + for (int i = 0; i < numPrimaryKeys; i++) { + secondaryTypeTraits[numSecondaryKeys + i] = primaryTypeTraits[i]; + } + return secondaryTypeTraits; + } + + private static IBinaryComparatorFactory[] getCmpFactories(MetadataProvider metadataProvider, Dataset dataset, + Index index, ARecordType recordType, ARecordType metaType) throws AlgebricksException { + IBinaryComparatorFactory[] primaryCmpFactories = + dataset.getPrimaryComparatorFactories(metadataProvider, recordType, metaType); + if (index.isPrimaryIndex()) { + return dataset.getPrimaryComparatorFactories(metadataProvider, recordType, metaType); + } else if (dataset.getDatasetType() == DatasetType.EXTERNAL + && index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) { + return FilesIndexDescription.FILES_INDEX_COMP_FACTORIES; + } + int numPrimaryKeys = dataset.getPrimaryKeys().size(); + int numSecondaryKeys = index.getKeyFieldNames().size(); + IBinaryComparatorFactoryProvider cmpFactoryProvider = + metadataProvider.getStorageComponentProvider().getComparatorFactoryProvider(); + IBinaryComparatorFactory[] secondaryCmpFactories = + new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys]; + for (int i = 0; i < numSecondaryKeys; i++) { + ARecordType sourceType; + List keySourceIndicators = index.getKeyFieldSourceIndicators(); + if (keySourceIndicators == null || keySourceIndicators.get(i) == 0) { + sourceType = recordType; + } else { + sourceType = metaType; + } + Pair keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(i), + index.getKeyFieldNames().get(i), sourceType); + IAType keyType = keyTypePair.first; + secondaryCmpFactories[i] = cmpFactoryProvider.getBinaryComparatorFactory(keyType, true); + } + // Add serializers and comparators for primary index fields. + for (int i = 0; i < numPrimaryKeys; i++) { + secondaryCmpFactories[numSecondaryKeys + i] = primaryCmpFactories[i]; + } + return secondaryCmpFactories; + } + + private static int[] getBloomFilterFields(Dataset dataset, Index index) throws AlgebricksException { + if (index.isPrimaryIndex()) { + return dataset.getPrimaryBloomFilterFields(); + } else if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + if (index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) { + return FilesIndexDescription.BLOOM_FILTER_FIELDS; + } else { + return new int[] { index.getKeyFieldNames().size() }; + } + } + int numKeys = index.getKeyFieldNames().size(); + int[] bloomFilterKeyFields = new int[numKeys]; + for (int i = 0; i < numKeys; i++) { + bloomFilterKeyFields[i] = i; + } + return bloomFilterKeyFields; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java new file mode 100644 index 0000000..1bec41e --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent; + +public class TestLsmBtreeIoOpCallbackFactory implements ILSMIOOperationCallbackFactory { + + private static final long serialVersionUID = 1L; + + public static TestLsmBtreeIoOpCallbackFactory INSTANCE = new TestLsmBtreeIoOpCallbackFactory(); + private static volatile int completedFlushes = 0; + private static volatile int completedMerges = 0; + private static volatile int rollbackFlushes = 0; + private static volatile int rollbackMerges = 0; + private static volatile int failedFlushes = 0; + private static volatile int failedMerges = 0; + + private TestLsmBtreeIoOpCallbackFactory() { + } + + @Override + public synchronized ILSMIOOperationCallback createIoOpCallback() { + completedFlushes = 0; + completedMerges = 0; + rollbackFlushes = 0; + rollbackMerges = 0; + // Whenever this is called, it resets the counter + // However, the counters for the failed operations are never reset since we expect them + // To be always 0 + return new TestLsmBtreeIoOpCallback(); + } + + public int getTotalFlushes() { + return completedFlushes + rollbackFlushes; + } + + public int getTotalMerges() { + return completedMerges + rollbackMerges; + } + + public int getTotalIoOps() { + return getTotalFlushes() + getTotalMerges(); + } + + public int getRollbackFlushes() { + return rollbackFlushes; + } + + public int getRollbackMerges() { + return rollbackMerges; + } + + public int getCompletedFlushes() { + return completedFlushes; + } + + public int getCompletedMerges() { + return completedMerges; + } + + public static int getFailedFlushes() { + return failedFlushes; + } + + public static int getFailedMerges() { + return failedMerges; + } + + public class TestLsmBtreeIoOpCallback extends LSMBTreeIOOperationCallback { + @Override + public void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) { + super.afterFinalize(opType, newComponent); + synchronized (INSTANCE) { + if (newComponent != null) { + if (newComponent == EmptyComponent.INSTANCE) { + if (opType == LSMOperationType.FLUSH) { + rollbackFlushes++; + } else { + rollbackMerges++; + } + } else { + if (opType == LSMOperationType.FLUSH) { + completedFlushes++; + } else { + completedMerges++; + } + } + } else { + recordFailure(opType); + } + INSTANCE.notifyAll(); + } + } + + private void recordFailure(LSMOperationType opType) { + if (opType == LSMOperationType.FLUSH) { + failedFlushes++; + } else { + failedMerges++; + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index 5bb9d49..a20e660 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -77,7 +77,7 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { @Override protected boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException { - List immutableComponents = new ArrayList<>(index.getImmutableComponents()); + List immutableComponents = new ArrayList<>(index.getDiskComponents()); // Reverse the components order so that we look at components from oldest to newest. Collections.reverse(immutableComponents); @@ -110,7 +110,7 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { for (IndexInfo info : indexInfos) { ILSMIndex lsmIndex = info.getIndex(); - List immutableComponents = new ArrayList<>(lsmIndex.getImmutableComponents()); + List immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents()); if (isMergeOngoing(immutableComponents)) { continue; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java index f357aea..ba0f928 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java @@ -34,7 +34,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId; -import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil; +import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; // A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback { @@ -100,9 +100,9 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC } } - public void putLSNIntoMetadata(ILSMDiskComponent index, List oldComponents) + public void putLSNIntoMetadata(ILSMDiskComponent newComponent, List oldComponents) throws HyracksDataException { - index.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents))); + newComponent.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents))); } public static long getTreeIndexLSN(DiskComponentMetadata md) throws HyracksDataException { @@ -188,10 +188,11 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC putLSNIntoMetadata(newComponent, oldComponents); putComponentIdIntoMetadata(newComponent, oldComponents); if (opType == LSMOperationType.MERGE) { - LongPointable markerLsn = LongPointable.FACTORY - .createPointable(ComponentMetadataUtil.getLong(oldComponents.get(0).getMetadata(), - ComponentMetadataUtil.MARKER_LSN_KEY, ComponentMetadataUtil.NOT_FOUND)); - newComponent.getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY, markerLsn); + // In case of merge, oldComponents are never null + LongPointable markerLsn = + LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(), + ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND)); + newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java index bbe2c4f..abe474b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java @@ -26,7 +26,7 @@ import org.apache.hyracks.data.std.primitive.LongPointable; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; -import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil; +import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; /** * A basic callback used to write marker to transaction logs @@ -52,17 +52,17 @@ public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback { private long getLsn() { long lsn; try { - lsn = ComponentMetadataUtil.getLong(index.getCurrentMemoryComponent().getMetadata(), - ComponentMetadataUtil.MARKER_LSN_KEY, ComponentMetadataUtil.NOT_FOUND); + lsn = ComponentUtils.getLong(index.getCurrentMemoryComponent().getMetadata(), ComponentUtils.MARKER_LSN_KEY, + ComponentUtils.NOT_FOUND); } catch (HyracksDataException e) { // Should never happen since this is a memory component throw new IllegalStateException(e); } - if (lsn == ComponentMetadataUtil.NOT_FOUND) { + if (lsn == ComponentUtils.NOT_FOUND) { synchronized (index.getOperationTracker()) { // look for it in previous memory component if exists lsn = lsnFromImmutableMemoryComponents(); - if (lsn == ComponentMetadataUtil.NOT_FOUND) { + if (lsn == ComponentUtils.NOT_FOUND) { // look for it in disk component lsn = lsnFromDiskComponents(); } @@ -72,26 +72,26 @@ public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback { } private long lsnFromDiskComponents() { - List diskComponents = index.getImmutableComponents(); + List diskComponents = index.getDiskComponents(); for (ILSMDiskComponent c : diskComponents) { try { - long lsn = ComponentMetadataUtil.getLong(c.getMetadata(), ComponentMetadataUtil.MARKER_LSN_KEY, - ComponentMetadataUtil.NOT_FOUND); - if (lsn != ComponentMetadataUtil.NOT_FOUND) { + long lsn = ComponentUtils.getLong(c.getMetadata(), ComponentUtils.MARKER_LSN_KEY, + ComponentUtils.NOT_FOUND); + if (lsn != ComponentUtils.NOT_FOUND) { return lsn; } } catch (HyracksDataException e) { throw new IllegalStateException("Unable to read metadata page. Disk Error?", e); } } - return ComponentMetadataUtil.NOT_FOUND; + return ComponentUtils.NOT_FOUND; } private long lsnFromImmutableMemoryComponents() { List memComponents = index.getMemoryComponents(); int numOtherMemComponents = memComponents.size() - 1; int next = index.getCurrentMemoryComponentIndex(); - long lsn = ComponentMetadataUtil.NOT_FOUND; + long lsn = ComponentUtils.NOT_FOUND; for (int i = 0; i < numOtherMemComponents; i++) { next = next - 1; if (next < 0) { @@ -100,13 +100,13 @@ public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback { ILSMMemoryComponent c = index.getMemoryComponents().get(next); if (c.isReadable()) { try { - lsn = ComponentMetadataUtil.getLong(c.getMetadata(), ComponentMetadataUtil.MARKER_LSN_KEY, - ComponentMetadataUtil.NOT_FOUND); + lsn = ComponentUtils.getLong(c.getMetadata(), ComponentUtils.MARKER_LSN_KEY, + ComponentUtils.NOT_FOUND); } catch (HyracksDataException e) { // Should never happen since this is a memory component throw new IllegalStateException(e); } - if (lsn != ComponentMetadataUtil.NOT_FOUND) { + if (lsn != ComponentUtils.NOT_FOUND) { return lsn; } } @@ -117,7 +117,7 @@ public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback { @Override public void after(long lsn) { pointable.setLong(lsn); - index.getCurrentMemoryComponent().getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY, pointable); + index.getCurrentMemoryComponent().getMetadata().put(ComponentUtils.MARKER_LSN_KEY, pointable); } public ILSMIndex getIndex() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java index c18ecc2..9f071bb 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java @@ -218,7 +218,7 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { } ILSMIndex index = Mockito.mock(ILSMIndex.class); - Mockito.when(index.getImmutableComponents()).thenReturn(components); + Mockito.when(index.getDiskComponents()).thenReturn(components); ILSMIndexAccessor accessor = Mockito.mock(ILSMIndexAccessor.class); Mockito.doAnswer(new Answer() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java index 2eb84fb..12f49a8 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java @@ -92,7 +92,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI super.open(); primaryIndexHelper.open(); primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance(); - diskComponents = new ILSMDiskComponent[primaryIndex.getImmutableComponents().size()]; + diskComponents = new ILSMDiskComponent[primaryIndex.getDiskComponents().size()]; secondaryIndexHelper.open(); secondaryIndex = (ILSMIndex) secondaryIndexHelper.getIndexInstance(); @@ -221,7 +221,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI } private void activateComponents() throws HyracksDataException { - List primaryComponents = primaryIndex.getImmutableComponents(); + List primaryComponents = primaryIndex.getDiskComponents(); for (int i = diskComponents.length - 1; i >= 0; i--) { // start from the oldest component to the newest component if (diskComponents[i] != null && diskComponents[i].getComponentSize() > 0) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index 9c9fd17..75e193b 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -965,6 +965,12 @@ test-jar + org.apache.hyracks + hyracks-storage-am-lsm-btree-test + ${hyracks.version} + test-jar + + commons-io commons-io 2.5 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index ef3f13b..a071345 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -129,6 +129,10 @@ public class ErrorCode { public static final int JOB_HAS_NOT_BEEN_CREATED_YET = 93; public static final int CANNOT_READ_CLOSED_FILE = 94; public static final int TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME = 95; + public static final int ILLEGAL_ATTEMPT_TO_ENTER_EMPTY_COMPONENT = 96; + public static final int ILLEGAL_ATTEMPT_TO_EXIT_EMPTY_COMPONENT = 97; + public static final int A_FLUSH_OPERATION_HAS_FAILED = 98; + public static final int A_MERGE_OPERATION_HAS_FAILED = 99; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 40df3d8..ae579b4 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -112,5 +112,9 @@ 93 = Job %1$s has not been created yet 94 = Cannot read closed file (%1$s) 95 = Tuple of size %1$s cannot fit into an empty frame +96 = Illegal attempt to enter empty component +97 = Illegal attempt to exit empty component +98 = A flush operation has failed +99 = A merge operation has failed 10000 = The given rule collection %1$s is not an instance of the List class. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java index 71a3f71..ff47d27 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java @@ -33,5 +33,7 @@ public enum IndexOperation { FULL_MERGE, FLUSH, REPLICATE, - DISK_COMPONENT_SCAN + DISK_COMPONENT_SCAN, + DELETE_MEMORY_COMPONENT, + DELETE_DISK_COMPONENTS } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index 86d926f..3775985 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -141,7 +141,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { // is needed. // It only needs to return the newer list @Override - public List getImmutableComponents() { + public List getDiskComponents() { if (version == 0) { return diskComponents; } else if (version == 1) { @@ -195,7 +195,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName()); ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); - ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, + ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath())); } @@ -376,7 +376,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { // Not supported @Override - public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException { throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-BTree"); } @@ -465,7 +465,7 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { if (isTransaction) { // Since this is a transaction component, validate and // deactivate. it could later be added or deleted - markAsValid(component); + component.markAsValid(durable); BTree btree = ((LSMBTreeDiskComponent) component).getBTree(); BloomFilter bloomFilter = ((LSMBTreeDiskComponent) component).getBloomFilter(); btree.deactivate(); @@ -506,11 +506,6 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { } } - @Override - public String toString() { - return "LSMTwoPCBTree [" + fileManager.getBaseDir() + "]"; - } - // The accessor for disk only indexes don't use modification callback and always carry the target index version with them @Override public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index 7462c7a..ff17905 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -325,7 +325,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd } @Override - public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException { throw HyracksDataException.create(ErrorCode.FLUSH_NOT_SUPPORTED_IN_EXTERNAL_INDEX); } @@ -364,10 +364,10 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd .get(secondDiskComponents.size() - 1); } - ioScheduler.scheduleOperation(new LSMBTreeWithBuddyMergeOperation(accessor, mergingComponents, cursor, - relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(), - relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), - keepDeleteTuples)); + ioScheduler.scheduleOperation( + new LSMBTreeWithBuddyMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), + relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), + callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples)); } // This method creates the appropriate opContext for the targeted version @@ -378,12 +378,11 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd } @Override - public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException { LSMBTreeWithBuddyMergeOperation mergeOp = (LSMBTreeWithBuddyMergeOperation) operation; IIndexCursor cursor = mergeOp.getCursor(); ISearchPredicate btreeSearchPred = new RangePredicate(null, null, true, true, null, null); ILSMIndexOperationContext opCtx = ((LSMBTreeWithBuddySortedCursor) cursor).getOpCtx(); - opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents()); search(opCtx, cursor, btreeSearchPred); LSMBTreeWithBuddyDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(), @@ -477,15 +476,6 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd } } - @Override - public void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException { - LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) lsmComponent; - // Flush the bloom filter first. - markAsValidInternal(component.getBTree().getBufferCache(), component.getBloomFilter()); - markAsValidInternal(component.getBTree()); - markAsValidInternal(component.getBuddyBTree()); - } - // This function is used when a new component is to be committed -- is // called by the harness. @Override @@ -629,7 +619,7 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd if (isTransaction) { // Since this is a transaction component, validate and // deactivate. it could later be added or deleted - markAsValid(component); + component.markAsValid(durable); BTree btree = ((LSMBTreeWithBuddyDiskComponent) component).getBTree(); BTree buddyBtree = ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree(); BloomFilter bloomFilter = ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter(); @@ -816,15 +806,14 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd @Override protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx, - ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) + throws HyracksDataException { return null; } @Override protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, - List mergingComponents, LSMComponentFileReferences mergeFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { return null; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index 1920592..a0d5a22 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -64,7 +64,6 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFilterManager; -import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor.ICursorFactory; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; @@ -295,7 +294,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { } @Override - public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException { LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation; LSMBTreeMemoryComponent flushingComponent = (LSMBTreeMemoryComponent) flushOp.getFlushingComponent(); IIndexAccessor accessor = flushingComponent.getBTree().createAccessor(NoOpOperationCallback.INSTANCE, @@ -361,15 +360,12 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { } @Override - public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException { + public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException { LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation; IIndexCursor cursor = mergeOp.getCursor(); RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null); - ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) cursor).getOpCtx(); - opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents()); - search(opCtx, cursor, rangePred); + search(mergeOp.getAccessor().getOpContext(), cursor, rangePred); List mergedComponents = mergeOp.getMergingComponents(); - long numElements = 0L; if (hasBloomFilter) { //count elements in btree for creating Bloomfilter @@ -400,9 +396,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples); getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getBTree()); } - componentBulkLoader.end(); - return mergedComponent; } @@ -464,22 +458,10 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { } @Override - public void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException { - // The order of forcing the dirty page to be flushed is critical. The - // bloom filter must be always done first. - LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) lsmComponent; - if (hasBloomFilter) { - markAsValidInternal(component.getBTree().getBufferCache(), component.getBloomFilter()); - } - markAsValidInternal(component.getBTree()); - } - - @Override protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx, - ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs, - ILSMIOOperationCallback callback) { + LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) { ILSMIndexAccessor accessor = createAccessor(opCtx); - return new LSMBTreeFlushOperation(accessor, flushingComponent, componentFileRefs.getInsertIndexFileReference(), + return new LSMBTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); } @@ -552,11 +534,6 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { } @Override - public String toString() { - return "LSMBTree [" + fileManager.getBaseDir() + "]"; - } - - @Override public Set getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) { Set files = new HashSet<>(); LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) lsmComponent; @@ -625,16 +602,15 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { @Override protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, - List mergingComponents, LSMComponentFileReferences mergeFileRefs, - ILSMIOOperationCallback callback) { + LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) { boolean returnDeletedTuples = false; ILSMIndexAccessor accessor = createAccessor(opCtx); + List mergingComponents = opCtx.getComponentHolder(); if (mergingComponents.get(mergingComponents.size() - 1) != diskComponents.get(diskComponents.size() - 1)) { returnDeletedTuples = true; } ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples); - return new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, - mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, - fileManager.getBaseDir().getAbsolutePath()); + return new LSMBTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), + mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java index 33bb60e..4d60ab4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java @@ -24,6 +24,7 @@ import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent { private final BTree btree; @@ -68,4 +69,14 @@ public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent { public String toString() { return getClass().getSimpleName() + ":" + btree.getFileReference().getRelativePath(); } + + @Override + public void markAsValid(boolean persist) throws HyracksDataException { + // The order of forcing the dirty page to be flushed is critical. + // The bloom filter must be always done first. + if (bloomFilter != null && persist) { + ComponentUtils.markAsValid(btree.getBufferCache(), bloomFilter, persist); + } + ComponentUtils.markAsValid(btree, persist); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java index 4a06778..e3424e5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java @@ -21,16 +21,14 @@ package org.apache.hyracks.storage.am.lsm.btree.impls; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; public class LSMBTreeFlushOperation extends FlushOperation { private final FileReference bloomFilterFlushTarget; - public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, ILSMMemoryComponent flushingComponent, - FileReference flushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, - String indexIdentifier) { - super(accessor, flushingComponent, flushTarget, callback, indexIdentifier); + public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, + FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, flushTarget, callback, indexIdentifier); this.bloomFilterFlushTarget = bloomFilterFlushTarget; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java index 0cc76f2..ec96303 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java @@ -19,11 +19,8 @@ package org.apache.hyracks.storage.am.lsm.btree.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -32,10 +29,9 @@ public class LSMBTreeMergeOperation extends MergeOperation { private final FileReference bloomFilterMergeTarget; - public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, List mergingComponents, - ITreeIndexCursor cursor, FileReference target, FileReference bloomFilterMergeTarget, - ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, target, callback, indexIdentifier, mergingComponents, cursor); + public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, + FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, target, callback, indexIdentifier, cursor); this.bloomFilterMergeTarget = bloomFilterMergeTarget; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java index 0ba7c30..4aee950 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java @@ -24,6 +24,7 @@ import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; public class LSMBTreeWithBuddyDiskComponent extends AbstractLSMDiskComponent { @@ -78,4 +79,11 @@ public class LSMBTreeWithBuddyDiskComponent extends AbstractLSMDiskComponent { public String toString() { return getClass().getSimpleName() + ":" + btree.getFileReference().getRelativePath(); } + + @Override + public void markAsValid(boolean persist) throws HyracksDataException { + ComponentUtils.markAsValid(btree.getBufferCache(), bloomFilter, persist); + ComponentUtils.markAsValid(btree, persist); + ComponentUtils.markAsValid(buddyBtree, persist); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java index 2817f3a..f682bde 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java @@ -18,11 +18,8 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -33,11 +30,10 @@ public class LSMBTreeWithBuddyMergeOperation extends MergeOperation { private final FileReference bloomFilterMergeTarget; private final boolean keepDeletedTuples; - public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, List mergingComponents, - ITreeIndexCursor cursor, FileReference target, FileReference buddyBtreeMergeTarget, - FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier, - boolean keepDeletedTuples) { - super(accessor, target, callback, indexIdentifier, mergingComponents, cursor); + public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, + FileReference buddyBtreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, + String indexIdentifier, boolean keepDeletedTuples) { + super(accessor, target, callback, indexIdentifier, cursor); this.buddyBtreeMergeTarget = buddyBtreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; this.keepDeletedTuples = keepDeletedTuples; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java index aed641d..b101315 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java @@ -22,6 +22,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; public interface ILSMDiskComponent extends ILSMComponent { + @Override default LSMComponentType getType() { return LSMComponentType.DISK; @@ -49,9 +50,19 @@ public interface ILSMDiskComponent extends ILSMComponent { /** * Return the component Id of this disk component from its metadata + * * @return * @throws HyracksDataException */ ILSMDiskComponentId getComponentId() throws HyracksDataException; + /** + * Mark the component as valid + * + * @param persist + * whether the call should force data to disk before returning + * @throws HyracksDataException + */ + void markAsValid(boolean persist) throws HyracksDataException; + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index c0a3f2d..89c8cb9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -19,6 +19,7 @@ package org.apache.hyracks.storage.am.lsm.common.api; import java.util.List; +import java.util.function.Predicate; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; @@ -242,4 +243,14 @@ public interface ILSMHarness { */ void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException; + + /** + * Rollback components that match the passed predicate + * + * @param ctx + * @param predicate + * @throws HyracksDataException + */ + void deleteComponents(ILSMIndexOperationContext ctx, Predicate predicate) + throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java index a13f82c..c2ae786 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java @@ -58,9 +58,12 @@ public interface ILSMIOOperation extends Callable { Boolean call() throws HyracksDataException; /** - * The target of the io operation - * - * @return + * @return The target of the io operation */ FileReference getTarget(); + + /** + * @return the accessor of the operation + */ + ILSMIndexAccessor getAccessor(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java index ae2c93d..addeb27 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java @@ -57,7 +57,7 @@ public interface ILSMIndex extends IIndex { /** * components with lower indexes are newer than components with higher index */ - List getImmutableComponents(); + List getDiskComponents(); boolean isPrimaryIndex(); @@ -99,15 +99,6 @@ public interface ILSMIndex extends IIndex { void addInactiveDiskComponent(ILSMDiskComponent diskComponent); - /** - * Persist the LSM component - * - * @param lsmComponent - * , the component to be persistent - * @throws HyracksDataException - */ - void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException; - boolean isCurrentMutableComponentEmpty() throws HyracksDataException; void scheduleReplication(ILSMIndexOperationContext ctx, List diskComponents, boolean bulkload, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java index 1042df2..b8d64af 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java @@ -19,6 +19,7 @@ package org.apache.hyracks.storage.am.lsm.common.api; import java.util.List; +import java.util.function.Predicate; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; @@ -36,6 +37,11 @@ import org.apache.hyracks.storage.common.IIndexCursor; public interface ILSMIndexAccessor extends IIndexAccessor { /** + * @return the operation context associated with the accessor + */ + ILSMIndexOperationContext getOpContext(); + + /** * Schedule a flush operation * * @param callback @@ -245,4 +251,13 @@ public interface ILSMIndexAccessor extends IIndexAccessor { * If the BufferCache throws while un/pinning or un/latching. */ void scanDiskComponents(IIndexCursor cursor) throws HyracksDataException; + + /** + * Delete components that match the passed predicate + * NOTE: This call can only be made when the caller knows that data modification has been stopped + * + * @param filter + * @throws HyracksDataException + */ + void deleteComponents(Predicate predicate) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java index 6e51f83..aee46f0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java @@ -54,6 +54,7 @@ public abstract class AbstractIoOperation implements ILSMIOOperation { return target; } + @Override public ILSMIndexAccessor getAccessor() { return accessor; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java index 64b8fec..4386d52 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java @@ -24,7 +24,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; -import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil; +import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent { @@ -103,9 +103,9 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl @Override public ILSMDiskComponentId getComponentId() throws HyracksDataException { - long minID = ComponentMetadataUtil.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY, + long minID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY, ILSMDiskComponentId.NOT_FOUND); - long maxID = ComponentMetadataUtil.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY, + long maxID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY, ILSMDiskComponentId.NOT_FOUND); //TODO: do we need to throw an exception when ID is not found? return new LSMDiskComponentId(minID, maxID); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index fe6d20f..c471cfb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -35,8 +35,6 @@ import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType; import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; -import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.common.impls.AbstractSearchPredicate; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; @@ -191,15 +189,14 @@ public abstract class AbstractLSMIndex implements ILSMIndex { throw HyracksDataException.create(ErrorCode.CANNOT_DEACTIVATE_INACTIVE_INDEX); } if (flush) { - flushMemoryComponents(); + flushMemoryComponent(); } deactivateDiskComponents(); deactivateMemoryComponents(); isActive = false; } - // What if more than one memory component needs flushing?? - protected void flushMemoryComponents() throws HyracksDataException { + protected void flushMemoryComponent() throws HyracksDataException { BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback); ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleFlush(cb); @@ -278,6 +275,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { case UPDATE: case PHYSICALDELETE: case FLUSH: + case DELETE_MEMORY_COMPONENT: case DELETE: case UPSERT: operationalComponents.add(memoryComponents.get(cmc)); @@ -305,6 +303,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { break; case MERGE: + case DELETE_DISK_COMPONENTS: operationalComponents.addAll(ctx.getComponentsToBeMerged()); break; case FULL_MERGE: @@ -329,28 +328,28 @@ public abstract class AbstractLSMIndex implements ILSMIndex { @Override public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { - ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0); LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - opCtx.setOperation(IndexOperation.FLUSH); - opCtx.getComponentHolder().add(flushingComponent); - ILSMIOOperation flushOp = createFlushOperation(opCtx, flushingComponent, componentFileRefs, callback); + opCtx.setOperation(ctx.getOperation()); + opCtx.getComponentHolder().addAll(ctx.getComponentHolder()); + ILSMIOOperation flushOp = createFlushOperation(opCtx, componentFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(flushOp, tracer)); } @Override public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { + List mergingComponents = ctx.getComponentHolder(); // merge must create a different op ctx AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - opCtx.setOperation(IndexOperation.MERGE); - List mergingComponents = ctx.getComponentHolder(); + opCtx.setOperation(ctx.getOperation()); + opCtx.getComponentHolder().addAll(mergingComponents); ILSMDiskComponent firstComponent = (ILSMDiskComponent) mergingComponents.get(0); ILSMDiskComponent lastComponent = (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1); LSMComponentFileReferences mergeFileRefs = getMergeFileReferences(firstComponent, lastComponent); - ILSMIOOperation mergeOp = createMergeOperation(opCtx, mergingComponents, mergeFileRefs, callback); + ILSMIOOperation mergeOp = createMergeOperation(opCtx, mergeFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(mergeOp, tracer)); } @@ -391,28 +390,11 @@ public abstract class AbstractLSMIndex implements ILSMIndex { memoryComponentsAllocated = true; } - protected void markAsValidInternal(ITreeIndex treeIndex) throws HyracksDataException { - int fileId = treeIndex.getFileId(); - IBufferCache bufferCache = treeIndex.getBufferCache(); - treeIndex.getPageManager().close(); - // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page - // won't be flushed to disk because it won't be dirty until the write latch has been released. - // Force modified metadata page to disk. - // If the index is not durable, then the flush is not necessary. - if (durable) { - bufferCache.force(fileId, true); - } - } - - protected void markAsValidInternal(IBufferCache bufferCache, BloomFilter filter) throws HyracksDataException { - if (durable) { - bufferCache.force(filter.getFileId(), true); - } - } - @Override public void addDiskComponent(ILSMDiskComponent c) throws HyracksDataException { - diskComponents.add(0, c); + if (c != EmptyComponent.INSTANCE) { + diskComponents.add(0, c); + } } @Override @@ -420,7 +402,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex { throws HyracksDataException { int swapIndex = diskComponents.indexOf(mergedComponents.get(0)); diskComponents.removeAll(mergedComponents); - diskComponents.add(swapIndex, newComponent); + if (newComponent != EmptyComponent.INSTANCE) { + diskComponents.add(swapIndex, newComponent); + } } @Override @@ -430,7 +414,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { } @Override - public List getImmutableComponents() { + public List getDiskComponents() { return diskComponents; } @@ -477,8 +461,10 @@ public abstract class AbstractLSMIndex implements ILSMIndex { } @Override - public String toString() { - return "LSMIndex [" + fileManager.getBaseDir() + "]"; + public final String toString() { + return "{\"class\" : \"" + getClass().getSimpleName() + "\", \"dir\" : \"" + fileManager.getBaseDir() + + "\", \"memory\" : " + (memoryComponents == null ? 0 : memoryComponents.size()) + ", \"disk\" : " + + diskComponents.size() + "}"; } @Override @@ -626,6 +612,22 @@ public abstract class AbstractLSMIndex implements ILSMIndex { return size; } + @Override + public final ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException { + ILSMIndexAccessor accessor = operation.getAccessor(); + ILSMIndexOperationContext opCtx = accessor.getOpContext(); + return opCtx.getOperation() == IndexOperation.DELETE_MEMORY_COMPONENT ? EmptyComponent.INSTANCE + : doFlush(operation); + } + + @Override + public final ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException { + ILSMIndexAccessor accessor = operation.getAccessor(); + ILSMIndexOperationContext opCtx = accessor.getOpContext(); + return opCtx.getOperation() == IndexOperation.DELETE_DISK_COMPONENTS ? EmptyComponent.INSTANCE + : doMerge(operation); + } + public abstract Set getLSMComponentPhysicalFiles(ILSMComponent newComponent); protected abstract void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException; @@ -659,11 +661,13 @@ public abstract class AbstractLSMIndex implements ILSMIndex { throws HyracksDataException; protected abstract ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx, - ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException; + LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException; protected abstract ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, - List mergingComponents, LSMComponentFileReferences mergeFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException; + LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException; + + protected abstract ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException; + + protected abstract ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java index 6b9af7e..847b882 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java @@ -36,7 +36,7 @@ public class ConstantMergePolicy implements ILSMMergePolicy { @Override public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException { - List immutableComponents = index.getImmutableComponents(); + List immutableComponents = index.getDiskComponents(); if (!areComponentsMergable(immutableComponents)) { return; @@ -84,7 +84,7 @@ public class ConstantMergePolicy implements ILSMMergePolicy { * there will be no new merge either in this situation. */ - List immutableComponents = index.getImmutableComponents(); + List immutableComponents = index.getDiskComponents(); int totalImmutableComponentCount = immutableComponents.size(); // [case 1] http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java new file mode 100644 index 0000000..0134dca --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; + +public class EmptyComponent implements ILSMDiskComponent { + public static final EmptyComponent INSTANCE = new EmptyComponent(); + + private EmptyComponent() { + } + + @Override + public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) throws HyracksDataException { + throw HyracksDataException.create(ErrorCode.ILLEGAL_ATTEMPT_TO_ENTER_EMPTY_COMPONENT); + } + + @Override + public void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent) + throws HyracksDataException { + throw HyracksDataException.create(ErrorCode.ILLEGAL_ATTEMPT_TO_EXIT_EMPTY_COMPONENT); + } + + @Override + public ComponentState getState() { + return ComponentState.INACTIVE; + } + + @Override + public ILSMComponentFilter getLSMComponentFilter() { + return null; + } + + @Override + public DiskComponentMetadata getMetadata() { + return EmptyDiskComponentMetadata.INSTANCE; + } + + @Override + public long getComponentSize() { + return 0; + } + + @Override + public int getFileReferenceCount() { + return 0; + } + + @Override + public void destroy() throws HyracksDataException { + // No Op + } + + @Override + public ILSMDiskComponentId getComponentId() throws HyracksDataException { + return null; + } + + @Override + public void markAsValid(boolean persist) throws HyracksDataException { + // No Op + } + +}