asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [2/2] asterixdb git commit: [NO ISSUE][STO] Recover from failure in memory allocation callback
Date Wed, 31 Jan 2018 05:24:26 GMT
[NO ISSUE][STO] Recover from failure in memory allocation callback

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Previously, if an exception is thrown in the
  ILSMIOOperationCallback.allocated call, then the memory component
  is allocated but the flag memoryComponentsAllocated is false.
- Any subsequent attempt to modify the index will try to allocate
  the component but since it has already been allocated, it will fail
  with the exception: File is already mapped.
- In this change, if an exception is thrown from the callback, then
  the component is de-allocated before throwing the exception.
- Test is case is added.

Change-Id: I80e605461df18c7f6d7785cd7504ca3acb4f45b1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2336
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/caf43069
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/caf43069
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/caf43069

Branch: refs/heads/master
Commit: caf43069baf53259b10087745d931e341f109be1
Parents: 248e18a
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Tue Jan 30 16:13:01 2018 -0800
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Tue Jan 30 21:24:08 2018 -0800

----------------------------------------------------------------------
 .../test/dataflow/ComponentRollbackTest.java    | 428 +++++--------------
 .../test/dataflow/IoCallbackFailureTest.java    | 159 +++++++
 .../dataflow/MultiPartitionLSMIndexTest.java    |  10 +-
 .../SearchCursorComponentSwitchTest.java        |  18 +-
 .../asterix/test/dataflow/StorageTestUtils.java | 293 +++++++++++++
 .../am/common/impls/AbstractTreeIndex.java      |  28 +-
 .../am/lsm/common/api/ILSMMemoryComponent.java  |   3 +-
 .../am/lsm/common/impls/AbstractLSMIndex.java   |  26 +-
 .../impls/AbstractLSMMemoryComponent.java       |  31 +-
 .../impls/MultitenantVirtualBufferCache.java    |  10 +-
 .../apache/hyracks/storage/common/IIndex.java   |   1 +
 .../am/lsm/btree/impl/ITestOpCallback.java      |   6 +-
 .../storage/am/lsm/btree/impl/TestLsmBtree.java |  22 +-
 13 files changed, 664 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/caf43069/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 9828424..a46b029 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -20,64 +20,38 @@ package org.apache.asterix.test.dataflow;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
 import java.util.function.Predicate;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
 import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.external.util.DataflowUtils;
-import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Flusher;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Merger;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.test.CountAnswer;
-import org.apache.hyracks.api.test.FrameWriterTestUtils;
-import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -87,46 +61,16 @@ import org.junit.Test;
 
 public class ComponentRollbackTest {
 
-    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
-    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
-            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
-    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
-            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
-    private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
-    private static final ARecordType META_TYPE = null;
-    private static final GenerationFunction[] META_GEN_FUNCTION = null;
-    private static final boolean[] UNIQUE_META_FIELDS = null;
-    private static final int[] KEY_INDEXES = { 0 };
-    private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
-    private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
-    private static final int TOTAL_NUM_OF_RECORDS = 10000;
-    private static final int RECORDS_PER_COMPONENT = 1000;
-    private static final int DATASET_ID = 101;
-    private static final String DATAVERSE_NAME = "TestDV";
-    private static final String DATASET_NAME = "TestDS";
-    private static final String DATA_TYPE_NAME = "DUMMY";
-    private static final String NODE_GROUP_NAME = "DEFAULT";
     private static final Predicate<ILSMComponent> memoryComponentsPredicate = c -> c instanceof ILSMMemoryComponent;
-    private static final StorageComponentProvider storageManager = new StorageComponentProvider();
     private static TestNodeController nc;
     private static TestLsmBtree lsmBtree;
     private static NCAppRuntimeContext ncAppCtx;
     private static IDatasetLifecycleManager dsLifecycleMgr;
-    private static Dataset dataset;
     private static IHyracksTaskContext ctx;
     private static IIndexDataflowHelper indexDataflowHelper;
     private static ITransactionContext txnCtx;
     private static LSMInsertDeleteOperatorNodePushable insertOp;
-    public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new ITestOpCallback<Semaphore>() {
-        @Override
-        public void before(Semaphore smeaphore) {
-            smeaphore.release();
-        }
-
-        @Override
-        public void after() {
-        }
-    };
+    private static final int PARTITION = 0;
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -149,27 +93,18 @@ public class ComponentRollbackTest {
 
     @Before
     public void createIndex() throws Exception {
-        List<List<String>> partitioningKeys = new ArrayList<>();
-        partitioningKeys.add(Collections.singletonList("key"));
-        int partition = 0;
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
-                NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null,
-                        PartitioningStrategy.HASH, partitioningKeys, null, null, null, false, null),
-                null, DatasetType.INTERNAL, DATASET_ID, 0);
-        PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
-                storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, partition);
+        PrimaryIndexInfo primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
         IndexDataflowHelperFactory iHelperFactory =
                 new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
         JobId jobId = nc.newJobId();
-        ctx = nc.createTestContext(jobId, partition, false);
-        indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+        ctx = nc.createTestContext(jobId, PARTITION, false);
+        indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
         indexDataflowHelper.open();
         lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
         indexDataflowHelper.close();
         txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
-        insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                KEY_INDICATORS_LIST, storageManager, null).getLeft();
+        insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
     }
 
     @After
@@ -177,26 +112,18 @@ public class ComponentRollbackTest {
         indexDataflowHelper.destroy();
     }
 
-    static void allowAllOps(TestLsmBtree lsmBtree) {
-        lsmBtree.addModifyCallback(ALLOW_CALLBACK);
-        lsmBtree.addFlushCallback(ALLOW_CALLBACK);
-        lsmBtree.addSearchCallback(ALLOW_CALLBACK);
-        lsmBtree.addMergeCallback(ALLOW_CALLBACK);
-    }
-
     @Test
     public void testRollbackWhileNoOp() {
         try {
             // allow all operations
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -210,28 +137,28 @@ public class ComponentRollbackTest {
             }
             insertOp.close();
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
-
             // get all components
             List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -239,28 +166,21 @@ public class ComponentRollbackTest {
     }
 
     public void flush(boolean async) throws Exception {
-        flush(dsLifecycleMgr, lsmBtree, dataset, async);
-    }
-
-    public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
-            boolean async) throws Exception {
-        waitForOperations(lsmBtree);
-        dsLifecycleMgr.flushDataset(dataset.getDatasetId(), async);
+        StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, async);
     }
 
     @Test
     public void testRollbackThenInsert() {
         try {
             // allow all operations
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -279,23 +199,22 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
 
             // insert again
             nc.newJobId();
             txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                     new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
-            insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                    KEY_INDICATORS_LIST, storageManager, null).getLeft();
+            insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
             insertOp.open();
-            for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+            for (int j = 0; j < StorageTestUtils.RECORDS_PER_COMPONENT; j++) {
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
             }
@@ -304,16 +223,16 @@ public class ComponentRollbackTest {
             }
             insertOp.close();
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -324,16 +243,15 @@ public class ComponentRollbackTest {
     public void testRollbackWhileSearch() {
         try {
             // allow all operations but search
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearSearchCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -353,42 +271,43 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(
                     c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
             // now that the rollback has completed, we will unblock the search
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // search now and ensure
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
             // rollback the last disk component
             // re-block searches
             lsmBtree.clearSearchCallbacks();
-            Searcher secondSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree,
-                    TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            Searcher secondSearcher = new Searcher(nc, PARTITION, lsmBtree,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
             // wait till firstSearcher enter the components
             secondSearcher.waitUntilEntered();
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
 
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
             // now that the rollback has completed, we will unblock the search
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(secondSearcher.result());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+                    StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -399,15 +318,14 @@ public class ComponentRollbackTest {
     public void testRollbackWhileFlush() {
         try {
             // allow all operations
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -426,7 +344,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // disable flushes
             lsmBtree.clearFlushCallbacks();
             Flusher firstFlusher = new Flusher(lsmBtree);
@@ -435,7 +353,7 @@ public class ComponentRollbackTest {
             // now that we enetered, we will rollback. This will not proceed since it is waiting for the flush to complete
             Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
             // now that the rollback has completed, we will search
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             //unblock the flush
             lsmBtree.allowFlush(1);
             // ensure rollback completed
@@ -443,7 +361,7 @@ public class ComponentRollbackTest {
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -454,16 +372,15 @@ public class ComponentRollbackTest {
     public void testRollbackWhileMerge() {
         try {
             // allow all operations but merge
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearMergeCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -482,7 +399,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // Now, we will start a full merge
             Merger merger = new Merger(lsmBtree);
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -499,7 +416,7 @@ public class ComponentRollbackTest {
             Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
             // rollback is now waiting for the merge to complete
             // we will search
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             //unblock the merge
             lsmBtree.allowMerge(1);
             // ensure rollback completes
@@ -507,8 +424,8 @@ public class ComponentRollbackTest {
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure that we rolled back the merged component
-            searchAndAssertCount(nc, 0, dataset, storageManager,
-                    TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS
+                    - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -519,15 +436,14 @@ public class ComponentRollbackTest {
     public void testRollbackWhileFlushAndSearchFlushExistsFirst() {
         try {
             // allow all operations
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -546,7 +462,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // disable flushes
             // disable searches
             lsmBtree.clearFlushCallbacks();
@@ -554,21 +470,21 @@ public class ComponentRollbackTest {
             Flusher firstFlusher = new Flusher(lsmBtree);
             flush(true);
             firstFlusher.waitUntilCount(1);
-            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback rollback a memory component
             Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
             //unblock the flush
             lsmBtree.allowFlush(1);
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // ensure current mem component is not modified
             rollerback.complete();
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure the rollback was no op since it waits for ongoing flushes
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -579,15 +495,14 @@ public class ComponentRollbackTest {
     public void testRollbackWhileFlushAndSearchSearchExistsFirst() {
         try {
             // allow all operations
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -606,7 +521,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // disable flushes
             // disable searches
             lsmBtree.clearFlushCallbacks();
@@ -614,13 +529,13 @@ public class ComponentRollbackTest {
             flush(true);
             firstFlusher.waitUntilCount(1);
             lsmBtree.clearSearchCallbacks();
-            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
             // The rollback will be waiting for the flush to complete
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             //unblock the flush
@@ -629,7 +544,7 @@ public class ComponentRollbackTest {
             rollerback.complete();
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -640,16 +555,15 @@ public class ComponentRollbackTest {
     public void testRollbackWhileMergeAndSearchMergeExitsFirst() {
         try {
             // allow all operations except merge
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearMergeCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -668,7 +582,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // Now, we will start a merge
             Merger merger = new Merger(lsmBtree);
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -683,7 +597,7 @@ public class ComponentRollbackTest {
             merger.waitUntilCount(1);
             // we will block search
             lsmBtree.clearSearchCallbacks();
-            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
@@ -692,13 +606,13 @@ public class ComponentRollbackTest {
             // unblock the merge
             lsmBtree.allowMerge(1);
             // unblock the search
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             rollerback.complete();
             // now that the rollback has completed, we will search
-            searchAndAssertCount(nc, 0, dataset, storageManager,
-                    TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS
+                    - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT));
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
         } catch (Throwable e) {
@@ -711,16 +625,15 @@ public class ComponentRollbackTest {
     public void testRollbackWhileMergeAndSearchSearchExitsFirst() {
         try {
             // allow all operations except merge
-            allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearMergeCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+            for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
                 // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
@@ -739,7 +652,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // Now, we will start a merge
             Merger merger = new Merger(lsmBtree);
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -754,22 +667,22 @@ public class ComponentRollbackTest {
             merger.waitUntilCount(1);
             // we will block search
             lsmBtree.clearSearchCallbacks();
-            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             Rollerback rollerBack = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
             // unblock the search
-            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // even though rollback has been called, it is still waiting for the merge to complete
-            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             //unblock the merge
             lsmBtree.allowMerge(1);
             rollerBack.complete();
-            searchAndAssertCount(nc, 0, dataset, storageManager,
-                    TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+            StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS
+                    - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT));
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
         } catch (Throwable e) {
@@ -789,7 +702,7 @@ public class ComponentRollbackTest {
                 public void run() {
                     ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
                     try {
-                        dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+                        dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
                         ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
                         lsmAccessor.deleteComponents(predicate);
                     } catch (HyracksDataException e) {
@@ -809,102 +722,6 @@ public class ComponentRollbackTest {
         }
     }
 
-    static class Searcher {
-        private final ExecutorService executor = Executors.newSingleThreadExecutor();
-        private Future<Boolean> task;
-        private volatile boolean entered = false;
-
-        public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager,
-                TestLsmBtree lsmBtree, int numOfRecords) {
-            lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() {
-
-                @Override
-                public void before(Semaphore sem) {
-                    synchronized (Searcher.this) {
-                        entered = true;
-                        Searcher.this.notifyAll();
-                    }
-                }
-
-                @Override
-                public void after() {
-                }
-            });
-            Callable<Boolean> callable = new Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
-                    searchAndAssertCount(nc, partition, dataset, storageManager, numOfRecords);
-                    return true;
-                }
-            };
-            task = executor.submit(callable);
-        }
-
-        boolean result() throws Exception {
-            return task.get();
-        }
-
-        synchronized void waitUntilEntered() throws InterruptedException {
-            while (!entered) {
-                this.wait();
-            }
-        }
-    }
-
-    private class Merger {
-        private volatile int count = 0;
-
-        public Merger(TestLsmBtree lsmBtree) {
-            lsmBtree.addMergeCallback(new ITestOpCallback<Semaphore>() {
-
-                @Override
-                public void before(Semaphore smeaphore) {
-                    synchronized (Merger.this) {
-                        count++;
-                        Merger.this.notifyAll();
-                    }
-                }
-
-                @Override
-                public void after() {
-                }
-            });
-        }
-
-        synchronized void waitUntilCount(int count) throws InterruptedException {
-            while (this.count != count) {
-                this.wait();
-            }
-        }
-    }
-
-    private class Flusher {
-        private volatile int count = 0;
-
-        public Flusher(TestLsmBtree lsmBtree) {
-            lsmBtree.addFlushCallback(new ITestOpCallback<Semaphore>() {
-
-                @Override
-                public void before(Semaphore smeaphore) {
-                    synchronized (Flusher.this) {
-                        count++;
-                        Flusher.this.notifyAll();
-                    }
-                }
-
-                @Override
-                public void after() {
-                }
-            });
-        }
-
-        synchronized void waitUntilCount(int count) throws InterruptedException {
-            while (this.count != count) {
-                this.wait();
-            }
-        }
-    }
-
     private static class DiskComponentLsnPredicate implements Predicate<ILSMComponent> {
         private final long lsn;
 
@@ -924,49 +741,4 @@ public class ComponentRollbackTest {
             }
         }
     }
-
-    static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset,
-            StorageComponentProvider storageManager, int numOfRecords)
-            throws HyracksDataException, AlgebricksException {
-        JobId jobId = nc.newJobId();
-        IHyracksTaskContext ctx = nc.createTestContext(jobId, partition, false);
-        TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
-                Collections.emptyList(), Collections.emptyList(), false);
-        IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
-                new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
-        emptyTupleOp.open();
-        emptyTupleOp.close();
-        Assert.assertEquals(numOfRecords, countOp.getCount());
-    }
-
-    public static void waitForOperations(ILSMIndex index) throws InterruptedException {
-        // wait until number of activeOperation reaches 0
-        PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
-        long maxWaitTime = 60000L; // 1 minute
-        long before = System.currentTimeMillis();
-        while (opTracker.getNumActiveOperations() > 0) {
-            Thread.sleep(5); // NOSONAR: Test code with a timeout
-            if (System.currentTimeMillis() - before > maxWaitTime) {
-                throw new IllegalStateException(
-                        (System.currentTimeMillis() - before) + "ms passed without completing the frame operation");
-            }
-        }
-    }
-
-    public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
-            Collection<FrameWriterOperation> exceptionThrowingOperations,
-            Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
-        CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
-                exceptionThrowingOperations, errorThrowingOperations);
-        CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
-                exceptionThrowingOperations, errorThrowingOperations);
-        CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
-                exceptionThrowingOperations, errorThrowingOperations);
-        CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
-                exceptionThrowingOperations, errorThrowingOperations);
-        CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
-                exceptionThrowingOperations, errorThrowingOperations);
-        return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
-                closeAnswer, deepCopyInputFrames);
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/caf43069/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
new file mode 100644
index 0000000..8bafd32
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class IoCallbackFailureTest {
+
+    private static final int PARTITION = 0;
+    private static TestNodeController nc;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+        String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+                + File.separator + "resources" + File.separator + "cc.conf";
+        nc = new TestNodeController(configPath, false);
+        nc.init();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        System.out.println("TearDown");
+        nc.deInit();
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Test
+    public void testTempFailureInAllocateCallback() throws Exception {
+        PrimaryIndexInfo primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
+        IndexDataflowHelperFactory iHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+        JobId jobId = nc.newJobId();
+        IHyracksTaskContext ctx = nc.createTestContext(jobId, PARTITION, false);
+        IIndexDataflowHelper indexDataflowHelper =
+                iHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
+        indexDataflowHelper.open();
+        TestLsmBtree lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
+        indexDataflowHelper.close();
+        LSMInsertDeleteOperatorNodePushable insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+        StorageTestUtils.allowAllOps(lsmBtree);
+        ITestOpCallback<ILSMMemoryComponent> failCallback = new ITestOpCallback<ILSMMemoryComponent>() {
+            @SuppressWarnings("deprecation")
+            @Override
+            public void before(ILSMMemoryComponent c) throws HyracksDataException {
+                throw new HyracksDataException("Fail on allocate callback");
+            }
+
+            @Override
+            public void after() throws HyracksDataException {
+                // No Op
+            }
+        };
+        lsmBtree.addIoAllocateCallback(failCallback);
+        boolean expectedExceptionThrown = false;
+        try {
+            insert(nc, lsmBtree, ctx, insertOp, StorageTestUtils.TOTAL_NUM_OF_RECORDS,
+                    StorageTestUtils.RECORDS_PER_COMPONENT);
+        } catch (Exception e) {
+            expectedExceptionThrown = true;
+        }
+        Assert.assertTrue(expectedExceptionThrown);
+        // Clear the callback and retry
+        lsmBtree.clearIoAllocateCallback();
+        jobId = nc.newJobId();
+        ctx = nc.createTestContext(jobId, PARTITION, false);
+        insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+        insert(nc, lsmBtree, ctx, insertOp, StorageTestUtils.TOTAL_NUM_OF_RECORDS,
+                StorageTestUtils.RECORDS_PER_COMPONENT);
+    }
+
+    private static void insert(TestNodeController nc, TestLsmBtree lsmBtree, IHyracksTaskContext ctx,
+            LSMInsertDeleteOperatorNodePushable insertOp, int totalNumRecords, int recordsPerComponent)
+            throws Exception {
+        NCAppRuntimeContext ncAppCtx = nc.getAppRuntimeContext();
+        IDatasetLifecycleManager dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+        TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+        ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+        boolean failed = false;
+        try {
+            try {
+                insertOp.open();
+                VSizeFrame frame = new VSizeFrame(ctx);
+                FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+                for (int j = 0; j < totalNumRecords; j++) {
+                    // flush every recordsPerComponent records
+                    if (j % recordsPerComponent == 0 && j + 1 != totalNumRecords) {
+                        if (tupleAppender.getTupleCount() > 0) {
+                            tupleAppender.write(insertOp, true);
+                        }
+                        StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, false);
+                    }
+                    ITupleReference tuple = tupleGenerator.next();
+                    DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+                }
+                if (tupleAppender.getTupleCount() > 0) {
+                    tupleAppender.write(insertOp, true);
+                }
+            } catch (Throwable th) {
+                failed = true;
+                insertOp.fail();
+                throw th;
+            } finally {
+                insertOp.close();
+            }
+        } finally {
+            if (failed) {
+                nc.getTransactionManager().abortTransaction(txnCtx.getTxnId());
+            } else {
+                nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/caf43069/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index b39a5c6..367d0b9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -189,8 +189,8 @@ public class MultiPartitionLSMIndexTest {
         }
         // allow all operations
         for (int i = 0; i < NUM_PARTITIONS; i++) {
-            ComponentRollbackTest.allowAllOps(primaryLsmBtrees[i]);
-            ComponentRollbackTest.allowAllOps(secondaryLsmBtrees[i]);
+            StorageTestUtils.allowAllOps(primaryLsmBtrees[i]);
+            StorageTestUtils.allowAllOps(secondaryLsmBtrees[i]);
             actors[i].add(new Request(Request.Action.INSERT_OPEN));
         }
     }
@@ -224,9 +224,9 @@ public class MultiPartitionLSMIndexTest {
             }
             ensureDone(actors[0]);
             // search now and ensure partition 0 has all the records
-            ComponentRollbackTest.searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             // and that partition 1 has no records
-            ComponentRollbackTest.searchAndAssertCount(nc, 1, dataset, storageManager, 0);
+            StorageTestUtils.searchAndAssertCount(nc, 1, dataset, storageManager, 0);
             // and that partition 0 has numFlushes disk components
             Assert.assertEquals(totalNumOfComponents, primaryLsmBtrees[0].getDiskComponents().size());
             // and that partition 1 has no disk components
@@ -655,7 +655,7 @@ public class MultiPartitionLSMIndexTest {
                         if (tupleAppender.getTupleCount() > 0) {
                             tupleAppender.write(insertOps[partition], true);
                         }
-                        ComponentRollbackTest.waitForOperations(primaryLsmBtrees[partition]);
+                        StorageTestUtils.waitForOperations(primaryLsmBtrees[partition]);
                         break;
                     default:
                         break;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/caf43069/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 7bc7a88..c452548 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -45,7 +45,7 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
-import org.apache.asterix.test.dataflow.ComponentRollbackTest.Searcher;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -147,7 +147,7 @@ public class SearchCursorComponentSwitchTest {
     }
 
     void unblockSearch(TestLsmBtree lsmBtree) {
-        lsmBtree.addSearchCallback(ComponentRollbackTest.ALLOW_CALLBACK);
+        lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
         lsmBtree.allowSearch(1);
     }
 
@@ -155,7 +155,7 @@ public class SearchCursorComponentSwitchTest {
     public void testCursorSwitchSucceed() {
         try {
             // allow all operations
-            ComponentRollbackTest.allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             // except search
             lsmBtree.clearSearchCallbacks();
             insertOp.open();
@@ -170,7 +170,7 @@ public class SearchCursorComponentSwitchTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+                    StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -183,7 +183,7 @@ public class SearchCursorComponentSwitchTest {
             firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
-            ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+            StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // unblock the search
             unblockSearch(lsmBtree);
@@ -201,7 +201,7 @@ public class SearchCursorComponentSwitchTest {
     public void testCursorSwitchFails() {
         try {
             // allow all operations
-            ComponentRollbackTest.allowAllOps(lsmBtree);
+            StorageTestUtils.allowAllOps(lsmBtree);
             // except search
             lsmBtree.clearSearchCallbacks();
             insertOp.open();
@@ -216,7 +216,7 @@ public class SearchCursorComponentSwitchTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+                    StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -229,7 +229,7 @@ public class SearchCursorComponentSwitchTest {
             firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
-            ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+            StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // merge all components
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -255,7 +255,7 @@ public class SearchCursorComponentSwitchTest {
             throws HyracksDataException, AlgebricksException {
         nc.newJobId();
         TestTupleCounterFrameWriter countOp =
-                ComponentRollbackTest.create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+                StorageTestUtils.create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
                         Collections.emptyList(), Collections.emptyList(), false);
         IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
                 new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/caf43069/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
new file mode 100644
index 0000000..e7a455c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.Assert;
+
+public class StorageTestUtils {
+
+    public static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    public static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    public static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+    public static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+    public static final ARecordType META_TYPE = null;
+    public static final GenerationFunction[] META_GEN_FUNCTION = null;
+    public static final boolean[] UNIQUE_META_FIELDS = null;
+    public static final int[] KEY_INDEXES = { 0 };
+    public static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
+    public static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+    public static final int TOTAL_NUM_OF_RECORDS = 10000;
+    public static final int RECORDS_PER_COMPONENT = 1000;
+    public static final int DATASET_ID = 101;
+    public static final String DATAVERSE_NAME = "TestDV";
+    public static final String DATASET_NAME = "TestDS";
+    public static final String DATA_TYPE_NAME = "DUMMY";
+    public static final String NODE_GROUP_NAME = "DEFAULT";
+    public static final StorageComponentProvider STORAGE_MANAGER = new StorageComponentProvider();
+    public static final List<List<String>> PARTITIONING_KEYS =
+            new ArrayList<>(Collections.singletonList(Collections.singletonList(RECORD_TYPE.getFieldNames()[0])));
+    public static final TestDataset DATASET =
+            new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                    NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                            PARTITIONING_KEYS, null, null, null, false, null),
+                    null, DatasetType.INTERNAL, DATASET_ID, 0);
+    public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new ITestOpCallback<Semaphore>() {
+        @Override
+        public void before(Semaphore smeaphore) {
+            smeaphore.release();
+        }
+
+        @Override
+        public void after() {
+        }
+    };
+
+    private StorageTestUtils() {
+    }
+
+    static void allowAllOps(TestLsmBtree lsmBtree) {
+        lsmBtree.addModifyCallback(ALLOW_CALLBACK);
+        lsmBtree.addFlushCallback(ALLOW_CALLBACK);
+        lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+        lsmBtree.addMergeCallback(ALLOW_CALLBACK);
+    }
+
+    public static PrimaryIndexInfo createPrimaryIndex(TestNodeController nc, int partition)
+            throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+        return nc.createPrimaryIndex(DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, STORAGE_MANAGER, KEY_INDEXES,
+                KEY_INDICATORS_LIST, partition);
+    }
+
+    public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx)
+            throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+        return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, STORAGE_MANAGER, null).getLeft();
+    }
+
+    public static TupleGenerator getTupleGenerator() {
+        return new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION,
+                UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+    }
+
+    public static void searchAndAssertCount(TestNodeController nc, int partition, int numOfRecords)
+            throws HyracksDataException, AlgebricksException {
+        searchAndAssertCount(nc, partition, DATASET, STORAGE_MANAGER, numOfRecords);
+    }
+
+    public static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset,
+            StorageComponentProvider storageManager, int numOfRecords)
+            throws HyracksDataException, AlgebricksException {
+        JobId jobId = nc.newJobId();
+        IHyracksTaskContext ctx = nc.createTestContext(jobId, partition, false);
+        TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+                Collections.emptyList(), Collections.emptyList(), false);
+        IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+                new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
+        emptyTupleOp.open();
+        emptyTupleOp.close();
+        Assert.assertEquals(numOfRecords, countOp.getCount());
+    }
+
+    public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
+            Collection<FrameWriterOperation> exceptionThrowingOperations,
+            Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
+        CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
+                exceptionThrowingOperations, errorThrowingOperations);
+        return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
+                closeAnswer, deepCopyInputFrames);
+    }
+
+    public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, boolean async)
+            throws Exception {
+        flush(dsLifecycleMgr, lsmBtree, DATASET, async);
+    }
+
+    public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
+            boolean async) throws Exception {
+        waitForOperations(lsmBtree);
+        dsLifecycleMgr.flushDataset(dataset.getDatasetId(), async);
+    }
+
+    public static void waitForOperations(ILSMIndex index) throws InterruptedException {
+        // wait until number of activeOperation reaches 0
+        PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
+        long maxWaitTime = 60000L; // 1 minute
+        long before = System.currentTimeMillis();
+        while (opTracker.getNumActiveOperations() > 0) {
+            Thread.sleep(5); // NOSONAR: Test code with a timeout
+            if (System.currentTimeMillis() - before > maxWaitTime) {
+                throw new IllegalStateException(
+                        (System.currentTimeMillis() - before) + "ms passed without completing the frame operation");
+            }
+        }
+    }
+
+    public static class Searcher {
+        private final ExecutorService executor = Executors.newSingleThreadExecutor();
+        private Future<Boolean> task;
+        private volatile boolean entered = false;
+
+        public Searcher(TestNodeController nc, int partition, TestLsmBtree lsmBtree, int numOfRecords) {
+            this(nc, partition, DATASET, STORAGE_MANAGER, lsmBtree, numOfRecords);
+        }
+
+        public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager,
+                TestLsmBtree lsmBtree, int numOfRecords) {
+            lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore sem) {
+                    synchronized (Searcher.this) {
+                        entered = true;
+                        Searcher.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
+                }
+            });
+            Callable<Boolean> callable = new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    searchAndAssertCount(nc, partition, dataset, storageManager, numOfRecords);
+                    return true;
+                }
+            };
+            task = executor.submit(callable);
+        }
+
+        public boolean result() throws Exception {
+            return task.get();
+        }
+
+        public synchronized void waitUntilEntered() throws InterruptedException {
+            while (!entered) {
+                this.wait();
+            }
+        }
+    }
+
+    public static class Merger {
+        private volatile int count = 0;
+
+        public Merger(TestLsmBtree lsmBtree) {
+            lsmBtree.addMergeCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore smeaphore) {
+                    synchronized (Merger.this) {
+                        count++;
+                        Merger.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
+                }
+            });
+        }
+
+        public synchronized void waitUntilCount(int count) throws InterruptedException {
+            while (this.count != count) {
+                this.wait();
+            }
+        }
+    }
+
+    public static class Flusher {
+        private volatile int count = 0;
+
+        public Flusher(TestLsmBtree lsmBtree) {
+            lsmBtree.addFlushCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore smeaphore) {
+                    synchronized (Flusher.this) {
+                        count++;
+                        Flusher.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
+                }
+            });
+        }
+
+        public synchronized void waitUntilCount(int count) throws InterruptedException {
+            while (this.count != count) {
+                this.wait();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/caf43069/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 453431d..905c99d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -81,12 +81,28 @@ public abstract class AbstractTreeIndex implements ITreeIndex {
             throw HyracksDataException.create(ErrorCode.CANNOT_CREATE_ACTIVE_INDEX);
         }
         fileId = bufferCache.createFile(file);
-        bufferCache.openFile(fileId);
-        freePageManager.open(fileId);
-        freePageManager.init(interiorFrameFactory, leafFrameFactory);
-        setRootPage();
-        freePageManager.close();
-        bufferCache.closeFile(fileId);
+        boolean failed = true;
+        try {
+            bufferCache.openFile(fileId);
+            failed = false;
+        } finally {
+            if (failed) {
+                bufferCache.deleteFile(fileId);
+            }
+        }
+        failed = true;
+        try {
+            freePageManager.open(fileId);
+            freePageManager.init(interiorFrameFactory, leafFrameFactory);
+            setRootPage();
+            freePageManager.close();
+            failed = false;
+        } finally {
+            bufferCache.closeFile(fileId);
+            if (failed) {
+                bufferCache.deleteFile(fileId);
+            }
+        }
     }
 
     private void setRootPage() throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/caf43069/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index f892585..c72d402 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -76,7 +76,8 @@ public interface ILSMMemoryComponent extends ILSMComponent {
     void setState(ComponentState state);
 
     /**
-     * Allocates memory to this component, create and activate it
+     * Allocates memory to this component, create and activate it.
+     * This method is atomic. If an exception is thrown, then the call had no effect.
      *
      * @throws HyracksDataException
      */


Mime
View raw message