asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [NO ISSUE][STO] Recover from failure in memory allocation ca...
Date Sun, 28 Jan 2018 23:27:29 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2336

Change subject: [NO ISSUE][STO] Recover from failure in memory allocation callback
......................................................................

[NO ISSUE][STO] Recover from failure in memory allocation callback

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Previously, if an exception is thrown in the
  ILSMIOOperationCallback.allocated call, then the memory component
  is allocated but the flag memoryComponentsAllocated is false.
- Any subsequent attempt to modify the index will try to allocate
  the component but since it has already been allocated, it will fail
  with the exception: File is already mapped.
- In this change, if an exception is thrown from the callback, then
  the component is de-allocated before throwing the exception.
- Test is case is added.

Change-Id: I80e605461df18c7f6d7785cd7504ca3acb4f45b1
---
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
M asterixdb/asterix-app/src/test/resources/log4j2-test.xml
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
13 files changed, 653 insertions(+), 372 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/36/2336/1

diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 9828424..a46b029 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -20,64 +20,38 @@
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
 import java.util.function.Predicate;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
 import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.external.util.DataflowUtils;
-import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Flusher;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Merger;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.test.CountAnswer;
-import org.apache.hyracks.api.test.FrameWriterTestUtils;
-import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -87,46 +61,16 @@
 
 public class ComponentRollbackTest {
 
-    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
-    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
-            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
-    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
-            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
-    private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
-    private static final ARecordType META_TYPE = null;
-    private static final GenerationFunction[] META_GEN_FUNCTION = null;
-    private static final boolean[] UNIQUE_META_FIELDS = null;
-    private static final int[] KEY_INDEXES = { 0 };
-    private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
-    private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
-    private static final int TOTAL_NUM_OF_RECORDS = 10000;
-    private static final int RECORDS_PER_COMPONENT = 1000;
-    private static final int DATASET_ID = 101;
-    private static final String DATAVERSE_NAME = "TestDV";
-    private static final String DATASET_NAME = "TestDS";
-    private static final String DATA_TYPE_NAME = "DUMMY";
-    private static final String NODE_GROUP_NAME = "DEFAULT";
     private static final Predicate<ILSMComponent> memoryComponentsPredicate = c -> c instanceof ILSMMemoryComponent;
-    private static final StorageComponentProvider storageManager = new StorageComponentProvider();
     private static TestNodeController nc;
     private static TestLsmBtree lsmBtree;
     private static NCAppRuntimeContext ncAppCtx;
     private static IDatasetLifecycleManager dsLifecycleMgr;
-    private static Dataset dataset;
     private static IHyracksTaskContext ctx;
     private static IIndexDataflowHelper indexDataflowHelper;
     private static ITransactionContext txnCtx;
     private static LSMInsertDeleteOperatorNodePushable insertOp;
-    public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new ITestOpCallback<Semaphore>() {
-        @Override
-        public void before(Semaphore smeaphore) {
-            smeaphore.release();
-        }
-
-        @Override
-        public void after() {
-        }
-    };
+    private static final int PARTITION = 0;
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -149,27 +93,18 @@
 
     @Before
     public void createIndex() throws Exception {
-        List<List<String>> partitioningKeys = new ArrayList<>();
-        partitioningKeys.add(Collections.singletonList("key"));
-        int partition = 0;
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
-                NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null,
-                        PartitioningStrategy.HASH, partitioningKeys, null, null, null, false, null),
-                null, DatasetType.INTERNAL, DATASET_ID, 0);
-        PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
-                storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, partition);
+        PrimaryIndexInfo primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
         IndexDataflowHelperFactory iHelperFactory =
                 new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
         JobId jobId = nc.newJobId();
-        ctx = nc.createTestContext(jobId, partition, false);
-        indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+        ctx = nc.createTestContext(jobId, PARTITION, false);
+        indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
         indexDataflowHelper.open();
         lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
         indexDataflowHelper.close();
         txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
-        insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                KEY_INDICATORS_LIST, storageManager, null).getLeft();
+        insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
     }
 
     @After
@@ -177,26 +112,18 @@
         indexDataflowHelper.destroy();
     }
 
-    static void allowAllOps(TestLsmBtree lsmBtree) {
-        lsmBtree.addModifyCallback(ALLOW_CALLBACK);
-        lsmBtree.addFlushCallback(ALLOW_CALLBACK);
-        lsmBtree.addSearchCallback(ALLOW_CALLBACK);
-        lsmBtree.addMergeCallback(ALLOW_CALLBACK);
-    }
-
     @Test
     public void testRollbackWhileNoOp() {
         try {
             // allow all operations
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -210,28 +137,28 @@
             }
             insertOp.close();
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
-
             // get all components
             List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -239,28 +166,21 @@
     }
 
     public void flush(boolean async) throws Exception {
-        flush(dsLifecycleMgr, lsmBtree, dataset, async);
-    }
-
-    public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
-            boolean async) throws Exception {
-        waitForOperations(lsmBtree);
-        dsLifecycleMgr.flushDataset(dataset.getDatasetId(), async);
+        StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, async);
     }
 
     @Test
     public void testRollbackThenInsert() {
         try {
             // allow all operations
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -279,23 +199,22 @@
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
 
             // insert again
             nc.newJobId();
             txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                     new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
-            insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                    KEY_INDICATORS_LIST, storageManager, null).getLeft();
+            insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
             insertOp.open();
-            for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+            for (int j = 0; j < StorageTestUtils.RECORDS_PER_COMPONENT; j++) {
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
             }
@@ -304,16 +223,16 @@
             }
             insertOp.close();
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -324,16 +243,15 @@
     public void testRollbackWhileSearch() {
         try {
             // allow all operations but search
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearSearchCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -353,42 +271,43 @@
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(
                     c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
             // now that the rollback has completed, we will unblock the search
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // search now and ensure
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
             // rollback the last disk component
             // re-block searches
             lsmBtree.clearSearchCallbacks();
-            Searcher secondSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree,
-                    TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            Searcher secondSearcher = new Searcher(nc, PARTITION, lsmBtree,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
             // wait till firstSearcher enter the components
             secondSearcher.waitUntilEntered();
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
 
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
             // now that the rollback has completed, we will unblock the search
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(secondSearcher.result());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -399,15 +318,14 @@
     public void testRollbackWhileFlush() {
         try {
             // allow all operations
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -426,7 +344,7 @@
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // disable flushes
             lsmBtree.clearFlushCallbacks();
             Flusher firstFlusher = new Flusher(lsmBtree);
@@ -435,7 +353,7 @@
             // now that we enetered, we will rollback. This will not proceed since it is waiting for the flush to complete
             Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
             // now that the rollback has completed, we will search
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             //unblock the flush
             lsmBtree.allowFlush(1);
             // ensure rollback completed
@@ -443,7 +361,7 @@
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -454,16 +372,15 @@
     public void testRollbackWhileMerge() {
         try {
             // allow all operations but merge
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearMergeCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -482,7 +399,7 @@
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // Now, we will start a full merge
             Merger merger = new Merger(lsmBtree);
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -499,7 +416,7 @@
             Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
             // rollback is now waiting for the merge to complete
             // we will search
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             //unblock the merge
             lsmBtree.allowMerge(1);
             // ensure rollback completes
@@ -507,8 +424,8 @@
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure that we rolled back the merged component
-            searchAndAssertCount(nc, 0, dataset, storageManager,
-                    TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS
+                    - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -519,15 +436,14 @@
     public void testRollbackWhileFlushAndSearchFlushExistsFirst() {
         try {
             // allow all operations
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -546,7 +462,7 @@
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // disable flushes
             // disable searches
             lsmBtree.clearFlushCallbacks();
@@ -554,21 +470,21 @@
             Flusher firstFlusher = new Flusher(lsmBtree);
             flush(true);
             firstFlusher.waitUntilCount(1);
-            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback rollback a memory component
             Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
             //unblock the flush
             lsmBtree.allowFlush(1);
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // ensure current mem component is not modified
             rollerback.complete();
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure the rollback was no op since it waits for ongoing flushes
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -579,15 +495,14 @@
     public void testRollbackWhileFlushAndSearchSearchExistsFirst() {
         try {
             // allow all operations
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -606,7 +521,7 @@
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // disable flushes
             // disable searches
             lsmBtree.clearFlushCallbacks();
@@ -614,13 +529,13 @@
             flush(true);
             firstFlusher.waitUntilCount(1);
             lsmBtree.clearSearchCallbacks();
-            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
             // The rollback will be waiting for the flush to complete
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             //unblock the flush
@@ -629,7 +544,7 @@
             rollerback.complete();
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -640,16 +555,15 @@
     public void testRollbackWhileMergeAndSearchMergeExitsFirst() {
         try {
             // allow all operations except merge
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearMergeCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -668,7 +582,7 @@
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // Now, we will start a merge
             Merger merger = new Merger(lsmBtree);
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -683,7 +597,7 @@
             merger.waitUntilCount(1);
             // we will block search
             lsmBtree.clearSearchCallbacks();
-            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
@@ -692,13 +606,13 @@
             // unblock the merge
             lsmBtree.allowMerge(1);
             // unblock the search
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             rollerback.complete();
             // now that the rollback has completed, we will search
-            searchAndAssertCount(nc, 0, dataset, storageManager,
-                    TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS
+                    - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT));
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
         } catch (Throwable e) {
@@ -711,16 +625,15 @@
     public void testRollbackWhileMergeAndSearchSearchExitsFirst() {
         try {
             // allow all operations except merge
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearMergeCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -739,7 +652,7 @@
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // Now, we will start a merge
             Merger merger = new Merger(lsmBtree);
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -754,22 +667,22 @@
             merger.waitUntilCount(1);
             // we will block search
             lsmBtree.clearSearchCallbacks();
-            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             Rollerback rollerBack = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
             // unblock the search
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // even though rollback has been called, it is still waiting for the merge to complete
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             //unblock the merge
             lsmBtree.allowMerge(1);
             rollerBack.complete();
-            searchAndAssertCount(nc, 0, dataset, storageManager,
-                    TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS
+                    - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT));
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
         } catch (Throwable e) {
@@ -789,7 +702,7 @@
                 public void run() {
                     ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
                     try {
-                        dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+                        dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
                         ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
                         lsmAccessor.deleteComponents(predicate);
                     } catch (HyracksDataException e) {
@@ -805,102 +718,6 @@
             task.join();
             if (failure != null) {
                 throw failure;
-            }
-        }
-    }
-
-    static class Searcher {
-        private final ExecutorService executor = Executors.newSingleThreadExecutor();
-        private Future<Boolean> task;
-        private volatile boolean entered = false;
-
-        public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager,
-                TestLsmBtree lsmBtree, int numOfRecords) {
-            lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() {
-
-                @Override
-                public void before(Semaphore sem) {
-                    synchronized (Searcher.this) {
-                        entered = true;
-                        Searcher.this.notifyAll();
-                    }
-                }
-
-                @Override
-                public void after() {
-                }
-            });
-            Callable<Boolean> callable = new Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
-                    searchAndAssertCount(nc, partition, dataset, storageManager, numOfRecords);
-                    return true;
-                }
-            };
-            task = executor.submit(callable);
-        }
-
-        boolean result() throws Exception {
-            return task.get();
-        }
-
-        synchronized void waitUntilEntered() throws InterruptedException {
-            while (!entered) {
-                this.wait();
-            }
-        }
-    }
-
-    private class Merger {
-        private volatile int count = 0;
-
-        public Merger(TestLsmBtree lsmBtree) {
-            lsmBtree.addMergeCallback(new ITestOpCallback<Semaphore>() {
-
-                @Override
-                public void before(Semaphore smeaphore) {
-                    synchronized (Merger.this) {
-                        count++;
-                        Merger.this.notifyAll();
-                    }
-                }
-
-                @Override
-                public void after() {
-                }
-            });
-        }
-
-        synchronized void waitUntilCount(int count) throws InterruptedException {
-            while (this.count != count) {
-                this.wait();
-            }
-        }
-    }
-
-    private class Flusher {
-        private volatile int count = 0;
-
-        public Flusher(TestLsmBtree lsmBtree) {
-            lsmBtree.addFlushCallback(new ITestOpCallback<Semaphore>() {
-
-                @Override
-                public void before(Semaphore smeaphore) {
-                    synchronized (Flusher.this) {
-                        count++;
-                        Flusher.this.notifyAll();
-                    }
-                }
-
-                @Override
-                public void after() {
-                }
-            });
-        }
-
-        synchronized void waitUntilCount(int count) throws InterruptedException {
-            while (this.count != count) {
-                this.wait();
             }
         }
     }
@@ -923,50 +740,5 @@
                 return false;
             }
         }
-    }
-
-    static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset,
-            StorageComponentProvider storageManager, int numOfRecords)
-            throws HyracksDataException, AlgebricksException {
-        JobId jobId = nc.newJobId();
-        IHyracksTaskContext ctx = nc.createTestContext(jobId, partition, false);
-        TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
-                Collections.emptyList(), Collections.emptyList(), false);
-        IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
-                new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
-        emptyTupleOp.open();
-        emptyTupleOp.close();
-        Assert.assertEquals(numOfRecords, countOp.getCount());
-    }
-
-    public static void waitForOperations(ILSMIndex index) throws InterruptedException {
-        // wait until number of activeOperation reaches 0
-        PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
-        long maxWaitTime = 60000L; // 1 minute
-        long before = System.currentTimeMillis();
-        while (opTracker.getNumActiveOperations() > 0) {
-            Thread.sleep(5); // NOSONAR: Test code with a timeout
-            if (System.currentTimeMillis() - before > maxWaitTime) {
-                throw new IllegalStateException(
-                        (System.currentTimeMillis() - before) + "ms passed without completing the frame operation");
-            }
-        }
-    }
-
-    public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
-            Collection<FrameWriterOperation> exceptionThrowingOperations,
-            Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
-        CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
-                exceptionThrowingOperations, errorThrowingOperations);
-        CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
-                exceptionThrowingOperations, errorThrowingOperations);
-        CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
-                exceptionThrowingOperations, errorThrowingOperations);
-        CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
-                exceptionThrowingOperations, errorThrowingOperations);
-        CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
-                exceptionThrowingOperations, errorThrowingOperations);
-        return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
-                closeAnswer, deepCopyInputFrames);
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
new file mode 100644
index 0000000..8bafd32
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class IoCallbackFailureTest {
+
+    private static final int PARTITION = 0;
+    private static TestNodeController nc;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+        String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+                + File.separator + "resources" + File.separator + "cc.conf";
+        nc = new TestNodeController(configPath, false);
+        nc.init();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        System.out.println("TearDown");
+        nc.deInit();
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Test
+    public void testTempFailureInAllocateCallback() throws Exception {
+        PrimaryIndexInfo primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
+        IndexDataflowHelperFactory iHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+        JobId jobId = nc.newJobId();
+        IHyracksTaskContext ctx = nc.createTestContext(jobId, PARTITION, false);
+        IIndexDataflowHelper indexDataflowHelper =
+                iHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
+        indexDataflowHelper.open();
+        TestLsmBtree lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
+        indexDataflowHelper.close();
+        LSMInsertDeleteOperatorNodePushable insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+        StorageTestUtils.allowAllOps(lsmBtree);
+        ITestOpCallback<ILSMMemoryComponent> failCallback = new ITestOpCallback<ILSMMemoryComponent>() {
+            @SuppressWarnings("deprecation")
+            @Override
+            public void before(ILSMMemoryComponent c) throws HyracksDataException {
+                throw new HyracksDataException("Fail on allocate callback");
+            }
+
+            @Override
+            public void after() throws HyracksDataException {
+                // No Op
+            }
+        };
+        lsmBtree.addIoAllocateCallback(failCallback);
+        boolean expectedExceptionThrown = false;
+        try {
+            insert(nc, lsmBtree, ctx, insertOp, StorageTestUtils.TOTAL_NUM_OF_RECORDS,
+                    StorageTestUtils.RECORDS_PER_COMPONENT);
+        } catch (Exception e) {
+            expectedExceptionThrown = true;
+        }
+        Assert.assertTrue(expectedExceptionThrown);
+        // Clear the callback and retry
+        lsmBtree.clearIoAllocateCallback();
+        jobId = nc.newJobId();
+        ctx = nc.createTestContext(jobId, PARTITION, false);
+        insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+        insert(nc, lsmBtree, ctx, insertOp, StorageTestUtils.TOTAL_NUM_OF_RECORDS,
+                StorageTestUtils.RECORDS_PER_COMPONENT);
+    }
+
+    private static void insert(TestNodeController nc, TestLsmBtree lsmBtree, IHyracksTaskContext ctx,
+            LSMInsertDeleteOperatorNodePushable insertOp, int totalNumRecords, int recordsPerComponent)
+            throws Exception {
+        NCAppRuntimeContext ncAppCtx = nc.getAppRuntimeContext();
+        IDatasetLifecycleManager dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+        TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+        ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+        boolean failed = false;
+        try {
+            try {
+                insertOp.open();
+                VSizeFrame frame = new VSizeFrame(ctx);
+                FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+                for (int j = 0; j < totalNumRecords; j++) {
+                    // flush every recordsPerComponent records
+                    if (j % recordsPerComponent == 0 && j + 1 != totalNumRecords) {
+                        if (tupleAppender.getTupleCount() > 0) {
+                            tupleAppender.write(insertOp, true);
+                        }
+                        StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, false);
+                    }
+                    ITupleReference tuple = tupleGenerator.next();
+                    DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+                }
+                if (tupleAppender.getTupleCount() > 0) {
+                    tupleAppender.write(insertOp, true);
+                }
+            } catch (Throwable th) {
+                failed = true;
+                insertOp.fail();
+                throw th;
+            } finally {
+                insertOp.close();
+            }
+        } finally {
+            if (failed) {
+                nc.getTransactionManager().abortTransaction(txnCtx.getTxnId());
+            } else {
+                nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index b39a5c6..367d0b9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -189,8 +189,8 @@
         }
         // allow all operations
         for (int i = 0; i < NUM_PARTITIONS; i++) {
-            ComponentRollbackTest.allowAllOps(primaryLsmBtrees[i]);
-            ComponentRollbackTest.allowAllOps(secondaryLsmBtrees[i]);
+            StorageTestUtils.allowAllOps(primaryLsmBtrees[i]);
+            StorageTestUtils.allowAllOps(secondaryLsmBtrees[i]);
             actors[i].add(new Request(Request.Action.INSERT_OPEN));
         }
     }
@@ -224,9 +224,9 @@
             }
             ensureDone(actors[0]);
             // search now and ensure partition 0 has all the records
-            ComponentRollbackTest.searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             // and that partition 1 has no records
-            ComponentRollbackTest.searchAndAssertCount(nc, 1, dataset, storageManager, 0);
+            StorageTestUtils.searchAndAssertCount(nc, 1, dataset, storageManager, 0);
             // and that partition 0 has numFlushes disk components
             Assert.assertEquals(totalNumOfComponents, primaryLsmBtrees[0].getDiskComponents().size());
             // and that partition 1 has no disk components
@@ -655,7 +655,7 @@
                         if (tupleAppender.getTupleCount() > 0) {
                             tupleAppender.write(insertOps[partition], true);
                         }
-                        ComponentRollbackTest.waitForOperations(primaryLsmBtrees[partition]);
+                        StorageTestUtils.waitForOperations(primaryLsmBtrees[partition]);
                         break;
                     default:
                         break;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 7bc7a88..c452548 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -45,7 +45,7 @@
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
-import org.apache.asterix.test.dataflow.ComponentRollbackTest.Searcher;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -147,7 +147,7 @@
     }
 
     void unblockSearch(TestLsmBtree lsmBtree) {
-        lsmBtree.addSearchCallback(ComponentRollbackTest.ALLOW_CALLBACK);
+        lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
         lsmBtree.allowSearch(1);
     }
 
@@ -155,7 +155,7 @@
     public void testCursorSwitchSucceed() {
         try {
             // allow all operations
-            ComponentRollbackTest.allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             // except search
             lsmBtree.clearSearchCallbacks();
             insertOp.open();
@@ -170,7 +170,7 @@
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+                    StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -183,7 +183,7 @@
             firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
-            ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+            StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // unblock the search
             unblockSearch(lsmBtree);
@@ -201,7 +201,7 @@
     public void testCursorSwitchFails() {
         try {
             // allow all operations
-            ComponentRollbackTest.allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             // except search
             lsmBtree.clearSearchCallbacks();
             insertOp.open();
@@ -216,7 +216,7 @@
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+                    StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -229,7 +229,7 @@
             firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
-            ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+            StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // merge all components
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -255,7 +255,7 @@
             throws HyracksDataException, AlgebricksException {
         nc.newJobId();
         TestTupleCounterFrameWriter countOp =
-                ComponentRollbackTest.create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+                StorageTestUtils.create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
                         Collections.emptyList(), Collections.emptyList(), false);
         IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
                 new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
new file mode 100644
index 0000000..e7a455c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.Assert;
+
+public class StorageTestUtils {
+
+    public static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    public static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    public static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+    public static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+    public static final ARecordType META_TYPE = null;
+    public static final GenerationFunction[] META_GEN_FUNCTION = null;
+    public static final boolean[] UNIQUE_META_FIELDS = null;
+    public static final int[] KEY_INDEXES = { 0 };
+    public static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
+    public static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+    public static final int TOTAL_NUM_OF_RECORDS = 10000;
+    public static final int RECORDS_PER_COMPONENT = 1000;
+    public static final int DATASET_ID = 101;
+    public static final String DATAVERSE_NAME = "TestDV";
+    public static final String DATASET_NAME = "TestDS";
+    public static final String DATA_TYPE_NAME = "DUMMY";
+    public static final String NODE_GROUP_NAME = "DEFAULT";
+    public static final StorageComponentProvider STORAGE_MANAGER = new StorageComponentProvider();
+    public static final List<List<String>> PARTITIONING_KEYS =
+            new ArrayList<>(Collections.singletonList(Collections.singletonList(RECORD_TYPE.getFieldNames()[0])));
+    public static final TestDataset DATASET =
+            new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                    NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                            PARTITIONING_KEYS, null, null, null, false, null),
+                    null, DatasetType.INTERNAL, DATASET_ID, 0);
+    public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new ITestOpCallback<Semaphore>() {
+        @Override
+        public void before(Semaphore smeaphore) {
+            smeaphore.release();
+        }
+
+        @Override
+        public void after() {
+        }
+    };
+
+    private StorageTestUtils() {
+    }
+
+    static void allowAllOps(TestLsmBtree lsmBtree) {
+        lsmBtree.addModifyCallback(ALLOW_CALLBACK);
+        lsmBtree.addFlushCallback(ALLOW_CALLBACK);
+        lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+        lsmBtree.addMergeCallback(ALLOW_CALLBACK);
+    }
+
+    public static PrimaryIndexInfo createPrimaryIndex(TestNodeController nc, int partition)
+            throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+        return nc.createPrimaryIndex(DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, STORAGE_MANAGER, KEY_INDEXES,
+                KEY_INDICATORS_LIST, partition);
+    }
+
+    public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx)
+            throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+        return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, STORAGE_MANAGER, null).getLeft();
+    }
+
+    public static TupleGenerator getTupleGenerator() {
+        return new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION,
+                UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+    }
+
+    public static void searchAndAssertCount(TestNodeController nc, int partition, int numOfRecords)
+            throws HyracksDataException, AlgebricksException {
+        searchAndAssertCount(nc, partition, DATASET, STORAGE_MANAGER, numOfRecords);
+    }
+
+    public static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset,
+            StorageComponentProvider storageManager, int numOfRecords)
+            throws HyracksDataException, AlgebricksException {
+        JobId jobId = nc.newJobId();
+        IHyracksTaskContext ctx = nc.createTestContext(jobId, partition, false);
+        TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+                Collections.emptyList(), Collections.emptyList(), false);
+        IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+                new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
+        emptyTupleOp.open();
+        emptyTupleOp.close();
+        Assert.assertEquals(numOfRecords, countOp.getCount());
+    }
+
+    public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
+            Collection<FrameWriterOperation> exceptionThrowingOperations,
+            Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
+        CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
+                exceptionThrowingOperations, errorThrowingOperations);
+        return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
+                closeAnswer, deepCopyInputFrames);
+    }
+
+    public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, boolean async)
+            throws Exception {
+        flush(dsLifecycleMgr, lsmBtree, DATASET, async);
+    }
+
+    public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
+            boolean async) throws Exception {
+        waitForOperations(lsmBtree);
+        dsLifecycleMgr.flushDataset(dataset.getDatasetId(), async);
+    }
+
+    public static void waitForOperations(ILSMIndex index) throws InterruptedException {
+        // wait until number of activeOperation reaches 0
+        PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
+        long maxWaitTime = 60000L; // 1 minute
+        long before = System.currentTimeMillis();
+        while (opTracker.getNumActiveOperations() > 0) {
+            Thread.sleep(5); // NOSONAR: Test code with a timeout
+            if (System.currentTimeMillis() - before > maxWaitTime) {
+                throw new IllegalStateException(
+                        (System.currentTimeMillis() - before) + "ms passed without completing the frame operation");
+            }
+        }
+    }
+
+    public static class Searcher {
+        private final ExecutorService executor = Executors.newSingleThreadExecutor();
+        private Future<Boolean> task;
+        private volatile boolean entered = false;
+
+        public Searcher(TestNodeController nc, int partition, TestLsmBtree lsmBtree, int numOfRecords) {
+            this(nc, partition, DATASET, STORAGE_MANAGER, lsmBtree, numOfRecords);
+        }
+
+        public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager,
+                TestLsmBtree lsmBtree, int numOfRecords) {
+            lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore sem) {
+                    synchronized (Searcher.this) {
+                        entered = true;
+                        Searcher.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
+                }
+            });
+            Callable<Boolean> callable = new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    searchAndAssertCount(nc, partition, dataset, storageManager, numOfRecords);
+                    return true;
+                }
+            };
+            task = executor.submit(callable);
+        }
+
+        public boolean result() throws Exception {
+            return task.get();
+        }
+
+        public synchronized void waitUntilEntered() throws InterruptedException {
+            while (!entered) {
+                this.wait();
+            }
+        }
+    }
+
+    public static class Merger {
+        private volatile int count = 0;
+
+        public Merger(TestLsmBtree lsmBtree) {
+            lsmBtree.addMergeCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore smeaphore) {
+                    synchronized (Merger.this) {
+                        count++;
+                        Merger.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
+                }
+            });
+        }
+
+        public synchronized void waitUntilCount(int count) throws InterruptedException {
+            while (this.count != count) {
+                this.wait();
+            }
+        }
+    }
+
+    public static class Flusher {
+        private volatile int count = 0;
+
+        public Flusher(TestLsmBtree lsmBtree) {
+            lsmBtree.addFlushCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore smeaphore) {
+                    synchronized (Flusher.this) {
+                        count++;
+                        Flusher.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
+                }
+            });
+        }
+
+        public synchronized void waitUntilCount(int count) throws InterruptedException {
+            while (this.count != count) {
+                this.wait();
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/log4j2-test.xml b/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
index 3edb700..7b0ad23 100644
--- a/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
+++ b/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
@@ -35,7 +35,7 @@
     <Logger name="org.apache.asterix" level="INFO" additivity="false">
       <AppenderRef ref="InfoLog"/>
     </Logger>
-    <Logger name="org.apache.asterix.test" level="WARN">
+    <Logger name="org.apache.asterix.test" level="WARN" additivity="false">
       <AppenderRef ref="Console"/>
     </Logger>
   </Loggers>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index f892585..c72d402 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -76,7 +76,8 @@
     void setState(ComponentState state);
 
     /**
-     * Allocates memory to this component, create and activate it
+     * Allocates memory to this component, create and activate it.
+     * This method is atomic. If an exception is thrown, then the call had no effect.
      *
      * @throws HyracksDataException
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index e9f410d..749b3ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -438,9 +438,29 @@
         if (memoryComponentsAllocated || memoryComponents == null) {
             return;
         }
-        for (ILSMMemoryComponent c : memoryComponents) {
-            c.allocate();
-            ioOpCallback.allocated(c);
+        int i = 0;
+        boolean allocated = false;
+        try {
+            for (; i < memoryComponents.size(); i++) {
+                allocated = false;
+                ILSMMemoryComponent c = memoryComponents.get(i);
+                c.allocate();
+                allocated = true;
+                ioOpCallback.allocated(c);
+            }
+        } finally {
+            if (i < memoryComponents.size()) {
+                // something went wrong
+                if (allocated) {
+                    ILSMMemoryComponent c = memoryComponents.get(i);
+                    c.deallocate();
+                }
+                // deallocate all previous components
+                for (int j = i - 1; j >= 0; j--) {
+                    ILSMMemoryComponent c = memoryComponents.get(j);
+                    c.deallocate();
+                }
+            }
         }
         memoryComponentsAllocated = true;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index c0bef7d..afefb3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -237,25 +237,49 @@
 
     @Override
     public final void allocate() throws HyracksDataException {
+        boolean allocated = false;
         ((IVirtualBufferCache) getIndex().getBufferCache()).open();
-        doAllocate();
+        try {
+            doAllocate();
+            allocated = true;
+        } finally {
+            if (!allocated) {
+                ((IVirtualBufferCache) getIndex().getBufferCache()).close();
+            }
+        }
     }
 
     protected void doAllocate() throws HyracksDataException {
-        getIndex().create();
-        getIndex().activate();
+        boolean created = false;
+        boolean activated = false;
+        try {
+            getIndex().create();
+            created = true;
+            getIndex().activate();
+            activated = true;
+        } finally {
+            if (created && !activated) {
+                getIndex().destroy();
+            }
+        }
     }
 
     @Override
     public final void deallocate() throws HyracksDataException {
-        doDeallocate();
-        getIndex().getBufferCache().close();
+        try {
+            doDeallocate();
+        } finally {
+            getIndex().getBufferCache().close();
+        }
     }
 
     protected void doDeallocate() throws HyracksDataException {
-        getIndex().deactivate();
-        getIndex().destroy();
-        componentId = null;
+        try {
+            getIndex().deactivate();
+            getIndex().destroy();
+        } finally {
+            componentId = null;
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
index d192351..38b7f38 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -107,7 +108,10 @@
     @Override
     public int createFile(FileReference fileRef) throws HyracksDataException {
         synchronized (fileMapManager) {
-            return fileMapManager.registerFile(fileRef);
+            int fileId = fileMapManager.registerFile(fileRef);
+            LOGGER.log(Level.INFO, "File " + fileRef.getRelativePath() + "(" + fileId
+                    + ") has been registered.. Coming from " + Arrays.toString(new Throwable().getStackTrace()));
+            return fileId;
         }
     }
 
@@ -118,7 +122,10 @@
                 if (fileMapManager.isMapped(fileRef)) {
                     return fileMapManager.lookupFileId(fileRef);
                 }
-                return fileMapManager.registerFile(fileRef);
+                int fileId = fileMapManager.registerFile(fileRef);
+                LOGGER.log(Level.INFO, "File " + fileRef.getRelativePath() + "(" + fileId
+                        + ") has been registered.. Coming from " + Arrays.toString(new Throwable().getStackTrace()));
+                return fileId;
             }
         } finally {
             logStats();
@@ -152,7 +159,9 @@
     @Override
     public void deleteFile(int fileId) throws HyracksDataException {
         synchronized (fileMapManager) {
-            fileMapManager.unregisterFile(fileId);
+            FileReference fileRef = fileMapManager.unregisterFile(fileId);
+            LOGGER.log(Level.INFO, "File " + fileRef.getRelativePath() + "(" + fileId
+                    + ") has been unregistered.. Coming from " + Arrays.toString(new Throwable().getStackTrace()));
         }
         int reclaimedPages = 0;
         for (int i = 0; i < buckets.length; i++) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
index 816550b..19b4856 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
@@ -34,6 +34,7 @@
      * Initializes the persistent state of an index.
      * An index cannot be created if it is in the activated state.
      * Calling create on an index that is deactivated has the effect of clearing the index.
+     * This method is atomic. If an exception is thrown, then the call had no effect.
      *
      * @throws HyracksDataException
      *             if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
index acc3347..e888238 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
@@ -18,8 +18,10 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.impl;
 
-public interface ITestOpCallback<T> {
-    void before(T t);
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-    void after();
+public interface ITestOpCallback<T> {
+    void before(T t) throws HyracksDataException;
+
+    void after() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index bf3bb31..3c781a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -109,7 +109,7 @@
         }
     }
 
-    public static <T> void callback(ITestOpCallback<T> callback, T t) {
+    public static <T> void callback(ITestOpCallback<T> callback, T t) throws HyracksDataException {
         if (callback != null) {
             callback.before(t);
         }
@@ -344,7 +344,7 @@
         }
     }
 
-    public void beforeIoOperationCalled() {
+    public void beforeIoOperationCalled() throws HyracksDataException {
         synchronized (ioBeforeCallbacks) {
             for (ITestOpCallback<Void> callback : ioBeforeCallbacks) {
                 callback.before(null);
@@ -352,7 +352,7 @@
         }
     }
 
-    public void beforeIoOperationReturned() {
+    public void beforeIoOperationReturned() throws HyracksDataException {
         synchronized (ioBeforeCallbacks) {
             for (ITestOpCallback<Void> callback : ioBeforeCallbacks) {
                 callback.after();
@@ -360,7 +360,7 @@
         }
     }
 
-    public void afterIoOperationCalled() {
+    public void afterIoOperationCalled() throws HyracksDataException {
         synchronized (ioAfterOpCallbacks) {
             for (ITestOpCallback<Void> callback : ioAfterOpCallbacks) {
                 callback.before(null);
@@ -368,7 +368,7 @@
         }
     }
 
-    public void afterIoOperationReturned() {
+    public void afterIoOperationReturned() throws HyracksDataException {
         synchronized (ioAfterOpCallbacks) {
             for (ITestOpCallback<Void> callback : ioAfterOpCallbacks) {
                 callback.after();
@@ -376,7 +376,7 @@
         }
     }
 
-    public void afterIoFinalizeCalled() {
+    public void afterIoFinalizeCalled() throws HyracksDataException {
         synchronized (ioAfterFinalizeCallbacks) {
             for (ITestOpCallback<Void> callback : ioAfterFinalizeCallbacks) {
                 callback.before(null);
@@ -384,7 +384,7 @@
         }
     }
 
-    public void afterIoFinalizeReturned() {
+    public void afterIoFinalizeReturned() throws HyracksDataException {
         synchronized (ioAfterFinalizeCallbacks) {
             for (ITestOpCallback<Void> callback : ioAfterFinalizeCallbacks) {
                 callback.after();
@@ -392,7 +392,7 @@
         }
     }
 
-    public void recycledCalled(ILSMMemoryComponent component) {
+    public void recycledCalled(ILSMMemoryComponent component) throws HyracksDataException {
         synchronized (ioRecycleCallbacks) {
             for (ITestOpCallback<ILSMMemoryComponent> callback : ioRecycleCallbacks) {
                 callback.before(component);
@@ -400,7 +400,7 @@
         }
     }
 
-    public void recycledReturned(ILSMMemoryComponent component) {
+    public void recycledReturned(ILSMMemoryComponent component) throws HyracksDataException {
         synchronized (ioRecycleCallbacks) {
             for (ITestOpCallback<ILSMMemoryComponent> callback : ioRecycleCallbacks) {
                 callback.after();
@@ -408,7 +408,7 @@
         }
     }
 
-    public void allocatedCalled(ILSMMemoryComponent component) {
+    public void allocatedCalled(ILSMMemoryComponent component) throws HyracksDataException {
         synchronized (ioAllocateCallbacks) {
             for (ITestOpCallback<ILSMMemoryComponent> callback : ioAllocateCallbacks) {
                 callback.before(component);
@@ -416,7 +416,7 @@
         }
     }
 
-    public void allocatedReturned(ILSMMemoryComponent component) {
+    public void allocatedReturned(ILSMMemoryComponent component) throws HyracksDataException {
         synchronized (ioAllocateCallbacks) {
             for (ITestOpCallback<ILSMMemoryComponent> callback : ioAllocateCallbacks) {
                 callback.after();

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2336
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I80e605461df18c7f6d7785cd7504ca3acb4f45b1
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>


Mime
View raw message