Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0CE88200D45 for ; Thu, 23 Nov 2017 09:55:54 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0B594160BFE; Thu, 23 Nov 2017 08:55:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DC31F160BEF for ; Thu, 23 Nov 2017 09:55:51 +0100 (CET) Received: (qmail 31574 invoked by uid 500); 23 Nov 2017 08:55:51 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 31564 invoked by uid 99); 23 Nov 2017 08:55:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Nov 2017 08:55:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D2137E0779; Thu, 23 Nov 2017 08:55:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.apache.org Message-Id: <71ceb3f26f524732836844d87ca278d1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: asterixdb git commit: [ASTERIXDB-2161] Fix component id manage lifecycle Date: Thu, 23 Nov 2017 08:55:47 +0000 (UTC) archived-at: Thu, 23 Nov 2017 08:55:54 -0000 Repository: asterixdb Updated Branches: refs/heads/master 4dad5dfa8 -> 98b9d603e [ASTERIXDB-2161] Fix component id manage lifecycle - user model changes: no - storage format changes: no - interface changes: yes. The interface of LMSIOOperationCallback is changed Details: - The current way of management component ids is not correct, in presence of that multiple partitions sharing the same primary op tracker. It's possible when a partition is empty/being flushed, the next flush is scheduled by another partition, which will disturb the partition. This patch fixes this by using the same logic of maintaining flushed LSNs to maintain component id. - Extend recycle memory component interface to indicate whether it switches the new component or not. - Also fixes [ASTERIXDB-2168] to ensure we do not miss latest flushed LSNs by advancing io callback before finishing flush Change-Id: Ifc35184c4d431db9af71cab302439e165ee55f54 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2153 Integration-Tests: Jenkins Tested-by: Jenkins Contrib: Jenkins Reviewed-by: abdullah alamoudi Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/98b9d603 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/98b9d603 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/98b9d603 Branch: refs/heads/master Commit: 98b9d603e8531139a4668af48092e9c961ee41fb Parents: 4dad5df Author: luochen01 Authored: Wed Nov 22 18:25:22 2017 -0800 Committer: abdullah alamoudi Committed: Thu Nov 23 00:55:26 2017 -0800 ---------------------------------------------------------------------- .../test/dataflow/ComponentRollbackTest.java | 28 +- .../TestLsmBtreeIoOpCallbackFactory.java | 4 +- .../common/context/DatasetLifecycleManager.java | 29 +- .../AbstractLSMIOOperationCallback.java | 107 +++++--- .../AbstractLSMIOOperationCallbackTest.java | 267 +++++++++++++++++++ .../LSMBTreeIOOperationCallbackTest.java | 67 +---- ...SMBTreeWithBuddyIOOperationCallbackTest.java | 67 +---- ...LSMInvertedIndexIOOperationCallbackTest.java | 67 +---- .../LSMRTreeIOOperationCallbackTest.java | 67 +---- .../lsm/common/api/ILSMIOOperationCallback.java | 3 +- .../impls/AbstractLSMMemoryComponent.java | 4 +- .../BlockingIOOperationCallbackWrapper.java | 4 +- .../storage/am/lsm/common/impls/LSMHarness.java | 2 +- .../impls/NoOpIOOperationCallbackFactory.java | 2 +- .../common/impls/StubIOOperationCallback.java | 2 +- 15 files changed, 414 insertions(+), 306 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index 70436b5..a239210 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -135,10 +135,11 @@ public class ComponentRollbackTest { List> partitioningKeys = new ArrayList<>(); partitioningKeys.add(Collections.singletonList("key")); int partition = 0; - dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, - NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, - partitioningKeys, null, null, null, false, null), - null, DatasetType.INTERNAL, DATASET_ID, 0); + dataset = + new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, + NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, + partitioningKeys, null, null, null, false, null), + null, DatasetType.INTERNAL, DATASET_ID, 0); PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, partition); IndexDataflowHelperFactory iHelperFactory = @@ -201,12 +202,17 @@ public class ComponentRollbackTest { Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents(memoryComponentsPredicate); searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT); // rollback the last disk component lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); + + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT)); @@ -249,6 +255,9 @@ public class ComponentRollbackTest { Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); + + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents(memoryComponentsPredicate); searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT); @@ -273,6 +282,9 @@ public class ComponentRollbackTest { // rollback the last disk component lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); + + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT)); @@ -320,6 +332,9 @@ public class ComponentRollbackTest { firstSearcher.waitUntilEntered(); // now that we enetered, we will rollback ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); + + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents( c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified())); @@ -338,6 +353,9 @@ public class ComponentRollbackTest { secondSearcher.waitUntilEntered(); lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); + + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); // now that the rollback has completed, we will unblock the search @@ -745,6 +763,8 @@ public class ComponentRollbackTest { public void run() { ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); try { + dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); lsmAccessor.deleteComponents(predicate); } catch (HyracksDataException e) { failure = e; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java index 44967e3..ddcb5b5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java @@ -20,7 +20,6 @@ package org.apache.asterix.test.dataflow; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; @@ -98,8 +97,7 @@ public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallback } @Override - public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) - throws HyracksDataException { + public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) { super.afterFinalize(opType, newComponent); synchronized (TestLsmBtreeIoOpCallbackFactory.this) { if (newComponent != null) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index fe64d3b..8002895 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -362,9 +362,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC for (IndexInfo iInfo : dsr.getIndexes().values()) { AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); - if (!(iInfo.getIndex().isCurrentMutableComponentEmpty() - || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated() - || opTracker.isFlushOnExit())) { + if (!(iInfo.getIndex().isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush() + || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) { long firstLSN = ioCallback.getFirstLSN(); if (firstLSN < targetLSN) { if (LOGGER.isLoggable(Level.INFO)) { @@ -387,7 +386,15 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled */ private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException { - if (!dsInfo.isExternal() && dsInfo.isDurable()) { + if (dsInfo.isExternal()) { + // no memory components for external dataset + return; + } + + ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID()); + idGenerator.refresh(); + + if (dsInfo.isDurable()) { synchronized (logRecord) { TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null, logManager.getNodeId(), dsInfo.getIndexes().size()); @@ -404,16 +411,14 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC throw new HyracksDataException(e); } } - for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - //update resource lsn - AbstractLSMIOOperationCallback ioOpCallback = - (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); - ioOpCallback.updateLastLSN(logRecord.getLSN()); - } } - ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID()); - idGenerator.refresh(); + for (IndexInfo iInfo : dsInfo.getIndexes().values()) { + //update resource lsn + AbstractLSMIOOperationCallback ioOpCallback = + (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); + ioOpCallback.updateLastLSN(logRecord.getLSN()); + } if (asyncFlush) { for (IndexInfo iInfo : dsInfo.getIndexes().values()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java index c33e2d1..1432f25 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java @@ -34,7 +34,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; -import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent; import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils; @@ -54,6 +53,12 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC protected int readIndex; // Index of the currently being written to component protected int writeIndex; + // Index of the memory component to be recycled + protected int recycleIndex; + // Indicates whether this index has been scheduled to flush (no matter whether succeeds or not) + protected boolean hasFlushed; + // Keep track of the component Id of the next component being activated. + protected ILSMComponentId[] nextComponentIds; protected final ILSMComponentIdGenerator idGenerator; @@ -66,6 +71,12 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC flushRequested = new boolean[count]; readIndex = 0; writeIndex = 0; + recycleIndex = 0; + hasFlushed = false; + nextComponentIds = new ILSMComponentId[count]; + if (count > 0) { + nextComponentIds[0] = idGenerator.getId(); + } } @Override @@ -84,33 +95,50 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC if (writeIndex != readIndex) { firstLSNs[writeIndex] = mutableLastLSNs[writeIndex]; } + } } } @Override - public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException { - // The operation was complete and the next I/O operation for the LSM index didn't start yet - if (opType == LSMIOOperationType.FLUSH && newComponent != null) { - synchronized (this) { - flushRequested[readIndex] = false; - // if the component which just finished flushing is the component that will be modified next, - // we set its first LSN to its previous LSN - if (readIndex == writeIndex) { - firstLSNs[writeIndex] = mutableLastLSNs[writeIndex]; + public void afterOperation(LSMIOOperationType opType, List oldComponents, + ILSMDiskComponent newComponent) throws HyracksDataException { + //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here + if (newComponent != null) { + putLSNIntoMetadata(newComponent, oldComponents); + putComponentIdIntoMetadata(opType, newComponent, oldComponents); + if (opType == LSMIOOperationType.MERGE) { + // In case of merge, oldComponents are never null + LongPointable markerLsn = + LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(), + ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND)); + newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn); + } else if (opType == LSMIOOperationType.FLUSH) { + // advance memory component indexes + synchronized (this) { + // we've already consumed the specified LSN/component id. + // Now we can advance to the next component + flushRequested[readIndex] = false; + // if the component which just finished flushing is the component that will be modified next, + // we set its first LSN to its previous LSN + if (readIndex == writeIndex) { + firstLSNs[writeIndex] = mutableLastLSNs[writeIndex]; + } + readIndex = (readIndex + 1) % mutableLastLSNs.length; } - readIndex = (readIndex + 1) % mutableLastLSNs.length; - } - if (newComponent == EmptyComponent.INSTANCE) { - // This component was just deleted, we refresh the component id, when it gets recycled, it will get - // the new id from the component id generator. - // It is assumed that the component delete caller will ensure that corresponding components in secondary - // indexes are deleted as well - idGenerator.refresh(); } } } + @Override + public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) { + // The operation was complete and the next I/O operation for the LSM index didn't start yet + if (opType == LSMIOOperationType.FLUSH) { + hasFlushed = true; + } + + } + public void putLSNIntoMetadata(ILSMDiskComponent newComponent, List oldComponents) throws HyracksDataException { newComponent.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents))); @@ -148,6 +176,13 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC //Moreover, since the memory component is already being flushed, the next scheduleFlush request must fail. //See https://issues.apache.org/jira/browse/ASTERIXDB-1917 mutableLastLSNs[writeIndex] = lastLSN; + if (hasFlushed || lsmIndex.isMemoryComponentsAllocated()) { + // we only (re)set next component id if either this index has been flushed (no matter succeed or not) + // or the memory component has been allocated + // This prevents the case where indexes in a partition are being allocated, while another partition + // tries to schedule flush + nextComponentIds[writeIndex] = idGenerator.getId(); + } } } @@ -164,7 +199,6 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC } public synchronized boolean hasPendingFlush() { - for (int i = 0; i < flushRequested.length; i++) { if (flushRequested[i]) { return true; @@ -173,23 +207,6 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC return false; } - @Override - public void afterOperation(LSMIOOperationType opType, List oldComponents, - ILSMDiskComponent newComponent) throws HyracksDataException { - //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here - if (newComponent != null) { - putLSNIntoMetadata(newComponent, oldComponents); - putComponentIdIntoMetadata(opType, newComponent, oldComponents); - if (opType == LSMIOOperationType.MERGE) { - // In case of merge, oldComponents are never null - LongPointable markerLsn = - LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(), - ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND)); - newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn); - } - } - } - public long getComponentLSN(List diskComponents) throws HyracksDataException { if (diskComponents == null) { // Implies a flush IO operation. --> moves the flush pointer @@ -208,14 +225,26 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC return maxLSN; } + private synchronized ILSMComponentId getLSMComponentId() { + return nextComponentIds[recycleIndex]; + } + @Override - public void recycled(ILSMMemoryComponent component) throws HyracksDataException { - component.resetId(idGenerator.getId()); + public void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException { + ILSMComponentId componentId = getLSMComponentId(); + component.resetId(componentId); + if (componentSwitched) { + recycleIndex = (recycleIndex + 1) % nextComponentIds.length; + } } @Override public void allocated(ILSMMemoryComponent component) throws HyracksDataException { - component.resetId(idGenerator.getId()); + if (component == lsmIndex.getCurrentMemoryComponent()) { + // only set the component id for the first (current) memory component + ILSMComponentId componentId = getLSMComponentId(); + component.resetId(componentId); + } } /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java new file mode 100644 index 0000000..0f2ea50 --- /dev/null +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.test.ioopcallbacks; + +import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; +import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent; +import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import junit.framework.TestCase; + +public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { + + @Test + public void testNormalSequence() throws HyracksDataException { + ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); + Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); + Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); + LSMBTreeIOOperationCallback callback = + new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); + + //request to flush first component + callback.updateLastLSN(1); + callback.beforeOperation(LSMIOOperationType.FLUSH); + + //request to flush second component + callback.updateLastLSN(2); + callback.beforeOperation(LSMIOOperationType.FLUSH); + + Assert.assertEquals(1, callback.getComponentLSN(null)); + callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); + callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + + Assert.assertEquals(2, callback.getComponentLSN(null)); + + callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); + callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + } + + @Test + public void testOverWrittenLSN() throws HyracksDataException { + ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); + Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); + Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); + + LSMBTreeIOOperationCallback callback = + new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); + + //request to flush first component + callback.updateLastLSN(1); + callback.beforeOperation(LSMIOOperationType.FLUSH); + + //request to flush second component + callback.updateLastLSN(2); + callback.beforeOperation(LSMIOOperationType.FLUSH); + + //request to flush first component again + //this call should fail + callback.updateLastLSN(3); + //there is no corresponding beforeOperation, since the first component is being flush + //the scheduleFlush request would fail this time + + Assert.assertEquals(1, callback.getComponentLSN(null)); + callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); + callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + + Assert.assertEquals(2, callback.getComponentLSN(null)); + callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + } + + @Test + public void testLostLSN() throws HyracksDataException { + ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); + Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); + Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); + + LSMBTreeIOOperationCallback callback = + new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); + + //request to flush first component + callback.updateLastLSN(1); + callback.beforeOperation(LSMIOOperationType.FLUSH); + + //request to flush second component + callback.updateLastLSN(2); + callback.beforeOperation(LSMIOOperationType.FLUSH); + + Assert.assertEquals(1, callback.getComponentLSN(null)); + + // the first flush is finished, but has not finalized yet (in codebase, these two calls + // are not synchronized) + callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); + + //request to flush first component again + callback.updateLastLSN(3); + + // the first flush is finalized (it may be called after afterOperation for a while) + callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + + // the second flush gets LSN 2 + Assert.assertEquals(2, callback.getComponentLSN(null)); + // the second flush is finished + callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); + callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + + // it should get new LSN 3 + Assert.assertEquals(3, callback.getComponentLSN(null)); + } + + @Test + public void testAllocateComponentId() throws HyracksDataException { + ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(); + ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); + Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); + ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); + Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); + + LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator); + + ILSMComponentId initialId = idGenerator.getId(); + // simulate a partition is flushed before allocated + idGenerator.refresh(); + callback.updateLastLSN(0); + + callback.allocated(mockComponent); + checkMemoryComponent(initialId, mockComponent); + } + + @Test + public void testRecycleComponentId() throws HyracksDataException { + ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(); + ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); + Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); + ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); + Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); + LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator); + + ILSMComponentId id = idGenerator.getId(); + callback.allocated(mockComponent); + checkMemoryComponent(id, mockComponent); + + Mockito.when(mockIndex.isMemoryComponentsAllocated()).thenReturn(true); + for (int i = 0; i < 100; i++) { + // schedule a flush + idGenerator.refresh(); + ILSMComponentId expectedId = idGenerator.getId(); + + callback.updateLastLSN(0); + callback.beforeOperation(LSMIOOperationType.FLUSH); + callback.recycled(mockComponent, true); + + callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); + callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + checkMemoryComponent(expectedId, mockComponent); + } + } + + @Test + public void testRecycleWithoutSwitch() throws HyracksDataException { + ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(); + ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); + Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); + ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); + Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); + LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator); + + ILSMComponentId id = idGenerator.getId(); + callback.allocated(mockComponent); + checkMemoryComponent(id, mockComponent); + + Mockito.when(mockIndex.isMemoryComponentsAllocated()).thenReturn(true); + + for (int i = 0; i < 10; i++) { + idGenerator.refresh(); + id = idGenerator.getId(); + callback.updateLastLSN(0); + callback.recycled(mockComponent, false); + callback.afterFinalize(LSMIOOperationType.FLUSH, null); + checkMemoryComponent(id, mockComponent); + } + } + + @Test + public void testConcurrentRecycleComponentId() throws HyracksDataException { + ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(); + ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); + ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); + Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); + Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); + LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator); + + ILSMComponentId id = idGenerator.getId(); + callback.allocated(mockComponent); + checkMemoryComponent(id, mockComponent); + + Mockito.when(mockIndex.isMemoryComponentsAllocated()).thenReturn(true); + + // schedule a flush + idGenerator.refresh(); + ILSMComponentId expectedId = idGenerator.getId(); + + callback.updateLastLSN(0); + callback.beforeOperation(LSMIOOperationType.FLUSH); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + + // another flush is to be scheduled before the component is recycled + idGenerator.refresh(); + ILSMComponentId nextId = idGenerator.getId(); + + // recycle the component + callback.recycled(mockComponent, true); + checkMemoryComponent(expectedId, mockComponent); + + // schedule the next flush + callback.updateLastLSN(0); + callback.beforeOperation(LSMIOOperationType.FLUSH); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.recycled(mockComponent, true); + checkMemoryComponent(nextId, mockComponent); + } + + private void checkMemoryComponent(ILSMComponentId expected, ILSMMemoryComponent memoryComponent) + throws HyracksDataException { + ArgumentCaptor argument = ArgumentCaptor.forClass(ILSMComponentId.class); + Mockito.verify(memoryComponent).resetId(argument.capture()); + assertEquals(expected, argument.getValue()); + + Mockito.reset(memoryComponent); + } + + private ILSMDiskComponent mockDiskComponent() { + ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class); + Mockito.when(component.getMetadata()).thenReturn(Mockito.mock(DiskComponentMetadata.class)); + return component; + } + + protected abstract AbstractLSMIOOperationCallback getIoCallback(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java index f467ee8..c22e2e3 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java @@ -19,72 +19,19 @@ package org.apache.asterix.test.ioopcallbacks; +import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; -import org.junit.Assert; import org.mockito.Mockito; -import junit.framework.TestCase; +public class LSMBTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest { -public class LSMBTreeIOOperationCallbackTest extends TestCase { - - public void testNormalSequence() { - try { - ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); - Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMBTreeIOOperationCallback callback = - new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); - - //request to flush first component - callback.updateLastLSN(1); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush second component - callback.updateLastLSN(2); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - - Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - } catch (Exception e) { - Assert.fail(); - } - } - - public void testOverWrittenLSN() { - try { - ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); - Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMBTreeIOOperationCallback callback = - new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); - - //request to flush first component - callback.updateLastLSN(1); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush second component - callback.updateLastLSN(2); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush first component again - //this call should fail - callback.updateLastLSN(3); - //there is no corresponding beforeOperation, since the first component is being flush - //the scheduleFlush request would fail this time - - Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - - Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - } catch (Exception e) { - Assert.fail(); - } + @Override + protected AbstractLSMIOOperationCallback getIoCallback() { + ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); + Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); + return new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java index 63c46f7..356c80a 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java @@ -19,72 +19,19 @@ package org.apache.asterix.test.ioopcallbacks; +import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; -import org.junit.Assert; import org.mockito.Mockito; -import junit.framework.TestCase; +public class LSMBTreeWithBuddyIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest { -public class LSMBTreeWithBuddyIOOperationCallbackTest extends TestCase { - - public void testNormalSequence() { - try { - ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); - Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMBTreeWithBuddyIOOperationCallback callback = - new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); - - //request to flush first component - callback.updateLastLSN(1); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush second component - callback.updateLastLSN(2); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - - Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - } catch (Exception e) { - Assert.fail(); - } - } - - public void testOverWrittenLSN() { - try { - ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); - Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMBTreeWithBuddyIOOperationCallback callback = - new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); - - //request to flush first component - callback.updateLastLSN(1); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush second component - callback.updateLastLSN(2); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush first component again - //this call should fail - callback.updateLastLSN(3); - //there is no corresponding beforeOperation, since the first component is being flush - //the scheduleFlush request would fail this time - - Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - - Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - } catch (Exception e) { - Assert.fail(); - } + @Override + protected AbstractLSMIOOperationCallback getIoCallback() { + ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); + Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); + return new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java index 1e961d8..ac4595e 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java @@ -19,72 +19,19 @@ package org.apache.asterix.test.ioopcallbacks; +import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; -import org.junit.Assert; import org.mockito.Mockito; -import junit.framework.TestCase; +public class LSMInvertedIndexIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest { -public class LSMInvertedIndexIOOperationCallbackTest extends TestCase { - - public void testNormalSequence() { - try { - ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); - Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMInvertedIndexIOOperationCallback callback = - new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); - - //request to flush first component - callback.updateLastLSN(1); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush second component - callback.updateLastLSN(2); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - - Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - } catch (Exception e) { - Assert.fail(); - } - } - - public void testOverWrittenLSN() { - try { - ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); - Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMInvertedIndexIOOperationCallback callback = - new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); - - //request to flush first component - callback.updateLastLSN(1); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush second component - callback.updateLastLSN(2); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush first component again - //this call should fail - callback.updateLastLSN(3); - //there is no corresponding beforeOperation, since the first component is being flush - //the scheduleFlush request would fail this time - - Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - - Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - } catch (Exception e) { - Assert.fail(); - } + @Override + protected AbstractLSMIOOperationCallback getIoCallback() { + ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); + Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); + return new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java index 618f2a3..0131e3f 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java @@ -19,72 +19,19 @@ package org.apache.asterix.test.ioopcallbacks; +import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; -import org.junit.Assert; import org.mockito.Mockito; -import junit.framework.TestCase; +public class LSMRTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest { -public class LSMRTreeIOOperationCallbackTest extends TestCase { - - public void testNormalSequence() { - try { - ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); - Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMRTreeIOOperationCallback callback = - new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); - - //request to flush first component - callback.updateLastLSN(1); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush second component - callback.updateLastLSN(2); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - - Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - } catch (Exception e) { - Assert.fail(); - } - } - - public void testOverWrittenLSN() { - try { - ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); - Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMRTreeIOOperationCallback callback = - new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); - - //request to flush first component - callback.updateLastLSN(1); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush second component - callback.updateLastLSN(2); - callback.beforeOperation(LSMIOOperationType.FLUSH); - - //request to flush first component again - //this call should fail - callback.updateLastLSN(3); - //there is no corresponding beforeOperation, since the first component is being flush - //the scheduleFlush request would fail this time - - Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - - Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); - } catch (Exception e) { - Assert.fail(); - } + @Override + protected AbstractLSMIOOperationCallback getIoCallback() { + ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); + Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); + return new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java index e122fd4..7bb12c4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java @@ -60,8 +60,9 @@ public interface ILSMIOOperationCallback { * This method is called when a memory component is recycled * * @param component + * @param componentSwitched */ - void recycled(ILSMMemoryComponent component) throws HyracksDataException; + void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException; /** * This method is called when a memory component is allocated http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java index 17dadcb..15cf8e5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java @@ -61,7 +61,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im if (state == ComponentState.INACTIVE && requestedToBeActive) { state = ComponentState.READABLE_WRITABLE; requestedToBeActive = false; - lsmIndex.getIOOperationCallback().recycled(this); + lsmIndex.getIOOperationCallback().recycled(this, true); } switch (opType) { case FORCE_MODIFICATION: @@ -278,7 +278,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im if (this.componentId != null && !componentId.missing() // for backward compatibility && this.componentId.compareTo(componentId) != IdCompareResult.LESS_THAN) { throw new IllegalStateException( - "LSM memory component receives illegal id. Old id " + this.componentId + ", new id " + componentId); + this + " receives illegal id. Old id " + this.componentId + ", new id " + componentId); } if (LOGGER.isLoggable(Level.INFO)) { LOGGER.log(Level.INFO, "Component Id was reset from " + this.componentId + " to " + componentId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java index 12dbb46..e464231 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java @@ -64,8 +64,8 @@ public class BlockingIOOperationCallbackWrapper implements ILSMIOOperationCallba } @Override - public void recycled(ILSMMemoryComponent component) throws HyracksDataException { - wrappedCallback.recycled(component); + public void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException { + wrappedCallback.recycled(component, componentSwitched); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index e1d5114..393aa6a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -175,7 +175,7 @@ public class LSMHarness implements ILSMHarness { // Call recycled only when we change it's state is reset back to READABLE_WRITABLE // Otherwise, if the component is in other state, e.g., INACTIVE, or // READABLE_UNWRITABLE_FLUSHING, it's not considered as being recycled here. - lsmIndex.getIOOperationCallback().recycled(flushingComponent); + lsmIndex.getIOOperationCallback().recycled(flushingComponent, false); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java index 21d10d7..3a58c19 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java @@ -67,7 +67,7 @@ public enum NoOpIOOperationCallbackFactory implements ILSMIOOperationCallbackFac } @Override - public void recycled(ILSMMemoryComponent component) { + public void recycled(ILSMMemoryComponent component, boolean componentSwitched) { // Do nothing. } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java index 238e915..88def5e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java @@ -64,7 +64,7 @@ public class StubIOOperationCallback implements ILSMIOOperationCallback { } @Override - public void recycled(ILSMMemoryComponent component) { + public void recycled(ILSMMemoryComponent component, boolean componentSwitched) { // Not interested in this }