asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject asterixdb git commit: [ASTERIXDB-2161] Fix component id manage lifecycle
Date Thu, 23 Nov 2017 08:55:47 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 4dad5dfa8 -> 98b9d603e


[ASTERIXDB-2161] Fix component id manage lifecycle

- user model changes: no
- storage format changes: no
- interface changes: yes. The interface of LMSIOOperationCallback
is changed

Details:
- The current way of management component ids is not correct,
in presence of that multiple partitions sharing the same primary op
tracker. It's possible when a partition is empty/being flushed,
the next flush is scheduled by another partition, which
will disturb the partition. This patch fixes this by
using the same logic of maintaining flushed LSNs to maintain
component id.
- Extend recycle memory component interface to indicate whether it
switches the new component or not.
- Also fixes [ASTERIXDB-2168] to ensure we do not miss latest flushed
LSNs by advancing io callback before finishing flush

Change-Id: Ifc35184c4d431db9af71cab302439e165ee55f54
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2153
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/98b9d603
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/98b9d603
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/98b9d603

Branch: refs/heads/master
Commit: 98b9d603e8531139a4668af48092e9c961ee41fb
Parents: 4dad5df
Author: luochen01 <cluo8@uci.edu>
Authored: Wed Nov 22 18:25:22 2017 -0800
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Thu Nov 23 00:55:26 2017 -0800

----------------------------------------------------------------------
 .../test/dataflow/ComponentRollbackTest.java    |  28 +-
 .../TestLsmBtreeIoOpCallbackFactory.java        |   4 +-
 .../common/context/DatasetLifecycleManager.java |  29 +-
 .../AbstractLSMIOOperationCallback.java         | 107 +++++---
 .../AbstractLSMIOOperationCallbackTest.java     | 267 +++++++++++++++++++
 .../LSMBTreeIOOperationCallbackTest.java        |  67 +----
 ...SMBTreeWithBuddyIOOperationCallbackTest.java |  67 +----
 ...LSMInvertedIndexIOOperationCallbackTest.java |  67 +----
 .../LSMRTreeIOOperationCallbackTest.java        |  67 +----
 .../lsm/common/api/ILSMIOOperationCallback.java |   3 +-
 .../impls/AbstractLSMMemoryComponent.java       |   4 +-
 .../BlockingIOOperationCallbackWrapper.java     |   4 +-
 .../storage/am/lsm/common/impls/LSMHarness.java |   2 +-
 .../impls/NoOpIOOperationCallbackFactory.java   |   2 +-
 .../common/impls/StubIOOperationCallback.java   |   2 +-
 15 files changed, 414 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 70436b5..a239210 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -135,10 +135,11 @@ public class ComponentRollbackTest {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
         int partition = 0;
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
-                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                        partitioningKeys, null, null, null, false, null),
-                null, DatasetType.INTERNAL, DATASET_ID, 0);
+        dataset =
+                new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                        NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                                partitioningKeys, null, null, null, false, null),
+                        null, DatasetType.INTERNAL, DATASET_ID, 0);
         PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
                 storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, partition);
         IndexDataflowHelperFactory iHelperFactory =
@@ -201,12 +202,17 @@ public class ComponentRollbackTest {
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
             searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
             searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
@@ -249,6 +255,9 @@ public class ComponentRollbackTest {
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
             searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
@@ -273,6 +282,9 @@ public class ComponentRollbackTest {
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
             searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
@@ -320,6 +332,9 @@ public class ComponentRollbackTest {
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(
                     c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
@@ -338,6 +353,9 @@ public class ComponentRollbackTest {
             secondSearcher.waitUntilEntered();
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
             // now that the rollback has completed, we will unblock the search
@@ -745,6 +763,8 @@ public class ComponentRollbackTest {
                 public void run() {
                     ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
                     try {
+                        dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+                        ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
                         lsmAccessor.deleteComponents(predicate);
                     } catch (HyracksDataException e) {
                         failure = e;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index 44967e3..ddcb5b5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -20,7 +20,6 @@ package org.apache.asterix.test.dataflow;
 
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -98,8 +97,7 @@ public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallback
         }
 
         @Override
-        public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
-                throws HyracksDataException {
+        public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
             super.afterFinalize(opType, newComponent);
             synchronized (TestLsmBtreeIoOpCallbackFactory.this) {
                 if (newComponent != null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index fe64d3b..8002895 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -362,9 +362,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
                 for (IndexInfo iInfo : dsr.getIndexes().values()) {
                     AbstractLSMIOOperationCallback ioCallback =
                             (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
-                    if (!(iInfo.getIndex().isCurrentMutableComponentEmpty()
-                            || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated()
-                            || opTracker.isFlushOnExit())) {
+                    if (!(iInfo.getIndex().isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
+                            || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) {
                         long firstLSN = ioCallback.getFirstLSN();
                         if (firstLSN < targetLSN) {
                             if (LOGGER.isLoggable(Level.INFO)) {
@@ -387,7 +386,15 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
      * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled
      */
     private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
-        if (!dsInfo.isExternal() && dsInfo.isDurable()) {
+        if (dsInfo.isExternal()) {
+            // no memory components for external dataset
+            return;
+        }
+
+        ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID());
+        idGenerator.refresh();
+
+        if (dsInfo.isDurable()) {
             synchronized (logRecord) {
                 TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null, logManager.getNodeId(),
                         dsInfo.getIndexes().size());
@@ -404,16 +411,14 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
                     throw new HyracksDataException(e);
                 }
             }
-            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
-                //update resource lsn
-                AbstractLSMIOOperationCallback ioOpCallback =
-                        (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
-                ioOpCallback.updateLastLSN(logRecord.getLSN());
-            }
         }
 
-        ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID());
-        idGenerator.refresh();
+        for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
+            //update resource lsn
+            AbstractLSMIOOperationCallback ioOpCallback =
+                    (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
+            ioOpCallback.updateLastLSN(logRecord.getLSN());
+        }
 
         if (asyncFlush) {
             for (IndexInfo iInfo : dsInfo.getIndexes().values()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index c33e2d1..1432f25 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -34,7 +34,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
-import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 
@@ -54,6 +53,12 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
     protected int readIndex;
     // Index of the currently being written to component
     protected int writeIndex;
+    // Index of the memory component to be recycled
+    protected int recycleIndex;
+    // Indicates whether this index has been scheduled to flush (no matter whether succeeds or not)
+    protected boolean hasFlushed;
+    // Keep track of the component Id of the next component being activated.
+    protected ILSMComponentId[] nextComponentIds;
 
     protected final ILSMComponentIdGenerator idGenerator;
 
@@ -66,6 +71,12 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
         flushRequested = new boolean[count];
         readIndex = 0;
         writeIndex = 0;
+        recycleIndex = 0;
+        hasFlushed = false;
+        nextComponentIds = new ILSMComponentId[count];
+        if (count > 0) {
+            nextComponentIds[0] = idGenerator.getId();
+        }
     }
 
     @Override
@@ -84,33 +95,50 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
                 if (writeIndex != readIndex) {
                     firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
                 }
+
             }
         }
     }
 
     @Override
-    public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException {
-        // The operation was complete and the next I/O operation for the LSM index didn't start yet
-        if (opType == LSMIOOperationType.FLUSH && newComponent != null) {
-            synchronized (this) {
-                flushRequested[readIndex] = false;
-                // if the component which just finished flushing is the component that will be modified next,
-                // we set its first LSN to its previous LSN
-                if (readIndex == writeIndex) {
-                    firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
+    public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
+            ILSMDiskComponent newComponent) throws HyracksDataException {
+        //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
+        if (newComponent != null) {
+            putLSNIntoMetadata(newComponent, oldComponents);
+            putComponentIdIntoMetadata(opType, newComponent, oldComponents);
+            if (opType == LSMIOOperationType.MERGE) {
+                // In case of merge, oldComponents are never null
+                LongPointable markerLsn =
+                        LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
+                                ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
+                newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
+            } else if (opType == LSMIOOperationType.FLUSH) {
+                // advance memory component indexes
+                synchronized (this) {
+                    // we've already consumed the specified LSN/component id.
+                    // Now we can advance to the next component
+                    flushRequested[readIndex] = false;
+                    // if the component which just finished flushing is the component that will be modified next,
+                    // we set its first LSN to its previous LSN
+                    if (readIndex == writeIndex) {
+                        firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
+                    }
+                    readIndex = (readIndex + 1) % mutableLastLSNs.length;
                 }
-                readIndex = (readIndex + 1) % mutableLastLSNs.length;
-            }
-            if (newComponent == EmptyComponent.INSTANCE) {
-                // This component was just deleted, we refresh the component id, when it gets recycled, it will get
-                // the new id from the component id generator.
-                // It is assumed that the component delete caller will ensure that corresponding components in secondary
-                // indexes are deleted as well
-                idGenerator.refresh();
             }
         }
     }
 
+    @Override
+    public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
+        // The operation was complete and the next I/O operation for the LSM index didn't start yet
+        if (opType == LSMIOOperationType.FLUSH) {
+            hasFlushed = true;
+        }
+
+    }
+
     public void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<ILSMComponent> oldComponents)
             throws HyracksDataException {
         newComponent.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
@@ -148,6 +176,13 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
             //Moreover, since the memory component is already being flushed, the next scheduleFlush request must fail.
             //See https://issues.apache.org/jira/browse/ASTERIXDB-1917
             mutableLastLSNs[writeIndex] = lastLSN;
+            if (hasFlushed || lsmIndex.isMemoryComponentsAllocated()) {
+                // we only (re)set next component id if either this index has been flushed (no matter succeed or not)
+                // or the memory component has been allocated
+                // This prevents the case where indexes in a partition are being allocated, while another partition
+                // tries to schedule flush
+                nextComponentIds[writeIndex] = idGenerator.getId();
+            }
         }
     }
 
@@ -164,7 +199,6 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
     }
 
     public synchronized boolean hasPendingFlush() {
-
         for (int i = 0; i < flushRequested.length; i++) {
             if (flushRequested[i]) {
                 return true;
@@ -173,23 +207,6 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
         return false;
     }
 
-    @Override
-    public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
-            ILSMDiskComponent newComponent) throws HyracksDataException {
-        //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
-        if (newComponent != null) {
-            putLSNIntoMetadata(newComponent, oldComponents);
-            putComponentIdIntoMetadata(opType, newComponent, oldComponents);
-            if (opType == LSMIOOperationType.MERGE) {
-                // In case of merge, oldComponents are never null
-                LongPointable markerLsn =
-                        LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
-                                ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
-                newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
-            }
-        }
-    }
-
     public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
             // Implies a flush IO operation. --> moves the flush pointer
@@ -208,14 +225,26 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
         return maxLSN;
     }
 
+    private synchronized ILSMComponentId getLSMComponentId() {
+        return nextComponentIds[recycleIndex];
+    }
+
     @Override
-    public void recycled(ILSMMemoryComponent component) throws HyracksDataException {
-        component.resetId(idGenerator.getId());
+    public void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException {
+        ILSMComponentId componentId = getLSMComponentId();
+        component.resetId(componentId);
+        if (componentSwitched) {
+            recycleIndex = (recycleIndex + 1) % nextComponentIds.length;
+        }
     }
 
     @Override
     public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
-        component.resetId(idGenerator.getId());
+        if (component == lsmIndex.getCurrentMemoryComponent()) {
+            // only set the component id for the first (current) memory component
+            ILSMComponentId componentId = getLSMComponentId();
+            component.resetId(componentId);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
new file mode 100644
index 0000000..0f2ea50
--- /dev/null
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.ioopcallbacks;
+
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import junit.framework.TestCase;
+
+public abstract class AbstractLSMIOOperationCallbackTest extends TestCase {
+
+    @Test
+    public void testNormalSequence() throws HyracksDataException {
+        ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+        Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+        Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
+        LSMBTreeIOOperationCallback callback =
+                new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+
+        //request to flush first component
+        callback.updateLastLSN(1);
+        callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+        //request to flush second component
+        callback.updateLastLSN(2);
+        callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+        Assert.assertEquals(1, callback.getComponentLSN(null));
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+
+        Assert.assertEquals(2, callback.getComponentLSN(null));
+
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+    }
+
+    @Test
+    public void testOverWrittenLSN() throws HyracksDataException {
+        ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+        Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+        Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
+
+        LSMBTreeIOOperationCallback callback =
+                new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+
+        //request to flush first component
+        callback.updateLastLSN(1);
+        callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+        //request to flush second component
+        callback.updateLastLSN(2);
+        callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+        //request to flush first component again
+        //this call should fail
+        callback.updateLastLSN(3);
+        //there is no corresponding beforeOperation, since the first component is being flush
+        //the scheduleFlush request would fail this time
+
+        Assert.assertEquals(1, callback.getComponentLSN(null));
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+
+        Assert.assertEquals(2, callback.getComponentLSN(null));
+        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+    }
+
+    @Test
+    public void testLostLSN() throws HyracksDataException {
+        ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+        Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+        Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
+
+        LSMBTreeIOOperationCallback callback =
+                new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+
+        //request to flush first component
+        callback.updateLastLSN(1);
+        callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+        //request to flush second component
+        callback.updateLastLSN(2);
+        callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+        Assert.assertEquals(1, callback.getComponentLSN(null));
+
+        // the first flush is finished, but has not finalized yet (in codebase, these two calls
+        // are not synchronized)
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+
+        //request to flush first component again
+        callback.updateLastLSN(3);
+
+        // the first flush is finalized (it may be called after afterOperation for a while)
+        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+
+        // the second flush gets LSN 2
+        Assert.assertEquals(2, callback.getComponentLSN(null));
+        // the second flush is finished
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+
+        // it should get new LSN 3
+        Assert.assertEquals(3, callback.getComponentLSN(null));
+    }
+
+    @Test
+    public void testAllocateComponentId() throws HyracksDataException {
+        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+        ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+        Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+        ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
+        Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
+
+        LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+
+        ILSMComponentId initialId = idGenerator.getId();
+        // simulate a partition is flushed before allocated
+        idGenerator.refresh();
+        callback.updateLastLSN(0);
+
+        callback.allocated(mockComponent);
+        checkMemoryComponent(initialId, mockComponent);
+    }
+
+    @Test
+    public void testRecycleComponentId() throws HyracksDataException {
+        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+        ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+        Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+        ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
+        Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
+        LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+
+        ILSMComponentId id = idGenerator.getId();
+        callback.allocated(mockComponent);
+        checkMemoryComponent(id, mockComponent);
+
+        Mockito.when(mockIndex.isMemoryComponentsAllocated()).thenReturn(true);
+        for (int i = 0; i < 100; i++) {
+            // schedule a flush
+            idGenerator.refresh();
+            ILSMComponentId expectedId = idGenerator.getId();
+
+            callback.updateLastLSN(0);
+            callback.beforeOperation(LSMIOOperationType.FLUSH);
+            callback.recycled(mockComponent, true);
+
+            callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+            callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+            checkMemoryComponent(expectedId, mockComponent);
+        }
+    }
+
+    @Test
+    public void testRecycleWithoutSwitch() throws HyracksDataException {
+        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+        ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+        Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+        ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
+        Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
+        LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+
+        ILSMComponentId id = idGenerator.getId();
+        callback.allocated(mockComponent);
+        checkMemoryComponent(id, mockComponent);
+
+        Mockito.when(mockIndex.isMemoryComponentsAllocated()).thenReturn(true);
+
+        for (int i = 0; i < 10; i++) {
+            idGenerator.refresh();
+            id = idGenerator.getId();
+            callback.updateLastLSN(0);
+            callback.recycled(mockComponent, false);
+            callback.afterFinalize(LSMIOOperationType.FLUSH, null);
+            checkMemoryComponent(id, mockComponent);
+        }
+    }
+
+    @Test
+    public void testConcurrentRecycleComponentId() throws HyracksDataException {
+        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+        ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+        ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
+        Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
+        Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+        LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+
+        ILSMComponentId id = idGenerator.getId();
+        callback.allocated(mockComponent);
+        checkMemoryComponent(id, mockComponent);
+
+        Mockito.when(mockIndex.isMemoryComponentsAllocated()).thenReturn(true);
+
+        // schedule a flush
+        idGenerator.refresh();
+        ILSMComponentId expectedId = idGenerator.getId();
+
+        callback.updateLastLSN(0);
+        callback.beforeOperation(LSMIOOperationType.FLUSH);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+
+        // another flush is to be scheduled before the component is recycled
+        idGenerator.refresh();
+        ILSMComponentId nextId = idGenerator.getId();
+
+        // recycle the component
+        callback.recycled(mockComponent, true);
+        checkMemoryComponent(expectedId, mockComponent);
+
+        // schedule the next flush
+        callback.updateLastLSN(0);
+        callback.beforeOperation(LSMIOOperationType.FLUSH);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+        callback.recycled(mockComponent, true);
+        checkMemoryComponent(nextId, mockComponent);
+    }
+
+    private void checkMemoryComponent(ILSMComponentId expected, ILSMMemoryComponent memoryComponent)
+            throws HyracksDataException {
+        ArgumentCaptor<ILSMComponentId> argument = ArgumentCaptor.forClass(ILSMComponentId.class);
+        Mockito.verify(memoryComponent).resetId(argument.capture());
+        assertEquals(expected, argument.getValue());
+
+        Mockito.reset(memoryComponent);
+    }
+
+    private ILSMDiskComponent mockDiskComponent() {
+        ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class);
+        Mockito.when(component.getMetadata()).thenReturn(Mockito.mock(DiskComponentMetadata.class));
+        return component;
+    }
+
+    protected abstract AbstractLSMIOOperationCallback getIoCallback();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
index f467ee8..c22e2e3 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
@@ -19,72 +19,19 @@
 
 package org.apache.asterix.test.ioopcallbacks;
 
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
-import org.junit.Assert;
 import org.mockito.Mockito;
 
-import junit.framework.TestCase;
+public class LSMBTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
 
-public class LSMBTreeIOOperationCallbackTest extends TestCase {
-
-    public void testNormalSequence() {
-        try {
-            ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
-            Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMBTreeIOOperationCallback callback =
-                    new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
-            //request to flush first component
-            callback.updateLastLSN(1);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush second component
-            callback.updateLastLSN(2);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            Assert.assertEquals(1, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
-            Assert.assertEquals(2, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-        } catch (Exception e) {
-            Assert.fail();
-        }
-    }
-
-    public void testOverWrittenLSN() {
-        try {
-            ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
-            Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMBTreeIOOperationCallback callback =
-                    new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
-            //request to flush first component
-            callback.updateLastLSN(1);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush second component
-            callback.updateLastLSN(2);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush first component again
-            //this call should fail
-            callback.updateLastLSN(3);
-            //there is no corresponding beforeOperation, since the first component is being flush
-            //the scheduleFlush request would fail this time
-
-            Assert.assertEquals(1, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
-            Assert.assertEquals(2, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-        } catch (Exception e) {
-            Assert.fail();
-        }
+    @Override
+    protected AbstractLSMIOOperationCallback getIoCallback() {
+        ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+        Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+        return new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
index 63c46f7..356c80a 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
@@ -19,72 +19,19 @@
 
 package org.apache.asterix.test.ioopcallbacks;
 
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
-import org.junit.Assert;
 import org.mockito.Mockito;
 
-import junit.framework.TestCase;
+public class LSMBTreeWithBuddyIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
 
-public class LSMBTreeWithBuddyIOOperationCallbackTest extends TestCase {
-
-    public void testNormalSequence() {
-        try {
-            ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
-            Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMBTreeWithBuddyIOOperationCallback callback =
-                    new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
-            //request to flush first component
-            callback.updateLastLSN(1);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush second component
-            callback.updateLastLSN(2);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            Assert.assertEquals(1, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
-            Assert.assertEquals(2, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-        } catch (Exception e) {
-            Assert.fail();
-        }
-    }
-
-    public void testOverWrittenLSN() {
-        try {
-            ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
-            Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMBTreeWithBuddyIOOperationCallback callback =
-                    new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
-            //request to flush first component
-            callback.updateLastLSN(1);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush second component
-            callback.updateLastLSN(2);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush first component again
-            //this call should fail
-            callback.updateLastLSN(3);
-            //there is no corresponding beforeOperation, since the first component is being flush
-            //the scheduleFlush request would fail this time
-
-            Assert.assertEquals(1, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
-            Assert.assertEquals(2, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-        } catch (Exception e) {
-            Assert.fail();
-        }
+    @Override
+    protected AbstractLSMIOOperationCallback getIoCallback() {
+        ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+        Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+        return new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
index 1e961d8..ac4595e 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
@@ -19,72 +19,19 @@
 
 package org.apache.asterix.test.ioopcallbacks;
 
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
-import org.junit.Assert;
 import org.mockito.Mockito;
 
-import junit.framework.TestCase;
+public class LSMInvertedIndexIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
 
-public class LSMInvertedIndexIOOperationCallbackTest extends TestCase {
-
-    public void testNormalSequence() {
-        try {
-            ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
-            Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMInvertedIndexIOOperationCallback callback =
-                    new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
-            //request to flush first component
-            callback.updateLastLSN(1);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush second component
-            callback.updateLastLSN(2);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            Assert.assertEquals(1, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
-            Assert.assertEquals(2, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-        } catch (Exception e) {
-            Assert.fail();
-        }
-    }
-
-    public void testOverWrittenLSN() {
-        try {
-            ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
-            Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMInvertedIndexIOOperationCallback callback =
-                    new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
-            //request to flush first component
-            callback.updateLastLSN(1);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush second component
-            callback.updateLastLSN(2);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush first component again
-            //this call should fail
-            callback.updateLastLSN(3);
-            //there is no corresponding beforeOperation, since the first component is being flush
-            //the scheduleFlush request would fail this time
-
-            Assert.assertEquals(1, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
-            Assert.assertEquals(2, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-        } catch (Exception e) {
-            Assert.fail();
-        }
+    @Override
+    protected AbstractLSMIOOperationCallback getIoCallback() {
+        ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+        Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+        return new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
index 618f2a3..0131e3f 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
@@ -19,72 +19,19 @@
 
 package org.apache.asterix.test.ioopcallbacks;
 
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
-import org.junit.Assert;
 import org.mockito.Mockito;
 
-import junit.framework.TestCase;
+public class LSMRTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
 
-public class LSMRTreeIOOperationCallbackTest extends TestCase {
-
-    public void testNormalSequence() {
-        try {
-            ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
-            Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMRTreeIOOperationCallback callback =
-                    new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
-            //request to flush first component
-            callback.updateLastLSN(1);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush second component
-            callback.updateLastLSN(2);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            Assert.assertEquals(1, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
-            Assert.assertEquals(2, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-        } catch (Exception e) {
-            Assert.fail();
-        }
-    }
-
-    public void testOverWrittenLSN() {
-        try {
-            ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
-            Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMRTreeIOOperationCallback callback =
-                    new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
-            //request to flush first component
-            callback.updateLastLSN(1);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush second component
-            callback.updateLastLSN(2);
-            callback.beforeOperation(LSMIOOperationType.FLUSH);
-
-            //request to flush first component again
-            //this call should fail
-            callback.updateLastLSN(3);
-            //there is no corresponding beforeOperation, since the first component is being flush
-            //the scheduleFlush request would fail this time
-
-            Assert.assertEquals(1, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
-            Assert.assertEquals(2, callback.getComponentLSN(null));
-            callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-        } catch (Exception e) {
-            Assert.fail();
-        }
+    @Override
+    protected AbstractLSMIOOperationCallback getIoCallback() {
+        ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+        Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+        return new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
index e122fd4..7bb12c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
@@ -60,8 +60,9 @@ public interface ILSMIOOperationCallback {
      * This method is called when a memory component is recycled
      *
      * @param component
+     * @param componentSwitched
      */
-    void recycled(ILSMMemoryComponent component) throws HyracksDataException;
+    void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException;
 
     /**
      * This method is called when a memory component is allocated

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 17dadcb..15cf8e5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -61,7 +61,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
         if (state == ComponentState.INACTIVE && requestedToBeActive) {
             state = ComponentState.READABLE_WRITABLE;
             requestedToBeActive = false;
-            lsmIndex.getIOOperationCallback().recycled(this);
+            lsmIndex.getIOOperationCallback().recycled(this, true);
         }
         switch (opType) {
             case FORCE_MODIFICATION:
@@ -278,7 +278,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
         if (this.componentId != null && !componentId.missing() // for backward compatibility
                 && this.componentId.compareTo(componentId) != IdCompareResult.LESS_THAN) {
             throw new IllegalStateException(
-                    "LSM memory component receives illegal id. Old id " + this.componentId + ", new id " + componentId);
+                    this + " receives illegal id. Old id " + this.componentId + ", new id " + componentId);
         }
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.log(Level.INFO, "Component Id was reset from " + this.componentId + " to " + componentId);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
index 12dbb46..e464231 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
@@ -64,8 +64,8 @@ public class BlockingIOOperationCallbackWrapper implements ILSMIOOperationCallba
     }
 
     @Override
-    public void recycled(ILSMMemoryComponent component) throws HyracksDataException {
-        wrappedCallback.recycled(component);
+    public void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException {
+        wrappedCallback.recycled(component, componentSwitched);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index e1d5114..393aa6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -175,7 +175,7 @@ public class LSMHarness implements ILSMHarness {
             // Call recycled only when we change it's state is reset back to READABLE_WRITABLE
             // Otherwise, if the component is in other state, e.g., INACTIVE, or
             // READABLE_UNWRITABLE_FLUSHING, it's not considered as being recycled here.
-            lsmIndex.getIOOperationCallback().recycled(flushingComponent);
+            lsmIndex.getIOOperationCallback().recycled(flushingComponent, false);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
index 21d10d7..3a58c19 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
@@ -67,7 +67,7 @@ public enum NoOpIOOperationCallbackFactory implements ILSMIOOperationCallbackFac
         }
 
         @Override
-        public void recycled(ILSMMemoryComponent component) {
+        public void recycled(ILSMMemoryComponent component, boolean componentSwitched) {
             // Do nothing.
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98b9d603/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
index 238e915..88def5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
@@ -64,7 +64,7 @@ public class StubIOOperationCallback implements ILSMIOOperationCallback {
     }
 
     @Override
-    public void recycled(ILSMMemoryComponent component) {
+    public void recycled(ILSMMemoryComponent component, boolean componentSwitched) {
         // Not interested in this
     }
 


Mime
View raw message